/**
* 定时任务实现
*
* @author
*/
@Service
public class ScheduledTaskServiceImpl implements ScheduledTaskService {
/**
* 日志
*/
private static final Logger LOGGER = LoggerFactory.getLogger(ScheduledTaskServiceImpl.class);
/**
* 可重入锁
*/
private final ReentrantLock lock = new ReentrantLock();
/**
* 存放已经启动的任务map
*/
private final Map<String, ScheduledFuture<?>> scheduledFutureMap = new ConcurrentHashMap<>();
@Resource
private ScheduledTaskDao taskDao;
// 创建ThreadPoolTaskScheduler线程池
@Autowired
private ThreadPoolTaskScheduler threadPoolTaskScheduler;
/**
* 转换首字母小写
*
* @param str
* @return
*/
public static String lowerFirstCapse(String str) {
char[] chars = str.toCharArray();
chars[0] += 32;
return String.valueOf(chars);
}
@Bean
public ThreadPoolTaskScheduler threadPoolTaskScheduler() {
LOGGER.info("Create a timed task scheduling thread pool start");
//创建一个线程池调度器
ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
//设置线程池容量
threadPoolTaskScheduler.setPoolSize(20);
//线程名前缀
threadPoolTaskScheduler.setThreadNamePrefix("taskExecutor-");
//设置当任务被取消的同时从当前调度器移除的策略
threadPoolTaskScheduler.setWaitForTasksToCompleteOnShutdown(true);
//等待时常
threadPoolTaskScheduler.setAwaitTerminationSeconds(60);
LOGGER.info("Create a timed task scheduling thread pool end");
return threadPoolTaskScheduler;
}
/**
* 所有任务列表
*/
@Override
public List<ScheduleConfig> taskList() {
LOGGER.info(">>>>>> Get the task list to start >>>>>> ");
//数据库查询所有任务 => 未做分页
List<ScheduleConfig> taskBeanList = taskDao.getAllTask();
if (CollectionUtils.isEmpty(taskBeanList)) {
return new ArrayList<>();
}
for (ScheduleConfig taskBean : taskBeanList) {
String taskName = taskBean.getTaskName();
//是否启动标记处理
taskBean.setStartFlag(this.isStart(taskName));
}
return taskBeanList;
}
/**
* 根据任务Name 启动任务
*/
@Override
public Boolean start(String taskName) {
LOGGER.info(">>>>>> Starting tasks {} Begin >>>>>>", taskName);
//添加锁放一个线程启动,防止多人启动多次
lock.lock();
try {
//校验是否已经启动
if (this.isStart(taskName)) {
LOGGER.info(">>>>>> The current task is already started and there is no need to start it again!");
return false;
}
//根据Name数据库获取任务配置信息
ScheduleConfig scheduledTask = taskDao.getByName(taskName);
//启动任务
this.initTask(scheduledTask);
} finally {
// 释放锁
lock.unlock();
}
return true;
}
/**
* 根据 Name 停止任务
*/
@Override
public Boolean stop(String taskName) {
LOGGER.info(">>>>>> Enter the stop task {} >>>>>>", taskName);
//当前任务实例是否存在
boolean taskStartFlag = scheduledFutureMap.containsKey(taskName);
if (taskStartFlag) {
//获取任务实例
ScheduledFuture scheduledFuture = scheduledFutureMap.get(taskName);
//关闭实例
scheduledFuture.cancel(true);
}
return taskStartFlag;
}
/**
* 根据任务Name 重启任务
*/
@Override
public Boolean restart(String taskName) {
LOGGER.info(">>>>>> Enter the restart task {} >>>>>>", taskName);
//先停止
this.stop(taskName);
//再启动
return this.start(taskName);
}
/**
* 程序启动时初始化 ==> 启动所有正常状态的任务
*/
@Override
public void initAllTask(List<ScheduleConfig> scheduledTaskBeanList) {
LOGGER.info("init All Task begin !size={}", scheduledTaskBeanList.size());
if (CollectionUtils.isEmpty(scheduledTaskBeanList)) {
return;
}
for (ScheduleConfig scheduledTask : scheduledTaskBeanList) {
//任务 Name
String taskName = scheduledTask.getTaskName();
//校验是否已经启动
if (this.isStart(taskName)) {
continue;
}
//启动任务
this.initTask(scheduledTask);
}
}
/**
* 初始化
*/
@SneakyThrows
private void initTask(ScheduleConfig scheduleConfig) {
ScheduledFuture<?> future = threadPoolTaskScheduler.schedule(getRunnable(scheduleConfig), getTrigger(scheduleConfig));
//将启动的任务放入 map
scheduledFutureMap.put(scheduleConfig.getTaskName(), future);
}
/**
* 暂停任务
*
* @param Name
* @return
*/
public boolean pauseeTask(String Name) {
ScheduledFuture toBeRemovedFuture = scheduledFutureMap.remove(Name);
if (toBeRemovedFuture != null) {
toBeRemovedFuture.cancel(true);
return true;
} else {
return false;
}
}
/**
* 更新任务
*
* @param scheduleConfig
*/
public void updateTask(ScheduleConfig scheduleConfig) {
ScheduledFuture toBeRemovedFuture = scheduledFutureMap.remove(scheduleConfig.getTaskName());
if (toBeRemovedFuture != null) {
toBeRemovedFuture.cancel(true);
}
initTask(scheduleConfig);
}
/**
* runnable
*
* @param config
* @return
*/
private Runnable getRunnable(ScheduleConfig config) {
return new Runnable() {
@Override
public void run() {
Class<?> clazz;
try {
clazz = Class.forName(config.getClassName());
String className = lowerFirstCapse(clazz.getSimpleName());
Object bean = ApplicationContextHelper.getBean(className);
Method method = ReflectionUtils.findMethod(bean.getClass(), config.getMethod());
ReflectionUtils.invokeMethod(method, bean);
} catch (ClassNotFoundException e) {
LOGGER.error("invoke class: {},method: {} has an error ", config.getClassName(), config.getMethod());
}
}
};
}
/**
* Trigger
*
* @param scheduleConfig
* @return
*/
private Trigger getTrigger(ScheduleConfig scheduleConfig) {
return new Trigger() {
@Override
public Date nextExecutionTime(TriggerContext triggerContext) {
CronTrigger trigger = new CronTrigger(scheduleConfig.getTaskCron());
Date nextExec = trigger.nextExecutionTime(triggerContext);
return nextExec;
}
};
}
/**
* 任务是否已经启动
*/
private Boolean isStart(String taskName) {
//校验是否已经启动
if (scheduledFutureMap.containsKey(taskName)) {
return !scheduledFutureMap.get(taskName).isCancelled();
}
return false;
}
/**
* 打印正在启动的任务
*/
@Override
public String printlnTask() {
String printlnTask = JSON.toJSONString(scheduledFutureMap);
LOGGER.info("current Running Task {},size ={}", printlnTask, scheduledFutureMap.size());
return printlnTask;
}
}
完整的代码实现如下