RabbitMQ——队列消息数

背景


在实际使用过程,会遇到这么些情况:

生产者发送的消息数量与消费者接收的消息数量不一致。例如生产者向rabbitmq投递了100条消息,消费者只从队列中接收到了80条消息,并且当前队列中已经没有任何消息。

要定位这个问题,通常是分段来定位,一方面统计生产者到底发送了多少消息,一方面统计有多少消息是正确路由到指定队列的,两者进行比较判断生产者发送是否有问题,如果数量一致,也就是生产者发送的消息都正确到队列后,基本可以断定是消费端的问题。比如消费者本身的逻辑处理不对,或者有其他消费者到该队列消费了一部分数据等。

那么这里有个问题:怎样正确统计到底有多少消息发送到了指定队列?尤其是生产、消费同时进行时,怎样进行正确统计?或者该问题变相的变成一条运维需求,即统计一个时间段内发布到指定队列的消息数。

可能的解决办法


1、抓包或者开启trace来追踪消息进出rabbitmq的情况

然而这种方式仅适用于开发调试阶段,当消息量达到一定程度时,trace会严重影响性能,而抓包也不一定能在正确时刻找到有问题的包。

2、针对有问题的队列,再建一个队列,并以同样的binding-key绑定到同样的exchange上。这样一来,生产者发送过来的消息,会同时进入到两个队列,其中一个队列中的消息被消费者消费掉,新建的队列因没有消费者可保留全部的消息,我们只需要看这个队列中的消息数就可以完成统计工作。

同样,这种办法也是存在一些问题的。首先,消息在队列中堆积,会占用rabbitmq的内存或磁盘空间,从而影响rabbitmq的整体性能。有人可能会问,那再开发个程序,消费这个队列中的消息但不做任何处理,仅仅是进行计数统计,是不是就解决问题了。然而并不是这样,因为一个消息同时路由进入到多个队列,对于生产者的性能也是有损耗的(尤其是一个消息路由到6个以上的队列)。

另一种可行方法


在rabbitmq中,每个消息在队列中会有一个对应的序号,这个序号是每个队列独立维护的。该序号的意义主要是保证消息按照先进先出的方式有序被消费者消费。

其内部实现,每个队列的状态信息中,维护了一个字段:next_seq_id。该字段表示下一条进入队列的消息的序号。每当有消息发送到队列时,该值会加1,同时每个消息的序号也作为消息索引的一部分持久化到文件中了,这样rabbitmq重启后,队列中的消息依然是可以按照有序的方式被消费者消费。

我们可以定期获取该字段对应的值,前后两次的差值就是这个时间段内进入队列的消息总数。

获取方式


可以通过http接口来获取指定队列该字段的值。

如下图所示,两次查看spurs这个队列信息之间,一共发送了3条消息。

当然,我们也可以不指定队列,即查看全部队列的信息,并从中获取next_seq_id字段对应的值。

除此之外,rabbitmq的插件rabbitmq_management中提供了管理控制台的命令行工具rabbitmqadmin,该工具本质上也是通过http的方式调用对应接口获取相关信息,可以理解为是封装成了一个可执行程序(脚本)。

例如:

最后再补充说明一点:

前面说了,每个消息在队列中都有一个对应的序号,并且该序号随着消息一起持久化到文件中了,但字段next_seq_id本身并没有进行持久化,因此rabbitmq重启后,每个队列会重新计算该值。具体为:找下一个即将写的索引文件,乘以16384得到的值即为新的next_seq_id的值。(为什么是乘以16384,可以参考这篇文章

总结


统计一个时间段内进入队列的消息数,可以通过队列的内部状态字段next_seq_id来实现。

### RabbitMQ 延迟队列消息迁移实现 在RabbitMQ中,为了实现延迟队列中的消息迁移,通常采用死信交换机(Dead Letter Exchange, DLX)机制来完成这一过程。当一条消息达到设定的TTL(Time To Live),即生存时间之后,它会被自动转移到指定的死信队列,在那里可以由其他消费者继续处理。 #### 死信队列配置 创建两个队列:一个是正常的工作队列用于接收新来的任务;另一个则是作为死信队列专门存储过期未被及时消费的任务。对于工作队列而言,需要设置`x-dead-letter-exchange`和`x-dead-letter-routing-key`属性指向对应的DLX及其绑定键[^2]。 ```bash # 创建普通队列并关联到DLX上 rabbitmqadmin declare queue name=task_queue arguments='{"x-dead-letter-exchange":"dlx","x-dead-letter-routing-key":"expired_task"}' ``` #### 设置消息存活时间(TTL) 可以通过两种方式给每条进入该队列消息附加一个有效期限: - **队列级别的TTL**:所有发往此队列的信息都将继承相同的超时期限; ```json {"arguments":{"x-message-ttl":5000}} ``` - **单个消息级别的TTL**:允许单独控制某次投递的具体时限。 发布者可以在发送时通过AMQP协议中的headers字段指定具体的毫秒。 一旦超过所设的时间范围而未能成功派送出去,则这些据包就会成为“孤儿”,进而触发向预先定义好的DLX转发动作[^3]。 #### 处理已到期的据项 针对那些最终落入死信队列里的实例,可以根据业务需求采取不同的策略加以处置——比如重试执行、记录日志亦或是永久删除等操作。值得注意的是,由于这类特殊类型的缓冲区同样遵循FIFO原则排队等候服务端调度安排,因此建议合理规划其容量大小以免造成不必要的资源浪费或阻塞现象发生。
评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值