RocketMQ 的详细使用教程

以下是 RocketMQ 的详细使用教程,涵盖安装配置、基础操作、高级功能及高可用部署等内容:


一、安装与配置

1. 单机模式安装
  1. 下载与解压

示例命令:

unzip rocketmq-all-5.1.4-bin-release.zip
cd rocketmq-5.1.4
  1. 启动服务

NameServer:RocketMQ 的注册中心。

nohup sh bin/mqnamesrv &
tail -f ~/logs/rocketmqlogs/namesrv.log  # 查看启动日志

Broker:消息代理服务。

nohup sh bin/mqbroker -n localhost:9876 &
tail -f ~/logs/rocketmqlogs/broker.log
  1. 验证安装

使用内置工具测试消息收发:

sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer  # 生产者
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer  # 消费者
2. 集群部署(高可用)
  1. 双主双从架构

在两台服务器上分别部署 NameServer 和 Broker,配置文件 broker-master.propertiesbroker-slave.properties,关键参数:

namesrvAddr=192.168.100.43:9876;192.168.100.44:9876  # 多 NameServer 地址
brokerRole=SYNC_MASTER  # 主节点同步复制
flushDiskType=ASYNC_FLUSH  # 异步刷盘

启动命令:

nohup sh bin/mqbroker -c conf/broker-master.properties &
nohup sh bin/mqbroker -c conf/broker-slave.properties &
  1. 配置说明
    • brokerId=0 表示主节点,>0 为从节点5。
    • fileReservedTime=48 设置消息保留时间(小时)5。

二、基础使用

1. 创建 Topic

命令行创建

  • bash复制sh bin/mqadmin createTopic -n localhost:9876 -c DefaultCluster -t testTopic
  • 控制台创建:访问 http://IP:8080/admin,输入默认账号 admin/admin,在主题管理页面创建4。
2. 生产者发送消息
// 初始化生产者
DefaultMQProducer producer = new DefaultMQProducer("producerGroup");
producer.setNamesrvAddr("localhost:9876");
producer.start();

// 发送消息
Message msg = new Message("testTopic", "tagA", "Hello RocketMQ".getBytes());
SendResult result = producer.send(msg);
producer.shutdown();
3. 消费者订阅消息
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("testTopic", "*");

consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
    for (MessageExt msg : msgs) {
        System.out.println("Received: " + new String(msg.getBody()));
    }
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();

三、高级功能

1. 事务消息

实现两阶段提交

TransactionMQProducer producer = new TransactionMQProducer("transactionGroup");
producer.setTransactionListener(new TransactionListener() {
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        // 执行本地事务(如数据库操作)
        return LocalTransactionState.COMMIT_MESSAGE;
    }
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        return LocalTransactionState.UNKNOW;
    }
});
producer.sendMessageInTransaction(msg, null);
2. 延迟消息

支持 18 个延迟级别(1s~2h):

Message msg = new Message("testTopic", "tagA", "Delayed Message".getBytes());
msg.setDelayTimeLevel(3);  // 10秒延迟
producer.send(msg);
3. 消息过滤
  • Tag 过滤:消费者订阅时指定 Tag(如 consumer.subscribe("testTopic", "tagA || tagB"))。
  • SQL 过滤:通过消息属性筛选(需 Broker 配置 enablePropertyFilter=true)。

四、高可用与性能优化

1. 消息存储机制
  • CommitLog:顺序写入消息内容,提升吞吐量(顺序写速度可达 600MB/s)2。
  • ConsumeQueue:逻辑队列,存储消息索引,加速消费2。
2. 刷盘策略
  • 同步刷盘:消息写入磁盘后才返回成功,数据可靠性高,性能较低。
  • 异步刷盘:消息写入 PageCache 后立即返回,性能高,适合高吞吐场景25。
3. 负载均衡
  • 生产者负载均衡:默认轮询发送到不同 Broker 的队列2。
  • 消费者负载均衡:集群模式下自动分配队列,支持 AllocateMessageQueueAveragely(平均分配)和 AllocateMessageQueueByCircle(环状分配)2。

五、监控与管理

1. 控制台功能
  • 实时监控:查看消息堆积、TPS、消费者状态等4。
  • 主题管理:动态创建/删除 Topic,调整队列数量。
  • 日志查看:支持按时间范围检索 Broker 和消费者日志。
2. 日志配置
  • 日志路径:~/logs/rocketmqlogs/,可通过 logback.xml 调整日志级别和格式。

六、常见问题

  1. 消息重复消费
    • 解决方案:消费者实现幂等性(如通过唯一业务 ID 去重)2。
  1. Broker 启动失败
    • 检查端口冲突(默认 NameServer 端口 9876,Broker 端口 10911)5。

总结

RocketMQ 的核心使用流程包括安装部署、Topic 管理、消息生产与消费,高级功能涵盖事务消息、延迟消息和高可用集群配置。实际应用中需根据业务场景选择刷盘策略(同步/异步)和复制方式(同步/异步主从),并结合控制台监控优化性能。更多配置细节可参考 官方文档7。

引用\[1\]:要使用RocketMQ,首先需要下载RocketMQ的dashboard,并进行编译和安装。可以通过以下步骤完成: 1. 下载RocketMQ的dashboard压缩包,并解压缩。 2. 使用Maven编译dashboard。 3. 将编译后的jar文件拷贝到指定目录。 4. 启动RocketMQ的dashboard。 引用\[2\]:在使用RocketMQ时,可能会遇到一些错误。例如,当发送消息时出现"No route info of this topic"的错误。这可能是因为没有为该主题设置路由信息。可以参考RocketMQ的官方文档了解更多详情。 引用\[3\]:如果你想使用RocketMQ,你需要下载并解压RocketMQ的安装包。然后,你可以启动RocketMQ。 以上是使用RocketMQ的一些基本步骤和常见问题的解决方法。希望对你有帮助! #### 引用[.reference_title] - *1* *3* [rocketMQ简明教程](https://blog.csdn.net/weixin_43952174/article/details/124627105)[target="_blank" data-report-click={"spm":"1018.2226.3001.9630","extra":{"utm_source":"vip_chatgpt_common_search_pc_result","utm_medium":"distribute.pc_search_result.none-task-cask-2~all~insert_cask~default-1-null.142^v91^insertT0,239^v3^insert_chatgpt"}} ] [.reference_item] - *2* [RocketMQ从零到学会使用](https://blog.csdn.net/m0_59849460/article/details/124115627)[target="_blank" data-report-click={"spm":"1018.2226.3001.9630","extra":{"utm_source":"vip_chatgpt_common_search_pc_result","utm_medium":"distribute.pc_search_result.none-task-cask-2~all~insert_cask~default-1-null.142^v91^insertT0,239^v3^insert_chatgpt"}} ] [.reference_item] [ .reference_list ]
评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值