大家心心念念的RocketMQ5.x入门手册来喽

1、前言

为了更好的拥抱云原生,RocketMQ5.x架构进行了大的重构,提出了存储与计算分离的设计架构,架构设计图如下所示:

00

RocketMQ5.x提供了一套非常建议的消息发送、消费API,并统一放在Apache顶级开源项目rocketmq-clients下,链接:https://github.com/apache/rocketmq-clients,提供了cpp、go、java、php、rust的实现,多语言生态初现,如下图所示:

01

2、源码级调试 RocketMQ 5.x

当RocketMQ为了顺应云原生大潮,提出存储与计算分离后,想必我相信很多粉丝朋友和我一样,都希望尽快一睹RocketMQ5.x的”芳颜“,如果还没有在IDE中调试通过的小伙伴,那就跟着我的步骤来,带你一起体验RocketMQ 5.x。

Step1:从github(https://github.com/apache/rocketmq)下载源码,并导入到IDEA中,如下图所示:

02

相比RocketMQ4.x,5.x主要是增加了一个代理模块(rocketmq-proxy),将路由、计算等功能从Broker中剥离出来。

Step2:创建一个RocketMQ主目录,并在主目录中创建conf文件夹,并把源码中distribution模块中conf下的文件拷贝到当前目录,如下图所示:

03

Step3:从namesrv模块中找到类NamesrvStartup类,配置后运行,如下图所示:

04

这里的关键点在于需要配置环境变量ROCKETMQ_HOME,其路径设置为【Step2】中创建的目录,然后启动该类,输出如下所示表示NameServer启动成功。

The Name Server boot success. serializeType=JSON

Step4:从broker模块中找到类BrokerStartup,配置后运行,效果如下图所示:

05

这里有两个要点:

  • 通过 -c 参数指定broker配置文件的位置
  • 设置ROCKETMQ_HOME环境变量,其路径就是上文中conf目录所在的父目录

Step5:启动proxy模块,如下图所示:

06

设置好环境变量RMQ_PROXY_HOME环境变量,直接启动,会抛出如下错误:

07

原因是RocketMQ Proxy在启动时会RMQ_PROXY_HOME加载日志文件,我们从源码模块中distribution中logback_proxy.xml拷贝到proxy主目录的conf文件夹下。

再次尝试启动,抛出如下错误:

08

需要再从源码模块中distribution中rmq-proxy.json拷贝到proxy主目录的conf文件夹下,启动成功如下所示:

09

那问题来了,rmq-proxy.json文件中的内容是多少呢?

{
  "rocketMQClusterName": "DefaultCluster"
}

那这个文件中又可以陪着哪些参数呢?这个目前无法从官方网站中获取,大家可以去查看org.apache.rocketmq.proxy.config.ProxyConfig,里面所有的属性都可以在这个文件中配置。

Nameserver、broker、Proxy都已经启动成功了,那我们如何发送消息呢?

由于RocketMQ 5.x引入了Proxy,原先的RocketMQ Client API 不能直接使用,RocketMQ官方提供了一套极简API,API的完整定义在Apache顶级开源项目rocketmq-apis(https://github.com/apache/rocketmq-apis),具体的定义如下图所示:

10

具体的实现在https://github.com/apache/rocketmq-clients,实现了cpp、golang、java、php、rust的实现。

接下来,我们使用一下java版本的客户端尝试发送一条消息,代码如下所示:

<dependency>
      <groupId>org.apache.rocketmq</groupId>
      <artifactId>rocketmq-client-apis</artifactId>
      <version>5.0.0</version>
    </dependency>

    <dependency>
      <groupId>org.apache.rocketmq</groupId>
      <artifactId>rocketmq-client-java</artifactId>
      <version>5.0.0</version>
    </dependency>
  
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.SessionCredentialsProvider;
import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;

import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;

public class RocketMQProxyTest {

    public static void main(String[] args) throws Exception {


        final ClientServiceProvider provider = ClientServiceProvider.loadService();

        // Credential provider is optional for client configuration.
        String accessKey = "yourAccessKey";
        String secretKey = "yourSecretKey";
        SessionCredentialsProvider sessionCredentialsProvider =
                new StaticSessionCredentialsProvider(accessKey, secretKey);

        String endpoints = "127.0.0.1:8081";
        ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
                .setEndpoints(endpoints)
                .setCredentialProvider(sessionCredentialsProvider)
                .setRequestTimeout(Duration.ofSeconds(30))
                .build();
        String topic = "TopicTest";
        final Producer producer = provider.newProducerBuilder()
                .setClientConfiguration(clientConfiguration)
                // Set the topic name(s), which is optional. It makes producer could prefetch the topic route before
                // message publishing.
                .setTopics(topic)
                // May throw {@link ClientException} if the producer is not initialized.
                .build();
        // Define your message body.
        byte[] body = "This is a normal message for Apache RocketMQ".getBytes(StandardCharsets.UTF_8);
        String tag = "yourMessageTagA";


        final Message message = provider.newMessageBuilder()
                // Set topic for the current message.
                .setTopic(topic)
                // Message secondary classifier of message besides topic.
                .setTag(tag)
                // Key(s) of the message, another way to mark message besides message id.
                .setKeys("yourMessageKey-0e094a5f9d85")
                .setBody(body)
                .build();
        final CompletableFuture<SendReceipt> future = producer.sendAsync(message);
        future.whenComplete((sendReceipt, throwable) -> {
            if (null == throwable) {
                System.out.println("Send message successfully, messageId=" + sendReceipt.getMessageId());
            } else {
                System.out.println("Failed to send message");
            }
        });
        // Block to avoid exist of background threads.
        Thread.sleep(Long.MAX_VALUE);
        // Close the producer when you don't need it anymore.
        producer.close();
    }
}

运行结果:

Send message successfully, messageId=01C6A0F34F62CB328C03EFF3EF00000000

运行成功,在这里给大家留一个作业,那消息消费如何写呢?

原文首发:https://www.codingw.net/Article?id=783

一键三连(关注、点赞、留言)是对我最大的鼓励。

各位技术朋友们,我是《RocketMQ技术内幕》一书作者,CSDN2020博客之星TOP2,热衷于中间件领域的技术分享,维护「中间件兴趣圈」公众号,旨在成体系剖析Java主流中间件,构建完备的分布式架构体系,欢迎大家大家关注我,回复「专栏」可获取15个专栏;回复「PDF」可获取海量学习资料,回复「加群」可以拉你入技术交流群,零距离与BAT大厂的大神交流。

### RocketMQ 5.x 版本中分布式事务的实现与使用 #### 概述 RocketMQ作为一种成熟的企业级消息中间件,在大型分布式系统中扮演着关键角色,为解耦系统组件、提高系统响应速度和稳定性以及实现最终一致性等方面发挥了重要作用[^1]。 #### 添加依赖项 为了在项目中集成RocketMQ并支持其功能特性,需引入相应的Maven依赖: ```xml <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>5.0.0</version> </dependency> ``` 请注意版本号应根据最新稳定版调整。这里假设使用的是5.x系列的一个具体版本[^2]。 #### 创建事务消息生产者 下面展示了一个基本的例子,说明如何创建一个事务消息生产者,并设置必要的配置选项以便于处理事务逻辑: ```java public class TransactionProducer { private static final Logger logger = LoggerFactory.getLogger(TransactionProducer.class); public void init() throws MQClientException { // 实例化事务消息生产者 TransactionMQProducer producer = new TransactionMQProducer("transaction_producer_group"); // 设置NameServer地址 producer.setNamesrvAddr("localhost:9876"); // 注册本地事务执行器 producer.setTransactionListener(new LocalTransactionChecker()); try { // 启动生产者实例 producer.start(); String[] tags = {"TagA", "TagB", "TagC"}; for (int i = 0; i < 10; ++i) { Message msg = new Message( "TopicTest", tags[i % tags.length], ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) ); SendResult sendResult = producer.sendMessageInTransaction(msg, null); System.out.printf("%s%n", sendResult); Thread.sleep(10); } } catch (Exception e) { throw new RuntimeException(e.getMessage(), e.getCause()); } finally { // 关闭资源释放连接池等操作 producer.shutdown(); } } } ``` 这段代码实现了启动一个名为`transaction_producer_group`的消息队列客户端,并指定了名称服务器的位置;同时定义了三个标签用于区分不同类型的消息体内容。通过循环发送十条测试性质的信息至指定主题下,每条信息都携带不同的标记以供后续消费端识别分类[^4]。 #### 定义本地事务监听器 要使事务生效,则还需要自定义一个类继承自`LocalTransactionExecuter`接口来完成具体的业务流程控制工作。此部分可以根据实际情况灵活设计,比如数据库更新、文件写入或其他任何需要保证原子性的动作组合而成的操作序列。 ```java class LocalTransactionChecker implements TransactionListener { @Override public LocalTransactionState executeLocalTransaction(Message message, Object arg) { // 执行本地事务的具体逻辑... return LocalTransactionState.COMMIT_MESSAGE; } @Override public LocalTransactionState checkLocalTransaction(MessageExt message) { // 对未提交成功的半成品状态做补偿措施... return LocalTransactionState.UNKNOW; } } ``` 在这个例子中,默认情况下所有的消息都会被立即确认提交给Broker节点保存下来。而在真实环境中可能涉及到更加复杂的状态判断机制,例如当遇到异常情况时可以选择回滚或者延迟重试等方式来进行错误恢复尝试。
评论 3
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包

打赏作者

中间件兴趣圈

你的鼓励将是我创作的最大动力

¥1 ¥2 ¥4 ¥6 ¥10 ¥20
扫码支付:¥1
获取中
扫码支付

您的余额不足,请更换扫码支付或充值

打赏作者

实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值