活动介绍

如何在 Spring Boot 中实现 RabbitMQ 的手动 ACK 确认机制?

时间: 2025-12-03 19:49:09 AIGC 浏览: 14
在 Spring Boot 中使用 RabbitMQ 时,手动 ACK(确认)机制可以确保消息被消费者成功处理后才从队列中移除,避免因消费者异常导致消息丢失。默认情况下,Spring AMQP 使用自动 ACK 模式,但为了实现更高的可靠性,通常需要开启**手动 ACK**。 下面是如何在 Spring Boot 中配置和使用 RabbitMQ 手动 ACK 的完整示例。 --- ### ✅ 实现步骤 1. 添加依赖 2. 配置 `application.yml` 或 `application.properties` 3. 配置 RabbitMQ 相关 Bean(队列、交换机、绑定等) 4. 配置监听容器工厂以启用手动 ACK 5. 在消费者中使用 `Channel` 和 `@RabbitListener` 进行手动确认或拒绝 --- ### 1. Maven 依赖(`pom.xml`) ```xml <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> </dependencies> ``` --- ### 2. 配置文件 `application.yml` ```yaml spring: rabbitmq: host: localhost port: 5672 username: guest password: guest listener: simple: acknowledge-mode: manual # 关键:设置为手动ACK concurrency: 1 # 初始消费者数量 max-concurrency: 5 # 最大并发消费者数 ``` > 注意:`acknowledge-mode: manual` 是开启手动确认的关键。 --- ### 3. RabbitMQ 配置类 ```java import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitConfig { public static final String QUEUE_NAME = "manual_ack_queue"; public static final String EXCHANGE_NAME = "manual_ack_exchange"; public static final String ROUTING_KEY = "manual.routing.key"; @Bean public Queue queue() { return new Queue(QUEUE_NAME, true); // durable: true } @Bean public TopicExchange exchange() { return new TopicExchange(EXCHANGE_NAME, true, false); } @Bean public Binding binding(Queue queue, TopicExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY); } } ``` --- ### 4. 配置监听容器工厂(支持手动 ACK) ```java import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitListenerConfig { @Bean public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setQueueNames(RabbitConfig.QUEUE_NAME); container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 手动ACK return container; } } ``` > 注意:虽然 `@RabbitListener` 注解方式不需要显式创建容器,但如果要自定义行为(如重试、并发),建议配置 `SimpleRabbitListenerContainerFactory`。 #### 推荐方式:使用 `SimpleRabbitListenerContainerFactory` ```java import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler; import org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.util.ErrorHandler; @Configuration public class RabbitListenerConfig { @Autowired private ConnectionFactory connectionFactory; @Bean public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setMessageConverter(new Jackson2JsonMessageConverter()); factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 核心:手动确认 factory.setErrorHandler(new MyErrorHandler()); // 自定义错误处理器 return factory; } // 自定义错误处理器,防止因异常导致消息无限重入队列 public static class MyErrorHandler implements ErrorHandler { @Override public void handleError(Throwable t) { if (t instanceof ListenerExecutionFailedException) { System.err.println("消息处理失败:" + t.getMessage()); // 可记录日志、发送告警,不要抛出异常以免无限重试 } } } } ``` --- ### 5. 消费者代码(手动 ACK 示例) ```java import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; @Component public class ManualAckConsumer { @RabbitListener(queues = RabbitConfig.QUEUE_NAME) public void listen(Message message, Channel channel) throws IOException { long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { // 模拟业务逻辑 String msg = new String(message.getBody()); System.out.println("收到消息: " + msg); // 模拟处理成功 if (!msg.contains("fail")) { channel.basicAck(deliveryTag, false); // 确认消息 System.out.println("消息已确认"); } else { // 处理失败,拒绝消息,并不重新入队(防止死循环) channel.basicNack(deliveryTag, false, false); // multiple=false, requeue=false System.out.println("消息已拒绝,不重新入队"); } } catch (Exception e) { // 发生异常时拒绝消息 System.err.println("处理异常: " + e.getMessage()); channel.basicNack(deliveryTag, false, false); // 不重入队列 } } } ``` --- ### 6. 生产者测试代码(可选) ```java import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RestController; @RestController public class MessageController { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/send/{msg}") public String sendMessage(@PathVariable String msg) { rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, RabbitConfig.ROUTING_KEY, msg); return "消息已发送: " + msg; } } ``` --- ### 📌 关键点解释 - `AcknowledgeMode.MANUAL`:关闭自动确认。 - `channel.basicAck()`:成功处理后调用,告诉 RabbitMQ 删除该消息。 - `channel.basicNack(deliveryTag, false, false)`: - 第三个参数 `requeue=false` 表示不要重新放回队列(防止重复消费造成雪崩)。 - 若设为 `true`,消息会重新入队,可能被再次消费。 - 异常处理要小心,避免无限重试。 --- ### 🔁 延伸建议 - 结合 **死信队列(DLX)**:当 `basicNack(requeue=false)` 时,消息可路由到 DLX 做后续分析。 - 使用 `@RabbitListener` + `@SendTo` 实现 RPC 模式响应。 - 加入重试机制(如 Spring Retry)+ 指数退避。 --- ###
阅读全文

相关推荐

最新推荐

recommend-type

springboot + rabbitmq 如何实现消息确认机制(踩坑经验)

SpringBoot 和 RabbitMQ 是当前流行的微服务架构中常用的技术栈,然而在实际开发中,消息确认机制的实现却是一个坑爹的点。今天,我将与大家分享小编在实际开发中的一点踩坑经验,内容简单易懂,需要的朋友可以参考...
recommend-type

基于C11标准实现的多项式运算管理系统_支持多项式创建存储求和乘法及删除操作提供命令行交互界面并兼容Linux与Windows平台包含完整错误处理与数据持久化功能_旨.zip

基于C11标准实现的多项式运算管理系统_支持多项式创建存储求和乘法及删除操作提供命令行交互界面并兼容Linux与Windows平台包含完整错误处理与数据持久化功能_旨.zip
recommend-type

霸王茶姬运营分析:数据驱动的销售与用户策略

资源摘要信息:"《霸王茶姬店铺运营分析》报告分析框架介绍" 报告的标题《霸王茶姬店铺运营分析》以及描述指出了报告的核心内容是针对新中式茶饮品牌“霸王茶姬”的运营状况进行深入分析,其目的在于通过数据分析提升销售业绩、优化产品组合、增强用户粘性,并为运营策略提供数据支持。以下为报告的详细知识点: 1. 市场分析: - 新中式茶饮品牌霸王茶姬在市场上拥有良好的口碑,原因在于其高品质原料和独特口感。 - 面临激烈的市场竞争和消费者需求多样化,霸王茶姬需要明确其市场定位,以及如何在竞争中脱颖而出。 2. 销售与用户研究: - 分析销售数据、用户画像、产品表现和市场营销效果,旨在精细化管理运营策略,促进持续发展。 - 用户画像分析包括会员用户占比、用户年龄和性别分布、复购率与用户忠诚度、购买渠道占比等。 3. 数据分析方法: - 使用Python作为主要分析工具,实现数据的描述性统计和可视化分析。 - 数据处理涵盖数据清洗、缺失值处理和异常值检测,以确保分析结果的准确性。 4. 销售数据可视化: - 通过日/周/月销售额趋势图、各门店销售额对比柱状图、订单量与客单价分析饼图等图表形式,直观展示销售数据。 5. 销售数据分析结果: - 日销售额趋势显示周末销售额显著高于工作日,尤其以周六为最高峰。 - 月度销售额在夏季(6-8月)达到高峰,冬季(12-2月)相对较低。 - A门店销售额最高,占比30%,B门店和C门店销售额相近,分别占25%和20%。 - 平均客单价为35元,订单量高峰出现在下午2-5点。 6. 产品销售分析: - 分析各产品销量排名、爆款产品与滞销产品,并探讨组合购买情况及季节性产品销量趋势。 7. 结论与建议: - 根据分析得出的核心发现,提出针对性的运营优化策略和市场营销建议。 - 针对如何增长销售额、提升用户粘性、优化产品组合、提高运营效率及市场策略优化等方面,给出明确的结论和建议。 报告的内容与结构突显了数据驱动决策的重要性,并展示了如何利用数据分析方法来解决实际业务问题,从而为企业决策层提供科学的决策依据。通过对霸王茶姬店铺运营的深入分析,报告意在帮助企业识别市场机会,规避风险,优化运营流程,并最终实现业绩的增长。
recommend-type

TwinCAT PLC任务周期设置指南:单任务与多任务调度的6大实战策略

# TwinCAT任务周期的深度解析与实战调优 在现代自动化系统中,一个看似简单的“1ms”背后,藏着多少工程师的汗水和深夜调试?💡 当你按下启动按钮时,TwinCAT控制器正以微秒级精度调度着成百上千条指令——这不仅是代码的执行,更是时间的艺术。而这一切的核心,就是**任务周期(Task Cycle Time)**。 它不像AI模型那样炫酷,也不像视觉识别那样吸睛,但它却是整个控制系统稳定运行的“心跳”。跳得太快,CPU不堪重负;跳得太慢,设备失控飞车。🎯 所以说,在Beckhoff的世界里,**周期即生命线**。 --- ## 实时性从何而来?TwinCAT任务调度的本质揭秘
recommend-type

硬盘阵列柜写入数据速度

### 硬盘阵列柜数据写入速度及性能影响因素 硬盘阵列柜的数据写入速度受到多种因素的影响,这些因素可以分为硬件层面、软件层面以及配置层面。以下是详细分析: #### 1. **RAID级别对写入速度的影响** 不同的RAID级别对写入速度有显著影响。例如,在RAID 0中,数据被均匀分布到所有磁盘上,因此写入速度接近单个磁盘的总和[^3]。然而,在RAID 1中,由于需要将数据同时写入主盘和镜像盘,写入速度通常与单个磁盘的速度相当[^5]。而在RAID 5中,写入操作需要计算校验信息并将其写入磁盘,这会增加额外的开销,从而降低写入速度[^4]。 #### 2. **缓存的作用** 硬件R
recommend-type

C#编程语言的全面教程:基础语法与面向对象编程

资源摘要信息:"C#语言教程介绍" C#(读作“C Sharp”)是由微软公司于2000年推出的一种现代化面向对象编程语言,其设计目的是为了能够开发出具有复杂功能的软件组件,并且能够在微软的.NET平台上运行。C#语言以其简洁、面向对象、类型安全等特点,迅速成为开发Windows应用程序、Web服务、游戏以及跨平台解决方案的热门选择。 一、环境搭建 在正式开始学习C#编程之前,必须首先搭建好开发环境。通常情况下,开发者会优先考虑使用微软官方提供的Visual Studio集成开发环境(IDE),它适合从简单的学习项目到复杂的应用开发。Visual Studio提供了代码编辑、调试以及多种工具集,极大地提高了开发效率。 除了IDE,还需要安装.NET软件开发工具包(SDK),它是运行和构建C#程序所必需的。.NET SDK不仅包括.NET运行时,还包含用于编译和管理C#项目的一系列命令行工具和库。 二、C#基础语法 1. 命名空间与类 C#使用`using`关键字来引入命名空间,这对于使用类库和模块化代码至关重要。例如,使用`using System;`可以让程序访问`System`命名空间下的所有类,比如`Console`类。 类是C#中定义对象蓝图的核心,使用`class`关键字来声明。类可以包含字段、属性、方法和其他类成员,这些成员共同定义了类的行为和数据。 2. 变量与数据类型 在C#中,变量是用于存储数据值的基本单元。在使用变量之前,必须声明它并指定数据类型。C#支持多种基本数据类型,如整数(`int`)、浮点数(`double`)、字符(`char`)和布尔值(`bool`)。此外,C#还支持更复杂的数据类型,比如字符串(`string`)和数组。 3. 控制流语句 控制流语句用于控制程序的执行路径。它们能够根据条件判断来决定执行哪部分代码,或者通过循环重复执行某段代码。常用的控制流语句有: - `if`语句,用于基于条件表达式的结果执行代码块。 - `for`循环,用于按照一定次数重复执行代码块。 - `while`循环,根据条件表达式的结果循环执行代码块。 - `switch`语句,用于根据不同的条件执行不同的代码块。 三、面向对象编程(OOP) C#是一种纯粹的面向对象编程语言,它提供了类和对象的概念来支持面向对象的编程范式。 1. 类与对象 类在C#中是对象的蓝图或模板。一个类定义了一个对象的结构(数据成员)和行为(方法成员)。对象是类的实际实例,通过调用类的构造函数来创建。 2. 构造函数 构造函数是一种特殊的方法,它的名称与类名相同,并且在创建类的新对象时自动调用。构造函数负责初始化对象的状态。 3. 封装、继承与多态 封装是指将对象的实现细节隐藏起来,并向外界提供访问对象状态和行为的接口。 继承允许一个类(称为子类)继承另一个类(称为父类)的属性和方法,以此来重用代码和实现层级结构。 多态允许不同类的对象以统一的接口进行交互,并且可以在运行时确定要调用的方法的具体实现。 四、高级特性 C#提供了丰富的高级特性,这些特性使得C#更加灵活和强大。 1. 泛型与集合 泛型允许开发者编写与特定数据类型无关的代码,这使得同一个算法或方法能够应用于不同的数据类型,同时还能保持类型安全。 C#提供了丰富的集合类型,比如数组、列表(`List<T>`)、队列(`Queue<T>`)、栈(`Stack<T>`)和字典(`Dictionary<TKey,TValue>`)等,这些集合类型帮助开发者更高效地管理数据集合。 2. 异常处理 C#通过异常处理机制为开发者提供了处理程序运行时错误的方法。异常可以在检测到错误时抛出,并且在程序的其他部分捕获和处理。 3. Lambda表达式与LINQ Lambda表达式提供了一种简洁的定义匿名方法的方式,它们在C#的许多高级特性中都有应用。 LINQ(语言集成查询)是C#的一个强大特性,它提供了一种一致的方法来查询和处理数据,无论数据是存储在数据库中、XML文件中还是内存中的集合。 五、并发编程 在多核处理器时代,并发编程变得异常重要。C#通过多种方式支持并发编程,例如提供线程的基础操作、线程池和任务并行库(TPL)等。 任务并行库简化了并行编程,它允许开发者轻松地执行并行任务和并行化循环操作。异步编程是C#的另一个重要特性,特别是async和await关键字的引入,它们使得异步代码的编写更加直观和简洁。 此外,C#还支持并发集合和原子操作,这些是实现线程安全集合和高效同步机制的重要工具。 总结而言,C#语言结合了面向对象的强大功能和现代编程语言的许多便捷特性,使其在各种类型的软件开发中成为了一个非常流行和实用的选择。通过不断学习和实践C#语言的基础和高级特性,开发者能够有效地创建各种高性能的应用程序。
recommend-type

深度解析TwinCAT ADS通信机制:从端口分配到路由建立的5个关键步骤

# TwinCAT ADS通信机制深度解析:从基础协议到安全优化 在现代工业自动化系统中,设备间的高效、可靠通信是实现智能制造的核心。而 Beckhoff 的 **TwinCAT ADS(Automation Device Specification)** 协议,正是这一领域的标杆技术之一。它不仅支撑着 PLC 与上位机之间的实时数据交互,更以其高度灵活的架构和强大的扩展能力,成为构建复杂控制系统的关键纽带。 你有没有遇到过这样的场景? 👉 上位机 HMI 显示的数据总是“慢半拍”; 👉 多轴运动控制时轨迹偏差大,调试无从下手; 👉 系统上线后偶发性断连,日志却查不出原因
recommend-type

kotlin怎么实现左滑删除列表中的一项数据

### 在 Kotlin 中实现 RecyclerView 左滑删除功能 要在 Kotlin 中实现 RecyclerView 的左滑删除功能,可以通过继承 `ItemTouchHelper.Callback` 类并重写相关方法来完成。以下是完整的实现步骤和代码示例: #### 1. 添加依赖 首先确保在项目的 `build.gradle` 文件中添加了 RecyclerView 的支持库: ```gradle dependencies { implementation 'androidx.recyclerview:recyclerview:1.2.1' } ``` #### 2
recommend-type

赵致琢教授探讨中国计算机科学教育的发展策略

资源摘要信息:《中国计算机科学专业教育发展道路的思考》 知识点一:计算机科学教育的发展与挑战 随着计算机科学的飞速发展,学科专业办学面临诸多挑战。例如,计算机科学从“前科学”时代向成熟学科的过渡使得学科知识体系日渐庞大且复杂。这要求高校在计算机科学教育过程中采用更加合理和科学的办学策略,适应社会多样化的需求。 知识点二:分层次分类办学的策略 报告提出了分层次分类办学作为应对当前教育挑战的关键策略。这一策略涉及在研究生教育和本科教育中设立不同的培养目标和课程体系,以培养不同类型的计算机科学人才,如创新人才、应用技术开发人才和职业技术人才。 知识点三:学科专业教育的重新定位 高等教育中存在办学定位模糊的问题,导致教育资源分配不合理。因此,赵教授建议高校需要明确自身定位,根据学科专业教学的要求,分类开展教学活动,避免盲目追求规模扩大而忽视教育质量。 知识点四:专业认证的重要性 赵教授强调专业认证的重要性。通过专业认证体系,可以保证教育质量,确保培养的人才满足社会的需求和标准,从而提升学科专业的社会认可度。 知识点五:教学改革实践经验 厦门大学在计算机科学本科教学改革方面提供了实践经验。例如,通过强化数学基础和增加实践课程的比重,厦门大学成功地提升了教育质量,并取得了显著成效。这些经验对其他高校具有借鉴意义。 知识点六:教育改革的本土化与国际合作 赵教授指出,中国高等教育改革应该立足本土文化,借鉴国外的先进经验和教育理念。通过校际协作,可以提升师资水平,推动教育质量的整体提升。这表明国际交流与合作对于学科建设与教学改革具有重要意义。 知识点七:构建学科人才培养的科学体系 为应对教育挑战,需要全面建立学科人才培养的科学体系,包括科学理论体系、示范教育基地和质量保障体系。这三个体系是确保教育质量和可持续发展的基础。 知识点八:问题根源的深入分析 报告进一步分析了当前计算机科学教育问题的根源,包括宏观决策上的缺失、微观运行中的混乱以及外部环境问题。这些问题导致了教育资源配置的不合理和教学效率的低下。 知识点九:师资队伍建设的重要性 赵教授提到,当前师资队伍存在不足,大部分高校需要提升师资的起点和质量。师资队伍的建设是提高教育质量的关键,需要从选拔、培养到评价等多方面进行系统的改革和创新。 知识点十:对未来的展望与选择 在总结前人经验和分析现状的基础上,赵教授呼吁对高等教育和科学技术未来的发展道路做出正确的选择,强调科学、理智和质量的重要性,并强调了中国高等教育改革需要立足本国传统文化根基,同时借鉴国外先进经验,进行系统的变革。 综合以上内容,赵致琢教授的报告不仅深入分析了当前中国计算机科学专业教育所面临的挑战,还提出了具有实践价值的应对策略,强调了教育改革的必要性和紧迫性。报告内容丰富,为当前和未来的计算机科学教育提供了宝贵的参考和指导。
recommend-type

定时器溢出与更新中断联动:构建完整呼吸周期状态机的6阶段控制模型

# 呼吸节律的嵌入式艺术:从心跳到生命律动 在ICU病房里,一台呼吸机正发出轻微而规律的“嘶—呼”声。这不仅是机械气流的进出,更是一场精密的生命维持仪式——每一次吸气与呼气之间,都藏着成千上万次定时器中断、状态跃迁和算法调控的协同舞蹈。而在这一切背后,是一个看似简单却极其关键的技术支点:**如何用一个微控制器的“心跳”,去模拟人类最自然的生命律动?** 这不是普通的延时控制问题,也不是简单的PWM输出任务。这是关于时间精度、生理贴合度与系统鲁棒性的综合挑战。当患者肺部因ARDS(急性呼吸窘迫综合征)变得脆弱时,哪怕10ms的时间偏差或5cmH₂O的压力波动,都可能造成不可逆损伤。因此,现代