【RocketMQ生产者发送消息的三种方式:发送同步消息、异步消息、单向消息&案例实战&详细学习流程】

本文介绍RocketMQ消息中间件的安装配置及三种消息发送方式:同步、异步与单向发送的实现过程与应用场景。

摘要生成于 C知道 ,由 DeepSeek-R1 满血版支持, 前往体验 >

一.知识回顾:

【0.RocketMQ专栏的内容在这里哟,帮你整理好了,更多内容持续更新中】
【1.Docker安装部署RocketMQ消息中间件详细教程】


知识补充:
在使用RocketMQ发送消息之前需要我们先完成之前RocketMQ的安装,然后在Maven导入依赖并熟悉生产者发送消息的API。

xml文件中导入依赖

<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.8.0</version>
</dependency>

生产者发送消息步骤

  1. 创建消息生产者producer,并指定生产者组名
  2. 指定Nameserver地址
  3. 启动producer
  4. 创建消息对象,指定Topic、Tag和消息体
  5. 发送消息
  6. 关闭生产者producer

二.RocketMQ发送同步消息

2.1 同步消息

同步发送是指消息发送方发出数据后,同步等待,直到收到接收方发回响应之后才发下一个请求。这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。
在这里插入图片描述

2.2 生产者同步发送消息实现代码

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class SyncProducer {
    public static void main(String[] args) throws Exception{
        // 实例化消息生产者Producer
        DefaultMQProducer producer = new DefaultMQProducer("group_test");

        // 设置NameServer的地址
        producer.setNamesrvAddr("IP地址:9876");
        //producer.setSendLatencyFaultEnable(true);

        // 启动Producer实例
        producer.start();


        for (int i = 0; i < 10; i++) {
            // 创建消息,并指定Topic,Tag和消息体
            Message msg = new Message("TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
            );
            // 发送消息到一个Broker
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }
        //如果不再发送消息,关闭Producer实例。
        producer.shutdown();
    }
}

2.3 生产者同步发送消息运行结果

在这里插入图片描述
运行结果相关参数分析

  1. msgId
    消息的全局唯一标识(RocketMQ的ID生成是使用机器IP和消息偏移量的组成),由消息队列 MQ 系统自动生成,唯一标识某条消息。
  2. sendStatus
    发送的标识:成功,失败等
  3. queueId
    queueId是Topic的分区;Producer发送具体一条消息的时,对应选择的该Topic下的某一个Queue的标识ID。
  4. queueOffset
    Message queue是无限长的数组。一条消息进来下标就会涨1,而这个数组的下标就是queueOffset,queueOffset是从0开始递增。

2.4 生产者同步发送消息控制台展示

在这里插入图片描述

在这里插入图片描述

三.RocketMQ发送异步消息

3.1 异步消息

异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。消息发送方在发送了一条消息后,不等接收方发回响应,接着进行第二条消息发送。发送方通过回调接口的方式接收服务器响应,并对响应结果进行处理。
在这里插入图片描述

3.2 生产者异步发送消息实现代码

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class AsyncProducer {
    public static void main(String[] args) throws Exception{
        // 实例化消息生产者Producer
        DefaultMQProducer producer = new DefaultMQProducer("group_test");
        // 设置NameServer的地址
        producer.setNamesrvAddr("IP地址:9876");
        // 启动Producer实例
        producer.start();
        for (int i = 0; i < 10; i++) {
            final int index = i;
            // 创建消息,并指定Topic,Tag和消息体
            Message msg = new Message("TopicTest", "TagA", "OrderID888",
                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
            // SendCallback接收异步返回结果的回调
            producer.send(msg, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.printf("%s%n", sendResult);
                }
                @Override
                public void onException(Throwable e) {
                    System.out.printf("%-10d Exception %s %n", index, e);
                    e.printStackTrace();
                }
            });
        }
        Thread.sleep(10000);
        // 如果不再发送消息,关闭Producer实例。
        producer.shutdown();
    }
}

3.3 生产者异步发送消息运行结果

在这里插入图片描述
运行结果相关参数分析

  1. msgId
    消息的全局唯一标识(RocketMQ的ID生成是使用机器IP和消息偏移量的组成),由消息队列 MQ 系统自动生成,唯一标识某条消息。
  2. sendStatus
    发送的标识:成功,失败等
  3. queueId
    queueId是Topic的分区;Producer发送具体一条消息的时,对应选择的该Topic下的某一个Queue的标识ID。
  4. queueOffset
    Message queue是无限长的数组。一条消息进来下标就会涨1,而这个数组的下标就是queueOffset,queueOffset是从0开始递增。

3.4 生产者异步发送消息控制台展示

在这里插入图片描述

四.RocketMQ发送单向消息

4.1 单向消息

这种方式主要用在不特别关心发送结果的场景,例如日志发送。单向(Oneway)发送特点为发送方只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。
在这里插入图片描述

4.2 生产者单向发送消息代码实现

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class OnewayProducer {
    public static void main(String[] args) throws Exception{
        // 实例化消息生产者Producer对象
        DefaultMQProducer producer = new DefaultMQProducer("group_test");
        // 设置NameServer的地址
        producer.setNamesrvAddr("IP地址:9876");
        // 启动Producer实例
        producer.start();
        for (int i = 0; i < 10; i++) {
            // 创建消息,并指定Topic,Tag和消息体
            Message msg = new Message("TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
            );
            // 发送单向消息,没有任何返回结果
            producer.sendOneway(msg);

        }
        // 如果不再发送消息,关闭Producer实例。
        producer.shutdown();
    }
}

4.3 生产者单向发送消息结果展示

在这里插入图片描述

4.4 生产者单向发送消息控制台展示

在这里插入图片描述

五.同步消息 vs 异步消息 vs 单向消息

在这里插入图片描述

好了,到这里【RocketMQ生产者发送消息的三种方式:发送同步消息、异步消息、单向消息&案例实战&详细学习流程】就先学习到这里,更多内容持续创作学习中,敬请期待呦^ - ^.

评论 1
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包

打赏作者

硕风和炜

你的鼓励将是我创作的最大动力

¥1 ¥2 ¥4 ¥6 ¥10 ¥20
扫码支付:¥1
获取中
扫码支付

您的余额不足,请更换扫码支付或充值

打赏作者

实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值