7.7 消费端限流
在类似如秒杀活动中,一开始会有大量并发写请求到达服务端,城机对消息进行削峰处理,如何做?
当消息投递的速度远快于消费的速度时,随着时间积累就会出现“消息积压”。消息中间件本身是具备一定的缓冲能力的,但这个能力是有容量限制的,如果长期运行并没有任何处理,最终会导致Broker崩溃,而分布式系统的故障往往会发生上下游传递,连锁反应可能会引起系统大范围的宕机,这就很悲剧了
7.7.1 资源限制限流
在RabbitMQ中可对内存和磁盘使用量设置阈值,当达到阈值后,生产者将被阻塞(block),直到对应指标恢复正常。全局上可以防止超大流量、消息积压等导致的Broker被压垮。当内存受限或磁盘可用空间受限的时候,服务器都会暂时阻止连接,服务器将暂停从发布消息的已连接的客户端的套接字读取数据。连接心中监视也将被禁用。所有网络连接将在rabbitmqctl和管理插件中显示为“已阻止”,这意味着它们尚未尝试发布,因此可以或者被阻止,这意味着它们已发布,现在已暂停。兼容的客户端被阻止将收到通知。
在/etc/rabbitmq/rabbitmq.conf
中配置磁盘可用空间大小:
# 磁盘限制阈值设置
# 设置磁盘的可用空间大小,单位字节。当磁盘可用空间低于这个值的时候,发生磁盘告警,触发限流。
# 如果设置了相对大小,则忽略此绝对大小。
disk_free_limit.absolute = 2000
# 使用计量单位,从RabbitMQ 3.6.0开始有效,对vm_memory_high_watemark同样有效。
# disk_free_limit.absolute = 500KB
# disk_free_limit.absolute = 50mb
# disk_free_limit.absolute = 5GB
# Alternatively, we can set a limit relative to total avaliable RAM.
# Values lower than 1.0 can be dangerous and should be used carefully.
# 还可以使用相对于总可用内存来设置。注意,此值不要低于1.0!
# 当磁盘可用空间低于总可用内存的2.0倍的时候,触发限流
# disk_free_limit.relative = 2.0
# 内存限流阈值设置
# 0.4表示阈值总可用内存的比值。 总可用内存表示操作系统给每个进程分配的大小,或者实际内存大小。
# 如32位的Windows,系统给每个进程最大2GB的内存,则此比值表示阈值为820MB
vm_memory_high_watermark.relative = 0.4
# 还可以直接通过绝对值限制可用内存大小,单位字节
vm_memory_high_watermark.absolute = 1073741824
# 从RabbitMQ 3.6.0开始,绝对值支持计量单位。如果设置了相对值,则忽略此设置值
vm_memory_high_watermark.absolute = 1024MiB
k, kiB : kibibytes(2^10 - 1024 bytes)
M, MiB : mibibytes(2^20 - 1024576 bytes)
G, GiB : gibibytes(2^30 - 1073741824 bytes)
KB: kilobytes (10^3 - 1000 bytes)
MB: megabytes (10^6 - 1000000 bytes)
GB: gigabytes (10^9 - 1000000000 bytes)
可以通过两种来设置生效
-
临时生效
此配制仅当前生效在重启后将失效。
# 硬盘资源限制
rabbitmqctl set_disk_free_limit 68996808704
# 内存资源限制
rabbitmqctl set_vm_memory_high_watermark 0.4
样例:
[root@nullnull-os rabbitmq]# rabbitmqctl set_disk_free_limit 68996808704
Setting disk free limit on rabbit@nullnull-os to 68996808704 bytes ...
- 长期生效
在rabbitmq.conf的配制文件中加入
# 硬盘限制
disk_free_limit.absolute=68455178240
# 内存限制
vm_memory_high_watermark.relative = 0.4
样例:
[root@nullnull-os rabbitmq]# vi /etc/rabbitmq/rabbitmq.conf
# 加入以下内容,注意单位到字节
disk_free_limit.absolute=68455178240
[root@nullnull-os rabbitmq]# cat /etc/rabbitmq/rabbitmq.conf
disk_free_limit.absolute=68455178240
[root@nullnull-os rabbitmq]# systemctl restart rabbitmq-server
[root@nullnull-os rabbitmq]#
注意,此需要重启rabbitMQ才能生效。
磁盘限制配制参考
Configuring Disk Free Space Limit
The disk free space limit is configured with the disk_free_limit setting. By default 50MB is required to be free on the database partition (see the description of file locations for the default database location). This configuration file sets the disk free space limit to 1GB:
disk_free_limit.absolute = 1000000000
Or you can use memory units (KB, MB GB etc.) like this:
disk_free_limit.absolute = 1GB
It is also possible to set a free space limit relative to the RAM in the machine. This configuration file sets the disk free space limit to the same as the amount of RAM on the machine:
disk_free_limit.relative = 1.0
The limit can be changed while the broker is running using the rabbitmqctl set_disk_free_limit command or rabbitmqctl set_disk_free_limit mem_relative command. This command will take effect until next node restart.
The corresponding configuration setting should also be changed when the effects should survive a node restart.
来自:https://www.rabbitmq.com/disk-alarms.html
内存配制限制参考
https://www.rabbitmq.com/memory.html
Configuring the Memory Threshold
The memory threshold at which the flow control is triggered can be adjusted by editing the configuration file.
The example below sets the threshold to the default value of 0.4:
\# new style config format, recommended vm_memory_high_watermark.relative = 0.4
The default value of 0.4 stands for 40% of available (detected) RAM or 40% of available virtual address space, whichever is smaller. E.g. on a 32-bit platform with 4 GiB of RAM installed, 40% of 4 GiB is 1.6 GiB, but 32-bit Windows normally limits processes to 2 GiB, so the threshold is actually to 40% of 2 GiB (which is 820 MiB).
Alternatively, the memory threshold can be adjusted by setting an absolute limit of RAM used by the node. The example below sets the threshold to 1073741824 bytes (1024 MiB):
vm_memory_high_watermark.absolute = 1073741824
Same example, but using memory units:
vm_memory_high_watermark.absolute = 1024MiB
If the absolute limit is larger than the installed RAM or available virtual address space, the threshold is set to whichever limit is smaller.
The memory limit is appended to the log file when the RabbitMQ node starts:
2019-06-10 23:17:05.976 [info] <0.308.0> Memory high watermark set to 1024 MiB (1073741824 bytes) of 8192 MiB (8589934592 bytes) total
The memory limit may also be queried using the rabbitmq-diagnostics memory_breakdown and rabbitmq-diagnostics status commands.
The threshold can be changed while the broker is running using the
rabbitmqctl set_vm_memory_high_watermark <fraction>
command or
rabbitmqctl set_vm_memory_high_watermark absolute <memory_limit>
For example:
rabbitmqctl set_vm_memory_high_watermark 0.6
and
rabbitmqctl set_vm_memory_high_watermark absolute "4G"
When using the absolute mode, it is possible to use one of the following memory units:
- M, MiB for mebibytes (2^20 bytes)
- MB for megabytes (10^6 bytes)
- G, GiB for gibibytes (2^30 bytes)
- GB for gigabytes (10^9 bytes)
中文配制可参考:https://www.cnblogs.com/kaishirenshi/p/12132703.html
更多配制可参见:https://www.rabbitmq.com/configure.html#config-file
样例程序:
maven导入
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.9.0</version>
</dependency>
生产程序:
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeoutException;
public class ResourceLimitProduct {
public static void main(String[] args) throws Exception {
// 资源限制
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@node1:5672/%2f");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel(); ) {
// 定义交换器、队列和绑定</