本文共 9892 字,大约阅读时间需要 32 分钟。
随着物联网(Internet of Things,IoT)的兴起,机器之间(Machine-to-Machine,M2M)的大规模信息沟通成为重要的课堂,之前HTTP的请求/回答(Request/Response)模式不再合适,取而代之的是发布/订阅(Publish/Subscribe)模式。这就是轻量级、可扩展的MQTT(Message Queuing Telemetry Transport)可以施展拳脚的舞台。
MQTT是基于二进制消息的发布/订阅编程模式的消息协议,最早由IBM提出的,如今已经成为OASIS规范。由于规范很简单,非常适合需要低功耗和网络带宽有限的IoT场景。其主要特点包括:
运用MQTT协议,设备可以很方便地连接到物联网云服务,管理设备并处理数据,最后应用到各种业务场景中。同时MQTT也是一种比较不错的Android消息推送方案,FacebookMessenger就是采用了MQTT。可以说MQTT是物联网中最有潜力的网络协议之一。
若初次接触MQTT协议,可先理解以下概念:
与请求/回答这种同步模式不同,发布/定义模式解耦了发布消息的客户(发布者)与订阅消息的客户(订阅者)之间的关系,这意味着发布者和订阅者之间并不需要直接建立联系。
打个比方,你打电话给朋友,一直要等到朋友接电话了才能够开始交流,是一个典型的同步请求/回答的场景;而给一个好友邮件列表发电子邮件就不一样,你发好电子邮件该干嘛干嘛,好友们到有空了去查看邮件就是了,是一个典型的异步发布/订阅的场景。
换一种类比,请求/回答模式是一种同步模式,请求方会一直等待应答方的回复;而发布/订阅模式是一种异步的模式,
这种设计模式的好处为:MQTT是通过主题(Topics)对消息进行分类的,本质上就是一个UTF-8的字符串,不过可以通过反斜杠表示多个层级关系。主题并不需要创建,直接使用即可。
主题还可以通过通配符进行过滤,关于Topic通配符:
注意,MQTT允许使用通配符订阅主题,但是并不允许使用通配符广播。
为了满足不同的场景,MQTT支持三种不同级别的服务质量(Quality of Service,QoS)为不同场景提供消息可靠性:
用户可以根据消息的重要性选择不同的质量级别。
MQTT的固定头部,使用两个字节,共16位。
其中4-7Bit为消息类型,使用4位二进制表示,可代表16种消息类型:
除去0和15位置属于保留待用,共14种消息事件类型。
CONNECT:
CONNACK:Server发出CONNECT消息的Response:
PUBLISH : 发布消息
PUBACK: QoS=1时,用于发布消息后的确认
PUBREC / PUBREL / PUBCOMP
QoS=2时:SUBSCRIBE/SUBACK
UNSUBSCRIBE /UNSUBACK
PINGREQ / PINGRES :心跳
DUP flag(打开标志)
保证消息可靠传输,默认为0,只占用一个字节,表示第一次发送。不能用于检测消息重复发送等。只适用于客户端或服务器端尝试重发PUBLISH, PUBREL, SUBSCRIBE 或 UNSUBSCRIBE消息,注意需要满足以下条件:QoS(Quality of Service,服务质量)
使用两个二进制表示PUBLISH类型消息:
QoS value | bit 2 & bit 1 | Description |
---|---|---|
0 | 00 | 至多一次 发完即丢弃 |
1 | 01 | 至少一次 需要确认回复 |
2 | 10 | 只有一次 需要确认回复 |
3 | 11 | 待用,保留位置 |
RETAIN(保持)
仅针对PUBLISH消息。不同值,不同含义:
假如服务器收到一个空消息体(zero-length payload)、RETAIN = 1、已存在Topic name的PUBLISH消息,服务器可以删除掉对应的已被持久化的PUBLISH消息。
市面上有相当多的高质量MQTT代理,其中Mosquitto是一个开源的轻量级的C实现,其官网地址为: 。
在Ubuntu系统中可以直接通过以下命令安装:
apt-get install mosquittoapt-get install mosquitto-clients
关于如何配置和使用Mosquitto请详见官网和参考文献10, 这里不再详细展开。
Moqtuitto性能突出,发送消息快,稳定性高,cpu占用很少,并发比较高。i5-4核CPU,4G内存的服务器,就在能在20s以内发送10w条 QoS-0信息,且CPU使用率不超过20%。具体性能分析请见:
可能有的读者很心急,不像自己搭建服务器就像体现MQTT的工作流程。Eclipse提供可一个测试的服务器:iot.eclipse.org:1883。读者朋友们可以使用MQTT协议的官方客户端来直接连接使用。
示例代码如下:
package srx.awesome.code.mqtt.client;import org.eclipse.paho.client.mqttv3.*;public class PahoTest { //关注的主题 private static String topic = "MQTT Examples";// //发送的内容 private static String content = "Hello MQTT!!!!!"; //质量等级 private static int qos = 2; //MQTT服务地址 private static String broker = "tcp://iot.eclipse.org:1883"; //客户端ID private static String clientId = "JavaSample"; //用户名 private static String userName = "admin"; //密码 private static String passWord = "password"; @SuppressWarnings("finally") public static void main(String[] args) { try { //创建客户端 MqttClient sampleClient = new MqttClient(broker, clientId, null); //配置回调函数 sampleClient.setCallback(new MyMqttCallback()); //创建连接选择 MqttConnectOptions connOpts = getMqttConnectOptions(userName, passWord); System.out.println("Connecting to broker: "+broker); //创建服务连接 sampleClient.connect(connOpts); System.out.println("Connected"); //关注主题,质量等级为2 sampleClient.subscribe(topic, qos); //在另一个线程中发送消息 Thread thread = new Thread(() -> { try { publishMsg(topic, content, qos, sampleClient); } catch (MqttException e) { e.printStackTrace(); } }); thread.start(); thread.join(); //断开服务连接 sampleClient.disconnect(); System.out.println("Disconnected"); } catch(MqttException me) { System.out.println("reason "+me.getReasonCode()); System.out.println("msg "+me.getMessage()); System.out.println("loc "+me.getLocalizedMessage()); System.out.println("cause "+me.getCause()); System.out.println("excep "+me); me.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.exit(0); } } private static void publishMsg(String topic, String content, int qos, MqttClient sampleClient) throws MqttException { //循环发送10次消息 for (int times =0 ;times<10; times++) { System.out.println(String.format("%d time Publishing message: %s", times, content)); //创建消息内容 MqttMessage message = new MqttMessage(content.getBytes()); //设置质量级别 message.setQos(qos); //发送消息 sampleClient.publish(topic, message); //System.out.println("Message published"); } } private static MqttConnectOptions getMqttConnectOptions(String userName, String passWord) { MqttConnectOptions connOpts = new MqttConnectOptions(); //是否清除Session,如果否,重新连接之后会自动关注之前关注的主题 connOpts.setCleanSession(true); connOpts.setUserName(userName); connOpts.setPassword(passWord.toCharArray()); connOpts.setAutomaticReconnect(true); // 设置连接超时时间, 单位为秒,默认30 connOpts.setConnectionTimeout(30); // 设置会话心跳时间,单位为秒,默认20 connOpts.setKeepAliveInterval(20); return connOpts; }}
代码实现的功能很简单:该客户订阅主题"MQTT Examples"
,然后向这个主题连续10次发送消息,服务代理会把发布在该主题的消息在发给定语该主题的用户,也就是客户端自己。
需要重点说明的事为客户端代理设置回调器(MqttCallback ),下面是作者自定义的回调器。
package srx.awesome.code.mqtt.client;import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;import org.eclipse.paho.client.mqttv3.MqttCallback;import org.eclipse.paho.client.mqttv3.MqttException;import org.eclipse.paho.client.mqttv3.MqttMessage;class MyMqttCallback implements MqttCallback { //端看连接之后被调用 @Override public void connectionLost(Throwable arg0) { System.out.println("Connection Lost:"+arg0.getMessage()); } //收到消息后被发送 @Override public void messageArrived(String s, MqttMessage mqttMessage) throws MqttException { System.out.println(String.format("get Msg: %s from Topic: %s", mqttMessage, s)); } //消息被送到之后被调用 @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { if(iMqttDeliveryToken.isComplete()){ System.out.println(String.format("Delivery a Msg to Topic: %s",iMqttDeliveryToken.getTopics()[0])); } }}
通过自定义回调器,就可以设置消息事件到来和发出后的业务逻辑,将通信和业务处理分离开,是一种解耦和的设计。
运行程序,输入如下:
Connecting to broker: tcp://iot.eclipse.org:1883Connected0 time Publishing message: Hello MQTT!!!!!get Msg: 881.0267578289576 from Topic: MQTT Examples1 time Publishing message: Hello MQTT!!!!!Delivery a Msg to Topic: MQTT Examplesget Msg: Hello MQTT!!!!! from Topic: MQTT Examples2 time Publishing message: Hello MQTT!!!!!Delivery a Msg to Topic: MQTT Examplesget Msg: Hello MQTT!!!!! from Topic: MQTT ExamplesDelivery a Msg to Topic: MQTT Examples3 time Publishing message: Hello MQTT!!!!!get Msg: Hello MQTT!!!!! from Topic: MQTT ExamplesDelivery a Msg to Topic: MQTT Examples4 time Publishing message: Hello MQTT!!!!!get Msg: Hello MQTT!!!!! from Topic: MQTT ExamplesDelivery a Msg to Topic: MQTT Examples5 time Publishing message: Hello MQTT!!!!!get Msg: Hello MQTT!!!!! from Topic: MQTT ExamplesDelivery a Msg to Topic: MQTT Examples6 time Publishing message: Hello MQTT!!!!!get Msg: Hello MQTT!!!!! from Topic: MQTT ExamplesDelivery a Msg to Topic: MQTT Examples7 time Publishing message: Hello MQTT!!!!!get Msg: Hello MQTT!!!!! from Topic: MQTT ExamplesDelivery a Msg to Topic: MQTT Examples8 time Publishing message: Hello MQTT!!!!!get Msg: Hello MQTT!!!!! from Topic: MQTT Examples9 time Publishing message: Hello MQTT!!!!!Delivery a Msg to Topic: MQTT Examplesget Msg: Hello MQTT!!!!! from Topic: MQTT ExamplesDelivery a Msg to Topic: MQTT ExamplesDisconnected
10次消息发送全部成功,客户端也成功收到自己发送的消息。主要注意的是,由于我们设置QoS=2
,需要服务器和客户端之间多次通信,耗费了时间,往往是消息已经被发到了,客户端才确定消息真的被发出了。
示例代码:
以上就是MQTT协议的简单介绍,更为复杂的功能期待各位读者探索。
感谢参考文献中列出的文章对于作者的帮助。
转载地址:http://hmmwa.baihongyu.com/