当前位置 : 首页 » 文章分类 :  开发  »  Spring-Context-Scheduling

Spring-Context-Scheduling

springframework.scheduling 包相关笔记
所属模块:spring-context


ThreadPoolTaskExecutor线程池

Spring 中的 ThreadPoolTaskExecutor 是借助于JDK并发包中的 java.util.concurrent.ThreadPoolExecutor 来实现的。
建议在 Spring 项目中使用 ThreadPoolTaskExecutor , 对 ThreadPoolExecutor 做了很多封装,使用简单。

public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport
        implements AsyncListenableTaskExecutor, SchedulingTaskExecutor {}

我们可以看到他继承了 ExecutorConfigurationSupport 类, 此类实现了 BeanNameAware, InitializingBean, DisposableBean 这三个接口主要是做一些初始化和销毁资源处理操作, 给 ThreadPoolTaskExecutor 进行自动初始化和销毁赋能。

public abstract class ExecutorConfigurationSupport extends CustomizableThreadFactory
        implements BeanNameAware, InitializingBean, DisposableBean{}

初始化过程分析

由于 ThreadPoolTaskExecutor 的父类 ExecutorConfigurationSupport 实现了 InitializingBean 接口,所以只要重写其中的 afterPropertiesSet() 方法即可定制初始化动作。

public abstract class ExecutorConfigurationSupport extends CustomizableThreadFactory
        implements BeanNameAware, InitializingBean, DisposableBean {
  @Override
  public void afterPropertiesSet() {
    initialize();
  }

  public void initialize() {
        if (logger.isInfoEnabled()) {
            logger.info("Initializing ExecutorService" + (this.beanName != null ? " '" + this.beanName + "'" : ""));
        }
        if (!this.threadNamePrefixSet && this.beanName != null) {
            setThreadNamePrefix(this.beanName + "-");
        }
        this.executor = initializeExecutor(this.threadFactory, this.rejectedExecutionHandler);
    }

  protected abstract ExecutorService initializeExecutor(
            ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler);
}

最终通过抽象方法 initializeExecutor 来初始化 executor, 在 ThreadPoolTaskExecutor 中的实现:

@Override
protected ExecutorService initializeExecutor(
    ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {

  BlockingQueue<Runnable> queue = createQueue(this.queueCapacity);

  ThreadPoolExecutor executor;
  if (this.taskDecorator != null) {
    executor = new ThreadPoolExecutor(
        this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
        queue, threadFactory, rejectedExecutionHandler) {
      @Override
      public void execute(Runnable command) {
        super.execute(taskDecorator.decorate(command));
      }
    };
  }
  else {
    executor = new ThreadPoolExecutor(
        this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
        queue, threadFactory, rejectedExecutionHandler);

  }

  if (this.allowCoreThreadTimeOut) {
    executor.allowCoreThreadTimeOut(true);
  }

  this.threadPoolExecutor = executor;
  return executor;
}

可以看到就是用参数 new 一个 ThreadPoolExecutor

销毁过程分析

由于 ThreadPoolTaskExecutor 的父类 ExecutorConfigurationSupport 实现了 DisposableBean 接口,所以只要重写其中的 destroy() 方法即可定制销毁动作。

public abstract class ExecutorConfigurationSupport extends CustomizableThreadFactory
        implements BeanNameAware, InitializingBean, DisposableBean {
    @Override
      public void destroy() {
          shutdown();
      }

    public void shutdown() {
          if (logger.isInfoEnabled()) {
              logger.info("Shutting down ExecutorService" + (this.beanName != null ? " '" + this.beanName + "'" : ""));
          }
          if (this.executor != null) {
              if (this.waitForTasksToCompleteOnShutdown) {
                  this.executor.shutdown();
              }
              else {
                  this.executor.shutdownNow();
              }
              awaitTerminationIfNecessary(this.executor);
          }
      }
}

主要是用了 ExecutorService 的 shutdown 做一些资源的处理

Spring框架中对线程池的使用和封装ThreadPoolTaskExecutor
https://my.oschina.net/kipeng/blog/1795538


使用

最常用方式就是做为 bean 注入到容器中。
线程池 config, 暴露 bean 到 spring 上下文中:

@Bean
public ThreadPoolTaskExecutor taskExecutor() {
  ThreadPoolTaskExecutor poolExecutor = new ThreadPoolTaskExecutor();
  // 核心线程数
  poolExecutor.setCorePoolSize(5);
  // 最大线程数
  poolExecutor.setMaxPoolSize(15);
  // 队列大小
  poolExecutor.setQueueCapacity(100);
  // 线程最大空闲时间
  poolExecutor.setKeepAliveSeconds(300);
  // 拒绝策略
  poolExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
  // 线程名称前缀
  poolExecutor.setThreadNamePrefix("my-pool-");

  return poolExecutor;
}

新任务提交时:若当前线运行的程数量小于核心线程数,则创建一条新线程;
若已经超过核心线程数,则先放入队列中; 队列满后,创建新线程;
当线程总数等于最大线程数时,则执行拒绝策略

使用

@Autowired
private ThreadPoolTaskExecutor taskExecutor;

public void testExecutor(final String str) {
  taskExecutor.execute(() -> System.out.println(Thread.currentThread().getName() + "--" + str));
}

SchedulingConfigurer

Spring 中,创建定时任务除了使用 @Scheduled 注解外,还可以使用 SchedulingConfigurer
@Schedule 注解有一个缺点,其定时的时间不能动态的改变,而基于 SchedulingConfigurer 接口的方式可以做到。

SchedulingConfigurer 源码

只有一个方法,用于添加定时任务

package org.springframework.scheduling.annotation;

import org.springframework.scheduling.config.ScheduledTaskRegistrar;

@FunctionalInterface
public interface SchedulingConfigurer {
    void configureTasks(ScheduledTaskRegistrar taskRegistrar);
}

addFixedRateTask

addFixedDelayTask

addCronTask

addTriggerTask


@Scheduled 定时任务

Spring提供了两种任务调度方法:
1、同步任务,@Scheduled,通过 @EnableScheduling 启用
2、异步任务,@Async,通过 @EnableAsync 启用


cron 表达式

cron 表达式,指定任务在特定时间执行;
@Scheduled(cron="*/5 * * * * *") 通过cron表达式定义规则

每隔 5s 就会来询问是否可以执行下一个任务,如果前一个任务没有执行完成,则后面的任务需要继续等待 5s 后再来问,看看是否可以执行。

fixedDelay/fixedDelayString

fixedDelay 表示上一次任务执行完成后多久再次执行,参数类型为 long,单位 ms
fixedDelayString 与 fixedDelay 含义一样,只是参数类型变为 String

@Scheduled(fixedDelay = 60 * 60 * 1000L, initialDelay = 1 * 60 * 1000L) 固定 60 分钟执行一次,第一次 1 分钟后触发
@Scheduled(fixedDelay = 5000) 上一次执行完毕时间点之后5秒再执行

fixedRate/fixedRateString

fixedRate 表示按一定的频率执行任务,参数类型为 long,单位 ms
fixedRateString 与 fixedRate 的含义一样,只是将参数类型变为 String

如果前一个任务执行时间(这个时间是累计的)超过执行周期,则后一个任务在前一个任务完成后立即执行,否则等待到指定周期时刻执行

@Scheduled(fixedRate = 5000) 上一次开始执行时间点之后5秒再执行
@Scheduled(initialDelay=1000, fixedRate=5000) 第一次延迟1秒后执行,之后按fixedRate的规则每5秒执行一次

initialDelay 表示延迟多久再第一次执行任务,参数类型为 long,单位ms
initialDelayString 与 initialDelay 的含义一样,只是将参数类型变为 String
这两个参数都要配合 fixedDelayfixedRate 一起使用。

@Component
public class ScheduledTasks {
    public final static long ONE_DAY = 24 * 60 * 60 * 1000;
    public final static long ONE_HOUR =  60 * 60 * 1000;

    @Scheduled(fixedRate = ONE_DAY)
    public void scheduledTask() {
       System.out.println(" 我是一个每隔一天就会执行一次的调度任务");
    }

    @Scheduled(fixedDelay = ONE_HOURS)
    public void scheduleTask2() {
       System.out.println(" 我是一个执行完后,隔一小时就会执行的任务");
    }

    @Scheduled(initialDelay=1000, fixedRate=5000)
    public void doSomething() {
       System.out.println("我是一个第一次延迟1秒执行,之后每5秒执行一次的任务");
    }

    @Scheduled(cron = "0 0/1 * * * ? ")
    public void ScheduledTask3() {
       System.out.println(" 我是一个每隔一分钟就就会执行的任务");
    }
}

@Scheduled 定时任务停止执行排查

spring @Scheduled原理解析
https://juejin.im/post/6844903924936212494


@EnableScheduling 启用定时任务

在Spring Boot的主类中加入 @EnableScheduling 注解,启用定时任务的配置
@EnableScheduling 注解的作用是发现注解 @Scheduled 的任务并由后台执行。没有它的话将无法执行定时任务。

@SpringBootApplication
@EnableScheduling
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

读取配置文件中的cron时间配置

第二种在你的类前面添加

@PropertySource("classpath:root/test.props")
@Scheduled(cron="${jobs.schedule}")

最后 test.props 添加 jobs.schedule = 0/5 * * * * ?

cron 表达式从左到右依次为:
second
minute
hour
day of month
month
day of week

Spring使用@Scheduled进行定时任务,定的时间可否变?
https://www.oschina.net/question/578674_84037


避免集群环境下任务重复调度

当 @Scheduled 定时任务部署在集群环境下时,会出现任务多次被调度执行的情况,因为 @Scheduled 只是个轻量级的本地定时任务,本身不提供集群调度功能。
为了避免集群环境下任务重复调度,可以采用如下方法;
1、调度方法加redis分布式锁,防止同一个任务被多次调度。
2、改用Quartz定时任务框架,Quartz提供基于数据库持久化的集群调度功能。

如何用Spring实现集群环境下的定时任务
https://blog.csdn.net/caomiao2006/article/details/52750569


根据条件启用定时任务

@Service
@ConditionalOnProperty("yourConditionPropery")
public class SchedulingService {

  @Scheduled
  public void task1() {...}

  @Scheduled
  public void task2() {...}

}

一次@Scheduled任务未执行问题排查

背景:
springboot 项目中配了好几个 @Scheduled 定时任务,其中有个 fixedDelay 自动登录保持 session 有效的任务,在某次执行后就没再执行过。

@Scheduled(fixedDelay = 10 * 60 * 1000L)
private String login() {
    ...
}

排查方向:
1、因为是 fixedDelay 任务,上一次任务结束后才会有下一次调用,先看下是否上一次任务卡住了没返回。
发现每次 login 都马上返回了。

2、看下项目中其他 @Scheduled 定时任务是否还在不断触发,主要是排查下定时任务线程池是否还在工作。
排查发现另外一个 10 分钟执行一次的定时任务也停了,继续排查发现有个通过 SchedulingConfigurer 配置的定时任务跑了几个小时,一直到当前还在执行,自从这个定时任务开始,所有其他定时任务都不执行了。

原因:
spring 中的定时任务,不论是通过 @Scheduled 注解开启的,还是通过 SchedulingConfigurer 代码配置的,都是放到同一个定时任务调度线程池中执行的,这个线程池默认是单线程的,源码在 org.springframework.scheduling.config.ScheduledTaskRegistrar 中:

protected void scheduleTasks() {
    if (this.taskScheduler == null) {
        this.localExecutor = Executors.newSingleThreadScheduledExecutor();
        this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);
    }
    if (this.triggerTasks != null) {
        for (TriggerTask task : this.triggerTasks) {
            addScheduledTask(scheduleTriggerTask(task));
        }
    }
    if (this.cronTasks != null) {
        for (CronTask task : this.cronTasks) {
            addScheduledTask(scheduleCronTask(task));
        }
    }
    if (this.fixedRateTasks != null) {
        for (IntervalTask task : this.fixedRateTasks) {
            addScheduledTask(scheduleFixedRateTask(task));
        }
    }
    if (this.fixedDelayTasks != null) {
        for (IntervalTask task : this.fixedDelayTasks) {
            addScheduledTask(scheduleFixedDelayTask(task));
        }
    }
}

即如果不指定 taskScheduler 默认使用 Executors.newSingleThreadScheduledExecutor() 是单线程的线程池,且拒绝策略是默认的 AbortPolicy 拒绝执行。
所以如果某个定时任务出现死循环一直跑,则所有其他定时任务都无法执行了。

解决:
配置 spring 定时任务线程池的线程个数,改为多线程。

@Scheduled多线程

spring 中的定时任务线程池默认是单线程的,定时任务触发时如果当前有其他定时任务在跑,定时任务线程池就会抛出 RejectedExecutionException 异常导致任务无法执行
可以实现 SchedulingConfigurer 个性化配置定时任务线程池,设置 taskScheduler 为多线程线程池。

@Configuration
public class ScheduleConfig implements SchedulingConfigurer {
    @Override
    public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
        //设定一个长度10的定时任务线程池
        taskRegistrar.setScheduler(Executors.newScheduledThreadPool(10));
    }
}

@Scheduled源码

package org.springframework.scheduling.annotation;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Repeatable;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Repeatable(Schedules.class)
public @interface Scheduled {
    String cron() default "";

    String zone() default "";

    long fixedDelay() default -1L;

    String fixedDelayString() default "";

    long fixedRate() default -1L;

    String fixedRateString() default "";

    long initialDelay() default -1L;

    String initialDelayString() default "";
}

@Async 异步方法

@EnableAsync启用异步方法

有时候我们会调用一些特殊的任务,任务会比较耗时,重要的是,我们不管他返回的后果。这时候我们就需要用这类的异步任务啦,调用后就让他去跑,不堵塞主线程,我们继续干别的。

在Spring Boot中,我们只需要通过使用@Async注解就能简单的将原来的同步方法变为异步方法。

@Async 标注的方法将在独立的线程中被执行,调用者无需等待它的完成,即可继续其他的操作。

为了让@Async注解能够生效,还需要在Spring Boot的主程序中配置@EnableAsync

@SpringBootApplication
@EnableAsync
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

@Async异步方法

public void AsyncTask(){
    @Async
    public void doSomeHeavyBackgroundTask(int sleepTime) {
        try {
            Thread.sleep(sleepTime);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Async
    public Future<String> doSomeHeavyBackgroundTask() {
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return null;
    }

    public void printLog() {
        System.out.println("  i print a log  ,time=" + System.currentTimeMillis());
    }
 }

测试类:

@RunWith(SpringJUnit4ClassRunner.class)
@WebAppConfiguration
@ContextConfiguration(classes = AsycnTaskConfig.class) //要声明@EnableASync
public class AsyncTaskTest {
    @Autowired
    AsyncTask asyncTask;

    @Test
    public void AsyncTaskTest() throws InterruptedException {
        if (asyncTask != null) {
            asyncTask.doSomeHeavyBackgroundTask(4000);
            asyncTask.printLog();
            Thread.sleep(5000);
        }
    }
}

@Async 所修饰的函数不要定义为static类型,这样异步调用不会生效

Spring Boot中使用@Async实现异步调用
http://blog.didispace.com/springbootasync/

Spring的两种任务调度Scheduled和Async
https://sanjay-f.github.io/2015/08/24/Spring%E7%9A%84%E4%B8%A4%E7%A7%8D%E4%BB%BB%E5%8A%A1%E8%B0%83%E5%BA%A6Scheduled%E5%92%8CAsync/


异步异常处理

Spring的两种任务调度Scheduled和Async
https://sanjay-f.github.io/2015/08/24/Spring%E7%9A%84%E4%B8%A4%E7%A7%8D%E4%BB%BB%E5%8A%A1%E8%B0%83%E5%BA%A6Scheduled%E5%92%8CAsync/

springboot异步调用@Async
https://segmentfault.com/a/1190000010142962


上一篇 Java-Bean Validation

下一篇 Spring-MyBatis

阅读
评论
2,792
阅读预计12分钟
创建日期 2018-06-28
修改日期 2020-09-13
类别

页面信息

location:
protocol:
host:
hostname:
origin:
pathname:
href:
document:
referrer:
navigator:
platform:
userAgent:

评论