背景
MQTT全称:Message Queuing Telemetry Transport(消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的"轻量、简单、开放和易于实现的"通讯协议。
MQTT协议
官方定义:mqtt — MQ Telemetry Transport。官网:https://mosquitto.org/。
它构建于TCP/IP协议上,由IBM在1999年发布。MQTT最大优点在于,可以以极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有较广泛的应用,如:机器与机器(M2M)通信和物联网(IoT)。其在,通过卫星链路通信传感器、偶尔拨号的医疗设备、智能家居中已广泛使用。
不得不提:M2M和IoT,在这个领域里面MQTT应用范围很广。
实现MQTT协议需要客户端和服务器端通讯完成,在通讯过程中,MQTT协议中有三种身份:发布者(Publish)、代理(Broker)(服务器)、订阅者(Subscribe)。其中,消息的发布者和订阅者都是客户端,消息代理是服务器,消息发布者可以同时是订阅者。
MQTT传输的消息分为:主题(Topic)和负载(payload)两部分:
(1)Topic,可以理解为消息的类型,订阅者订阅(Subscribe)后,就会收到该主题的消息内容(payload)。
(2)payload,可以理解为消息的内容,是指订阅者具体要使用的内容。
MQTT会构建底层网络传输:它将建立客户端到服务器的连接,提供两者之间的一个有序的、无损的、基于字节流的双向传输。
当应用数据通过MQTT网络发送时,MQTT会把与之相关的服务质量(QoS)和主题名(Topic)相关连。
Qos消息服务质量
Broker针对message提供的服务模式,不同模式对message的保证机制不同。
QoS=0:最多一次送达。
broker发送后就不管了,client不需要响应broker也不会重发。消息可能会丢失。
QoS=1:至少一次送达。
broker会维护msg生命周期。broker发出去之后必须等待反馈,没有反馈就找时机重发,client反馈后broker结束该msg生命周期。可能会导致msg重复发情况。
QoS=2:确送达到且只有一次送达。最高等级消息服务质量模式,相应来说开销会增大。
broker会维护msg生命周期。broker发送msg后等待client反馈,收到反馈后再发送给client确认收到反馈,client收到反馈确认后,再发送broker确认完成,broker结束msg生命周期。也就是两次握手,第一次msg确认,第二次反馈确认。
Mosquitto中间件及其他Mqtt Broker
实现MQTT协议的中间件,市面上有很多,如RabbitMq就不同程度的实现了该协议。
这里我们主要介绍Mosquitto,它是eclipse产品下面。
Mosquitto is an open source implementation of a server for version 3.1 and 3.1.1 of the MQTT protocol. It also includes a C and C++ client library, and the mosquitto_pub and mosquitto_sub utilities for publishing and subscribing.
它包含C&C++的库,主要是mosquitto.h这个库,封装了很多方法。可以参考文档:https://mosquitto.org/api/files/mosquitto-h.html。
对于JAVA下的用户推荐 org.eclipse.paho\rg.eclipse.paho.client.mqttv3 工具包,目前大多数公司都使用这款。因为MQTT基于TCP/IP且原理简单,也可以自研connect-client或者站在开源的基础上封装。
安装Mosquitto(window|linux)
下载地址:https://mosquitto.org/download/ 。
1、window安装方式
直接下载exe文件,进行安装即可。
如果window盗版,大致会出现dll包丢失问题(反正我是出现了),按要求去网上下载复制到c:\windows\system32目录下。
安装后目录如下:
需要掌握以下这几个文件:
- mosquitto.conf: 这是配置文件,可以修改端口、消息内容大小等等。
- mosquitto.exe:这是中间件服务(broker),首先就系统启动它才能提供MQTT协议的消息服务。
- mosquitto_sub.exe:这是订阅消息命令工具。
- mosquitto_pub.exe:这是发布消息命令工具。
- mosquitto_passwd.exe:用来管理密码。
具体命令详情,参考官网文档:
mosquitto_conf
mosquitto
mosquitto_sub
mosquitto_passwd
mosquitto_pub
2、linux安装方式
官网直接下载 mosquitto-1.5.4.tar.gz
我的linux环境:2.6.32-696.el6.x86_64
tar -xvf mosquitto-1.5.4.tar.gz,会生成目录:mosquitto-1.5.4
进入该目录执行: make install
不同环境会遇到不同的坑:我遇到的是少了文件:uuid/uuid.h,如下图:
百度查了下,需要安装 libuuid-devel ,使用 yum install libuuid-devel,之后在 make install 顺利成功。
成功后会生成命令:mosquitto mosquitto_passwd mosquitto_pub mosquitto_sub
配置文件生成在:/etc/mosquitto 目录下
启动mosquitto命令:mosquitto -c /etc/mosquitto/mosquitto.conf -d
又遇到个坑:libmosquitto.so.1 路径不正确。
编译完mosquitto之后,进入到lib目录下,将编译之后的libmosquitto.so.1 拷贝到目录/usr/local/lib下,执行如下命令:
cp libmosquitto.so.1 /usr/local/lib
然后再执行: ln -s /usr/local/lib/libmosquitto.so.1 /usr/lib/libmosquitto.so.1 和 ldconfig 即可。
测试是否成功三个命令:
- mosquitto -c /etc/mosquitto/mosquitto.conf -d
- mosquitto_sub -h 127.0.0.1 -p 1885 -t test001
- mosquitto_pub -h 127.0.0.1 -p 1885 -t test001 -m "hello"
使用mosquitto,及mosquitto、mosquitto_sub、mosquitto_pub命令
mosquitto 就是中间件服务器,类似我们使用的rocketmq、redis,安装后需要启动这个服务。 mosquitto_sub\mosquitto_pub 这两个是订阅\发布命令。
官方提供详细文档介绍命令如何使用,如下:
- mosquitto -c ./mosquitto.conf -p 1883 -v
- mosquitto_sub -h 127.0.0.1 -p 1883 -t Test/test/1
- mosquitto_pub -h 192.168.1.1 -p 1885 -t sensors/temperature -m "1266193804 32"
window安装的话会有mosquitto_sub.exe\mosquitto_pub.exe可执行文件,你可以直接CMD使用。如下图:
项目中如何集成
JAVA项目推荐mqttv3包,eclipse旗下官方包。
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.0</version>
</dependency>
mqttv3设计原理详解
eclipse-paho mqttv3模型结构:
mqttv3定义了两个接口实现分别是:MqttClient和MqttAsyncClient。
消息持久化两种模式:MemoryPersistence和MqttDefaultFilePersistence。
mqttv3并没有实现订阅的保持策略,需要自己去维护。
样例代码展示
1、发布消息
@Test
public void mqttPub(){
try {
MqttConnectOptions options = new MqttConnectOptions();
options.setKeepAliveInterval(20);
options.setConnectionTimeout(10);
options.setCleanSession(false);
mqttClient = new MqttClient(serverUri, clientIp, new MemoryPersistence());
mqttClient.setCallback(new MqttCallBackImpl(mqttClient));
IMqttToken mqttToken = mqttClient.connectWithResult(options);
boolean isComplete = mqttToken.isComplete();
System.out.println("connect is " + isComplete);
int count = 50;
while (count-->0) {
String msg = "hello world : " + System.currentTimeMillis();
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setQos(2);
mqttMessage.setPayload(msg.getBytes("UTF-8"));
mqttClient.publish(testTopic, mqttMessage);
Thread.sleep(500);
}
System.out.println("mqttPub is closed.");
closeMqtt();
} catch (UnsupportedEncodingException e){
e.printStackTrace();
} catch (MqttException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
2、订阅消息callback方式
@Test
public void mqttSubByCallBack(){
try {
MqttConnectOptions options = new MqttConnectOptions();
options.setKeepAliveInterval(20);
options.setConnectionTimeout(10);
options.setCleanSession(false);
mqttClient = new MqttClient(serverUri, clientIp, new MemoryPersistence());
mqttClient.setCallback(new MqttCallBackImpl());
IMqttToken mqttToken = mqttClient.connectWithResult(options);
boolean isComplete = mqttToken.isComplete();
System.out.println("connect is " + isComplete);
int count = 50;
while (count-- > 0) {
mqttClient.subscribe(testTopic);
Thread.sleep(300);
}
System.out.println("mqttSubByCallBack is closed.");
closeMqtt();
} catch (MqttException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
3、订阅消息messagelistener方式
@Test
public void mqttSubByListener(){
try {
MqttConnectOptions options = new MqttConnectOptions();
options.setKeepAliveInterval(20);
options.setConnectionTimeout(10);
options.setCleanSession(false);
mqttClient = new MqttClient(serverUri, clientIp, new MemoryPersistence());
IMqttToken mqttToken = mqttClient.connectWithResult(options);
boolean isComplete = mqttToken.isComplete();
System.out.println("connect is " + isComplete);
int count = 50;
while(count-- > 0){
mqttClient.subscribeWithResponse(testTopic, new MqttMessageListenerImpl());
Thread.sleep(200);
}
System.out.println("mqttSubByListener is closed.");
closeMqtt();
} catch (MqttException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
作者:Owen Jia , 欢迎关注他的博客Owen Blog。
扫描下方二维码,收藏本篇博客哦。