定时任务,不陌生。
一、JDK 原生
java.util.Timer
1. 提供了API 用来提交任务
2.使用while 循环来执行 任务
3. 任务保存在数组中,如果没有任务执行,则 wait
java.util.TimerTask
一个抽象 线程类,用来保证 任务
示例:
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
System.out.printf("TimerTask exe:"+Thread.currentThread().getName());
}
}, 0, 10 * 1000); // 单位是毫秒
二. Spring 提供的 定时任务
只需要在 方法上加上
@Scheduled
注解,就OK了。 这个用起来非常便捷。
org.springframework.boot.autoconfigure.task.TaskSchedulingAutoConfiguration
在spring boot 项目启动时,这个类会 创建 必要的bean。
org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler
会创建一个 ScheduledExecutorService
org.springframework.scheduling.annotation.ScheduledAnnotationBeanPostProcessor
后置处理器:
1. 处理 Scheduled 注解的方法
2. 使用 ScheduledExecutorService 执行 task
注意:
线程池默认是 1 个线程,如果要增加线程池个数,可以用配置:
spring.task.scheduling.pool.size
根据自动配置类中
@Bean
@ConditionalOnBean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)
@ConditionalOnMissingBean({ SchedulingConfigurer.class, TaskScheduler.class, ScheduledExecutorService.class })
public ThreadPoolTaskScheduler taskScheduler(TaskSchedulerBuilder builder) {
return builder.build();
}
也可以自定义定时执行器:
@Bean
public TaskScheduler taskScheduler() {
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.setPoolSize(2);
return taskScheduler;
}
三. xxl-job
https://github.com/xuxueli/xxl-job.git/
1. xxl-job 有一个控制台,需要部署之后,用来注册Job。 需要使用到 mysql 数据库
2. 具体的Job 方法写完之后,spring boot项目 使用
@XxlJob
注解标识这个方法,就OK了,也不复杂。
1. 控制台部署
xxljob 依赖 数据库 保存 job的数据信息,所以 首先需要 建库建表。所需要的建表语句 再源码包中已经提供。
数据库配置完成后,直接 部署 控制服务就可以了。就是一个 spring boot 的应用。
默认用户名密码: admin/123456
2. 应用配置
1. 引入依赖包:
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
</dependency>
2. 添加 注册器bean
首先,将xxljob需要用到的配置 使用配置化的方式进行配置,方便修改
xxl:
job:
accessToken: default_token
admin:
addresses: http://127.0.0.1:8080/xxl-job-admin
executor:
address: ''
appname: xxl-job-executor-sample
ip: ''
logpath: /data/applogs/xxl-job/jobhandler
logretentiondays: 30
port: 9999
注册xxljob Bean
@Bean
public XxlJobSpringExecutor xxlJobExecutor() {
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAppname(appname);
xxlJobSpringExecutor.setIp(ip);
xxlJobSpringExecutor.setPort(port);
xxlJobSpringExecutor.setAccessToken(accessToken);
xxlJobSpringExecutor.setLogPath(logPath);
xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
return xxlJobSpringExecutor;
}
XxlJobSpringExecutor 是一个 SmartInitializingSingleton 实现类,实例化后执行 afterSingletonsInstantiated方法。
1. 在spring容器中找到所有 标准 @XxlJob注解的方法
public @interface XxlJob {
/**
* jobhandler name
*/
String value();
/**
* init handler, invoked when JobThread init
*/
String init() default "";
/**
* destroy handler, invoked when JobThread destroy
*/
String destroy() default "";
}
value 就是 执行这个xxljob 的 处理器的名字。
如果value为空,会报错。 如果 value重复,也会报错。
注册 value 和 com.xxl.job.core.handler.impl.MethodJobHandler 的映射关系。执行时就是调用这个方法
@Override
public ReturnT<String> execute(String param) throws Exception {
return (ReturnT<String>) method.invoke(target, new Object[]{param});
}
2. com.xxl.job.core.executor.XxlJobExecutor#start
public void start() throws Exception {
// init logpath
XxlJobFileAppender.initLogPath(logPath);
// init invoker, admin-client
initAdminBizList(adminAddresses, accessToken);
// init JobLogFileCleanThread
JobLogFileCleanThread.getInstance().start(logRetentionDays);
// init TriggerCallbackThread
TriggerCallbackThread.getInstance().start();
// init executor-server
initEmbedServer(address, ip, port, appname, accessToken);
}
初始化日志目录,如果不配置,则默认是logBasePath = "/data/applogs/xxl-job/jobhandler";
初始化客户端,这里会注册 xxljob应用端,并提供回调。
需要 xxljob控制台进行通信的,所以需要xxljob控制台地址 和 token
public class AdminBizClient implements AdminBiz {
public AdminBizClient() {
}
public AdminBizClient(String addressUrl, String accessToken) {
this.addressUrl = addressUrl;
this.accessToken = accessToken;
// valid
if (!this.addressUrl.endsWith("/")) {
this.addressUrl = this.addressUrl + "/";
}
}
private String addressUrl ;
private String accessToken;
private int timeout = 3;
@Override
public ReturnT<String> callback(List<HandleCallbackParam> callbackParamList) {
return XxlJobRemotingUtil.postBody(addressUrl+"api/callback", accessToken, timeout, callbackParamList, String.class);
}
@Override
public ReturnT<String> registry(RegistryParam registryParam) {
return XxlJobRemotingUtil.postBody(addressUrl + "api/registry", accessToken, timeout, registryParam, String.class);
}
@Override
public ReturnT<String> registryRemove(RegistryParam registryParam) {
return XxlJobRemotingUtil.postBody(addressUrl + "api/registryRemove", accessToken, timeout, registryParam, String.class);
}
}
启动日志文件自动清理线程,必须超过 3 才启动,否则 不启动日志文件清理线程
public void start(final long logRetentionDays){
// limit min value
if (logRetentionDays < 3 ) {
return;
}
启动callback线程,应该是将执行结果 通知到 xxljob控制台
public ReturnT<String> callback(List<HandleCallbackParam> callbackParamList) {
return XxlJobRemotingUtil.postBody(addressUrl+"api/callback", accessToken, timeout, callbackParamList, String.class);
}
启动客户端 的http端口,用来xxljob控制台 进行xxljob任务调度 调的地址
涉及到 address ip port appName accessToken
确认客户端端口:如果传了端口,就用传入的端口,否则 自动生成,默认是 9999
port = port>0?port: NetUtil.findAvailablePort(9999);
确认客户端IP:如果传了IP,则用传入的,否则 自动获取
ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp();
确认客户端ip地址:
首先,如果 address 参数不为空,则使用 address (需要是http://ip:port 或者 http://域名:port) 的形式, 如果 为空,则使用 http:ip(上面解析的ip):port(上面解析的port)
所以, address 参数 和 ip、port 参数 ,用来提供 一个 可被调用的 地址,完成 xxljob 任务的调用
启动server,使用的是 Netty
ChannelFuture future = bootstrap.bind(port).sync();
这时,server 已启动
进行注册, 将 appName 和 address 注册到 xxljob 控制台
RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
@Override
public ReturnT<String> registry(RegistryParam registryParam) {
return XxlJobRemotingUtil.postBody(addressUrl + "api/registry", accessToken, timeout, registryParam, String.class);
}
如图:
总结来说
必须配置:
1. 需要配置 xxljob 控制台 地址 和 token
2. 需要配置 定时任务的 执行器
配置完后,可以开始编写任务:
@XxlJob("doLoadJob")
public void doLoadJob(){
System.out.printf("doLoadJob");
}
功能点:
1. 服务启动后,自动向 控制台注册自己的 地址
2. 控制台注册 任务时,可以选择一种路由策略: 分片广播
分片广播的原理:
拿到 注册的所有的 服务IP,遍历执行 http请求。
对于分片广播,需要 再编写 Job时,完成 分片处理。xxlJob 不会给你的任务做分片,它只是 触发每台服务执行一下任务而已。
对于分片方法,可以自己定义:
任务数%机器数 == 当前机器编号 时,执行
3. 服务接收到任务请求后,会把任务 放在队列中,由 其他的线程异步执行。
四.Quartz
官网:
http://www.quartz-scheduler.org/
在编写代码时,稍微复杂一下,需要实现Job接口,创建 JobDetail、Trigger、Scheduler 等来协助完成调度你的任务。
五、Elastic-Job
需要用到 zookeeper来协助完成。目前为止没有用过。