目录
什么是MQTT?
MQTT 全称为 Message Queuing Telemetry Transport(消息队列遥测传输)是一种基于发布/订阅范式的“轻量级”消息协议,由 IBM 发布。可以被解释为一种低开销,低带宽占用的即时通讯协议,可以用极少的代码和带宽的为连接远程设备提供实时可靠的消息服务,它适用于硬件性能低下的远程设备以及网络状况糟糕的环境下,因此 MQTT 协议在 IoT(Internet of things,物联网),小型设备应用,移动应用等方面有较广泛的应用。
IoT (Internet of things,物联网)设备要运作,就必须连接到互联网,设备才能相互协作,以及与后端服务协同工作。而互联网的基础网络协议是 TCP/IP,MQTT 协议是基于 TCP/IP 协议栈而构建的,因此它已经慢慢的已经成为了 IoT 通讯的标准。
MQTT协议是为大量计算能力有限,且工作在低带宽、不可靠的网络的远程传感器和控制设备通讯而设计的协议,它具有以下主要的几项特性:
1、使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合;
2、对负载内容屏蔽的消息传输;
3、使用 TCP/IP 提供网络连接;
4、有三种消息发布服务质量:
5、小型传输,开销很小(固定长度的头部是 2 字节),协议交换最小化,以降低网络流量;
6、使用 Last Will 和 Testament 特性通知有关各方客户端异常中断的机制。
三种消息发布的服务质量:
1.至多一次,消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。
2.至少一次,确保消息到达,但消息重复可能会发生。
3.只有一次,确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果。
MQTT 协议提供一对多的消息发布,可以解除应用程序耦合,信息冗余小。该协议需要客户端和服务端,而协议中主要有三种身份:发布者(Publisher)、代理(Broker,服务器)、订阅者(Subscriber)。其中,消息的发布者和订阅者都是客户端,消息代理是服务器,而消息发布者可以同时是订阅者,实现了生产者与消费者的脱耦。
MQTT传输的消息分为:主题(Topic)和负载(payload)两部分:
(1)Topic,可以理解为消息的类型,订阅者订阅(Subscribe)后,就会收到该主题的消息内容(payload);
(2)payload,可以理解为消息的内容,是指订阅者具体要使用的内容。
参考文档:
https://baike.baidu.com/item/MQTT/3618851?fr=aladdin
https://www.emqx.com/zh/resources/beginners-guide-to-the-mqtt-protocol
MQTT的应用场景
MQTT协议主要适用于物联网(IoT)领域,用于设备与服务器间交互,各种设备之间的通信和控制。如以下场景:
1.传感器数据的采集和传输:当传感器需要将采集的数据传输到服务器或其他设备进行处理时。
2.智能家居:智能家居设备将其状态更改发布到MQTT主题,用户通过中央控制单元订阅这些主题,从而能够从单个界面控制和监控各种设备。
3.智能制造:可以实现设备、零件、模块的温度等信息上报至管理平台,实时监控生产环境的健康状况。
4.物流和供应链:物流信息的实时更新和共享,提高物流管理的效率和透明度。
5.能源管理:能源使用数据的实时采集和分析,帮助优化能源使用和管理。如智能电网、智能能源监测等。
6.智慧城市:城市基础设施的智能化管理和监控,提高城市管理的效率和便利性。用于智慧城市的建设和管理,例如智能路灯、智能公交、智能城市管理等。
搭建MQTT服务器:EMQ X 或 RabbitMQ MQTT插件
EMQ X:开源社区中最流行的 MQTT 消息服务器
安装包下载地址:https://mqttx.app/zh/downloads
支持部署在 Docker、Debian、Ubuntu、CentOS/RHEL、macOS、Kubernetes、Windows以及源码编译安装
安装的方式有很多种,可供自由选择:Shell脚本安装、包管理器安装、二进制包安装、ZIP压缩包安装、Homebrew安装、Docker运行安装、Helm安装、源码编译安装
docker安装:(1)拉取镜像:docker pull emqx/emqx:v4.1.0 (2)运行镜像:docker run -tid --name emqx -p 1883:1883 -p 8083:8083 -p 8081:8081 -p 8883:8883 -p 8084:8084 -p 18083:18083 emqx/emqx:v4.1.0
压缩包安装:解压文件,打开解压后的 bin 目录,使用 CMD 命令行启动 EMQX:emqx start
Dashboard 控制面板:http://localhost:18083
RabbitMQ MQTT 插件配置方法:
1.启用 Web MQTT 插件
rabbitmq-plugins enable rabbitmq_web_mqtt
2.修改 RabbitMQ 配置文件(rabbitmq.conf),配置 Web MQTT 插件参数
web_mqtt.tcp.port = 15675
MQTT客户端的订阅与发布:SpringBoot集成MQTT
通过集成spring-integration-mqtt来支持MQTT,org.eclipse.paho来发布和订阅
EMQX官方接入文档:https://docs.emqx.com/zh/cloud/latest/connect_to_deployments/java_sdk.html
eclipse.paho底层原理讲解:https://blog.51cto.com/u_15067242/2574292
1.添加依赖
<!-- Spring Integration MQTT -->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
<!-- MQTT Client Library (Paho) -->
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
</dependency>
<!-- Spring Integration 流-->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
2.增加yml配置
mqtt.broker.url=tcp://localhost:1883
mqtt.username=your-username
mqtt.password=your-password
mqtt.consumer.client.id=spring-boot-mqtt-consumer-client
mqtt.producer.client.id=spring-boot-mqtt-producer-client
mqtt.default.topic=your/topic
3. 创建MQTT配置类MqttConfig.Class,配置MQTT连接
//mqttClient工厂类,目的:配置连接信息
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
// 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,
// 这里设置为true表示每次连接到服务器都以新的身份连接
options.setCleanSession(true);
// 设置连接的用户名
options.setUserName(username);
// 设置连接的密码
options.setPassword(password.toCharArray());
//设置服务器地址
options.setServerURIs(new String[]{broker.url});
// 设置超时时间 单位为秒
options.setConnectionTimeout(10);
// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线,但这个方法并没有重连的机制
options.setKeepAliveInterval(20);
// 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。
// options.setWill("willTopic", WILL_DATA, 2, false);
factory.setConnectionOptions(options);
return factory;
}
4.监听消息 - 订阅
//MQTT消息通道(消费者)
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
//订阅绑定topic(消费者):将topic与mqttInputChannel 建立绑定关系
@Bean
public MessageProducer inbound() {
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(consumer.client.id,
mqttClientFactory(), default.topic);
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
//MQTT消息监听器(消费者):@ServiceActivator表示 将接受来自 mqttInputChannel的消息,并将这些消息传递给 MessageHandler实例中执行其定义好的逻辑
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
//返回自定义消息处理器,自定义消费数据处理逻辑。
return message -> {
String payload = (String) message.getPayload();
System.out.println("Received message: " + payload);
};
}
5.发布消息 - 发布
//MQTT信息通道(生产者)
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
//MQTT消息处理器(生产者):@ServiceActivator表示 将接受来自 inputChannel 的消息,并将这些消息传递给 MessageHandler实例中执行其定义好的逻辑
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
//返回生产者消息处理器,消息处理逻辑内置publish方法
MqttPahoMessageHandler messageHandler =
new MqttPahoMessageHandler(producer.client.id, mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic(default.topic);
return messageHandler;
}
//IntegrationFlow 用于定义和配置消息传递系统中的流程
//IntegrationFlows.from()方法用于定义流程的起点,handle()方法用于定义如何处理接收到的消息,.transform()来转换消息有效负载,filter()来过滤消息
//CharacterStreamReadingMessageSource.stdin() 用于从标准输入流(stdin)读取字符流消息
//内部注释:Create a source that reads from System. in. EOF will not be detected.
//e是EndpointSpec 用于配置和定义消息通道的输入和输出端点,以及与消息通道进行交互的方式
//e.poller指定轮训超过多长时间结束输入。
@Bean
public IntegrationFlow mqttOutFlow() {
return IntegrationFlows.from(CharacterStreamReadingMessageSource.stdin(),
e -> e.poller(Pollers.fixedDelay(2000)))
.transform(p -> p + "")
.handle(mqttOutbound())
.get();
}
//调用生产消息接口
@Autowired
private MessageChannel mqttOutboundChannel;
public void sendMessage(String topic, String payload) {
mqttOutboundChannel.send(MessageBuilder.withPayload(payload)
.setHeader("mqtt_topic", topic)
.build());
}
或者使用@MessagingGateway (对接口方法的调用产生的任何消息都应该发送到给定的消息通道。)
@MessagingGateway(defaultRequestChannel = mqttOutboundChannel)修饰的接口类中的接口信息都发送给mqttOutboundChannel
例如:
@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}