物联网协议MQTT学习分享-介绍&安装&java样例

Owen Jia 2018年11月12日 781次浏览

背景

MQTT全称:Message Queuing Telemetry Transport(消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的"轻量、简单、开放和易于实现的"通讯协议。

MQTT协议

官方定义:mqtt — MQ Telemetry Transport。官网:https://mosquitto.org/
它构建于TCP/IP协议上,由IBM在1999年发布。MQTT最大优点在于,可以以极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有较广泛的应用,如:机器与机器(M2M)通信和物联网(IoT)。其在,通过卫星链路通信传感器、偶尔拨号的医疗设备、智能家居中已广泛使用。 不得不提:M2M和IoT,在这个领域里面MQTT应用范围很广。

基础通信架构(图片来自网络)

实现MQTT协议需要客户端和服务器端通讯完成,在通讯过程中,MQTT协议中有三种身份:发布者(Publish)、代理(Broker)(服务器)、订阅者(Subscribe)。其中,消息的发布者和订阅者都是客户端,消息代理是服务器,消息发布者可以同时是订阅者。
MQTT传输的消息分为:主题(Topic)和负载(payload)两部分:
(1)Topic,可以理解为消息的类型,订阅者订阅(Subscribe)后,就会收到该主题的消息内容(payload)。
(2)payload,可以理解为消息的内容,是指订阅者具体要使用的内容。
MQTT会构建底层网络传输:它将建立客户端到服务器的连接,提供两者之间的一个有序的、无损的、基于字节流的双向传输。
当应用数据通过MQTT网络发送时,MQTT会把与之相关的服务质量(QoS)和主题名(Topic)相关连。

Qos消息服务质量

Broker针对message提供的服务模式,不同模式对message的保证机制不同。

QoS=0:最多一次送达。
broker发送后就不管了,client不需要响应broker也不会重发。消息可能会丢失。

QoS=1:至少一次送达。
broker会维护msg生命周期。broker发出去之后必须等待反馈,没有反馈就找时机重发,client反馈后broker结束该msg生命周期。可能会导致msg重复发情况。

QoS=2:确送达到且只有一次送达。最高等级消息服务质量模式,相应来说开销会增大。
broker会维护msg生命周期。broker发送msg后等待client反馈,收到反馈后再发送给client确认收到反馈,client收到反馈确认后,再发送broker确认完成,broker结束msg生命周期。也就是两次握手,第一次msg确认,第二次反馈确认。

Mosquitto中间件及其他Mqtt Broker

实现MQTT协议的中间件,市面上有很多,如RabbitMq就不同程度的实现了该协议。
图片来自网络

这里我们主要介绍Mosquitto,它是eclipse产品下面。
Mosquitto is an open source implementation of a server for version 3.1 and 3.1.1 of the MQTT protocol. It also includes a C and C++ client library, and the mosquitto_pub and mosquitto_sub utilities for publishing and subscribing.
它包含C&C++的库,主要是mosquitto.h这个库,封装了很多方法。可以参考文档:https://mosquitto.org/api/files/mosquitto-h.html

对于JAVA下的用户推荐 org.eclipse.paho\rg.eclipse.paho.client.mqttv3 工具包,目前大多数公司都使用这款。因为MQTT基于TCP/IP且原理简单,也可以自研connect-client或者站在开源的基础上封装。

安装Mosquitto(window|linux)

下载地址:https://mosquitto.org/download/

1、window安装方式
直接下载exe文件,进行安装即可。
如果window盗版,大致会出现dll包丢失问题(反正我是出现了),按要求去网上下载复制到c:\windows\system32目录下。

安装后目录如下:
目录

需要掌握以下这几个文件:

  • mosquitto.conf: 这是配置文件,可以修改端口、消息内容大小等等。
  • mosquitto.exe:这是中间件服务(broker),首先就系统启动它才能提供MQTT协议的消息服务。
  • mosquitto_sub.exe:这是订阅消息命令工具。
  • mosquitto_pub.exe:这是发布消息命令工具。
  • mosquitto_passwd.exe:用来管理密码。

具体命令详情,参考官网文档:
mosquitto_conf
mosquitto
mosquitto_sub
mosquitto_passwd
mosquitto_pub

2、linux安装方式
官网直接下载 mosquitto-1.5.4.tar.gz
我的linux环境:2.6.32-696.el6.x86_64
tar -xvf mosquitto-1.5.4.tar.gz,会生成目录:mosquitto-1.5.4
进入该目录执行: make install
不同环境会遇到不同的坑:我遇到的是少了文件:uuid/uuid.h,如下图:

百度查了下,需要安装 libuuid-devel ,使用 yum install libuuid-devel,之后在 make install 顺利成功。
成功后会生成命令:mosquitto mosquitto_passwd mosquitto_pub mosquitto_sub
配置文件生成在:/etc/mosquitto 目录下
启动mosquitto命令:mosquitto -c /etc/mosquitto/mosquitto.conf -d
又遇到个坑:libmosquitto.so.1 路径不正确。
编译完mosquitto之后,进入到lib目录下,将编译之后的libmosquitto.so.1 拷贝到目录/usr/local/lib下,执行如下命令:
cp libmosquitto.so.1 /usr/local/lib
然后再执行: ln -s /usr/local/lib/libmosquitto.so.1 /usr/lib/libmosquitto.so.1 和 ldconfig 即可。

测试是否成功三个命令:

  • mosquitto -c /etc/mosquitto/mosquitto.conf -d
  • mosquitto_sub -h 127.0.0.1 -p 1885 -t test001
  • mosquitto_pub -h 127.0.0.1 -p 1885 -t test001 -m "hello"

使用mosquitto,及mosquitto、mosquitto_sub、mosquitto_pub命令

mosquitto 就是中间件服务器,类似我们使用的rocketmq、redis,安装后需要启动这个服务。 mosquitto_sub\mosquitto_pub 这两个是订阅\发布命令。

官方提供详细文档介绍命令如何使用,如下:

  • mosquitto -c ./mosquitto.conf -p 1883 -v
  • mosquitto_sub -h 127.0.0.1 -p 1883 -t Test/test/1
  • mosquitto_pub -h 192.168.1.1 -p 1885 -t sensors/temperature -m "1266193804 32"

window安装的话会有mosquitto_sub.exe\mosquitto_pub.exe可执行文件,你可以直接CMD使用。如下图: 发布topic
订阅topic

项目中如何集成

JAVA项目推荐mqttv3包,eclipse旗下官方包。

<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.2.0</version>
</dependency>

mqttv3设计原理详解

eclipse-paho mqttv3模型结构:

mqttv3定义了两个接口实现分别是:MqttClient和MqttAsyncClient。

消息持久化两种模式:MemoryPersistence和MqttDefaultFilePersistence。

mqttv3并没有实现订阅的保持策略,需要自己去维护。

样例代码展示

1、发布消息

@Test
public void mqttPub(){
    try {
        MqttConnectOptions options = new MqttConnectOptions();
        options.setKeepAliveInterval(20);
        options.setConnectionTimeout(10);
        options.setCleanSession(false);

        mqttClient = new MqttClient(serverUri, clientIp, new MemoryPersistence());
        mqttClient.setCallback(new MqttCallBackImpl(mqttClient));
        IMqttToken mqttToken = mqttClient.connectWithResult(options);
        boolean isComplete = mqttToken.isComplete();
        System.out.println("connect is " + isComplete);

        int count = 50;
        while (count-->0) {
            String msg = "hello world : " + System.currentTimeMillis();
            MqttMessage mqttMessage = new MqttMessage();
            mqttMessage.setQos(2);
            mqttMessage.setPayload(msg.getBytes("UTF-8"));
            mqttClient.publish(testTopic, mqttMessage);
            Thread.sleep(500);
        }
        System.out.println("mqttPub is closed.");
		closeMqtt();
    } catch (UnsupportedEncodingException e){
        e.printStackTrace();
    } catch (MqttException e) {
        e.printStackTrace();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

2、订阅消息callback方式

@Test
public void mqttSubByCallBack(){
    try {
        MqttConnectOptions options = new MqttConnectOptions();
        options.setKeepAliveInterval(20);
        options.setConnectionTimeout(10);
        options.setCleanSession(false);

        mqttClient = new MqttClient(serverUri, clientIp, new MemoryPersistence());
        mqttClient.setCallback(new MqttCallBackImpl());
        IMqttToken mqttToken = mqttClient.connectWithResult(options);
        boolean isComplete = mqttToken.isComplete();
        System.out.println("connect is " + isComplete);

        int count = 50;
        while (count-- > 0) {
            mqttClient.subscribe(testTopic);
            Thread.sleep(300);
        }
        System.out.println("mqttSubByCallBack is closed.");
		closeMqtt();
    } catch (MqttException e) {
        e.printStackTrace();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

3、订阅消息messagelistener方式

@Test
public void mqttSubByListener(){
    try {
        MqttConnectOptions options = new MqttConnectOptions();
        options.setKeepAliveInterval(20);
        options.setConnectionTimeout(10);
        options.setCleanSession(false);

        mqttClient = new MqttClient(serverUri, clientIp, new MemoryPersistence());
        IMqttToken mqttToken = mqttClient.connectWithResult(options);
        boolean isComplete = mqttToken.isComplete();
        System.out.println("connect is " + isComplete);

        int count = 50;
        while(count-- > 0){
            mqttClient.subscribeWithResponse(testTopic, new MqttMessageListenerImpl());
            Thread.sleep(200);
        }
        System.out.println("mqttSubByListener is closed.");
		closeMqtt();
    } catch (MqttException e) {
        e.printStackTrace();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

作者:Owen Jia , 欢迎关注他的博客Owen Blog

扫描下方二维码,收藏本篇博客哦。