使用mysql实现定时任务队列
1、首先创建一个叫MysqlQueue的类封装对mysql队列的操作。这个类其实比较简单,提供了take方法和pull方法来加载以及释放mysql中的记录。
@Component
public class MysqlQueue {
@Autowired
private DataSetService dataSetService;
@Autowired
private SyncLogService syncLogService;
@Transactional
public DatasetTask take() {
//加载mysql定时队列的任务
QueryWrapper queryWrapper = new QueryWrapper();
queryWrapper.in("sync_state", "等待运行的状态");
//根据sync_next_time进行排序,获取最近要执行的任务
queryWrapper.orderBy(true, true, "sync_next_time");
//使用排它锁,确保不会被其他加锁线程拿到
queryWrapper.last("limit 1 for update");
DhDataSet one = dataSetService.findOne(queryWrapper);
if (one == null)
return null;
Integer oldState = one.getSyncState();
one.setSyncState("就绪状态");
//更新并释放行级锁
dataSetService.updateById(one);
//把数据库中的记录封装成task
return new DatasetTask(one, dataSetService, oldState, dbDatasetSyncLog.getVersion(), dbDatasetSyncLog.getTimingTime(), syncLogService);
}
//把任务放回到MySQL队列(实际是修改记录的执行状态为等待状态)
public void pull(DatasetTask task) {
task.pull();
}
}
接下来看看封装的Task任务类,这个类实现Runnable接口。run方法就是线程池里要执行的业务方法。getDelay方法用于获取任务的延时时间,schedule方法则是在任务执行完以后,重新计算下一次的执行时间。
@Slf4j
@Getter
public class DatasetTask implements Runnable {
private DataSetService dataSetService;
private DhDataSet dhDataSet;
private Integer oldState;
public DatasetTask(DhDataSet dhDataSet, DataSetService dataSetService, Integer oldState) {
this.dhDataSet = dhDataSet;
this.dataSetService = dataSetService;
this.oldState = oldState;
}
@Override
public void run() {
try {
//执行定时任务
dhDataSet = dataSetService.excute(dhDataSet.getResourceId());
} catch (Exception e) {
log.error("同步定时任务失败{}:{}", dhDataSet.getResourceId(), e.getMessage(), e);
}
schedule(dhDataSet);
}
/**
* 计算下次执行时间
*
* @param dhDataSet
*/
protected void schedule(DhDataSet dhDataSet) {
//拿到spring的corn表达式用于计算下次执行时间
String corn = dhDataSet.getSyncCorn();
if (Objects.equals(corn, CornUtil.PAUSE)) {
dhDataSet.setSyncState(SyncState.stop.getCode());
}
CronTrigger trigger = new CronTrigger(corn);
TriggerContext triggerContext = new SimpleTriggerContext();
//计算下次执行时间
Date scheduledExecutionTime = trigger.nextExecutionTime(triggerContext);
if (scheduledExecutionTime == null) {
throw new WarnException("同步时间设置失败");
}
dhDataSet.setSyncUpdateTime(LocalDateTime.now());
dhDataSet.setSyncNextTime(scheduledExecutionTime);
//不更新同步状态字段
dhDataSet.setSyncState(null);
dataSetService.updateById(dhDataSet);
}
public Long getDelay() {
Date syncNextTime = dhDataSet.getSyncNextTime();
//拿到下次执行时间,减去当前时间,得到延时时间
return dhDataSet.getSyncNextTime().getTime() - new Date().getTime();
}
public void pull() {
//没到执行时间,把状态改回去
DhDataSet dhDataSet = getDhDataSet();
dhDataSet.setSyncState(getOldState());
dataSetService.updateById(dhDataSet);
}
}
接下来就是定时任务的线程池部分,代码也不是很复杂就不作太多解析。
/**
* 数据集定时任务
*/
public class DhThreadPoolTaskScheduler {
private volatile int poolSize = 1;//核心线程数,默认是1
private MysqlQueue mysqlQueue;//mysql队列
private Set<Thread> threads;
private AtomicInteger alive = new AtomicInteger(0);
private final ReentrantLock lock = new ReentrantLock();
private final Condition available = lock.newCondition();
public DhThreadPoolTaskScheduler(MysqlQueue mysqlQueue) {
this(null, mysqlQueue);
}
public DhThreadPoolTaskScheduler(Integer poolSize, MysqlQueue mysqlQueue) {
this.mysqlQueue = mysqlQueue;
if (poolSize != null)
this.poolSize = poolSize;
initThreads();
}
/**
* 初始化线程
*/
private void initThreads() {
threads = new HashSet<>(poolSize);
for (int i = 0; i < poolSize; i++) {
Thread thread = new Thread(new TimingTask());
threads.add(thread);
thread.start();
alive.incrementAndGet();
}
}
/**
* 唤醒线程
*
* @throws InterruptedException
*/
public void signal() throws InterruptedException {
try {
lock.lockInterruptibly();
//核心线程不满则创建新线程
if (alive.get() < poolSize) {
Thread thread = new Thread(new TimingTask());
threads.add(thread);
thread.start();
alive.incrementAndGet();
//清除已经停止的线程
Set<Thread> notAlive = new HashSet<>();
threads.forEach(t -> {
if (!t.isAlive()) {
notAlive.add(t);
}
});
threads.removeAll(notAlive);
} else {
available.signal();
}
} finally {
lock.unlock();
}
}
class TimingTask implements Runnable {
@SneakyThrows
@Override
public void run() {
try {
lock.lockInterruptibly();
for (; ; ) {
//take需使用行锁保证原子性
DatasetTask task = mysqlQueue.take();
if (task == null) {
//没有可执行任务,阻塞
available.await();
} else {
Long delay = task.getDelay();
//有延时时间就睡眠等待
if (delay > 0 || delay == -1l) {
//放回队列
mysqlQueue.pull(task);
available.await(delay, TimeUnit.MILLISECONDS);
//线程被唤醒后,从队列获取新任务重新计算睡眠时间
continue;
}
//执行业务逻辑
task.run();
}
}
} catch (InterruptedException e) {
} finally {
try {
alive.decrementAndGet();
signal();
} catch (InterruptedException e) {
} finally {
lock.unlock();
}
}
}
}
}