RabbitMQ 消息不重复消费和顺序性

一、引言

在使用 RabbitMQ 进行消息传递时,确保消息不重复消费和保证消息的顺序性是两个重要的问题。不重复消费可以避免业务逻辑的重复执行,保证数据的一致性;而消息的顺序性则在某些业务场景下(如订单处理、状态更新等)至关重要。本技术文档将详细介绍在 Java 中如何解决这两个问题。

二、保证消息不重复消费

2.1 问题分析

消息重复消费通常是由于网络波动、消费者故障等原因导致消息确认机制出现问题,RabbitMQ 会重新投递消息,从而造成重复消费。

2.2 解决方案

2.2.1 唯一消息 ID

为每条消息生成一个唯一的 ID,在消费者端对消息 ID 进行记录和判断。如果该消息 ID 已经被处理过,则直接忽略该消息。

2.2.2 代码示例

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;

public class Consumer {
    private static final String QUEUE_NAME = "test_queue";
    private static final Set<String> processedMessageIds = new HashSet<>();

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String messageId = delivery.getProperties().getMessageId();
                if (processedMessageIds.contains(messageId)) {
                    System.out.println(" [x] Message already processed: " + messageId);
                    return;
                }
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
                try {
                    // 处理消息的业务逻辑
                    doWork(message);
                } finally {
                    processedMessageIds.add(messageId);
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                }
            };
            channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { });
        }
    }

    private static void doWork(String task) {
        for (char ch : task.toCharArray()) {
            if (ch == '.') {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException _ignored) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }
}

public class Producer {
    private static final String QUEUE_NAME = "test_queue";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String message = "Hello, RabbitMQ!";
            String messageId = UUID.randomUUID().toString();
            AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                   .messageId(messageId)
                   .build();
            channel.basicPublish("", QUEUE_NAME, properties, message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + message + "' with ID: " + messageId);
        }
    }
}

2.2.3 代码解释

  • 生产者:为每条消息生成一个唯一的 UUID 作为消息 ID,并将其添加到消息的属性中。
  • 消费者:在消费消息时,首先检查消息 ID 是否已经在 processedMessageIds 集合中。如果已经存在,则忽略该消息;否则,处理消息并将消息 ID 添加到集合中。

三、保证消息的顺序性

3.1 问题分析

在 RabbitMQ 中,默认情况下消息是异步处理的,多个消费者可以同时从队列中获取消息,这可能导致消息的顺序被打乱。

3.2 解决方案

3.2.1 单消费者模式

为每个需要保证顺序的队列只配置一个消费者,这样可以确保消息按照发送的顺序依次被处理。

3.2.2 代码示例

import com.rabbitmq.client.*;

import java.io.IOException;

public class OrderedConsumer {
    private static final String QUEUE_NAME = "ordered_queue";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            System.out.println(" [*] Waiting for ordered messages. To exit press CTRL+C");

            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
                try {
                    // 处理消息的业务逻辑
                    doWork(message);
                } finally {
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                }
            };
            channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { });
        }
    }

    private static void doWork(String task) {
        for (char ch : task.toCharArray()) {
            if (ch == '.') {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException _ignored) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }
}

public class OrderedProducer {
    private static final String QUEUE_NAME = "ordered_queue";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            for (int i = 0; i < 10; i++) {
                String message = "Message " + i;
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
                System.out.println(" [x] Sent '" + message + "'");
            }
        }
    }
}

3.2.3 代码解释

  • 生产者:按顺序发送一系列消息到队列中。
  • 消费者:单个消费者从队列中依次获取消息并处理,确保消息的顺序性。

四、总结

通过为消息生成唯一 ID 并在消费者端进行记录,可以有效避免消息的重复消费;而采用单消费者模式可以保证消息的顺序性。在实际应用中,需要根据具体的业务场景和需求选择合适的解决方案。

以上代码示例基于 RabbitMQ Java 客户端,确保你已经添加了相应的依赖。在 Maven 项目中,可以在 pom.xml 中添加以下依赖:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.12.0</version>
</dependency>

这样,你就可以在 Java 中保证 RabbitMQ 消息的不重复消费和顺序性。

消息顺序是指消费消费消息发送者发布的消息顺序是一致的。在RabbitMQ中,如果只有一个消费一个生产者,并且考虑消息重复的情况下,消息顺序是可以得到保证的。也就是说,消费者会按照生产者发布消息顺序消费。然而,当有多个生产者同时发送消息时,无法确定消息到达Broker的前后顺序,因此无法验证消息顺序,因为每次消息的发送都是在各自的线程中进行的。 如果需要保证消息消费顺序,可以考虑以下几个方案: 1. 单生产者单消费者:如果只有一个生产者一个消费者,那么消息顺序是可以得到保证的。 2. 生产端启动事务机制:通过在生产端启用事务机制,可以确保消息按照发送的顺序到达RabbitMQ,并由消费者按照相同的顺序进行消费。 需要注意的是,在RabbitMQ中,没有内置的机制来保证消息顺序。因此,要保证消息顺序需要特定的配置处理机制,根据具体情况选取适合的解决方案。<span class="em">1</span><span class="em">2</span><span class="em">3</span> #### 引用[.reference_title] - *1* [java面试题_消息中间件--RabbitMQ(22题).pdf](https://download.csdn.net/download/qq_44273429/20923905)[target="_blank" data-report-click={"spm":"1018.2226.3001.9630","extra":{"utm_source":"vip_chatgpt_common_search_pc_result","utm_medium":"distribute.pc_search_result.none-task-cask-2~all~insert_cask~default-1-null.142^v93^chatsearchT3_2"}}] [.reference_item style="max-width: 50%"] - *2* *3* [RabbitMQ如何保证顺序](https://blog.csdn.net/qq_37653556/article/details/130795217)[target="_blank" data-report-click={"spm":"1018.2226.3001.9630","extra":{"utm_source":"vip_chatgpt_common_search_pc_result","utm_medium":"distribute.pc_search_result.none-task-cask-2~all~insert_cask~default-1-null.142^v93^chatsearchT3_2"}}] [.reference_item style="max-width: 50%"] [ .reference_list ]
评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值