前提
在做定时任务时,我们会精确到某一秒去执行我们的任务,那么XXL-Job是如何帮助我们精准执行呢?
利用单独的一个线程(ringThread),该线程的作用是从缓存中读取出当前时间该执行的定时任务,然后去执行,该缓存中存的是执行时间超前5秒内的定时任务,该缓存的数据结构是Map<Integer, List<QuartzJob>> ringData = new ConcurrentHashMap<>();(QuartzJob是我自定义的类,可以根据业务而定),这样的结构可以处理某一秒中多个任务的执行,相同时间的任务,放到了同一个list里面。
关键逻辑
从数据库中取到执行时间超前5秒内的定时任务存到该map中,不断的循环从缓存中取,这个map的key值范围是0-59,正好对应我们一分钟内的60秒,每秒对应map中的key值,例如,32秒的时候,取到的数据就是map中key值为32的数据,然后对其value进行操作(对list进行遍历,远程调用你的定时任务具体执行的代码或者是调用执行器,因业务而定)
存缓存代码
private volatile static Map<Integer, List<QuartzJob>> ringData = new ConcurrentHashMap<>();
// 获得当前秒
int ringSecond = (int) ((quartzJob.getTriggerNextTime() / 1000) % 60);
// 存到map中
pushTimeRing(ringSecond, quartzJob);
private void pushTimeRing(int ringSecond, QuartzJob quartzJob) {
// 向对应得key值中得list添加数据
List<QuartzJob> ringItemData = ringData.get(ringSecond);
if (ringItemData == null) {
ringItemData = new ArrayList<QuartzJob>();
ringData.put(ringSecond, ringItemData);
}
ringItemData.add(quartzJob);
Logit.debugLog(">>>>>>>>>>> xxl-job, schedule push time-ring : " + ringSecond + " = " + Arrays.asList(ringItemData));
}
取缓存代码
// ring thread
ringThread = new Thread(new Runnable() {
@Override
public void run() {
// align second
try {
TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis() % 1000);
} catch (InterruptedException e) {
if (!ringThreadToStop) {
Logit.errorLog(e.getMessage(), e);
}
}
while (!ringThreadToStop) {
try {
List<QuartzJob> ringItemData = new ArrayList<>();
// 得到当前时间(秒)
int nowSecond = Calendar.getInstance().get(Calendar.SECOND);
// 避免处理耗时太长,跨过刻度,向前校验一个刻度;
for (int i = 0; i < 2; i++) {
List<QuartzJob> tmpData = ringData.remove((nowSecond + 60 - i) % 60);
if (tmpData != null) {
ringItemData.addAll(tmpData);
}
}
// ring trigger
Logit.debugLog(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData));
if (ringItemData.size() > 0) {
// do trigger
for (QuartzJob quartzJob : ringItemData) {
// do trigger 调用JobTriggerPoolHelper.trigger,交给线程池去执行任务的具体的代码(可远程调用,看具体业务而定)
}
// clear
ringItemData.clear();
}
} catch (Exception e) {
if (!ringThreadToStop) {
Logit.errorLog(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e);
}
}
// next second, align second
try {
TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis() % 1000);
} catch (InterruptedException e) {
if (!ringThreadToStop) {
Logit.errorLog(e.getMessage(), e);
}
}
}
Logit.debugLog(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop");
}
});