应⽤场景:
每⼀个订单有从下单、锁库存、⽀付、下物流等⼏个业务步骤。每个业务步骤都由⼀个消息⽣产者通知给下游服务。如何保证对每个订单的业务处理顺序不乱?
示例代码:
⽣产者核⼼代码:
for (int i = 0; i < 10; i++) {
int orderId = i;
for(int j = 0 ; j <= 5 ; j ++){
Message msg =
new Message("OrderTopicTest", "order_"+orderId, "KEY" +
orderId,
("order_"+orderId+" step " +
j).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg, new
MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg,
Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId);
System.out.printf("%s%n", sendResult);
}
}
通过MessageSelector,将orderId相同的消息,都转发到同⼀个MessageQueue中。
消费者核⼼代码:
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeOrderlyContext context) {
context.setAutoCommit(true);
for(MessageExt msg:msgs){
System.out.println("收到消息内容 "+new String(msg.getBody()));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
注⼊⼀个MessageListenerOrderly实现。
实现思路:
基础思路:由于rocketMQ中多个队列中的消息的推送,是无法保证有序的,所以只有放到同一个队列的⼀批消息,才有可能保持消息的顺序。
1、⽣产者只有将⼀批有顺序要求的消息,放到同⼀个MesasgeQueue上,Broker才有可能保持这⼀批消息的顺序。
2、消费者只有⼀次锁定⼀个MessageQueue,拿到MessageQueue上所有的消息,
注意点:
1、理解局部有序与全局有序。⼤部分业务场景下,我们需要的其实是局部有序。如果要保持全局有序,那就只保留⼀个MessageQueue。性能显然⾮常低。 比如QQ消息,不同人发的消息在消息框可以不用保证顺序,但是同一个人发的消息进入消息框里必须按照先后顺序排列。
2、⽣产者端尽可能将有序消息打散到不同的MessageQueue上,避免过于⼏种导致数据热点竞争。如上所示,我们只需要保证局部有序就行,也就是保证需要有序的消息放入同一队列中,其他不需要保证有序的消息零散放入多个队列。
3、消费者端只能⽤同步的⽅式处理消息,不要使⽤异步处理。更不能⾃⾏使⽤批量处理。
4、消费者端只进⾏有限次数的重试。如果⼀条消息处理失败,RocketMQ会将后续消息阻塞住,让消费者进⾏重试。但是,如果消费者⼀直处理失败,超过最⼤重试次数,那么RocketMQ就会跳过这⼀条消息,处理后⾯的消息,这会造成消息乱序。
5、消费者端如果确实处理逻辑中出现问题,不建议抛出异常,可以返回 ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT作为替代。