Async 默认线程池真坑!

文章内容已经收录在《高级技术专家成长笔记》,欢迎订阅专栏!

从原理出发,直击面试难点,实现更高维度的降维打击!

目录

  • [前言]
  • [@Async 使用]
    • [自定义线程池]
  • [开启异步]
  • [后置处理器]
    • [创建 Advisor]
    • [后置处理逻辑]
  • [@Async 注解方法的拦截]
    • [1、获取异步任务执行器]
    • [2、将方法封装为异步任务]
    • [3、提交异步任务]
  • [@Async 原理总结]

@Async 原理分析

前言

@Async 由 Spring 框架提供,被该注解标注的类或方法会由 异步线程 执行。

因此,在项目中,需要异步执行的地方通过 @Async 指定非常方便,但正是因为这种便利性,让开发人员对底层流程没有任何感知。

如果不了解底层原理,直接使用 @Async ,那就可能会导致在未来的某一时刻出现服务不可用的情况。因为 @Async 默认使用 Spring 的线程池会为每一个异步任务创建一个线程执行,有 1w 个任务就会创建 1w 个线程,不会对线程进行复用,开销巨大。

在初期代码上线时可能不会出现问题,某个时间当并发量突然增加,就会导致应用负载增加,出现部分接口超时的问题。

@Async 使用

@Async 注解的使用需要两个步骤:

  1. 在启动类上添加注解 @EnableAsync ,开启异步任务。
  2. 在需要异步执行的方法或类上添加注解 @Async

@EnableAsync 注解开启了 Spring 异步执行器,@Async 注解只是起一个标记作用,用来标识哪些方法需要异步执行。 接下来 @Async 原理分析会以 @EnableAsync 注解为入口进行分析。

自定义线程池

@Async 注解中可以指定执行异步任务的线程池。如果不指定线程池,就会使用 Spring 默认的线程池 SimpleAsyncTaskExecutor ,会存在很多问题。

因此在使用 @Async 注解时务必要指定自定义的线程池。

@Async 注解标注方法的返回值被限制为 void 或者 Future ,其他类型的返回值在方法执行完毕之后无法获取。使用示例如下:

// 不指定线程池
@Async
public void testAsync() {}

// 指定线程池 & 返回值为 void
@Async("myThreadPool")
public void testAsync() {}

// 指定线程池 & 返回值为 Future
@Async("myThreadPool")
public Future testAsync() {}

开启异步

使用 @Async 之前,需要在启动类上添加 @EnableAsync 来开启异步,@EnableAsync 注解如下:

// 省略其他注解 ...
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync { /* ... */ }

@EnableAsync 注解上通过 @Import 注解引入了 AsyncConfigurationSelector ,因此 Spring 会去加载通过 @Import 注解引入的类。

AsyncConfigurationSelector 类实现了 ImportSelector 接口,因此在该类中会重写 selectImports() 方法来自定义加载 Bean 的逻辑,如下:

public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {
	@Override
	@Nullable
	public String[] selectImports(AdviceMode adviceMode) {
		switch (adviceMode) {
            // 基于 JDK 代理织入的通知
			case PROXY:
				return new String[] {ProxyAsyncConfiguration.class.getName()};
            // 基于 AspectJ 织入的通知
			case ASPECTJ:
				return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};
			default:
				return null;
		}
	}
}

selectImports() 方法中,会根据通知的不同类型来选择加载不同的类,其中 adviceMode 默认值为 PROXY

这里以基于 JDK 代理的通知为例,此时会加载 ProxyAsyncConfiguration 类,如下:

@Configuration
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {
	@Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)
	@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
	public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
		// ...
        
        // 加载后置处理器
		AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();

        // ...
		return bpp;
	}
}

后置处理器

ProxyAsyncConfiguration 类中,会通过 @Bean 注解加载一个后置处理器 AsyncAnnotationBeanPostProcessor ,这个后置处理器是使 @Async 注解起作用的关键。

如果某一个类或者方法上使用了 @Async 注解,AsyncAnnotationBeanPostProcessor 处理器就会为该类创建一个动态代理。

该类的方法在执行时,会被代理对象的拦截器所拦截,其中被 @Async 注解标记的方法会异步执行。

AsyncAnnotationBeanPostProcessor 代码如下:

public class AsyncAnnotationBeanPostProcessor extends AbstractBeanFactoryAwareAdvisingPostProcessor {
	@Override
	public void setBeanFactory(BeanFactory beanFactory) {
		super.setBeanFactory(beanFactory);
	·	
        // 创建 Advisor
		AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
		if (this.asyncAnnotationType != null) {
			advisor.setAsyncAnnotationType(this.asyncAnnotationType);
		}
		advisor.setBeanFactory(beanFactory);
		this.advisor = advisor;
	}
}

AsyncAnnotationBeanPostProcessor 的父类实现了 BeanFactoryAware 接口,因此在该类中重写了 setBeanFactory() 方法作为扩展点,来加载 AsyncAnnotationAdvisor

创建 Advisor

AdvisorSpring AOPAdvicePointcut 的抽象。Advice 为执行的通知逻辑,Pointcut 为通知执行的切入点。

在后置处理器 AsyncAnnotationBeanPostProcessor 中会去创建 AsyncAnnotationAdvisor , 在它的构造方法中,会构建对应的 AdvicePointcut ,如下:

public class AsyncAnnotationAdvisor extends AbstractPointcutAdvisor implements BeanFactoryAware {

	private Advice advice;
	private Pointcut pointcut;

    // 构造方法
	public AsyncAnnotationAdvisor(/* 参数省略 */) {
		// 1、构建 Advice
		this.advice = buildAdvice(executor, exceptionHandler);
        // 2、构建 Pointcut
		this.pointcut = buildPointcut(asyncAnnotationTypes);
	}

    // 构建 Advice
	protected Advice buildAdvice(/* 参数省略 */) {
		// 创建 AnnotationAsyncExecutionInterceptor 
		AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);
		interceptor.configure(executor, exceptionHandler);
		return interceptor;
	}

	// 构建 Pointcut
	protected Pointcut buildPointcut(Set<Class<? extends Annotation>> asyncAnnotationTypes) {
		ComposablePointcut result = null;
		for (Class<? extends Annotation> asyncAnnotationType : asyncAnnotationTypes) {
            // 1、类匹配(类上有对应注解的话,该类的所有方法都匹配)
			Pointcut cpc = new AnnotationMatchingPointcut(asyncAnnotationType, true);
            // 2、方法匹配(只有方法上有对应注解才匹配)
			Pointcut mpc = new AnnotationMatchingPointcut(null, asyncAnnotationType, true);
			if (result == null) {
				result = new ComposablePointcut(cpc);
			}
			else {
				result.union(cpc);
			}
            // 3、使用 union 取 cpc 和 mpc 两个 Pointcut 的并集。
			result = result.union(mpc);
		}
		return (result != null ? result : Pointcut.TRUE);
	}

}

AsyncAnnotationAdvisor 的核心在于构建 AdvicePointcut

  • 构建 Advice :会创建 AnnotationAsyncExecutionInterceptor 拦截器,在拦截器的 invoke() 方法中会执行通知的逻辑。
  • 构建 Pointcut :由 ClassFilterMethodMatcher 组成,用于匹配哪些方法需要执行通知( Advice )的逻辑。
后置处理逻辑

AsyncAnnotationBeanPostProcessor 后置处理器中实现的 postProcessAfterInitialization() 方法在其父类 AbstractAdvisingBeanPostProcessor 中,在 Bean 初始化之后,会进入到 postProcessAfterInitialization() 方法进行后置处理。

在后置处理方法中,会判断 Bean 是否符合后置处理器中 Advisor 通知的条件,如果符合,则创建代理对象。如下:

// AbstractAdvisingBeanPostProcessor
public Object postProcessAfterInitialization(Object bean, String beanName) {
	if (this.advisor == null || bean instanceof AopInfrastructureBean) {
		return bean;
	}
	if (bean instanceof Advised) {
		Advised advised = (Advised) bean;
		if (!advised.isFrozen() && isEligible(AopUtils.getTargetClass(bean))) {
			if (this.beforeExistingAdvisors) {
				advised.addAdvisor(0, this.advisor);
			}
			else {
				advised.addAdvisor(this.advisor);
			}
			return bean;
		}
	}
    // 判断给定的 Bean 是否符合后置处理器中 Advisor 通知的条件,符合的话,就创建代理对象。
	if (isEligible(bean, beanName)) {
		ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);
		if (!proxyFactory.isProxyTargetClass()) {
			evaluateProxyInterfaces(bean.getClass(), proxyFactory);
		}
        // 添加 Advisor。
		proxyFactory.addAdvisor(this.advisor);
		customizeProxyFactory(proxyFactory);
        // 返回代理对象。
		return proxyFactory.getProxy(getProxyClassLoader());
	}
	return bean;
}

@Async 注解方法的拦截

@Async 注解方法的执行会在 AnnotationAsyncExecutionInterceptor 中被拦截,在 invoke() 方法中执行拦截器的逻辑。此时会将 @Async 注解标注的方法封装为异步任务,交给执行器来执行。

invoke() 方法在 AnnotationAsyncExecutionInterceptor 的父类 AsyncExecutionInterceptor 中定义,如下:

public class AsyncExecutionInterceptor extends AsyncExecutionAspectSupport implements MethodInterceptor, Ordered {
	@Override
	@Nullable
	public Object invoke(final MethodInvocation invocation) throws Throwable {
		Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
		Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
		final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);

        // 1、确定异步任务执行器
		AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
		
        // 2、将要执行的方法封装为 Callable 异步任务
		Callable<Object> task = () -> {
			try {
                // 2.1、执行方法
				Object result = invocation.proceed();
                // 2.2、如果方法返回值是 Future 类型,阻塞等待结果
				if (result instanceof Future) {
					return ((Future<?>) result).get();
				}
			}
			catch (ExecutionException ex) {
				handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
			}
			catch (Throwable ex) {
				handleError(ex, userDeclaredMethod, invocation.getArguments());
			}
			return null;
		};
		// 3、提交任务
		return doSubmit(task, executor, invocation.getMethod().getReturnType());
	}
}

invoke() 方法中,主要有 3 个步骤:

  1. 确定执行异步任务的执行器。
  2. @Async 注解标注的方法封装为 Callable 异步任务。
  3. 将任务提交给执行器执行。
1、获取异步任务执行器

determineAsyncExecutor() 方法中,会获取异步任务的执行器(即执行异步任务的 线程池 )。代码如下:

// 确定异步任务的执行器
protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
    // 1、先从缓存中获取。
	AsyncTaskExecutor executor = this.executors.get(method);
	if (executor == null) {
		Executor targetExecutor;
        // 2、获取执行器的限定符。
		String qualifier = getExecutorQualifier(method);
		if (StringUtils.hasLength(qualifier)) {
            // 3、根据限定符获取对应的执行器。
			targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);
		}
		else {
            // 4、如果没有限定符,则使用默认的执行器。即 Spring 提供的默认线程池:SimpleAsyncTaskExecutor。
			targetExecutor = this.defaultExecutor.get();
		}
		if (targetExecutor == null) {
			return null;
		}
        // 5、将执行器包装为 TaskExecutorAdapter 适配器。
        // TaskExecutorAdapter 是 Spring 对于 JDK 线程池做的一层抽象,还是继承自 JDK 的线程池 Executor。这里可以不用管太多,只要知道它是线程池就可以了。
		executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?
				(AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));
		this.executors.put(method, executor);
	}
	return executor;
}

determineAsyncExecutor() 方法中确定了异步任务的执行器(线程池),主要是通过 @Async 注解的 value 值来获取执行器的限定符,根据限定符再去 BeanFactory 中查找对应的执行器就可以了。

如果在 @Async 注解中没有指定线程池,则使用 Spring 提供的默认线程池:SimpleAsyncTaskExecutor ,该线程池参数如下:

  • 核心线程数:8
  • 最大线程数:Integer.MAX_VALUE
  • 任务队列:LinkedBlockingQueue

默认线程池 SimpleAsyncTaskExecutor 的任务队列是无界队列。如果任务过多,超出核心线程的处理能力,任务就会堆压在任务队列,可能会造成内存 OOM。并且 SimpleAsyncTaskExecutor 线程池不重用线程,每次执行任务都会创建新线程,因此不推荐使用。

建议:在使用 @Async 时需要自己指定线程池,避免黑盒操作带来的风险。

@Async 注解中的 value 指定了线程池的限定符,根据限定符可以获取 自定义的线程池 。获取限定符的代码如下:

// AnnotationAsyncExecutionInterceptor
protected String getExecutorQualifier(Method method) {
	// 获取 Async 注解。
	Async async = AnnotatedElementUtils.findMergedAnnotation(method, Async.class);
	if (async == null) {
		async = AnnotatedElementUtils.findMergedAnnotation(method.getDeclaringClass(), Async.class);
	}
    // 获取注解的 value 值。
	return (async != null ? async.value() : null);
}
2、将方法封装为异步任务

invoke() 方法获取执行器之后,会将方法封装为异步任务,代码如下:

// 2、将要执行的方法封装为 Callable 异步任务
Callable<Object> task = () -> {
    try {
        // 2.1、执行方法
        Object result = invocation.proceed();
        // 2.2、如果方法返回值是 Future 类型,阻塞等待结果
        if (result instanceof Future) {
            return ((Future<?>) result).get();
        }
    }
    catch (ExecutionException ex) {
        handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
    }
    catch (Throwable ex) {
        handleError(ex, userDeclaredMethod, invocation.getArguments());
    }
    return null;
};

相比于 RunnableCallable 可以返回结果,并且抛出异常。

invocation.proceed() 的执行(原方法的执行)封装为 Callable 异步任务。这里仅仅当 result (方法返回值)类型为 Future 才返回,如果是其他类型则直接返回 null

因此使用 @Async 注解标注的方法如果使用 Future 类型之外的返回值,则无法获取方法的执行结果。

3、提交异步任务

AsyncExecutionInterceptor # invoke() 中将要执行的方法封装为 Callable 任务之后,就会将任务交给执行器来执行。提交相关的代码如下:

protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {
    // 方法返回值是 CompletableFuture 类型
	if (CompletableFuture.class.isAssignableFrom(returnType)) {
		return CompletableFuture.supplyAsync(() -> {
			try {
				return task.call();
			}
			catch (Throwable ex) {
				throw new CompletionException(ex);
			}
		}, executor);
	}
    // 方法返回值是 ListenableFuture 类型
	else if (ListenableFuture.class.isAssignableFrom(returnType)) {
		return ((AsyncListenableTaskExecutor) executor).submitListenable(task);
	}
    // 方法返回值是 Future 类型
	else if (Future.class.isAssignableFrom(returnType)) {
		return executor.submit(task);
	}
    // 方法返回值是 void 
	else {
		executor.submit(task);
		return null;
	}
}

doSubmit() 方法中,会根据 @Async 注解标注方法的返回值不同,来选择不同的任务提交方式,最后任务会由执行器(线程池)执行。

@Async 原理总结

Async原理总结

理解 @Async 原理的核心在于理解 @EnableAsync 注解,该注解开启了异步任务的功能。

主要流程如上图,会通过后置处理器来创建代理对象,之后代理对象中 @Async 方法的执行会走到 Advice 内部的拦截器中,之后将方法封装为异步任务,并提交线程池进行处理。

评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包

打赏作者

11来了

你的鼓励将是我创作的最大动力

¥1 ¥2 ¥4 ¥6 ¥10 ¥20
扫码支付:¥1
获取中
扫码支付

您的余额不足,请更换扫码支付或充值

打赏作者

实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值