使用mysql实现定时任务队列

使用mysql实现定时任务队列


1、首先创建一个叫MysqlQueue的类封装对mysql队列的操作。这个类其实比较简单,提供了take方法和pull方法来加载以及释放mysql中的记录。

@Component
public class MysqlQueue {
    @Autowired
    private DataSetService dataSetService;
    @Autowired
    private SyncLogService syncLogService;

    @Transactional
    public DatasetTask take() {
        //加载mysql定时队列的任务
        QueryWrapper queryWrapper = new QueryWrapper();
        queryWrapper.in("sync_state", "等待运行的状态");
        //根据sync_next_time进行排序,获取最近要执行的任务
        queryWrapper.orderBy(true, true, "sync_next_time");
        //使用排它锁,确保不会被其他加锁线程拿到
        queryWrapper.last("limit 1 for update");
        DhDataSet one = dataSetService.findOne(queryWrapper);
        if (one == null)
            return null;
        Integer oldState = one.getSyncState();
        one.setSyncState("就绪状态");

        //更新并释放行级锁
        dataSetService.updateById(one);
        //把数据库中的记录封装成task
        return new DatasetTask(one, dataSetService, oldState, dbDatasetSyncLog.getVersion(), dbDatasetSyncLog.getTimingTime(), syncLogService);
    }

    //把任务放回到MySQL队列(实际是修改记录的执行状态为等待状态)
    public void pull(DatasetTask task) {
        task.pull();
    }
}

接下来看看封装的Task任务类,这个类实现Runnable接口。run方法就是线程池里要执行的业务方法。getDelay方法用于获取任务的延时时间,schedule方法则是在任务执行完以后,重新计算下一次的执行时间。

@Slf4j
@Getter
public class DatasetTask implements Runnable {
    private DataSetService dataSetService;
    private DhDataSet dhDataSet;
    private Integer oldState;

    public DatasetTask(DhDataSet dhDataSet, DataSetService dataSetService, Integer oldState) {
        this.dhDataSet = dhDataSet;
        this.dataSetService = dataSetService;
        this.oldState = oldState;
    }

    @Override
    public void run() {
        try {
        //执行定时任务
            dhDataSet = dataSetService.excute(dhDataSet.getResourceId());
        } catch (Exception e) {
            log.error("同步定时任务失败{}:{}", dhDataSet.getResourceId(), e.getMessage(), e);
        }
        schedule(dhDataSet);
    }


    /**
     * 计算下次执行时间
     *
     * @param dhDataSet
     */
    protected void schedule(DhDataSet dhDataSet) {
        //拿到spring的corn表达式用于计算下次执行时间
        String corn = dhDataSet.getSyncCorn();
        if (Objects.equals(corn, CornUtil.PAUSE)) {
            dhDataSet.setSyncState(SyncState.stop.getCode());
        }

        CronTrigger trigger = new CronTrigger(corn);
        TriggerContext triggerContext = new SimpleTriggerContext();
        //计算下次执行时间
        Date scheduledExecutionTime = trigger.nextExecutionTime(triggerContext);
        if (scheduledExecutionTime == null) {
            throw new WarnException("同步时间设置失败");
        }
        dhDataSet.setSyncUpdateTime(LocalDateTime.now());
        dhDataSet.setSyncNextTime(scheduledExecutionTime);

        //不更新同步状态字段
        dhDataSet.setSyncState(null);
        dataSetService.updateById(dhDataSet);

    }

    public Long getDelay() {
        Date syncNextTime = dhDataSet.getSyncNextTime();
        //拿到下次执行时间,减去当前时间,得到延时时间
        return dhDataSet.getSyncNextTime().getTime() - new Date().getTime();
    }

    public void pull() {
        //没到执行时间,把状态改回去
        DhDataSet dhDataSet = getDhDataSet();
        dhDataSet.setSyncState(getOldState());
        dataSetService.updateById(dhDataSet);
    }
}

接下来就是定时任务的线程池部分,代码也不是很复杂就不作太多解析。

/**
 * 数据集定时任务
 */
public class DhThreadPoolTaskScheduler {
    private volatile int poolSize = 1;//核心线程数,默认是1
    private MysqlQueue mysqlQueue;//mysql队列
    private Set<Thread> threads;
    private AtomicInteger alive = new AtomicInteger(0);
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition available = lock.newCondition();

    public DhThreadPoolTaskScheduler(MysqlQueue mysqlQueue) {
        this(null, mysqlQueue);
    }

    public DhThreadPoolTaskScheduler(Integer poolSize, MysqlQueue mysqlQueue) {
        this.mysqlQueue = mysqlQueue;
        if (poolSize != null)
            this.poolSize = poolSize;
        initThreads();
    }


    /**
     * 初始化线程
     */
    private void initThreads() {
        threads = new HashSet<>(poolSize);
        for (int i = 0; i < poolSize; i++) {
            Thread thread = new Thread(new TimingTask());
            threads.add(thread);
            thread.start();
            alive.incrementAndGet();
        }
    }

    /**
     * 唤醒线程
     *
     * @throws InterruptedException
     */
    public void signal() throws InterruptedException {
        try {
            lock.lockInterruptibly();
            //核心线程不满则创建新线程
            if (alive.get() < poolSize) {
                Thread thread = new Thread(new TimingTask());
                threads.add(thread);
                thread.start();
                alive.incrementAndGet();
                //清除已经停止的线程
                Set<Thread> notAlive = new HashSet<>();
                threads.forEach(t -> {
                    if (!t.isAlive()) {
                        notAlive.add(t);
                    }
                });

                threads.removeAll(notAlive);
            } else {
                available.signal();
            }
        } finally {
            lock.unlock();
        }
    }

    class TimingTask implements Runnable {

        @SneakyThrows
        @Override
        public void run() {

            try {
                lock.lockInterruptibly();
                for (; ; ) {
                    //take需使用行锁保证原子性
                    DatasetTask task = mysqlQueue.take();
                    if (task == null) {
                        //没有可执行任务,阻塞
                        available.await();
                    } else {
                        Long delay = task.getDelay();
                        //有延时时间就睡眠等待
                        if (delay > 0 || delay == -1l) {
                            //放回队列
                            mysqlQueue.pull(task);
                            available.await(delay, TimeUnit.MILLISECONDS);
                            //线程被唤醒后,从队列获取新任务重新计算睡眠时间
                            continue;
                        }
                        //执行业务逻辑
                        task.run();
                    }
                }
            } catch (InterruptedException e) {

            } finally {
                try {
                    alive.decrementAndGet();
                    signal();
                } catch (InterruptedException e) {
                } finally {
                    lock.unlock();
                }

            }

        }
    }


}
评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值