如何在 Spring Boot 中实现 RabbitMQ 的手动 ACK 确认机制?
时间: 2025-12-03 19:49:09 AIGC 浏览: 14
在 Spring Boot 中使用 RabbitMQ 时,手动 ACK(确认)机制可以确保消息被消费者成功处理后才从队列中移除,避免因消费者异常导致消息丢失。默认情况下,Spring AMQP 使用自动 ACK 模式,但为了实现更高的可靠性,通常需要开启**手动 ACK**。
下面是如何在 Spring Boot 中配置和使用 RabbitMQ 手动 ACK 的完整示例。
---
### ✅ 实现步骤
1. 添加依赖
2. 配置 `application.yml` 或 `application.properties`
3. 配置 RabbitMQ 相关 Bean(队列、交换机、绑定等)
4. 配置监听容器工厂以启用手动 ACK
5. 在消费者中使用 `Channel` 和 `@RabbitListener` 进行手动确认或拒绝
---
### 1. Maven 依赖(`pom.xml`)
```xml
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>
```
---
### 2. 配置文件 `application.yml`
```yaml
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
listener:
simple:
acknowledge-mode: manual # 关键:设置为手动ACK
concurrency: 1 # 初始消费者数量
max-concurrency: 5 # 最大并发消费者数
```
> 注意:`acknowledge-mode: manual` 是开启手动确认的关键。
---
### 3. RabbitMQ 配置类
```java
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig {
public static final String QUEUE_NAME = "manual_ack_queue";
public static final String EXCHANGE_NAME = "manual_ack_exchange";
public static final String ROUTING_KEY = "manual.routing.key";
@Bean
public Queue queue() {
return new Queue(QUEUE_NAME, true); // durable: true
}
@Bean
public TopicExchange exchange() {
return new TopicExchange(EXCHANGE_NAME, true, false);
}
@Bean
public Binding binding(Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);
}
}
```
---
### 4. 配置监听容器工厂(支持手动 ACK)
```java
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitListenerConfig {
@Bean
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(RabbitConfig.QUEUE_NAME);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 手动ACK
return container;
}
}
```
> 注意:虽然 `@RabbitListener` 注解方式不需要显式创建容器,但如果要自定义行为(如重试、并发),建议配置 `SimpleRabbitListenerContainerFactory`。
#### 推荐方式:使用 `SimpleRabbitListenerContainerFactory`
```java
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler;
import org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.ErrorHandler;
@Configuration
public class RabbitListenerConfig {
@Autowired
private ConnectionFactory connectionFactory;
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 核心:手动确认
factory.setErrorHandler(new MyErrorHandler()); // 自定义错误处理器
return factory;
}
// 自定义错误处理器,防止因异常导致消息无限重入队列
public static class MyErrorHandler implements ErrorHandler {
@Override
public void handleError(Throwable t) {
if (t instanceof ListenerExecutionFailedException) {
System.err.println("消息处理失败:" + t.getMessage());
// 可记录日志、发送告警,不要抛出异常以免无限重试
}
}
}
}
```
---
### 5. 消费者代码(手动 ACK 示例)
```java
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
public class ManualAckConsumer {
@RabbitListener(queues = RabbitConfig.QUEUE_NAME)
public void listen(Message message, Channel channel) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
// 模拟业务逻辑
String msg = new String(message.getBody());
System.out.println("收到消息: " + msg);
// 模拟处理成功
if (!msg.contains("fail")) {
channel.basicAck(deliveryTag, false); // 确认消息
System.out.println("消息已确认");
} else {
// 处理失败,拒绝消息,并不重新入队(防止死循环)
channel.basicNack(deliveryTag, false, false); // multiple=false, requeue=false
System.out.println("消息已拒绝,不重新入队");
}
} catch (Exception e) {
// 发生异常时拒绝消息
System.err.println("处理异常: " + e.getMessage());
channel.basicNack(deliveryTag, false, false); // 不重入队列
}
}
}
```
---
### 6. 生产者测试代码(可选)
```java
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class MessageController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/send/{msg}")
public String sendMessage(@PathVariable String msg) {
rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, RabbitConfig.ROUTING_KEY, msg);
return "消息已发送: " + msg;
}
}
```
---
### 📌 关键点解释
- `AcknowledgeMode.MANUAL`:关闭自动确认。
- `channel.basicAck()`:成功处理后调用,告诉 RabbitMQ 删除该消息。
- `channel.basicNack(deliveryTag, false, false)`:
- 第三个参数 `requeue=false` 表示不要重新放回队列(防止重复消费造成雪崩)。
- 若设为 `true`,消息会重新入队,可能被再次消费。
- 异常处理要小心,避免无限重试。
---
### 🔁 延伸建议
- 结合 **死信队列(DLX)**:当 `basicNack(requeue=false)` 时,消息可路由到 DLX 做后续分析。
- 使用 `@RabbitListener` + `@SendTo` 实现 RPC 模式响应。
- 加入重试机制(如 Spring Retry)+ 指数退避。
---
###
阅读全文
