RocketMQ应用
一、普通消息发送
①. 同步发送消息
- 同步发送消息是指,Producer发出⼀条消息后,会在收到MQ返回的ACK之后才发下⼀条消息。该方式的消息可靠性最高,但消息发送效率太低。




②. 异步发送消息
- 异步发送消息是指,Producer发出消息后无需等待MQ返回ACK,直接发送下⼀条消息。该方式的消息可靠性可以得到保障,消息发送效率也可以。




③. 单向发送消息
单向发送消息是指,Producer仅负责发送消息,不等待、不处理MQ的ACK。该发送方式时MQ也不返回ACK。该方式的消息发送效率最高,但消息可靠性较差。




二、顺序消息
①. 什么是顺序消息
- 顺序消息指的是,严格按照消息的发送顺序进行消费的消息(FIFO)。
- 默认情况下生产者会把消息以Round Robin轮询方式发送到不同的Queue分区队列;而消费消息时会从多个Queue上拉取消息,这种情况下的发送和消费是不能保证顺序的。如果将消息仅发送到同一个Queue中,消费时也只从这个Queue上拉取消息,就严格保证了消息的顺序性。
②. 为什么需要顺序消息
- 例如,现在有TOPIC ORDER_STATUS (订单状态),其下有4个Queue队列,该Topic中的不同消息用于描述当前订单的不同状态。假设订单有状态:未支付、已支付、发货中、发货成功、发货失败。
- 根据以上订单状态,生产者从时序上可以生成如下几个消息:订单T0000001:未支付 --> 订单T0000001:已支付 --> 订单T0000001:发货中 --> 订单T0000001:发货失败
- 消息发送到MQ中之后,Queue的选择如果采用轮询策略,消息在MQ的存储可能如下:

- 这种情况下,我们希望Consumer消费消息的顺序和我们发送是一致的,然而上述MQ的投递和消费方式,我们无法保证顺序是正确的。对于顺序异常的消息,Consumer即使设置有一定的状态容错,也不能完全处理好这么多种随机出现组合情况。
- 基于上述的情况,可以设计如下方案:对于相同订单号的消息,通过一定的策略,将其放置在一个
Queue中,然后消费者再采用一定的策略(例如,一个线程独立处理一个queue,保证处理消息的顺序性),能够保证消费的顺序性。
③. 有序性分类
- 根据有序范围的不同,RocketMQ可以严格地保证两种消息的有序性:分区有序与全局有序。

- 当发送和消费参与的Queue只有一个时所保证的有序是整个Topic中消息的顺序, 称为全局有序。 在创建Topic时指定Queue的数量。
- 有三种指定方式:
- 1)在代码中创建Producer时,可以指定其自动创建的Topic的Queue数量
- 2)在RocketMQ可视化控制台中手动创建Topic时指定Queue数量
- 3)使用mqadmin命令手动创建Topic时指定Queue数量

- 如果有多个Queue参与,其仅可保证在该Queue分区队列上的消息顺序,则称为分区有序。
- 如何实现Queue的选择?在定义Producer时我们可以指定消息队列选择器,而这个选择器是我们 自己实现了MessageQueueSelector接口定义的。 在定义选择器的选择算法时,一般需要使用选择key。这个选择key可以是消息key也可以是其它数据。但无论谁做选择key,都不能重复,都是唯一的。
- 一般性的选择算法是,让选择key(或其hash值)与该Topic所包含的Queue的数量取模,其结果 即为选择出的Queue的QueueId。 取模算法存在一个问题:不同选择key与Queue数量取模结果可能会是相同的,即不同选择key的 消息可能会出现在相同的Queue,即同一个Consuemr可能会消费到不同选择key的消息。这个问 题如何解决?一般性的作法是,从消息中获取到选择key,对其进行判断。若是当前Consumer需 要消费的消息,则直接消费,否则,什么也不做。这种做法要求选择key要能够随着消息一起被Consumer获取到。此时使用消息key作为选择key是比较好的做法。
- 以上做法会不会出现如下新的问题呢?不属于那个Consumer的消息被拉取走了,那么应该消费 该消息的Consumer是否还能再消费该消息呢?同一个Queue中的消息不可能被同一个Group中的 不同Consumer同时消费。所以,消费现一个Queue的不同选择key的消息的Consumer一定属于不
- 同的Group。而不同的Group中的Consumer间的消费是相互隔离的,互不影响的。



三、延时消息
①. 什么是延时消息
- 当消息写入到Broker后,在指定的时长后才可被消费处理的消息,称为延时消息。
- 采用RocketMQ的延时消息可以实现定时任务的功能,而无需使用定时器。典型的应用场景是,电商易中超时未支付关闭订单的场景,12306平台订票超时未支付取消订票的场景。
②. 延时等级
- 延时消息的延迟时长不支持随意时长的延迟,是通过特定的延迟等级来指定的。延时等级定义在
RocketMQ服务端的MessageStoreConfig类中的如下变量中:

- 即,若指定的延时等级为3,则表示延迟时长为10s,即延迟等级是从1开始计数的。
- 当然,如果需要自定义的延时等级,可以通过在broker加载的配置中新增如下配置(例如下面增加了1天这个等级1d)。配置文件在RocketMQ安装目录下的conf目录中。
③. 延时消息实现原理
- Producer将消息发送到Broker后,Broker会首先将消息写入到commitlog文件,然后需要将其分发到相应的consumequeue。不过,在分发之前,系统会先判断消息中是否带有延时等级。若没有,则直接正常分发;若有则需要经历一个复杂的过程。



四、事务消息

②. 消息回查

- 消息回查,即重新查询本地事务的执行状态。本例就是重新到DB中查看预扣款操作是否执行成功。注意,消息回查不是重新执行回调操作。回调操作是进行预扣款操作,而消息回查则是查看预 扣款操作执行的结果。 引发消息回查的原因最常见的有两个:
- 1)回调操作返回UNKNWON
- 2)TC没有接收到TM的最终全局事务确认指令
③. XA协议
- XA(Unix Transaction)是一种分布式事务解决方案,一种分布式事务处理模式,是基于XA协议的。XA协议由Tuxedo(Transaction for Unix has been Extended for Distributed Operation,分布式操作扩展之后的Unix事务系统)首先提出的,并交给X/Open组织,作为资源管理器与事务管理器的接口标准。
XA模式中有三个重要组件:TC、TM、RM。
- TC (Transaction Coordinator),事务协调者。维护全局和分支事务的状态,驱动全局事务提交或回滚。RocketMQ中Broker充当着TC。
- TM (Transaction Manager),事务管理器。定义全局事务的范围:开始全局事务、提交或回滚全局事务。它实际是全局事务的发起者。RocketMQ中事务消息的Producer充当着TM。
- RM (Resource Manager),资源管理器。管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。RocketMQ中事务消息的Producer及Broker均是RM。
>XA模式是一个典型的2PC,其执行原理如下:
- TM向TC发起指令,开启一个全局事务。
- 根据业务要求,各个RM会逐个向TC注册分支事务,然后TC会逐个向RM发出预执行指令。
- 各个RM在接收到指令后会在进行本地事务预执行。
- RM将预执行结果Report给TC。当然,这个结果可能是成功,也可能是失败。
- TC在接收到各个RM的Report后会将汇总结果上报给TM,根据汇总结果TM会向TC发出确认指
令。- 若所有结果都是成功响应,则向TC发送Global Commit指令。
- 只要有结果是失败响应,则向TC发送Global Rollback指令。
- TC在接收到指令后再次向RM发送确认指令。
④. 事务执行状态




五、批量消息
①. 发送限制
- 生产者进行消息发送时可以一次发送多条消息,这可以大大提升Producer的发送效率。不过需要注意以下几点:
- 批量发送的消息必须具有相同的Topic
- 批量发送的消息必须具有相同的刷盘策略
- 批量发送的消息不能是延时消息与事务消息
②. 批量发送大小
- 默认情况下,一批发送的消息总大小不能超过4MB字节。如果想超出该值,有两种解决方案:
- 方案一:将批量消息进行拆分,拆分为若干不大于4M的消息集合分多次批量发送
- 方案二:在Producer端与Broker端修改属性



六、消息过滤
①. Tag过滤
- 通过consumer的subscribe()方法指定要订阅消息的Tag。如果订阅多个Tag的消息,Tag间使用或运算符(双竖线||)连接。



②. SQL过滤
- SQL过滤是一种通过特定表达式对事先埋入到消息中的用户属性进行筛选过滤的方式。通过SQL过滤,可以实现对消息的复杂过滤。不过,只有使用PUSH模式的消费者才能使用SQL过滤。
SQL过滤表达式中支持多种常量类型与运算符。
- 支持的常量类型:
- 数值:比如:123,3.1415
- 字符:必须用单引号包裹起来,比如:‘abc’
- 布尔:TRUE 或 FALSE NULL:特殊的常量,表示空
支持的运算符有:
-
数值比较:>,>=,<,<=,BETWEEN,=
-
字符比较:=,<>,IN
-
逻辑运算 :AND,OR,NOT NULL判断:IS NULL 或者 IS NOT NULL
-
默认情况下Broker没有开启消息的SQL过滤功能,需要在Broker加载的配置文件中添加如下属性,以开启该功能:enablePropertyFilter = true

七、消息发送重试机制
- Producer对发送失败的消息进行重新发送的机制,称为消息发送重试机制,也称为消息重投机制。
对于消息重投,需要注意以下几点:
- 生产者在发送消息时,若采用同步或异步发送方式,发送失败会重试,但oneway消息发送方式
发送失败是没有重试机制的 - 只有普通消息具有发送重试机制,顺序消息是没有的
- 消息重投机制可以保证消息尽可能发送成功、不丢失,但可能会造成消息重复。消息重复在
RocketMQ中是无法避免的问题 - 消息重复在一般情况下不会发生,当出现消息量大、网络抖动,消息重复就会成为大概率事件
producer主动重发、consumer负载变化(发生Rebalance,不会导致消息重复,但可能出现重复
消费)也会导致重复消息
消息重复无法避免,但要避免消息的重复消费。 - 避免消息重复消费的解决方案是,为消息添加唯一标识(例如消息key),使消费者对消息进行消费判断来避免重复消费
- 消息发送重试有三种策略可以选择:同步发送失败策略、异步发送失败策略、消息刷盘失败策略。
①. 同步发送失败策略
- 对于普通消息,消息发送默认采用round-robin策略来选择所发送到的队列。如果发送失败,默认重试2次。但在重试时是不会选择上次发送失败的Broker,而是选择其它Broker。当然,若只有一个Broker其也只能发送到该Broker,但其会尽量发送到该Broker上的其它Queue。

②. 异步发送失败策略
- 异步发送失败重试时,异步重试不会选择其他broker,仅在同一个broker上做重试,所以该策略无法保证消息不丢。

③. 消息刷盘失败策略
- 消息刷盘超时(Master或Slave)或slave不可用(slave在做数据同步时向master返回状态不是
SEND_OK)时,默认是不会将消息尝试发送到其他Broker的。不过,对于重要消息可以通过在Broke的配置文件设置retryAnotherBrokerWhenNotStoreOK属性为true来开启。
八、消息消费重试机制
①. 顺序消息的消费重试
- 对于顺序消息,当Consumer消费消息失败后,为了保证消息的顺序性,其会自动不断地进行消息重试,直到消费成功。消费重试默认间隔时间为1000毫秒。重试期间应用会出现消息消费被阻塞的情况。

②. 无序消息的消费重试
- 对于无序消息(普通消息、延时消息、事务消息),当Consumer消费消息失败时,可以通过设置返回状态达到消息重试的效果。不过需要注意,无序消息的重试只对集群消费方式生效,广播消费方式不提供失败重试特性。即对于广播消费,消费失败后,失败消息不再重试,继续消费后续消息。
③. 消费重试次数与间隔
- 对于无序消息集群消费下的重试消费,每条消息默认最多重试16次,但每次重试的间隔时间是不同的,会逐渐变长。每次重试的间隔时间如下表。

④. 重试队列
- 对于需要重试消费的消息,并不是Consumer在等待了指定时长后再次去拉取原来的消息进行消费,而是将这些需要重试消费的消息放入到了一个特殊Topic的队列中,而后进行再次消费的。这个特殊的队列就是重试队列。
- 当出现需要进行重试消费的消息时,Broker会为每个消费组都设置一个Topic名称为%RETRY%consumerGroup@consumerGroup 的重试队列。
⑤. 消费重试配置方式

- 集群消费方式下,消息消费失败后若希望消费重试,则需要在消息监听器接口的实现中明确进行如下三种方式之一的配置:
- 方式1:返回ConsumeConcurrentlyStatus.RECONSUME_LATER(推荐)
- 方式2:返回Null
- 方式3:抛出异常
⑥. 消费不重试配置方式

- 集群消费方式下,消息消费失败后若不希望消费重试,则在捕获到异常后同样也返回与消费成功后的相同的结果,即ConsumeConcurrentlyStatus.CONSUME_SUCCESS,则不进行消费重试。
本文详细介绍了RocketMQ的各种消息发送方式,包括同步、异步和单向发送,以及顺序消息的实现原理和应用场景。此外,探讨了延时消息的延迟等级和实现机制,事务消息的回查、XA协议和执行状态。还阐述了批量消息的限制与发送大小,消息过滤的Tag和SQL过滤策略,以及消息发送和消费的重试机制。最后,讨论了消费重试的配置方式,包括顺序消息和无序消息的处理策略。
892

被折叠的 条评论
为什么被折叠?



