异步注解 @Async + 自定义线程池

异步注解 @Async

1、启动类开启异步线程

启动类 上添加或者 自定义线程池 上添加注解:@EnableAsync

2、配置线程池

示例:

/**
 * 全局异步任务配置类
 *
 * @author chenmeng
 */
@Slf4j
@EnableAsync
@Configuration
public class GlobalAsyncConfig implements AsyncConfigurer {

    /**
     * 创建自定义异步线程池
     *
     * @return Executor
     */
    @Bean(name = "asyncExecutor")
    public Executor asyncExecutor() {
        // 1、核心线程数
        int corePoolSize = 10;
        // 2、最大线程数
        int maxPoolSize = 50;
        // 3、非核心线程空闲存活时间
        long keepAliveTime = 60L;
        // 4、时间单位: 秒
        TimeUnit unit = TimeUnit.SECONDS;
        // 5、工作队列
        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
        // 6、设置线程工厂(线程名称格式)
        ThreadFactory threadFactory = new ThreadFactoryBuilder()
                .setNameFormat("async-pool-%d")
                .build();
        // 7、设置拒绝策略
        ThreadPoolExecutor.CallerRunsPolicy rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy();

        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                corePoolSize,
                maxPoolSize,
                keepAliveTime,
                unit,
                workQueue,
                threadFactory,
                rejectedExecutionHandler
        );

        // 允许核心线程超时
        executor.allowCoreThreadTimeOut(true);

        return executor;
    }

    @Override
    public Executor getAsyncExecutor() {
        // 返回自定义线程池
        return asyncExecutor();
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        // 设置异步执行时的异常处理器
        return new GlobalAsyncUncaughtExceptionHandler();
    }

    public static class GlobalAsyncUncaughtExceptionHandler implements AsyncUncaughtExceptionHandler {
        @Override
        public void handleUncaughtException(Throwable ex, Method method, @NotNull Object... params) {
            log.error("[handleUncaughtException][method({}) params({}) 发生异常]", method.getName(), Arrays.deepToString(params), ex);
        }
    }
}

在这个配置类中,我们完成了以下内容:

  1. 通过 @Configuration 注解标注这个类是一个配置类。
  2. 通过 @EnableAsync 注解开启异步支持。
  3. 实现 AsyncConfigurer 接口,以自定义异步执行器和异常处理器。
  4. 创建了一个 ThreadPoolExecutor 作为自定义线程池,配置了核心线程数、最大线程数、空闲存活时间、工作队列等参数。
  5. 使用 ThreadFactoryBuilder 创建了一个命名的线程工厂。
  6. 设置了拒绝策略为 CallerRunsPolicy,这意味着如果线程池达到饱和状态,任务将由调用者线程执行。
  7. 实现了自定义的异常处理器 SimpleAsyncUncaughtExceptionHandler,用于处理异步方法执行中的未捕获异常。

3、方法体上添加异步注解

示例

@Async
public void test() {
  
}

注意事项

  • 在内部类之间的方法调用,此注解无效,不会有异步线程
  • 需要外部的类来调用这个异步方法才会开启异步线程

4、不使用自定义线程池可能引起的问题

在 Spring 框架中,@Async 注解可以用于异步执行方法。

当你使用 @Async 注解时,如果不自定义线程池,Spring 会为你自动创建一个默认的线程池。这个默认线程池是由 SimpleAsyncTaskExecutor 管理的,它为每个任务创建一个新的线程。

虽然这可以工作,但可能会遇到以下问题:

  1. 无限制的线程创建SimpleAsyncTaskExecutor 会为每个任务创建一个新的线程,而没有最大线程数的限制。如果异步任务的数量非常多,这可能导致大量的线程被创建,消耗大量的系统资源,最终可能导致 OutOfMemoryError 或降低系统性能。

  2. 线程管理:由于每次调用都会创建新线程,没有线程复用,这可能会导致线程管理上的开销,尤其是在高并发场景下。

  3. 调试和监控困难:默认线程池创建的线程名称没有明确的命名规则,这可能会使得在日志中或监控工具中跟踪异步任务变得困难。

  4. 资源竞争:大量的线程可能会引起CPU和内存资源的激烈竞争,尤其是在JVM和操作系统层面上的上下文切换。

  5. 安全性问题:如果异步任务执行的时间过长,而默认线程池没有适当的管理策略,可能会因为线程过多而影响到系统的稳定性和安全性。

为了避免这些问题,建议自定义线程池。自定义线程池可以提供以下优势:

  • 可控的线程数量:通过配置最大线程数来限制并发执行的任务数量,避免过度创建线程。
  • 线程复用:线程池可以重用已存在的线程来执行新的任务,减少了线程创建和销毁的开销。
  • 优雅的关闭:可以配置线程池的关闭策略,确保在应用程序关闭时,正在执行的任务能够优雅地完成或中断。
  • 更好的监控和调试:自定义线程池可以设置有意义的线程名称,便于在日志和监控系统中识别和跟踪。

在 Spring 中自定义线程池通常涉及到配置一个 ThreadPoolTaskExecutor 或者实现一个 AsyncConfigurer 接口,并定义线程池的各种参数,如核心线程数、最大线程数、队列容量等。这样可以更好地控制异步任务的执行和资源的使用。

如何自定义一个适合我项目的线程池?

自定义线程池以适应您的项目需求,需要考虑以下几个关键因素:

  1. 核心线程数(Core Pool Size)

    • 核心线程是线程池中始终保持活跃的线程,即使它们处于空闲状态。通常,核心线程数取决于任务的类型(CPU密集型、IO密集型或混合型)以及系统的并发需求。
  2. 最大线程数(Maximum Pool Size)

    • 最大线程数是线程池中允许的最大线程数。当所有核心线程都在忙碌时,并且任务队列已满,线程池会创建新的线程,直到达到这个数值。
  3. 工作队列(Work Queue)

    • 工作队列用于存储等待执行的任务。队列的大小可以是有限或无限的。选择合适的队列类型(如 ArrayBlockingQueueLinkedBlockingQueueSynchronousQueue 等)对于控制线程池的行为至关重要。
  4. 线程存活时间(Keep-Alive Time)

    • 当线程池中的线程数超过核心线程数时,非核心线程在空闲状态下可以存活的时间。这个参数可以用来控制线程的生命周期,以避免长时间保持不必要的线程。
  5. 时间单位(Time Unit)

    • 设置线程存活时间的单位
  6. 线程工厂(Thread Factory)

    • 线程工厂用于创建新线程。通过自定义线程工厂,可以设置线程的名称、优先级和其他属性,这有助于调试和监控。
  7. 拒绝策略(Rejected Execution Handler)

    • 当任务太多,无法被线程池及时处理时,拒绝策略决定了如何处理这些额外的任务。常见的拒绝策略包括 ThreadPoolExecutor.AbortPolicyCallerRunsPolicyDiscardPolicyDiscardOldestPolicy
  8. 任务类型

    • 根据任务的特性(CPU密集型、IO密集型或混合型)来调整线程池的参数。CPU密集型任务通常需要较少的线程,而IO密集型任务可能需要更多的线程。

    • // 示例
      	@Override
          public Executor getAsyncExecutor() {
              ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
              executor.setCorePoolSize(determineCorePoolSize()); // 根据任务类型确定核心线程数
              executor.setMaxPoolSize(determineMaxPoolSize()); // 根据任务类型确定最大线程数
              executor.setQueueCapacity(determineQueueCapacity()); // 根据任务类型确定队列容量
              executor.setThreadNamePrefix("async-task-"); // 线程名称前缀
              executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // 拒绝策略
              executor.initialize();
              return executor;
          }
      
          private int determineCorePoolSize() {
              // 根据任务类型和系统资源确定核心线程数
              // 例如,对于CPU密集型任务,可以返回Runtime.getRuntime().availableProcessors();
              // 对于IO密集型任务,可以返回更高的值
          }
      
          private int determineMaxPoolSize() {
              // 根据任务类型和系统资源确定最大线程数
          }
      
          private int determineQueueCapacity() {
              // 根据任务类型确定队列容量
          }
      

为什么需要自定义线程池?

@Async 默认使用 SimpleAsyncTaskExecutor

  • SimpleAsyncTaskExecutor 的问题
    • @Async 默认使用 无界线程池每次请求都会创建新线程,可能会导致线程资源耗尽,最终引发 OOM
    • 没有线程复用机制,对高并发业务不友好。

自定义线程池的好处

  1. 解决默认线程池的局限性
    • 自定义线程池可以控制线程数量防止资源耗尽,提高系统的吞吐量和稳定性
  2. 提高 @Async 任务执行的可控性
    • 默认 @Async 线程池可能会无限创建线程,自定义线程池后,可以设定最大线程数,防止应用崩溃

线程池参数的调优建议

参数作用推荐值(取决于业务场景)
corePoolSize核心线程数,长期存活CPU 密集型:CPU 核心数IO 密集型:2x CPU 核心数
maxPoolSize最大线程数corePoolSize * 2 或更多(高并发场景)
queueCapacity任务队列大小corePoolSize * 10(视情况而定)
keepAliveTime非核心线程存活时间60s
ThreadFactory自定义线程名称方便排查问题
RejectedExecutionHandler线程池满时的策略CallerRunsPolicy(让调用线程执行任务)

更多拒绝策略相关的内容:线程池详解 | 沉梦听雨的编程指南

学习参考

评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值