RabbitMQ消息可靠性保证机制4--消费端限流

7.7 消费端限流

在类似如秒杀活动中,一开始会有大量并发写请求到达服务端,城机对消息进行削峰处理,如何做?

当消息投递的速度远快于消费的速度时,随着时间积累就会出现“消息积压”。消息中间件本身是具备一定的缓冲能力的,但这个能力是有容量限制的,如果长期运行并没有任何处理,最终会导致Broker崩溃,而分布式系统的故障往往会发生上下游传递,连锁反应可能会引起系统大范围的宕机,这就很悲剧了

7.7.1 资源限制限流

在RabbitMQ中可对内存和磁盘使用量设置阈值,当达到阈值后,生产者将被阻塞(block),直到对应指标恢复正常。全局上可以防止超大流量、消息积压等导致的Broker被压垮。当内存受限或磁盘可用空间受限的时候,服务器都会暂时阻止连接,服务器将暂停从发布消息的已连接的客户端的套接字读取数据。连接心中监视也将被禁用。所有网络连接将在rabbitmqctl和管理插件中显示为“已阻止”,这意味着它们尚未尝试发布,因此可以或者被阻止,这意味着它们已发布,现在已暂停。兼容的客户端被阻止将收到通知。

/etc/rabbitmq/rabbitmq.conf中配置磁盘可用空间大小:

# 磁盘限制阈值设置
# 设置磁盘的可用空间大小,单位字节。当磁盘可用空间低于这个值的时候,发生磁盘告警,触发限流。
# 如果设置了相对大小,则忽略此绝对大小。
disk_free_limit.absolute = 2000

# 使用计量单位,从RabbitMQ 3.6.0开始有效,对vm_memory_high_watemark同样有效。
# disk_free_limit.absolute = 500KB
# disk_free_limit.absolute = 50mb
# disk_free_limit.absolute = 5GB

# Alternatively, we can set a limit relative to total avaliable RAM.
# Values lower than 1.0 can be dangerous and should be used carefully.
# 还可以使用相对于总可用内存来设置。注意,此值不要低于1.0!
# 当磁盘可用空间低于总可用内存的2.0倍的时候,触发限流
# disk_free_limit.relative = 2.0

# 内存限流阈值设置
# 0.4表示阈值总可用内存的比值。 总可用内存表示操作系统给每个进程分配的大小,或者实际内存大小。
# 如32位的Windows,系统给每个进程最大2GB的内存,则此比值表示阈值为820MB
vm_memory_high_watermark.relative = 0.4

# 还可以直接通过绝对值限制可用内存大小,单位字节
vm_memory_high_watermark.absolute = 1073741824

# 从RabbitMQ 3.6.0开始,绝对值支持计量单位。如果设置了相对值,则忽略此设置值
vm_memory_high_watermark.absolute = 1024MiB



k, kiB : kibibytes(2^10 - 1024 bytes)
M, MiB : mibibytes(2^20 - 1024576 bytes)
G, GiB : gibibytes(2^30 - 1073741824 bytes)

KB: kilobytes (10^3 - 1000 bytes)
MB: megabytes (10^6 - 1000000 bytes)
GB: gigabytes (10^9 - 1000000000 bytes)

可以通过两种来设置生效

  1. 临时生效

    此配制仅当前生效在重启后将失效。

# 硬盘资源限制
rabbitmqctl set_disk_free_limit 68996808704
# 内存资源限制
rabbitmqctl set_vm_memory_high_watermark 0.4

样例:

[root@nullnull-os rabbitmq]# rabbitmqctl set_disk_free_limit 68996808704
Setting disk free limit on rabbit@nullnull-os to 68996808704 bytes ...
  1. 长期生效

在rabbitmq.conf的配制文件中加入

# 硬盘限制 
disk_free_limit.absolute=68455178240

# 内存限制
vm_memory_high_watermark.relative = 0.4

样例:

[root@nullnull-os rabbitmq]# vi  /etc/rabbitmq/rabbitmq.conf 
# 加入以下内容,注意单位到字节
disk_free_limit.absolute=68455178240

[root@nullnull-os rabbitmq]# cat /etc/rabbitmq/rabbitmq.conf 
disk_free_limit.absolute=68455178240

[root@nullnull-os rabbitmq]# systemctl restart rabbitmq-server
[root@nullnull-os rabbitmq]# 

注意,此需要重启rabbitMQ才能生效。

磁盘限制配制参考

Configuring Disk Free Space Limit

The disk free space limit is configured with the disk_free_limit setting. By default 50MB is required to be free on the database partition (see the description of file locations for the default database location). This configuration file sets the disk free space limit to 1GB:

disk_free_limit.absolute = 1000000000

Or you can use memory units (KB, MB GB etc.) like this:

disk_free_limit.absolute = 1GB

It is also possible to set a free space limit relative to the RAM in the machine. This configuration file sets the disk free space limit to the same as the amount of RAM on the machine:

disk_free_limit.relative = 1.0

The limit can be changed while the broker is running using the rabbitmqctl set_disk_free_limit command or rabbitmqctl set_disk_free_limit mem_relative command. This command will take effect until next node restart.

The corresponding configuration setting should also be changed when the effects should survive a node restart.

来自:https://www.rabbitmq.com/disk-alarms.html

内存配制限制参考

https://www.rabbitmq.com/memory.html

Configuring the Memory Threshold

The memory threshold at which the flow control is triggered can be adjusted by editing the configuration file.

The example below sets the threshold to the default value of 0.4:

\# new style config format, recommended
vm_memory_high_watermark.relative = 0.4

The default value of 0.4 stands for 40% of available (detected) RAM or 40% of available virtual address space, whichever is smaller. E.g. on a 32-bit platform with 4 GiB of RAM installed, 40% of 4 GiB is 1.6 GiB, but 32-bit Windows normally limits processes to 2 GiB, so the threshold is actually to 40% of 2 GiB (which is 820 MiB).

Alternatively, the memory threshold can be adjusted by setting an absolute limit of RAM used by the node. The example below sets the threshold to 1073741824 bytes (1024 MiB):

vm_memory_high_watermark.absolute = 1073741824

Same example, but using memory units:

vm_memory_high_watermark.absolute = 1024MiB

If the absolute limit is larger than the installed RAM or available virtual address space, the threshold is set to whichever limit is smaller.

The memory limit is appended to the log file when the RabbitMQ node starts:

2019-06-10 23:17:05.976 [info] <0.308.0> Memory high watermark set to 1024 MiB (1073741824 bytes) of 8192 MiB (8589934592 bytes) total

The memory limit may also be queried using the rabbitmq-diagnostics memory_breakdown and rabbitmq-diagnostics status commands.

The threshold can be changed while the broker is running using the

rabbitmqctl set_vm_memory_high_watermark <fraction>

command or

rabbitmqctl set_vm_memory_high_watermark absolute <memory_limit>

For example:

rabbitmqctl set_vm_memory_high_watermark 0.6

and

rabbitmqctl set_vm_memory_high_watermark absolute "4G"

When using the absolute mode, it is possible to use one of the following memory units:

  • M, MiB for mebibytes (2^20 bytes)
  • MB for megabytes (10^6 bytes)
  • G, GiB for gibibytes (2^30 bytes)
  • GB for gigabytes (10^9 bytes)

中文配制可参考:https://www.cnblogs.com/kaishirenshi/p/12132703.html

更多配制可参见:https://www.rabbitmq.com/configure.html#config-file

样例程序:

maven导入

            <dependency>
                <groupId>com.rabbitmq</groupId>
                <artifactId>amqp-client</artifactId>
                <version>5.9.0</version>
            </dependency>

生产程序:

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeoutException;

public class ResourceLimitProduct {
   
  public static void main(String[] args) throws Exception {
   
    // 资源限制
    ConnectionFactory factory = new ConnectionFactory();
    factory.setUri("amqp://root:123456@node1:5672/%2f");

    try (Connection connection = factory.newConnection();
        Channel channel = connection.createChannel(); ) {
   

      // 定义交换器、队列和绑定</
### RabbitMQ 消息可靠性配置及保障机制 RabbitMQ消息可靠性保障是一个多层次的过程,涉及生产者、中间件本身以及消费者三个方面的协同工作。以下是具体的方法和配置: #### 生产者的可靠性保障 为了确保生产者能够可靠地将消息发送到 RabbitMQ 中的交换机,可以采用以下措施: - **AMQP 协议支持**:通过 AMQP 协议实现可靠的网络传输,该协议提供了事务模式和发布确认功能来增强数据传递的安全性[^1]。 - **生产者确认机制 (Publisher Confirms)**:启用此功能后,当一条消息被成功写入磁盘并路由至至少一个队列时,Broker 将向生产者返回 ACK 响应;如果发生错误,则会收到 NACK 或超时提示[^4]。 ```python import pika connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() # 开启 publisher confirms channel.confirm_delivery() try: channel.basic_publish(exchange='my_exchange', routing_key='routing.key', body='Message content') except Exception as e: print(f"Failed to deliver message: {e}") finally: connection.close() ``` #### RabbitMQ 自身的消息持久化与恢复能力 即使在服务崩溃的情况下,也可以依靠一些内置策略防止未处理的数据永久消失: - **消息持久化设置**:每条进入系统的记录都可以标记为“持久”,这意味着它们会被存储于硬盘而非仅仅驻留在内存里直到被消费为止[^3]。 - **镜像队列(Mirrored Queues)**:对于高可用需求场景下推荐使用这种技术,在多个节点间同步复制整个队列的内容从而提升容灾性能[^5]。 #### 消费消息安全接收 最后一步就是保证客户能稳定获取所需资料而不会因为异常中断造成遗漏或者重复读取等问题出现: - **手动应答(Manual Acknowledgements)** :只有当应用程序显式告知 broker 它已完成某项任务之后才会删除对应项目副本。 - **限流控制(QoS Tuning)** : 调整预取计数(pre-fetch count),限制每次分发给单个工作进程的任务数量以减少压力并提高稳定性. ```python def callback(ch, method, properties, body): process_message(body) ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_qos(prefetch_count=10) # 设置 QoS 参数 channel.basic_consume(queue='task_queue', on_message_callback=callback) print(' [*] Waiting for messages.') channel.start_consuming() ```
评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值