Job Scheduling简介

定时任务,不陌生。

一、JDK 原生

java.util.Timer

1. 提供了API 用来提交任务

2.使用while 循环来执行 任务

3. 任务保存在数组中,如果没有任务执行,则 wait

java.util.TimerTask

一个抽象 线程类,用来保证 任务

示例:

Timer timer = new Timer();

timer.schedule(new TimerTask() {
            @Override
            public void run() {
                System.out.printf("TimerTask exe:"+Thread.currentThread().getName());
            }
        }, 0, 10 * 1000);  // 单位是毫秒

二.  Spring 提供的 定时任务

只需要在 方法上加上 

@Scheduled

注解,就OK了。 这个用起来非常便捷。

org.springframework.boot.autoconfigure.task.TaskSchedulingAutoConfiguration

在spring boot 项目启动时,这个类会 创建 必要的bean。

org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler

会创建一个   ScheduledExecutorService 

org.springframework.scheduling.annotation.ScheduledAnnotationBeanPostProcessor

后置处理器:

1.  处理  Scheduled 注解的方法

2. 使用 ScheduledExecutorService 执行 task

注意:

线程池默认是 1 个线程,如果要增加线程池个数,可以用配置:

spring.task.scheduling.pool.size

根据自动配置类中
@Bean
	@ConditionalOnBean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)
	@ConditionalOnMissingBean({ SchedulingConfigurer.class, TaskScheduler.class, ScheduledExecutorService.class })
	public ThreadPoolTaskScheduler taskScheduler(TaskSchedulerBuilder builder) {
		return builder.build();
	}

也可以自定义定时执行器:

@Bean
    public TaskScheduler taskScheduler() {
        ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
        taskScheduler.setPoolSize(2);
        return taskScheduler;
    }

三.  xxl-job


https://github.com/xuxueli/xxl-job.git/
 

1. xxl-job 有一个控制台,需要部署之后,用来注册Job。 需要使用到 mysql 数据库

2. 具体的Job 方法写完之后,spring boot项目 使用 

@XxlJob

注解标识这个方法,就OK了,也不复杂。

1.  控制台部署

xxljob 依赖 数据库 保存 job的数据信息,所以 首先需要 建库建表。所需要的建表语句 再源码包中已经提供。

数据库配置完成后,直接 部署 控制服务就可以了。就是一个 spring boot 的应用。

默认用户名密码: admin/123456

2. 应用配置

1. 引入依赖包:

        <dependency>
            <groupId>com.xuxueli</groupId>
            <artifactId>xxl-job-core</artifactId>
        </dependency>

2. 添加 注册器bean

首先,将xxljob需要用到的配置 使用配置化的方式进行配置,方便修改

xxl:
  job:
    accessToken: default_token
    admin:
      addresses: http://127.0.0.1:8080/xxl-job-admin
    executor:
      address: ''
      appname: xxl-job-executor-sample
      ip: ''
      logpath: /data/applogs/xxl-job/jobhandler
      logretentiondays: 30
      port: 9999

注册xxljob Bean


    @Bean
    public XxlJobSpringExecutor xxlJobExecutor() {
        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
        xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
        xxlJobSpringExecutor.setAppname(appname);
        xxlJobSpringExecutor.setIp(ip);
        xxlJobSpringExecutor.setPort(port);
        xxlJobSpringExecutor.setAccessToken(accessToken);
        xxlJobSpringExecutor.setLogPath(logPath);
        xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);

        return xxlJobSpringExecutor;
    }

XxlJobSpringExecutor 是一个 SmartInitializingSingleton 实现类,实例化后执行 afterSingletonsInstantiated方法。

1. 在spring容器中找到所有 标准 @XxlJob注解的方法

public @interface XxlJob {

    /**
     * jobhandler name
     */
    String value();

    /**
     * init handler, invoked when JobThread init
     */
    String init() default "";

    /**
     * destroy handler, invoked when JobThread destroy
     */
    String destroy() default "";

}

value 就是 执行这个xxljob 的 处理器的名字。

如果value为空,会报错。 如果 value重复,也会报错。

注册 value 和 com.xxl.job.core.handler.impl.MethodJobHandler 的映射关系。执行时就是调用这个方法

 @Override
    public ReturnT<String> execute(String param) throws Exception {
        return (ReturnT<String>) method.invoke(target, new Object[]{param});
    }

2. com.xxl.job.core.executor.XxlJobExecutor#start

    public void start() throws Exception {

        // init logpath
        XxlJobFileAppender.initLogPath(logPath);

        // init invoker, admin-client
        initAdminBizList(adminAddresses, accessToken);


        // init JobLogFileCleanThread
        JobLogFileCleanThread.getInstance().start(logRetentionDays);

        // init TriggerCallbackThread
        TriggerCallbackThread.getInstance().start();

        // init executor-server
        initEmbedServer(address, ip, port, appname, accessToken);
    }

初始化日志目录,如果不配置,则默认是logBasePath = "/data/applogs/xxl-job/jobhandler";

初始化客户端,这里会注册 xxljob应用端,并提供回调。

需要 xxljob控制台进行通信的,所以需要xxljob控制台地址 和 token

public class AdminBizClient implements AdminBiz {

    public AdminBizClient() {
    }
    public AdminBizClient(String addressUrl, String accessToken) {
        this.addressUrl = addressUrl;
        this.accessToken = accessToken;

        // valid
        if (!this.addressUrl.endsWith("/")) {
            this.addressUrl = this.addressUrl + "/";
        }
    }

    private String addressUrl ;
    private String accessToken;
    private int timeout = 3;


    @Override
    public ReturnT<String> callback(List<HandleCallbackParam> callbackParamList) {
        return XxlJobRemotingUtil.postBody(addressUrl+"api/callback", accessToken, timeout, callbackParamList, String.class);
    }

    @Override
    public ReturnT<String> registry(RegistryParam registryParam) {
        return XxlJobRemotingUtil.postBody(addressUrl + "api/registry", accessToken, timeout, registryParam, String.class);
    }

    @Override
    public ReturnT<String> registryRemove(RegistryParam registryParam) {
        return XxlJobRemotingUtil.postBody(addressUrl + "api/registryRemove", accessToken, timeout, registryParam, String.class);
    }

}

启动日志文件自动清理线程,必须超过 3 才启动,否则 不启动日志文件清理线程

public void start(final long logRetentionDays){

        // limit min value
        if (logRetentionDays < 3 ) {
            return;
        }

启动callback线程,应该是将执行结果 通知到 xxljob控制台

 public ReturnT<String> callback(List<HandleCallbackParam> callbackParamList) {
        return XxlJobRemotingUtil.postBody(addressUrl+"api/callback", accessToken, timeout, callbackParamList, String.class);
    }

启动客户端 的http端口,用来xxljob控制台 进行xxljob任务调度 调的地址

涉及到 address ip port appName accessToken

确认客户端端口:如果传了端口,就用传入的端口,否则 自动生成,默认是 9999

 port = port>0?port: NetUtil.findAvailablePort(9999);

 确认客户端IP:如果传了IP,则用传入的,否则 自动获取

 ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp();

确认客户端ip地址:

首先,如果 address  参数不为空,则使用 address (需要是http://ip:port 或者 http://域名:port) 的形式, 如果 为空,则使用  http:ip(上面解析的ip):port(上面解析的port)

所以, address 参数 和  ip、port 参数 ,用来提供 一个 可被调用的 地址,完成 xxljob 任务的调用

启动server,使用的是 Netty

ChannelFuture future = bootstrap.bind(port).sync();

这时,server 已启动

进行注册, 将 appName 和 address 注册到 xxljob 控制台

RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);

@Override
    public ReturnT<String> registry(RegistryParam registryParam) {
        return XxlJobRemotingUtil.postBody(addressUrl + "api/registry", accessToken, timeout, registryParam, String.class);
    }

如图:

总结来说

必须配置:

1.  需要配置 xxljob 控制台 地址 和 token

2. 需要配置 定时任务的 执行器

配置完后,可以开始编写任务:

    @XxlJob("doLoadJob")
    public void doLoadJob(){

        System.out.printf("doLoadJob");

    }

功能点:

1. 服务启动后,自动向 控制台注册自己的 地址

2. 控制台注册 任务时,可以选择一种路由策略: 分片广播

分片广播的原理:

拿到 注册的所有的 服务IP,遍历执行 http请求。

对于分片广播,需要 再编写 Job时,完成 分片处理。xxlJob 不会给你的任务做分片,它只是 触发每台服务执行一下任务而已。

对于分片方法,可以自己定义:

任务数%机器数 == 当前机器编号 时,执行

3. 服务接收到任务请求后,会把任务 放在队列中,由 其他的线程异步执行。

四.Quartz

官网: 

http://www.quartz-scheduler.org/

在编写代码时,稍微复杂一下,需要实现Job接口,创建 JobDetail、Trigger、Scheduler 等来协助完成调度你的任务。

五、Elastic-Job 

需要用到 zookeeper来协助完成。目前为止没有用过。

评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值