第9章 RabbitMQ高阶
默认情况下,在内存到达内存阈值的50%时会进行换页动作。也就是说,在默认的内存阈值为0.4的情况下,当内存超过0.4*0.5=0.2时会进行换页动作。可以通过在配置文件中配置vm_memory_high_watermark_paging_ratio项来修改此值。
9.2.2 磁盘告警
当剩余磁盘空间低于确定的阈值时,RabbitMQ同样会阻塞生产者,这样可以避免因非持久化的消息持续换页而耗尽磁盘空间导致服务崩溃。默认情况下,磁盘阈值为50MB,这意味着当磁盘剩余空间低于50MB时会阻塞生产者并停止内存中消息的换页动作。这个阈值的设置可以减小但不能完全消除因磁盘耗尽而导致崩溃的可能性,比如在两次磁盘空间检测期间内,磁盘空间从大于50MB被耗尽到0MB。一个相对谨慎的做法是将磁盘阈值设置为与操作系统所显示的内存大小一致。
RabbitMQ会定期检测磁盘剩余空间,检测的频率与上一次执行检测到的磁盘剩余空间大小有关。正常情况下,每10秒执行一次检测,随着磁盘剩余空间与磁盘阈值的接近,检测频率会有所增加。当要到达磁盘阈值时,检测频率为每秒10次,这样有可能会增加系统的负载。
可以通过在配置文件中配置disk_free_limit项来设置磁盘阈值。
mem_relative可以参考机器内存的大小为磁盘阈值设置一个相对的比值。
9.3 流控
内存和磁盘告警相当于全局的流控(Global Flow Control),一旦触发会阻塞集群中所有的Connection,而流控时针对单个Connection的,可以称之为Per-Connection Flow Control 或者 Internal Flow Control。
9.3.1 流控的原理
Erlang进程之间并不共享内存(binary类型的除外),而是通过消息传递通信,每个进程都有自己的进程邮箱(mailbox)。默认情况下,Erlang并没有对进程邮箱的大小进行限制,所以当有大量消息持续发往某个进程时,会导致该进程邮箱过大,最终内存溢出并崩溃。在RabbitMQ中,如果生产者持续高速发送,而消费者消费速度较低时,如果没有流控,很快就会使内部进程邮箱的大小达到内存阈值。
RabbitMQ使用了一种基于信用证算法(credit-based algorithm)的流控机制来限制发送消息的速率以解决前面所提出的问题。
处于flow状态的Connection和处于running状态的Connection并没有什么不同,这个状态只是告诉系统管理员相应的发送速率受限了。
流控机制不只是作用于Connection,同样作用于信道(Channel)和队列。从Connection到Channel,再到队列,最后时消息持久化存储形成一个完整的流控链,对于处于整个流控链中的任意进程,只要该进程阻塞,上游的进程必定全部被阻塞。也就是说,如果某个进程达到性能瓶颈,必然会导致上游所有的进程被阻塞。所以我们可以利用流控机制的这个特点找出瓶颈之所在。处理消息的几个关键进程及其对应的顺序关系如图9-6所示。

其中的各个进程如下所述。
rabbit_reader:
rabbit_channel:
rabbit_amqqueue_process:
rabbit_msg_store:
9.3.2 案例:打破队列的瓶颈
图9-6中描绘了一条消息从接收到存储的一个必需流控链。一般情况下,向一个队列里推送消息时,往往会在rabbit_amqqueue_process中(即队列进程中)产生性能瓶颈。在向一个队列中快速发送消息的时候,Connection和Channel都会处于flow状态,而队列处于running状态,可以分析得出在队列进程中产生性能瓶颈的结论。
这就引入了本节所要解决的问题:如何提升队列的性能?一般可以有两种解决方案:第一种是开启Erlang语言的HiPE功能,这样保守估计可以提高30%~40%的性能,不过在较旧版本的Erlang中,这个功能不太稳定,建议使用较新版本的Erlang,版本至少是18.x。第二种是寻求打破rabbit_amqqueue_process的性能瓶颈。这里的打破是指以多个rabbit_amqqueue_process替换单个rabbit_amqqueue_process,这样可以充分利用上rabbit_reader或者rabbit_channel进程中被流控的性能。

这里所要做的一件事就是封装。将交换器、队列、绑定关系、生产和消费的方法全部进行封装,这样对于应用来说好比在操作一个(逻辑)队列。
9.4 镜像队列
引入镜像队列(Mirror Queue)的机制,可以将队列镜像到集群中的其他Broker节点之上,如果集群中的一个节点失效了,队列能自动地切换到镜像中的另一个节点上以保证服务的可用性。在通常的用法中,针对每一个配置镜像的队列(以下简称镜像队列)都包含一个主节点(master)和若干个从节点(slave),相应的结构可以参考图9-11。

slave会准确地按照master执行命令的顺序进行动作,故slave与master上维护的状态应该是相同的。如果master由于某种原因失效,那么“资历最老”的slave会被提升为新的master。根据slave加入的时间排序,时间最长的slave即为“资历最老”。发送到镜像队列的所有消息会被同时发往master和所有的slave上,如果此时master挂掉了,消息还会在slave上,这样slave提升为master的时候消息也不会丢失。除发送消息(Basic.Publish)外的所有动作都只会向master发送,然后再由master将命令执行的结果广播给各个slave。
如果消费者与slave建立连接并进行订阅消费,其实质上都是从master上获取消息,只不过看似从slave上消费而已。比如消费者与slave建立了TCP连接之后执行一个Basic.Get的操作,那么首先是由slave将Basic.Get请求发送master,再由master准备好数据返回给slave,最后由slave投递给消费者。读者可能会有疑问,大多的读写压力都落到了master上,那么这样是否负载会做不到有效的均衡?或者说是否可以向MySQL一样能够实现master写而slave读呢?注意这里的master和slave是针对队列而言的,而队列可以均匀地散落在集群的各个Broker节点中以达到负载均衡的目的,因为真正的负载还是针对实际的物理机器而言的,而不是内存中驻留的队列进程。
在图9-12中,集群中的每个Broker节点都包含1个队列的master和2个队列的slave,Q1的负载大多都集中在broker1上,Q2的负载大多都是集中在broker2上,Q3的负载大多都集中在broker3上,只要确保队列的master节点均匀散落在集群中的各个Broker节点即可确保很大程度上的负载均衡(每个队列的流量会有不同,因此均匀散落各个队列的master也无法确保绝对的负载均衡)。

注意要点:
RabbitMQ的镜像队列同时支持publisher confirm和事务两种机制。在事务机制中,只有当前事务在全部镜像中执行之后,客户端才会收到Tx.Commit-Ok的消息。同样的,在publisher confirm 机制中,生产者进行当前消息确认的前提是该消息被全部镜像所接收了。
不同于普通的非镜像(参考图9-2),镜像队列的backing_queue比较特殊,其实现并非是rabbit_variable_queue,它内部包裹了普通backing_queue进行本地消息消息持久化处理,在此基础上增加了将消息和ack复制到所有镜像的功能。镜像队列的结构可以参考图9-13,master的backing_queue采用的是rabbit_mirror_queue_master,而slave的backing_queue实现是rabbit_mirror_queue_slave。

所有对rabbit_mirror_queue_master的操作都会通过组播GM(Guaranteed Multicast)的方式同步到各个slave中。GM负责消息的广播,rabbit_mirror_queue_slave负责回调处理,而master上的回调处理是由coordinator负责完成的。如前所述,除了Basic.Publish,所有的操作都是通过master来完成的,master对消息进行处理的同时将消息的处理通过GM广播给所有的slave,slave的GM收到消息后,通过回调交由rabbit_mirror_queue_slave进行实际的处理。
GM模块实现的是一种可靠的组播通信协议,该协议能够保证组播消息的原子性,即保证组中活着的节点要么都收到消息要么都收不到,它的实现大致为:将所有的节点形成一个循环链表,每个节点都会监控位于自己左右两边的节点,当有节点新增时,相邻的节点保证当前广播的消息会复制到新的节点上;当有节点失效时,相邻的节点会接管以保证本次广播的消息会复制到所有的节点。在master和slave上的这些GM形成一个组(gm_group),这个组的信息会记录在Mnesia中。不同的镜像队列形成不同的组。操作命令从master对应的GM发出后,顺着链表传送到所有的节点。由于所有节点组成了一个循环链表,master对应的GM最终会收到自己发送的操作命令,这个时候master就知道该操作命令都同步到了所有的slave上。
新节点的加入过程可以参考图9-14,整个过程就像在链表中间插入一个节点。注意每当一个节点加入或者重新加入到这个镜像链路中时,之前队列保存的内容会被全部清空。

当slave挂掉之后,除了与slave相连的客户端连接全部断开,没有其他影响。当master挂掉之后,会有以下连锁反应:
(1)与master连接的客户端连接全部断开。
(2)选举最老的slave作为新的master,因为最老的slave与旧的master之间的同步状态应该是最好的。如果此时所有slave处于未同步状态,则未同步的消息会丢失。
(3)新的master重新入队所有unack的消息,因为新的master无法区分这些unack的消息是否已经到达客户端,或者是ack信息丢失在老的master链路上,再或者是丢失在老的master组播ack消息到所有slave的链路上,所以出于消息可靠性的考虑,重新入队所有unack的消息,不过此时客户端可能会有重复消息。
(4)如果客户端连接着slave,并且Basic.Consume消费时制定了x-cancel-on-ha-failover参数,那么断开之时客户端会收到一个Consumer Cancellation Notification的通知,消费者客户端中会回调Consumer接口的handleCancel方法。如果未指定x-cancel-on-ha-failover参数,那么消费者将无法感知master宕机。
x-cancel-on-ha-failover参数的使用示例如下:
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-cancel-on-ha-failover", true);
channel.basicConsume("queue.name", false, args, consumer);
镜像队列的配置主要是通过添加相应的Policy来完成的,对于添加Policy的细节可以参考6.3节。 这里需要详细介绍的是 rabbitmqctl set_policy [-p vhost] [--priority priority] [--apply-to apply-to] {name} {pattern} {definition} 命令中的definition部分,对于镜像队列的配置来说,definition中需要包含3个部分:ha-mode、ha-params和ha-sync-mode。
ha-mode:指明镜像队列的模式,有效值为all、exactly、nodes,默认为all。all表示在集群中所有的节点上进行镜像;exactly表示在指定个数的节点上进行镜像,节点个数由ha-params指定;nodes表示在指定节点上进行镜像,节点名称通过ha-params指定,节点的名称通常类似于rabbit@hostname,可以通过rabbitmqctl cluster_status命令查看到。
ha-params:不同的ha-mode配置中需要用到的参数。
ha-sync-mode:队列中消息的同步方式,有效值为automatic和manual。
举个例子,对队列名称以“queue_”开头的所有队列进行镜像,并在集群的两个节点上完成镜像,Policy的设置命令为:
rabbimqctl set_policy --priority 0 --apply-to queues mirror_queue "^queue_" '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'
ha-mode参数对排他(exclusive)队列并不生效,因为排他队列是连接独占的,当连接断开时队列会自动删除,所以实际上这个参数对排他队列没有任何意义。
将新节点加入已存在的镜像队列时,默认情况下ha-sync-mode取值为manual,镜像队列中的消息不会主动同步到新的slave中,除非显示调用同步命令。当调用同步命令后,队列开始阻塞,无法对其进行其他操作,直到同步完成。
当ha-sync-mode设置为automatic时,新加入的slave会默认同步已知的镜像队列。由于同步过程的限制,所有不建议对生产环境中正在使用的队列进行操作。使用rabbitmqctl list_queues {name} slave_pids synchronised_slave_pids命令可以查看哪些slaves已经完成同步。通过手动方式同步一个队列的命令为rabbitmqctl sync_queue {name},同样也可以取消某个队列的同步操作:rabbitmqctl cancel_sync_queue {name}。
当所有slave都出现未同步状态,并且ha-promote-on-shutdown设置为when-synced(默认)时,如果master因为主动原因停掉,比如通过rabbitmqctl stop命令或者优雅关闭操作系统,那么slave不会接管master,也就是此时镜像队列不可用;但是如果master因为被动原因停掉,比如Erlang虚拟机或者操作系统崩溃,那么slave会接管master。这个配置项隐含的价值取向时保证消息可靠不丢失,同时放弃了可用性。如果ha-promote-on-shutdown设置为always,那么不论master因为何种原因停止,slave都会接管master,优先保证可用性,不过消息可能会丢失。
镜像队列中最后一个停止的节点会是master,启动顺序必须时master先启动。如果slave先启动,它会有30秒的等待时间,等待master的启动,然后加入到集群中。如果30秒内master没有启动,slave会自动停止。当所有节点因故(断电等)同时离线时,每个节点都认为自己不是最后一个停止的节点,要恢复镜像队列,可以尝试在30秒内启动所有节点。
9.4.1 配置镜像队列
- 服务器CentOS 7.4
- 配置登录账号
-
添加新用户
rabbitmqctl add_user root root
-
设置权限
rabbitmqctl set_permissions -p / root ".*" ".*" ".*"
-
设置角色
rabbitmqctl set_user_tags root administrator
-
- 配置RabbitMQ-node1环境(Erlang RabbitMQ)
- 下载Erlang
curl -s https://packagecloud.io/install/repositories/rabbitmq/erlang/script.rpm.sh | sudo bash
- 安装Erlang
[www@RabbitMQ-Node1 ~]$ sudo yum -y install erlang
- 下载RabbitMQ
curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.rpm.sh | sudo bash
- 安装RabbitMQ
[www@RabbitMQ-Node1 ~]$ sudo yum -y install rabbitmq-server
- 后台启动RabbitMQ
[www@RabbitMQ-Node1 ~]$ sudo rabbitmq-server -detached
- 下载Erlang
- 配置RabbitMQ-node2环境(Erlang RabbitMQ)
- 下载Erlang
- 安装Erlang
- 下载RabbitMQ
- 安装RabbitMQ
- 配置节点名称
- 后台启动RabbitMQ
- 配置集群
- 各个节点互相识别(/etc/hosts???)
- 确保各个节点的cookie一致(/var/lib/rabbitmq/.erlang.cookie)
- 加入集群(join_cluster)
- 配置镜像队列
rabbitmqctl set_policy queue.test.mirror "^queue.test.mirror" '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}' rabbitmqctl set_policy queue.test.mirror "^queue.test.mirror" '{"ha-mode":"all","ha-sync-mode":"automatic"}'
- Federation
- Shovel
- 啦啦啦