MQTT源码解析

本文详细解析了MQTT在Android端的实现,从创建MqttAndroidClient开始,深入connect流程,包括服务创建、连接建立,直至线程池管理和报文交互。此外,还介绍了MqttCallbackExtended在自动断线重连中的作用,以及订阅subscribe的过程,涉及MqttService、MqttConnection和MqttAsyncClient的相关操作。

摘要生成于 C知道 ,由 DeepSeek-R1 满血版支持, 前往体验 >

 

一、MQTT的创建和connect流程

1.Android端实现mqtt,首先会new一个MqttAndroidClient,并传入需要的参数。

首先,MqttAndroidClient是如何创建的呢?

public MqttAndroidClient(Context context, String serverURI,
			String clientId, MqttClientPersistence persistence, Ack ackType) {
		myContext = context;
		this.serverURI = serverURI;
		this.clientId = clientId;
		this.persistence = persistence;
		messageAck = ackType;
	}

MqttAndroidClient本身是继承Broadcast的子类,他的构造器需要传入上面的参数,必须传入的为

public MqttAndroidClient(Context context, String serverURI,
			String clientId) {
		this(context, serverURI, clientId, null, Ack.AUTO_ACK);
	}

2.connect的核心方法:

@Override
	public IMqttToken connect(MqttConnectOptions options, Object userContext,
			IMqttActionListener callback) throws MqttException {

		IMqttToken token = new MqttTokenAndroid(this, userContext,
				callback);

		connectOptions = options;
		connectToken = token;

		/*
		 * The actual connection depends on the service, which we start and bind
		 * to here, but which we can't actually use until the serviceConnection
		 * onServiceConnected() method has run (asynchronously), so the
		 * connection itself takes place in the onServiceConnected() method
		 */
		if (mqttService == null) { // First time - must bind to the service
			Intent serviceStartIntent = new Intent();
			serviceStartIntent.setClassName(myContext, SERVICE_NAME);
			Object service = myContext.startService(serviceStartIntent);
			if (service == null) {
				IMqttActionListener listener = token.getActionCallback();
				if (listener != null) {
					listener.onFailure(token, new RuntimeException(
							"cannot start service " + SERVICE_NAME));
				}
			}

			// We bind with BIND_SERVICE_FLAG (0), leaving us the manage the lifecycle
			// until the last time it is stopped by a call to stopService()
			myContext.bindService(serviceStartIntent, serviceConnection,
					Context.BIND_AUTO_CREATE);

			if (!receiverRegistered) registerReceiver(this);
		}
		else {
			pool.execute(new Runnable() {

				@Override
				public void run() {
					doConnect();
					
					//Register receiver to show shoulder tap.
					if (!receiverRegistered) registerReceiver(MqttAndroidClient.this);
				}

			});
		}

		return token;
	}

返回一个IMqttToken对象,这里首先new一个MqttTokenAndroid对象,然后去判断是否存在MqttService实例。

没有的话会创建一个,如果创建失败直接调用IMqttActionListener的onFailure回调。

成功的话绑定一个bindService,传入ServiceConnection参数对象。

而ServiceConnection的成功回调中有个doConnect()方法,里面会调用MqttService的connect方法从而建立Mqtt的连接。

而在MqttService的connect方法里面,会创建一个MqttConnection对象,调用他的connect方法。

而MqttConnection的connect方法里面会判断是否有MqttAsyncClient实例,如果没有会创建他的实例并调用他的connect方法

// if myClient is null, then create a new connection
			else {
				alarmPingSender = new AlarmPingSender(service);
				myClient = new MqttAsyncClien
评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值