真实系统应用——责任链模式保证异步线程池中的日志追踪完整保障上的应用

真实系统应用——责任链模式保证异步线程池中的日志追踪完整保障上的应用

引言

在现代企业级分布式系统中,异步任务的处理是一个非常重要的技术挑战。无论是日志记录、任务执行,还是跨服务调用,如何高效且无缝地传递信息成为了设计系统时必须考虑的要素。尤其是 多线程异步执行 场景中,如何传递日志上下文(例如请求 ID 或 trace ID)来进行全链路追踪是一个非常实际且复杂的问题。

为了解决这个问题,责任链模式(Chain of Responsibility)作为一种经典的设计模式,提供了一种灵活而高效的方式来组织复杂的任务流,尤其是在异步执行中,它能够解耦上下文的传递和任务的执行。今天,我们将深入分析一个真实项目中,如何应用责任链模式解决异步线程池中的日志上下文传递问题,进一步探讨其实际应用和优化。

什么是责任链模式?

责任链模式(Chain of Responsibility Pattern)是一种 行为型设计模式,旨在通过将多个处理者连接成一条链来处理请求。在这个链中,每个处理者对象都有两个责任:一是处理当前请求,二是将请求传递给链中的下一个处理者。这个模式允许请求沿着链的传递,直到某个处理者对请求作出响应或者链的末尾没有处理者为止。

责任链模式的结构与工作原理

责任链模式通常由以下几部分组成:

  • 抽象处理者(Handler):定义了一个请求处理方法和指向下一个处理者的引用。处理者不需要知道请求的具体内容,而是将请求传递给链中的下一个处理者。
  • 具体处理者(ConcreteHandler):具体的请求处理者,决定是否处理当前请求。如果当前处理者不能处理请求,它会将请求传递给链中的下一个处理者。
  • 客户端(Client):发送请求的对象,它通过构造请求链,将请求发送到链中的第一个处理者。

在责任链模式中,客户端并不关心请求是由哪个具体处理者处理的,只需要将请求传递给链上的第一个处理者,剩下的交给责任链去处理。处理者自己决定是否处理请求,或是将请求转发给下一个处理者。

责任链模式的关键特点

优点

  1. 松耦合:请求的发送者并不关心处理者的具体实现,而是通过链的结构将请求发送给第一个处理者。每个处理者只关心它自身是否能处理请求,剩下的交给责任链的下一个处理者
  2. 灵活扩展:责任链模式的一个最大优点是它能轻松扩展。在责任链中添加新的处理者非常简单,无需修改现有的请求处理流程,只需要在链中插入一个新的处理者即可。这样一来,系统具有很好的扩展性和灵活性。
  3. 动态决定责任:每个处理者都可以决定是否继续将请求传递给下一个处理者,或是直接处理该请求。这样做的好处是能够根据具体条件动态决定请求的处理逻辑,而无需硬编码每个请求的处理流程。
  4. 责任的分担:在传统的程序设计中,处理请求的职责通常由一个单一的处理者负责,而责任链模式将这个责任分担到多个处理者中。每个处理者可以独立处理特定的任务,提升了系统的可维护性。

缺点

  • 调试困难:责任链模式可能导致请求的处理过程变得不可见或较难追踪,特别是当责任链非常长时,调试和日志记录会变得更加复杂。
  • 性能问题:如果责任链过长或层级过深,可能会带来一定的性能开销。每次请求都需要经过多个处理者的判断和传递,可能会影响效率。
  • 过度设计:对于一些简单的请求处理,责任链模式可能过于复杂,反而增加了不必要的设计和代码量。
责任链模式的应用场景

责任链模式适用于以下几种场景:

  • 多个对象处理同一请求:当一个请求需要多个对象按照顺序来处理时,可以使用责任链模式。例如,日志处理系统,日志可能要经过多个过滤器(如级别过滤、内容过滤等),每个过滤器决定是否处理该日志信息。
  • 动态处理流程:当请求的处理流程并不确定,且每个处理节点是否处理请求都不确定时,责任链模式提供了一种灵活的机制。在这种情况下,链中的每个处理者可以根据自己的需求决定是否处理请求,或者将请求传递给下一个处理者。
  • 请求的处理顺序不固定:当多个处理者需要按不同的顺序处理请求时,责任链模式非常有效。可以灵活地改变链中处理者的顺序或添加新的处理者。
  • 责任传递需要解耦:如果某个请求的处理需要不同的模块共同完成,且模块之间的耦合度较低时,责任链模式能很好地解耦这些模块,使得每个模块专注于自己的任务。

常见的实际应用场景如下:

  1. 事件处理系统:多个处理器顺序处理用户的事件请求,如 GUI 中的鼠标事件处理。
  2. 日志处理:日志依次经过过滤、格式化、输出等多个处理阶段。
  3. 权限控制:请求权限验证按层次进行,例如普通用户、管理员等角色的权限校验。
  4. 审批流程:多级审批流程中的每个环节负责自己的审批逻辑,直至最终结果。
  5. 错误处理:错误在多个处理器中逐步处理,可能包括日志记录、异常通知和回滚等。
  6. 过滤器链:Web 请求经过多个过滤器链,如认证、权限检查等。
  7. 工作流引擎:任务处理在多个节点之间传递,每个节点执行特定的处理任务。
  8. 请求处理流程:请求在多个处理器中逐步处理,例如 Web 服务中的数据验证和格式转换。
  9. 策略模式与责任链结合:结合策略模式动态选择处理逻辑,使用责任链模式组织处理顺序。
  10. 数据处理管道:数据经过多个处理阶段,如 ETL 任务中的数据清洗和转换。

系统开发中的问题背景

在实际应用中,系统经常需要在多线程环境下执行异步任务。以 @Async 注解为例,Spring 提供了异步方法执行的功能,它能够自动将方法执行移到独立的线程中。然而,MDC(Mapped Diagnostic Context)是 SLF4J 提供的工具,它只在当前线程有效,而异步线程中无法继承父线程的 MDC 上下文。这就导致了日志记录和请求追踪的丢失,尤其是在跨线程的场景中,如何保证子线程能够继承父线程的日志上下文,成为了一个棘手的问题。

比如说下面的日志:

2024/11/30 14:34:34.101  INFO 10 --- [0880-thread-196] c.m.m.c.i.d.f.UserInfoProviderFilter     : UserInfoFilterProvider:{"traceId":"abcd1234","bossType":null,"loginId":"ocs_admin",...}
2024/11/30 14:34:34.101  INFO 10 --- [0880-thread-196] m.m.c.i.d.f.MessageSourcesProviderFilter : MessageSourcesProviderFilter获取国际化语言为空
2024/11/30 14:34:34.117  INFO 10 --- [   importPool_0] c.m.m.t.i.m.S.queryListByUser            : ==>  Preparing: select S.STAR_RATING_USER_ID, S.USER_ID, ...
2024/11/30 14:34:34.130 DEBUG 10 --- [   importPool_0] c.m.m.t.i.m.S.queryListByUser            : <==      Total: 6
2024/11/30 14:34:34.139 ERROR 10 --- [   importPool_0] c.m.m.t.i.service.StarRatingUserService  : 客户信息导入失败,导入数据:{"traceId":"abcd1234", "errMsg":"该客户已存在该级"}

由于线程不一样,当再多线程日志混合时更加难以区分,如果能应用到一个上下文关联上两个线程的责任链,然后打印出来就更加明显了,例如我们加上了线程间传递的traceLogId,然后打印如[ 1656],我们在日志中就能知道这几个是子线程出来的,便于排查问题所在

2024/11/30 14:34:34.101  INFO 10 --- [   1656] [0880-thread-196] c.m.m.c.i.d.f.UserInfoProviderFilter     : UserInfoFilterProvider:{"traceId":"abcd1234","bossType":null,"loginId":"ocs_admin",...}
2024/11/30 14:34:34.101  INFO 10 --- [   1656] [0880-thread-196] m.m.c.i.d.f.MessageSourcesProviderFilter : MessageSourcesProviderFilter获取国际化语言为空
2024/11/30 14:34:34.117  INFO 10 --- [   1656] [   importPool_0] c.m.m.t.i.m.S.queryListByUser            : ==>  Preparing: select S.STAR_RATING_USER_ID, S.USER_ID, ...
2024/11/30 14:34:34.130 DEBUG 10 --- [   1656] [   importPool_0] c.m.m.t.i.m.S.queryListByUser            : <==      Total: 6
2024/11/30 14:34:34.139 ERROR 10 --- [   1656] [   importPool_0] c.m.m.t.i.service.StarRatingUserService  : 客户信息导入失败,导入数据:{"traceId":"abcd1234", "errMsg":"该客户已存在该级"}

责任链模式应用

为了优雅地解决这个问题,我们通过 责任链模式 来传递日志上下文。这里的 “责任链” 体现在异步任务执行的过程:当父线程触发异步任务时,它将上下文信息(如 traceLogId)传递给子线程,而子线程只需要根据上下文决定是否继续执行。

具体实现上,考虑到异步任务和日志上下文传递的实际需求,我们可以实现一个自定义的 ThreadPoolTaskExecutor,使得线程池能够在执行任务时将 MDC 上下文传递到子线程中。这种实现方式正是责任链模式的一个应用。

  1. 自定义线程池:我们通过扩展 ThreadPoolTaskExecutor 来实现一个自定义的线程池,重写 submit 方法,在提交任务时获取父线程的 MDC 上下文,并将其传递到子线程中。
  2. 传递上下文:当异步任务执行时,我们确保子线程能够继承父线程的 traceLogId 等上下文信息。如果父线程没有传递这些信息,子线程可以自动生成新的 traceLogId
  3. 清理上下文:在任务执行完成后,我们清理 MDC,确保线程池中的线程不会保留不必要的上下文信息,从而避免影响其他任务的执行。

责任链模式实现

  1. MDC上下文传递

    在多线程环境下,MDC 是用来存储和传递日志上下文的工具,但它只在当前线程有效。因此,我们必须手动将父线程的 MDC 上下文传递给子线程。为此,我们通过扩展 ThreadPoolTaskExecutor 来创建一个自定义线程池 MdcThreadPoolTaskExecutor,并重写其 submit 方法:

    @Slf4j
    public class MdcThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {
    
        @Override
        public <T> Future<T> submit(Callable<T> task) {
            // 获取父线程的MDC上下文
            Map<String, String> context = MDC.getCopyOfContextMap();
            log.info("----MDC content:{}", JSONObject.toJSONString(context));
            return super.submit(() -> {
                // 将父线程的traceLogId传递给子线程
                if (!CollectionUtils.isEmpty(context) && !StringUtilsExt.isEmpty(context.get(Constants.MDC_KEY_TRACE_LOG_ID))) {
                    MDC.put(Constants.MDC_KEY_TRACE_LOG_ID, context.get(Constants.MDC_KEY_TRACE_LOG_ID));
                } else {
                    MDC.put(Constants.MDC_KEY_TRACE_LOG_ID, UUID.randomUUID().toString()); // 生成新的traceLogId
                }
                T result = null;
                try {
                    result = task.call(); // 执行子线程任务
                } finally {
                    MDC.clear(); // 清理MDC,防止线程池中的线程重用
                }
                return result;
            });
        }
    }
    
  2. 日志上下文的继承与清理

    在异步任务中,我们首先获取父线程的 MDC 上下文,然后将其复制到子线程中。具体来说,如果父线程中有 traceLogId,则将其传递给子线程;否则,自动生成一个新的 traceLogId。任务执行完毕后,必须清理 MDC,以防止线程池中的线程在重用时带入不必要的上下文。

    这种处理方式正是责任链模式的一种体现,父线程的请求(即日志上下文)被传递给子线程,子线程只需要执行处理逻辑,而不需要关心上下文的传递。

  3. 线程池配置

    另外,异步任务的执行还需要配置一个合适的线程池。在 Spring 中,我们可以通过 ThreadPoolTaskExecutor 来配置线程池,并为其设置核心线程数、最大线程数和队列容量等参数。以下是配置线程池的代码:

    @Configuration
    @Slf4j
    public class TestImportExecutePool {
    
        @Bean("testImportPool")
        public Executor importPool() {
            log.info("start importPool ");
            MdcThreadPoolTaskExecutor executor = new MdcThreadPoolTaskExecutor();
            executor.setCorePoolSize(10); // 配置核心线程数
            executor.setMaxPoolSize(20);  // 配置最大线程数
            executor.setQueueCapacity(1000); // 配置队列容量
            executor.setKeepAliveSeconds(60); // 设置线程活跃时间
            executor.setThreadFactory(
                    new ThreadFactoryBuilder()
                            .setNameFormat("importPool_%d")
                            .setUncaughtExceptionHandler((thread, throwable) -> log.error("ThreadPool {} got exception", thread, throwable))
                            .build());
            executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // 设置拒绝策略
            executor.initialize();
            return executor;
        }
    }
    
  4. 异步任务调用

    在异步任务的执行中,我们通过 CompletableFuture 来提交任务,并确保任务能够在独立线程中执行,且能够正确地传递上下文:

    @Resource(name = "testImportPool")
    MdcThreadPoolTaskExecutor importPool;
    
    @Override
    public CompletableFuture<Map<Integer, String>> importQuestion(List<QuestionReq> toImportList) {
        return CompletableFuture.supplyAsync(() -> questionService.importQuestion(toImportList), importPool);
    }
    

    这样,importPool 作为执行线程池,能够确保任务执行的过程中 traceLogId 被正确地传递,从而实现日志追踪的完整性。

评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包

打赏作者

我心向阳iu

你的鼓励将是我创作的最大动力

¥1 ¥2 ¥4 ¥6 ¥10 ¥20
扫码支付:¥1
获取中
扫码支付

您的余额不足,请更换扫码支付或充值

打赏作者

实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值