1. 自定义线程池
package net.genesysinfo.digitalreport.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
/**
*
*一、ThreadPoolExecutor核心参数说明
* 1、corePoolSize:核心线程数
* * 核心线程会一直存活,及时没有任务需要执行
* * 当线程数小于核心线程数时,即使有线程空闲,线程池也会优先创建新线程处理
* * 设置allowCoreThreadTimeout=true(默认false)时,核心线程会超时关闭
*
* 2、queueCapacity:任务队列容量(阻塞队列)
* * 当核心线程数达到最大时,新任务会放在队列中排队等待执行
*
* 3、maxPoolSize:最大线程数
* * 当线程数>=corePoolSize,且任务队列已满时。线程池会创建新线程来处理任务
* * 当线程数=maxPoolSize,且任务队列已满时,线程池会拒绝处理任务而抛出异常
*
* 4、 keepAliveTime:线程空闲时间
* * 当线程空闲时间达到keepAliveTime时,线程会退出,直到线程数量=corePoolSize
* * 如果allowCoreThreadTimeout=true,则会直到线程数量=0
*
* 5、allowCoreThreadTimeout:允许核心线程超时
* 6、rejectedExecutionHandler:任务拒绝处理器
* * 两种情况会拒绝处理任务:
* - 当线程数已经达到maxPoolSize,切队列已满,会拒绝新任务
* - 当线程池被调用shutdown()后,会等待线程池里的任务执行完毕,再shutdown。如果在调用shutdown()和线程池真正shutdown之间提交任务,会拒绝新任务
* * 线程池会调用rejectedExecutionHandler来处理这个任务。如果没有设置默认是AbortPolicy,会抛出异常
* * ThreadPoolExecutor类有几个内部实现类来处理这类情况:
* - AbortPolicy 丢弃任务,抛运行时异常
* - CallerRunsPolicy 执行任务
* - DiscardPolicy 忽视,什么都不会发生
* - DiscardOldestPolicy 从队列中踢出最先进入队列(最后一个执行)的任务
* * 实现RejectedExecutionHandler接口,可自定义处理器
*
*二、ThreadPoolExecutor执行顺序
* 线程池按以下行为执行任务
* 1. 当线程数小于核心线程数时,创建线程。
* 2. 当线程数大于等于核心线程数,且任务队列未满时,将任务放入任务队列。
* 3. 当线程数大于等于核心线程数,且任务队列已满
* -1 若线程数小于最大线程数,创建线程
* -2 若线程数等于最大线程数,抛出异常,拒绝任务
*
*三、ThreadPoolExecutor如何设置参数
* 1、默认值
* * corePoolSize=1
* * queueCapacity=Integer.MAX_VALUE
* * maxPoolSize=Integer.MAX_VALUE
* * keepAliveTime=60s
* * allowCoreThreadTimeout=false
* * rejectedExecutionHandler=AbortPolicy()
*
* 2、如何来设置
* * 需要根据几个值来决定
* - tasks :每秒的任务数,假设为1000
* - taskcost:每个任务花费时间,假设为0.1s
* - responsetime:系统允许容忍的最大响应时间,假设为1s
* * 做几个计算
* - corePoolSize = 每秒需要多少个线程处理?
* * 一颗CPU核心同一时刻只能执行一个线程,然后操作系统切换上下文,核心开始执行另一个线程的代码,以此类推,超过cpu核心数,就会放入队列,如果队列也满了,就另起一个新的线程执行,所有推荐:corePoolSize = ((cpu核心数 * 2) + 有效磁盘数),java可以使用Runtime.getRuntime().availableProcessors()获取cpu核心数
* - queueCapacity = (coreSizePool/taskcost)*responsetime
* * 计算可得 queueCapacity = corePoolSize/0.1*1。意思是队列里的线程可以等待1s,超过了的需要新开线程来执行
* * 切记不能设置为Integer.MAX_VALUE,这样队列会很大,线程数只会保持在corePoolSize大小,当任务陡增时,不能新开线程来执行,响应时间会随之陡增。
* - maxPoolSize = (max(tasks)- queueCapacity)/(1/taskcost)
* * 计算可得 maxPoolSize = (1000-corePoolSize)/10,即(每秒并发数-corePoolSize大小) / 10
* * (最大任务数-队列容量)/每个线程每秒处理能力 = 最大线程数
* - rejectedExecutionHandler:根据具体情况来决定,任务不重要可丢弃,任务重要则要利用一些缓冲机制来处理
* - keepAliveTime和allowCoreThreadTimeout采用默认通常能满足
*
* 1.计算密集型线程池
*
* cpu使用率较高(也就是一些复杂运算,逻辑处理),所以线程数一般只需要cpu核数的线程就可以了。 这一类型的在开发中多出现的一些业务复杂计算和逻辑处理过程中。
*
* 数量一般为 N+1个 N为CPU核心数
*
* 2.IO密集型
*
* cpu使用率较低,程序中会存在大量I/O操作占据时间,导致线程空余时间出来,所以通常就需要开cpu核数的两倍的线程, 当线程进行I/O操作cpu空暇时启用其他线程继续使用cpu,提高cpu使用率 通过上述可以总结出:线程的最佳数量: 最佳线程数目 = ((线程等待时间+线程CPU时间)/线程CPU时间 )* CPU数目 线程等待时间所占比例越高,需要越多线程。线程CPU时间所占比例越高,需要越少线程。这一类型在开发中主要出现在一些读写操作频繁的业务逻辑中。
*
* 数量一般为:2N + 1个 N为CPU核心数
*
*
* 注意事项
* @Async注解会在以下几个场景失效,也就是说明明使用了@Async注解,但就没有走多线程。
*
* 异步方法使用static关键词修饰;
* 异步类不是一个Spring容器的bean(一般使用注解@Component和@Service,并且能被Spring扫描到);
* SpringBoot应用中没有添加@EnableAsync注解;
* 在同一个类中,一个方法调用另外一个有@Async注解的方法,注解不会生效。原因是@Async注解的方法,是在代理类中执行的。
* 需要注意的是: 异步方法使用注解@Async的返回值只能为void或者Future及其子类,当返回结果为其他类型时,方法还是会异步执行,但是返回值都是null
*
* @author coco
* @date 2021/11/16
*/
@Configuration
@EnableAsync
public class AsyncConfiguration {
@Bean("taskExecutor")
public Executor asyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 核心线程数:线程池创建时候初始化的线程数
executor.setCorePoolSize(10);
// 最大线程数:线程池最大的线程数,只有在缓冲队列满了之后才会申请超过核心线程数的线程
executor.setMaxPoolSize(20);
// 缓冲队列:用来缓冲执行任务的队列
executor.setQueueCapacity(500);
// 允许线程的空闲时间60秒:当超过了核心线程之外的线程在空闲时间到达之后会被销毁
executor.setKeepAliveSeconds(60);
// 线程池名的前缀:设置好了之后可以方便我们定位处理任务所在的线程池
executor.setThreadNamePrefix("task-");
// 缓冲队列满了之后的拒绝策略:由调用线程处理(一般是主线程)
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
// 初始化线程
executor.initialize();
return executor;
}
}
2. 在需要使用多线程的方法加上@Async注解,并指定使用自定义线程池
// 指定使用beanname为doSomethingExecutor的线程池
@Async("taskExecutor")
public String doSomething(String message) {
log.info("do something, message={}", message);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
log.error("do something error: ", e);
}
return message;
}