SpringBoot项目使用多线程实现动态定时任务


BaseTask.java

1
2
3
4
5
6
7
8
9
10
11
12
13

@Getter
@Setter
@DynamicUpdate
@DynamicInsert
@MappedSuperclass
@EntityListeners(AuditingEntityListener.class)
public abstract class BaseTask extends BaseEntity {

private String cron;

}

MyTask.java

实体类(根据项目需求自定义)

1
2
3
4
5
6
7
@Getter
@Setter
@Entity
@Table(name = "my_task")
public class MyTask extends BaseTask {
...
}

SchedulerThreadPoolConfig.java

定时任务线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Configuration
public class SchedulerThreadPoolConfig {
@Bean
public TaskScheduler taskScheduler() {
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
// 定时任务执行线程池核心线程数
taskScheduler.setPoolSize(10);
// 设置任务注册器的调度器
taskScheduler.setRemoveOnCancelPolicy(true);
// 设置线程名称前缀
taskScheduler.setThreadNamePrefix("SchedulerThreadPool-");
return taskScheduler;
}
}

ScheduledTask.java

ScheduledFuture包装类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public final class ScheduledTask {

volatile ScheduledFuture<?> future;

/**
* 取消定时任务
*/
public void cancel() {
ScheduledFuture<?> future = this.future;
if (future != null) {
future.cancel(true);
}
}
}

SchedulingRunnable.java

Runnable接口实现类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class TaskSchedulingRunnable implements Runnable {

private final BaseTask task;

private final BaseTaskService<? extends BaseTask> taskService;

public TaskSchedulingRunnable(BaseTask task, BaseTaskService<? extends BaseTask> taskService) {
this.task = task;
this.taskService = taskService;
}

@Override
public void run() {
//执行任务
taskService.execute(task);
}
}

CronTaskRegistrar.java

定时任务注册类,用于增加、删除定时任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49

@Component
public class CronTaskRegistrar implements DisposableBean {

private final Map<Integer, ScheduledTask> scheduledTasks = new ConcurrentHashMap<>(16);

@Autowired
private TaskScheduler taskScheduler;

public TaskScheduler getScheduler() {
return this.taskScheduler;
}

public void addCronTask(Integer cronId,Runnable task, String cronExpression) {
addCronTask(cronId,new CronTask(task, cronExpression));
}

public void addCronTask(Integer cronId,CronTask cronTask) {
if (cronTask != null) {
if (this.scheduledTasks.containsKey(cronId)) {
removeCronTask(cronId);
}
this.scheduledTasks.put(cronId, scheduleCronTask(cronTask));
}
}

public void removeCronTask(Integer cronId) {
ScheduledTask scheduledTask = this.scheduledTasks.remove(cronId);
if (scheduledTask != null) {
scheduledTask.cancel();
}
}

public ScheduledTask scheduleCronTask(CronTask cronTask) {
ScheduledTask scheduledTask = new ScheduledTask();
scheduledTask.future = this.taskScheduler.schedule(cronTask.getRunnable(), cronTask.getTrigger());
return scheduledTask;
}


@Override
public void destroy() {
for (ScheduledTask task : this.scheduledTasks.values()) {
task.cancel();
}
this.scheduledTasks.clear();
}
}

ScheduleConfig.java

实现CommandLineRunner,SpringBoot项目启动时自动执行run方法(从cron表中取出符合条件的数据加入定时任务线程池)

1
2
3
4
5
6
7
8
9
10
11
12
@Component
public class ScheduleConfig implements CommandLineRunner {

@Autowired
private MyTaskService taskService;

@Override
public void run(String... args) throws Exception {
taskService.run();
}
}

MyTaskRepository

Base类前往SpringDataJPA之Base抽象类+范型快速实现增删查改

1
2
3
public interface MyTaskRepository extends BaseRepository<MyTask> {
}

BaseTaskService.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public abstract class BaseTaskService<Task extends BaseTask>
extends BaseService<Task>
{

private final BaseRepository<Task> repository;
private final CronTaskRegistrar cronTaskRegistrar;

protected BaseTaskService(BaseRepository<Task> repository, CronTaskRegistrar cronTaskRegistrar) {
super(repository);
this.repository = repository;
this.cronTaskRegistrar = cronTaskRegistrar;
}

public List<Task> findAll() {
return repository.findAll();
}
// 任务内容
public void execute(BaseTask task) {
System.out.println(task.toString());
}

public void run() {
List<Task> Tasks = findAll();
for (Task task : Tasks) {
TaskSchedulingRunnable taskSchedulingRunnable = new TaskSchedulingRunnable(task, this);
cronTaskRegistrar.addCronTask(task.getId(), taskSchedulingRunnable, task.getCron());
}
}
}

MyTaskService.java

1
2
3
4
5
6
7
8
9
10
11
12
13
@Service
public class MyTaskService extends BaseTaskService<MyTask> {

protected MyTaskService(MyTaskRepository repository, CronTaskRegistrar cronTaskRegistrar) {
super(repository, cronTaskRegistrar);
}

@Override
public void execute(BaseTask myTask) {
System.out.println(myTask.toString());
}
}


← Prev Mac环境下Docker拉取Nginx镜像部署到无网络服务器 | 布隆过滤器详解(开发中) Next →