RocketMq顺序消息

应⽤场景:

每⼀个订单有从下单、锁库存、⽀付、下物流等⼏个业务步骤。每个业务步骤都由⼀个消息⽣产者通知给下游服务。如何保证对每个订单的业务处理顺序不乱?

示例代码:

⽣产者核⼼代码:

    for (int i = 0; i < 10; i++) {
        int orderId = i;
        for(int j = 0 ; j <= 5 ; j ++){
            Message msg =
                    new Message("OrderTopicTest", "order_"+orderId, "KEY" +
                            orderId,
                            ("order_"+orderId+" step " +
                                    j).getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult sendResult = producer.send(msg, new
                    MessageQueueSelector() {
                        @Override
                        public MessageQueue select(List<MessageQueue> mqs, Message msg,
                                                   Object arg) {
                            Integer id = (Integer) arg;
                            int index = id % mqs.size();
                            return mqs.get(index);
                        }
                    }, orderId);
            System.out.printf("%s%n", sendResult);
        }
    }

通过MessageSelector,将orderId相同的消息,都转发到同⼀个MessageQueue中。

消费者核⼼代码:

    consumer.registerMessageListener(new MessageListenerOrderly() {
        @Override
        public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeOrderlyContext context) {
            context.setAutoCommit(true);
            for(MessageExt msg:msgs){
                System.out.println("收到消息内容 "+new String(msg.getBody()));
            }
            return ConsumeOrderlyStatus.SUCCESS;
        }
    });

注⼊⼀个MessageListenerOrderly实现。

实现思路:

基础思路:由于rocketMQ中多个队列中的消息的推送,是无法保证有序的,所以只有放到同一个队列的⼀批消息,才有可能保持消息的顺序。

1、⽣产者只有将⼀批有顺序要求的消息,放到同⼀个MesasgeQueue上,Broker才有可能保持这⼀批消息的顺序。

2、消费者只有⼀次锁定⼀个MessageQueue,拿到MessageQueue上所有的消息,

注意点:

1、理解局部有序与全局有序。⼤部分业务场景下,我们需要的其实是局部有序。如果要保持全局有序,那就只保留⼀个MessageQueue。性能显然⾮常低。 比如QQ消息,不同人发的消息在消息框可以不用保证顺序,但是同一个人发的消息进入消息框里必须按照先后顺序排列。

2、⽣产者端尽可能将有序消息打散到不同的MessageQueue上,避免过于⼏种导致数据热点竞争。如上所示,我们只需要保证局部有序就行,也就是保证需要有序的消息放入同一队列中,其他不需要保证有序的消息零散放入多个队列。

3、消费者端只能⽤同步的⽅式处理消息,不要使⽤异步处理。更不能⾃⾏使⽤批量处理。

4、消费者端只进⾏有限次数的重试。如果⼀条消息处理失败,RocketMQ会将后续消息阻塞住,让消费者进⾏重试。但是,如果消费者⼀直处理失败,超过最⼤重试次数,那么RocketMQ就会跳过这⼀条消息,处理后⾯的消息,这会造成消息乱序。

5、消费者端如果确实处理逻辑中出现问题,不建议抛出异常,可以返回 ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT作为替代。

评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值