文章目录
真实系统应用——责任链模式保证异步线程池中的日志追踪完整保障上的应用
引言
在现代企业级分布式系统中,异步任务的处理是一个非常重要的技术挑战。无论是日志记录、任务执行,还是跨服务调用,如何高效且无缝地传递信息成为了设计系统时必须考虑的要素。尤其是 多线程 和 异步执行 场景中,如何传递日志上下文(例如请求 ID 或 trace ID)来进行全链路追踪是一个非常实际且复杂的问题。
为了解决这个问题,责任链模式(Chain of Responsibility)作为一种经典的设计模式,提供了一种灵活而高效的方式来组织复杂的任务流,尤其是在异步执行中,它能够解耦上下文的传递和任务的执行。今天,我们将深入分析一个真实项目中,如何应用责任链模式解决异步线程池中的日志上下文传递问题,进一步探讨其实际应用和优化。
什么是责任链模式?
责任链模式(Chain of Responsibility Pattern)是一种 行为型设计模式,旨在通过将多个处理者连接成一条链来处理请求。在这个链中,每个处理者对象都有两个责任:一是处理当前请求,二是将请求传递给链中的下一个处理者。这个模式允许请求沿着链的传递,直到某个处理者对请求作出响应或者链的末尾没有处理者为止。
责任链模式的结构与工作原理
责任链模式通常由以下几部分组成:
- 抽象处理者(Handler):定义了一个请求处理方法和指向下一个处理者的引用。处理者不需要知道请求的具体内容,而是将请求传递给链中的下一个处理者。
- 具体处理者(ConcreteHandler):具体的请求处理者,决定是否处理当前请求。如果当前处理者不能处理请求,它会将请求传递给链中的下一个处理者。
- 客户端(Client):发送请求的对象,它通过构造请求链,将请求发送到链中的第一个处理者。
在责任链模式中,客户端并不关心请求是由哪个具体处理者处理的,只需要将请求传递给链上的第一个处理者,剩下的交给责任链去处理。处理者自己决定是否处理请求,或是将请求转发给下一个处理者。
责任链模式的关键特点
优点:
- 松耦合:请求的发送者并不关心处理者的具体实现,而是通过链的结构将请求发送给第一个处理者。每个处理者只关心它自身是否能处理请求,剩下的交给责任链的下一个处理者。
- 灵活扩展:责任链模式的一个最大优点是它能轻松扩展。在责任链中添加新的处理者非常简单,无需修改现有的请求处理流程,只需要在链中插入一个新的处理者即可。这样一来,系统具有很好的扩展性和灵活性。
- 动态决定责任:每个处理者都可以决定是否继续将请求传递给下一个处理者,或是直接处理该请求。这样做的好处是能够根据具体条件动态决定请求的处理逻辑,而无需硬编码每个请求的处理流程。
- 责任的分担:在传统的程序设计中,处理请求的职责通常由一个单一的处理者负责,而责任链模式将这个责任分担到多个处理者中。每个处理者可以独立处理特定的任务,提升了系统的可维护性。
缺点:
- 调试困难:责任链模式可能导致请求的处理过程变得不可见或较难追踪,特别是当责任链非常长时,调试和日志记录会变得更加复杂。
- 性能问题:如果责任链过长或层级过深,可能会带来一定的性能开销。每次请求都需要经过多个处理者的判断和传递,可能会影响效率。
- 过度设计:对于一些简单的请求处理,责任链模式可能过于复杂,反而增加了不必要的设计和代码量。
责任链模式的应用场景
责任链模式适用于以下几种场景:
- 多个对象处理同一请求:当一个请求需要多个对象按照顺序来处理时,可以使用责任链模式。例如,日志处理系统,日志可能要经过多个过滤器(如级别过滤、内容过滤等),每个过滤器决定是否处理该日志信息。
- 动态处理流程:当请求的处理流程并不确定,且每个处理节点是否处理请求都不确定时,责任链模式提供了一种灵活的机制。在这种情况下,链中的每个处理者可以根据自己的需求决定是否处理请求,或者将请求传递给下一个处理者。
- 请求的处理顺序不固定:当多个处理者需要按不同的顺序处理请求时,责任链模式非常有效。可以灵活地改变链中处理者的顺序或添加新的处理者。
- 责任传递需要解耦:如果某个请求的处理需要不同的模块共同完成,且模块之间的耦合度较低时,责任链模式能很好地解耦这些模块,使得每个模块专注于自己的任务。
常见的实际应用场景如下:
- 事件处理系统:多个处理器顺序处理用户的事件请求,如 GUI 中的鼠标事件处理。
- 日志处理:日志依次经过过滤、格式化、输出等多个处理阶段。
- 权限控制:请求权限验证按层次进行,例如普通用户、管理员等角色的权限校验。
- 审批流程:多级审批流程中的每个环节负责自己的审批逻辑,直至最终结果。
- 错误处理:错误在多个处理器中逐步处理,可能包括日志记录、异常通知和回滚等。
- 过滤器链:Web 请求经过多个过滤器链,如认证、权限检查等。
- 工作流引擎:任务处理在多个节点之间传递,每个节点执行特定的处理任务。
- 请求处理流程:请求在多个处理器中逐步处理,例如 Web 服务中的数据验证和格式转换。
- 策略模式与责任链结合:结合策略模式动态选择处理逻辑,使用责任链模式组织处理顺序。
- 数据处理管道:数据经过多个处理阶段,如 ETL 任务中的数据清洗和转换。
系统开发中的问题背景
在实际应用中,系统经常需要在多线程环境下执行异步任务。以 @Async
注解为例,Spring 提供了异步方法执行的功能,它能够自动将方法执行移到独立的线程中。然而,MDC(Mapped Diagnostic Context)是 SLF4J 提供的工具,它只在当前线程有效,而异步线程中无法继承父线程的 MDC 上下文。这就导致了日志记录和请求追踪的丢失,尤其是在跨线程的场景中,如何保证子线程能够继承父线程的日志上下文,成为了一个棘手的问题。
比如说下面的日志:
2024/11/30 14:34:34.101 INFO 10 --- [0880-thread-196] c.m.m.c.i.d.f.UserInfoProviderFilter : UserInfoFilterProvider:{"traceId":"abcd1234","bossType":null,"loginId":"ocs_admin",...}
2024/11/30 14:34:34.101 INFO 10 --- [0880-thread-196] m.m.c.i.d.f.MessageSourcesProviderFilter : MessageSourcesProviderFilter获取国际化语言为空
2024/11/30 14:34:34.117 INFO 10 --- [ importPool_0] c.m.m.t.i.m.S.queryListByUser : ==> Preparing: select S.STAR_RATING_USER_ID, S.USER_ID, ...
2024/11/30 14:34:34.130 DEBUG 10 --- [ importPool_0] c.m.m.t.i.m.S.queryListByUser : <== Total: 6
2024/11/30 14:34:34.139 ERROR 10 --- [ importPool_0] c.m.m.t.i.service.StarRatingUserService : 客户信息导入失败,导入数据:{"traceId":"abcd1234", "errMsg":"该客户已存在该级"}
由于线程不一样,当再多线程日志混合时更加难以区分,如果能应用到一个上下文关联上两个线程的责任链,然后打印出来就更加明显了,例如我们加上了线程间传递的traceLogId,然后打印如[ 1656]
,我们在日志中就能知道这几个是子线程出来的,便于排查问题所在
2024/11/30 14:34:34.101 INFO 10 --- [ 1656] [0880-thread-196] c.m.m.c.i.d.f.UserInfoProviderFilter : UserInfoFilterProvider:{"traceId":"abcd1234","bossType":null,"loginId":"ocs_admin",...}
2024/11/30 14:34:34.101 INFO 10 --- [ 1656] [0880-thread-196] m.m.c.i.d.f.MessageSourcesProviderFilter : MessageSourcesProviderFilter获取国际化语言为空
2024/11/30 14:34:34.117 INFO 10 --- [ 1656] [ importPool_0] c.m.m.t.i.m.S.queryListByUser : ==> Preparing: select S.STAR_RATING_USER_ID, S.USER_ID, ...
2024/11/30 14:34:34.130 DEBUG 10 --- [ 1656] [ importPool_0] c.m.m.t.i.m.S.queryListByUser : <== Total: 6
2024/11/30 14:34:34.139 ERROR 10 --- [ 1656] [ importPool_0] c.m.m.t.i.service.StarRatingUserService : 客户信息导入失败,导入数据:{"traceId":"abcd1234", "errMsg":"该客户已存在该级"}
责任链模式应用
为了优雅地解决这个问题,我们通过 责任链模式 来传递日志上下文。这里的 “责任链” 体现在异步任务执行的过程:当父线程触发异步任务时,它将上下文信息(如 traceLogId)传递给子线程,而子线程只需要根据上下文决定是否继续执行。
具体实现上,考虑到异步任务和日志上下文传递的实际需求,我们可以实现一个自定义的 ThreadPoolTaskExecutor
,使得线程池能够在执行任务时将 MDC
上下文传递到子线程中。这种实现方式正是责任链模式的一个应用。
- 自定义线程池:我们通过扩展
ThreadPoolTaskExecutor
来实现一个自定义的线程池,重写submit
方法,在提交任务时获取父线程的MDC
上下文,并将其传递到子线程中。 - 传递上下文:当异步任务执行时,我们确保子线程能够继承父线程的
traceLogId
等上下文信息。如果父线程没有传递这些信息,子线程可以自动生成新的traceLogId
。 - 清理上下文:在任务执行完成后,我们清理
MDC
,确保线程池中的线程不会保留不必要的上下文信息,从而避免影响其他任务的执行。
责任链模式实现
-
MDC上下文传递
在多线程环境下,
MDC
是用来存储和传递日志上下文的工具,但它只在当前线程有效。因此,我们必须手动将父线程的 MDC 上下文传递给子线程。为此,我们通过扩展ThreadPoolTaskExecutor
来创建一个自定义线程池MdcThreadPoolTaskExecutor
,并重写其submit
方法:@Slf4j public class MdcThreadPoolTaskExecutor extends ThreadPoolTaskExecutor { @Override public <T> Future<T> submit(Callable<T> task) { // 获取父线程的MDC上下文 Map<String, String> context = MDC.getCopyOfContextMap(); log.info("----MDC content:{}", JSONObject.toJSONString(context)); return super.submit(() -> { // 将父线程的traceLogId传递给子线程 if (!CollectionUtils.isEmpty(context) && !StringUtilsExt.isEmpty(context.get(Constants.MDC_KEY_TRACE_LOG_ID))) { MDC.put(Constants.MDC_KEY_TRACE_LOG_ID, context.get(Constants.MDC_KEY_TRACE_LOG_ID)); } else { MDC.put(Constants.MDC_KEY_TRACE_LOG_ID, UUID.randomUUID().toString()); // 生成新的traceLogId } T result = null; try { result = task.call(); // 执行子线程任务 } finally { MDC.clear(); // 清理MDC,防止线程池中的线程重用 } return result; }); } }
-
日志上下文的继承与清理
在异步任务中,我们首先获取父线程的
MDC
上下文,然后将其复制到子线程中。具体来说,如果父线程中有traceLogId
,则将其传递给子线程;否则,自动生成一个新的traceLogId
。任务执行完毕后,必须清理MDC
,以防止线程池中的线程在重用时带入不必要的上下文。这种处理方式正是责任链模式的一种体现,父线程的请求(即日志上下文)被传递给子线程,子线程只需要执行处理逻辑,而不需要关心上下文的传递。
-
线程池配置
另外,异步任务的执行还需要配置一个合适的线程池。在 Spring 中,我们可以通过
ThreadPoolTaskExecutor
来配置线程池,并为其设置核心线程数、最大线程数和队列容量等参数。以下是配置线程池的代码:@Configuration @Slf4j public class TestImportExecutePool { @Bean("testImportPool") public Executor importPool() { log.info("start importPool "); MdcThreadPoolTaskExecutor executor = new MdcThreadPoolTaskExecutor(); executor.setCorePoolSize(10); // 配置核心线程数 executor.setMaxPoolSize(20); // 配置最大线程数 executor.setQueueCapacity(1000); // 配置队列容量 executor.setKeepAliveSeconds(60); // 设置线程活跃时间 executor.setThreadFactory( new ThreadFactoryBuilder() .setNameFormat("importPool_%d") .setUncaughtExceptionHandler((thread, throwable) -> log.error("ThreadPool {} got exception", thread, throwable)) .build()); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // 设置拒绝策略 executor.initialize(); return executor; } }
-
异步任务调用
在异步任务的执行中,我们通过
CompletableFuture
来提交任务,并确保任务能够在独立线程中执行,且能够正确地传递上下文:@Resource(name = "testImportPool") MdcThreadPoolTaskExecutor importPool; @Override public CompletableFuture<Map<Integer, String>> importQuestion(List<QuestionReq> toImportList) { return CompletableFuture.supplyAsync(() -> questionService.importQuestion(toImportList), importPool); }
这样,
importPool
作为执行线程池,能够确保任务执行的过程中traceLogId
被正确地传递,从而实现日志追踪的完整性。