1、生产者
生产者和主题之间存在多对多关系。一个生产者可以向多个主题发送消息,一个主题可以接收来自多个生产者的消息。这种多对多关系有助于提高性能扩展和灾难恢复能力。
2、消费者以及消费者组
一个队列可以被多个消费者(其中这多个消费者必须分别属于不同消费者组)消费,也就说,如果只有一个消费者组,一个队列只能被一个消费者消费。一个消费者可以消费多个队列。
举个例子就是,一个主题里面有10个队列,消费者组A中有5个消费者,一个消费者负责消费2个消息队列,实现负载均衡。消费者组B中有2消费者,其中一个消费者负责5个消息队列的消费。
在业务处理上,一个消费组中消费者们通常只针对一种Topic类型进行消费,于是有了消费者组订阅Topic这一说法。
消费者组同一时刻只能消费一个Topic的消息,不能同时消费多个Topic消息。
一个消费者组中的消费者必须订阅完全相同的Topic。
当然现在我们说得是,消费颗粒度为队列,一个消费组的消费者数量应该小于等于Topic中的队列数量;当消费颗粒度为消息时,则不存在这种限制。
3、生产者组
生产者组中的生产者在同一时刻,同时生产相同类型的Topic消息,需要注意得是,生产者组中的生产者也可能都会生产另外一种Topic消息,所以,在下一个时刻,可能会同时生产另外一种类型的Topic消息。
4、队列
也叫分区(Partition),一个主题可以有多个队列,每个队列可以存放多个消息。
5、主题与分片(Sharding)以及broker
6、消息标识
RocketMQ中每个消息拥有唯一的MessageId,且可以携带具有业务标识的key,以方便对消息的查询。
不过需要注意的是,MessageId有两个:
在生产者 send()消息时会自动生成一个MessageId(msgId);
当消息到达Broker后,Broker也会自动生成一个MessageId(offsetMsgId)。msgId、offsetMsgId与key都
称为消息标识。
msgId:由producer端生成,其生成规则为:producerIp + 进程pid + MessageClientIDSetter类的ClassLoader的hashCode + 当前时间 + AutomicInteger自增计数器
offsetMsgId:由broker端生成,其生成规则为:brokerIp + 物理分区的offset(Queue中的
偏移量)
key:由用户指定的业务相关的唯一标识
7、系统架构
7.1 深入理解NameServer以及Broker
是一个Broker服务与Topic路由的注册中心,支持Broker的动态注册。
Broker是服务,NameServer是服务注册中心。
明晰几个概念
1、服务(路由)注册:
NameServer通常也是以集群的方式部署,不过,NameServer是无状态、独立的,即NameServer集群中的各个节点间是无差异的,各节点间相互不进行信息通讯。
那各节点中的数据是如何进行数据同步的呢?在
Broker节点启动时,轮询NameServer列表,与每个NameServer节点建立长连接,发起注册请求。
NameServer 就是 RocketMQ 的“路由中介”——Broker 向它注册,Producer/Consumer (客户端)向它查询,整个系统才能协调运行。
2、心跳检测
NameServer 定期接收 Broker 心跳,感知 Broker 是否存活
3、路由管理
每个 NameServer 会在本地内存中维护一份独立的“Broker 路由表”,包括 每一个Broker 的地址、角色(Master/Slave)、所管理的 Topic 和队列等信息,Broker 会定时通过心跳机制向所有 NameServer 注册和更新这些信息。
4、路由发现
Producer 和 Consumer(客户端) 需要从 NameServer 获取 Topic 路由信息
5、路由信息
路由信息是指Topic → Queue → Broker 的映射关系,也就是客户端如何找到某个 Topic 的写入或读取地址。
举个例子:什么叫“路由信息”?
假设你有一个 Topic 叫 OrderTopic
,它有 6 个队列,分布在两个 Broker 上:
Broker 名 | IP 地址 | Topic | 队列编号 |
---|---|---|---|
BrokerA | 192.168.1.10 | OrderTopic | Queue 0, 1, 2 |
BrokerB | 192.168.1.11 | OrderTopic | Queue 3, 4, 5 |
那么路由信息长这样(伪 JSON):
{
"OrderTopic": {
"queues": [
{"queueId": 0, "broker": "BrokerA", "readable": true, "writable": true},
{"queueId": 1, "broker": "BrokerA", "readable": true, "writable": true},
{"queueId": 2, "broker": "BrokerA", "readable": true, "writable": true},
{"queueId": 3, "broker": "BrokerB", "readable": true, "writable": true},
{"queueId": 4, "broker": "BrokerB", "readable": true, "writable": true},
{"queueId": 5, "broker": "BrokerB", "readable": true, "writable": true}
],
"brokers": {
"BrokerA": "192.168.1.10:10911",
"BrokerB": "192.168.1.11:10911"
}
}
}
5、拉取(pull)与长轮询(Long Polling)以及推送(push)区别
5.1、 Pull 模式(短轮询)
消费者主动、定期向 Broker 拉取消息。如果没有消息,返回空,白跑一趟。
5.2、Push 模式(推送)
Broker 检测到有新消息时,主动“推送”给消费者。
5.3、 长轮询
是 Pull 的一种优化 —— 当 Broker 没有新消息时,不立即返回,而是阻塞一段时间(如15秒)等待消息到来再返回。RocketMQ 默认开启,消费者如果用 Pull 模式就是长轮询
6、生产者选择NameServer的默认策略
6.1 支持配置多个 NameServer 地址,例如:
producer.setNamesrvAddr("192.168.0.1:9876;192.168.0.2:9876");
6.2 客户端随机选择其中一个 NameServer 发起请求,如果失败,再尝试下一个。
7、Producer与Consumer的一个区别
Consumer 在运行过程中,会定期向它消费的 Broker 发送心跳包,用于维持连接、通报存活、参与消费队列的负载均衡。
8、Broker节点
每一个相同名字的Broker是一个主备集群,即集群中具有Master与Slave两种角色。Master负责处理读写操作请求,Slave负责对Master中的数据进行备份。当Master挂掉了,Slave则会自动切换为Master去工作。每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有NameServer。
7.2 系统架构工作流程
一、组件划分(图中 4 大部分)
-
Producer Cluster(生产者集群)
- Producer1、2、3:用于发送消息
- 与 NameServer 和 Broker 打交道
-
NameServer Cluster(路由发现中心)
- NameServer1、2、3:存储路由信息,多个节点无主从、彼此独立
- 提供路由信息给 Producer 和 Consumer
-
Broker Cluster(消息存储集群)
- 包含 Master/Slave 结构
- Broker Master1 ←→ Broker Slave1(同步数据)
- Broker Master2 ←→ Broker Slave2
-
Consumer Cluster(消费者集群)
- Consumer1、2、3:用于接收消息
- 向 NameServer 发现 Broker,再与 Broker 通信获取消息
二、整体工作流程(详细步骤)
【阶段一】Broker 启动 → 注册到 NameServer(Broker Discovery)
-
Broker Master / Slave 启动;
-
每个 Broker 会定时向所有 NameServer 发送注册请求(心跳包);
-
注册内容包括:
- Broker 名称 / ID / IP / 端口
- 所管理的 Topic、队列数等元数据
-
每个 NameServer 会将 Broker 信息存入本地内存,供后续路由查询使用;
-
Slave Broker 不直接供 Producer 写入消息,只做数据同步备份。
【阶段二】Producer 启动 → 查询路由信息
发送消息前,可以先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,当然,在创建Topic时也会将Topic与Broker的关系写入到NameServer中。不过,这步是可选的,也可以在发送消
息时自动创建Topic。
-
Producer 启动时,会连接 NameServer 集群(可配置多个);
-
通过调用
getTopicRouteInfoFromNameServer
请求某个 Topic 的路由信息; -
NameServer 返回:
- 包含该 Topic 的所有 Queue 列表
- 每个 Queue 所在的 Broker 地址
-
Producer 将路由结果缓存在本地内存中,后续根据策略选择某个队列发送消息(如轮询、哈希等)。
【阶段三】Producer → 向 Broker 发送消息
-
Producer 根据路由信息,选定某个 Broker 的某个队列;
-
将消息通过网络发送到该 Broker(通常是 Master);
-
Broker 接收到消息后,会:
- 写入磁盘(顺序写入 CommitLog)
- 异步或同步复制到 Slave(依据主从模式配置)
- 返回发送确认给 Producer
【阶段四】Consumer 启动 → 查询路由 + 拉取消息
- Consumer 启动时也会连接 NameServer,获取感兴趣的 Topic 路由信息;
- 然后通过负载均衡算法(如 AllocateMessageQueueAveragely)决定拉取哪些队列;
- Consumer 建立与 Broker 的长连接,周期性从对应队列中拉取消息;
- 消息拉取成功后,执行消费逻辑(可并发处理),并更新消费位点(offset);
- 若启用广播模式,则每个 Consumer 都会消费所有消息。
【阶段五】数据同步与容灾
-
Broker Master 和 Slave 之间保持数据同步:
- 同步模式(SYNC):发送消息后等待 Slave 同步完成再确认
- 异步模式(ASYNC):发送即成功,后台异步复制
-
如果 Master 宕机,Slave 可切换为读模式,或者通过主从切换手动恢复