一、引言
RabbitMQ 是一个广泛应用于软件开发、数据传输、微服务等领域的高效、可靠的开源消息队列系统1。在分布式系统中,保证消息的可靠传递和幂等性是至关重要的,它能够确保系统在各种复杂情况下的稳定性和数据的准确性。
二、消息可靠性保障
(一)生产者端
- 发送方确认机制(Publisher Confirm)
- 原理:生产者发送消息后,MQ 会返回一个确认消息给生产者,告知消息是否被成功接收。通过在配置文件中开启
publisher - confirm - type
来启用该功能,一般设置为correlated
,表示成功发布消息到交换机后会触发回调方法。生产者可以为每个消息设置一个唯一的CorrelationData
作为消息的标识符,在回调方法中根据这个标识符来确定消息的发送结果。 - 示例配置:在 Spring Boot 项目的
application.yml
文件中配置如下:
- 原理:生产者发送消息后,MQ 会返回一个确认消息给生产者,告知消息是否被成功接收。通过在配置文件中开启
spring:
rabbitmq:
addresses: 127.0.0.1
host: 5672
username: guest
password: guest
virtual - host: /
# 开启消息确认
publisher - confirm - type: correlated
- 事务机制
- 原理:生产者将发送消息的操作放在一个事务中,如果消息发送过程出现异常,可以回滚事务,确保消息不会丢失或出现不一致的情况。然而,事务机制会阻塞生产者线程,严重影响性能,在生产环境中一般不建议使用4。
- 示例代码:在使用 RabbitMQ 的 Java 客户端时,可以通过以下方式开启事务:
channel.txSelect();
try {
// 发送消息的代码
channel.basicPublish(exchange, routingKey, body);
channel.txCommit();
} catch (Exception e) {
channel.txRollback();
}
- 失败重试机制2
- 自带重试机制:如果发送方一开始就连不上 MQ,Spring Boot 中利用 Spring 的
retry
机制来实现重试。可以在配置文件中配置相关参数,如重试起始间隔时间、最大重试次数、最大重试间隔时间和间隔时间乘数等。 - 示例配置:
- 自带重试机制:如果发送方一开始就连不上 MQ,Spring Boot 中利用 Spring 的
spring:
rabbitmq:
template:
retry:
initial - interval: 1000ms
max - attempts: 10
max - interval: 10000ms
multiplier: 2
- 业务重试:针对消息没有到达交换机的情况,在消息发送失败回调中进行处理。首先创建一张表记录发送到中间件的消息,包括消息的状态、第一次重试时间和重试次数等字段。在消息发送时往表中插入记录,在确认回调方法中根据消息的
msgId
更新消息状态。另外开启定时任务,定时检查状态为发送中且超过重试时间的消息,根据重试次数决定是否重新发送消息。
(二)MQ 中间件端
- 消息持久化
- 队列持久化:创建队列时将其设置为持久化,这样可以保证 Broker 在重启后队列的元数据不会丢失。在定义队列时,将
durable
参数设置为true
即可实现队列持久化。 - 消息持久化:将消息的
deliveryMode
设置为2
,可以将消息持久化到磁盘。这样只有消息成功持久化到磁盘之后,Broker 才会发送通知给生产者进行确认。
- 队列持久化:创建队列时将其设置为持久化,这样可以保证 Broker 在重启后队列的元数据不会丢失。在定义队列时,将
- 交换机持久化:创建交换机时设置为持久化,保证交换机在 Broker 重启后不会丢失。在定义交换机时,将
durable
参数设置为true
。 - 镜像队列:为了防止 MQ 服务器宕机或磁盘损坏导致消息丢失,可以引入镜像队列。镜像队列会将消息复制到多个节点上,即使某个节点出现故障,其他节点仍然可以提供服务,从而提高系统的可靠性。
(三)消费者端
- 消费者确认机制(Consumer Acknowledgement)
- 原理:当消费者处理消息结束后,需要向 RabbitMQ 发送一个回执,以告知消息的处理状态。ACK 表示消费者成功处理了消息,RabbitMQ 会从队列中删除该消息;NACK 表示消息处理失败,RabbitMQ 需要再次投递该消息;REJECT 表示消息处理失败并且被拒绝,RabbitMQ 会从队列中删除该消息,但一般很少使用 REJECT,通常只在消息格式存在问题时使用。
- 确认模式:RabbitMQ 支持三种不同的确认模式,通过
acknowledge - mode
属性进行配置。manual
模式下,消费者接收到消息后需要手动发送确认给发送者;auto
模式下,Spring AMQP 利用 AOP 对消息处理逻辑做环绕增强,业务正常执行时自动返回 ACK,出现异常时根据异常类型返回 NACK 或 REJECT;none
模式下,消费者接收到消息后不需要发送任何确认给发送者,这种模式无法保证消息的可靠性,一般不使用。
- 失败重试机制
- 本地重试:Spring 框架提供了消费者失败重试机制,在消费者出现异常时利用本地重试,而不是无限制地将消息重新入队到 MQ 队列。可以通过配置相关参数来控制重试的次数、间隔时间等。在达到最大重试次数后,Spring AMQP 会抛出
AmqpRejectAndDontRequeueException
异常,并将消息从队列中删除。 - 重试策略自定义:Spring AMQP 允许开发人员自定义重试次数耗尽后的消息处理策略,通过实现
MessageRecovery
接口来定义不同的策略,如RejectAndDontRequeueRecoverer
(重试次数耗尽后直接拒绝消息并丢弃)、ImmediateRequeueMessageRecoverer
(重试次数耗尽后返回 NACK 给生产者使消息重新入队)、RepublishMessageRecoverer
(重试次数耗尽后将失败消息投递到指定的交换机和队列中)。
- 本地重试:Spring 框架提供了消费者失败重试机制,在消费者出现异常时利用本地重试,而不是无限制地将消息重新入队到 MQ 队列。可以通过配置相关参数来控制重试的次数、间隔时间等。在达到最大重试次数后,Spring AMQP 会抛出
三、幂等性保障
- 通过唯一标识符保证操作的幂等性
- 原理:为每个操作生成唯一的标识符(如 ID),并在系统中跟踪这些标识符以检测重复操作。当接收到具有已知标识符的操作时,可以跳过重复的操作。Spring AMQP 的
MessageConverter
自带了MessageID
的功能,只要开启这个功能,就可以为每个消息生成唯一的 ID,也可以在业务中基于 ID 判断是否是重复消息。 - 示例代码:
- 原理:为每个操作生成唯一的标识符(如 ID),并在系统中跟踪这些标识符以检测重复操作。当接收到具有已知标识符的操作时,可以跳过重复的操作。Spring AMQP 的
@Bean
public MessageConverter messageConverter() {
// 定义消息转换器
Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();
// 配置自动创建消息ID,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
jjmc.setCreateMessageIds(true);
return jjmc;
}
- 通过业务判断保证操作的幂等性
- 基于数据库唯一约束:在数据库表中为相关业务字段设置唯一约束,例如在订单表中,以订单编号作为唯一键。当消费者接收到消息并处理业务时,将相关数据插入或更新到数据库中,如果出现唯一约束冲突,则说明该消息是重复的,直接返回成功即可,避免重复执行相同的业务逻辑。
- 利用 Redis 缓存:在消费者消费消息之前,先将消息的 ID 放到 Redis 中。可以使用
SETNX
命令(SET if Not eXists
)来设置键值对,如果键已经存在,说明之前有人消费过该消息。可以根据键对应的值来判断消息的处理状态,如0
表示正在处理或处理失败,1
表示处理成功。当消息成功消费之后,将 ID 对应的值设置为1
。为了防止出现死锁等情况,可以给键设置一个生存时间。
四、总结
通过在生产者端、MQ 中间件端和消费者端采取一系列的措施,RabbitMQ 可以有效地保证消息的可靠性和幂等性。在实际应用中,需要根据具体的业务场景和需求,合理地配置和使用这些功能,以确保系统的稳定性和数据的一致性。同时,还需要注意一些性能方面的问题,例如事务机制对性能的影响、重试机制可能导致的资源消耗等,在保证系统可靠性的前提下,尽可能地提高系统的性能和效率。