前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >消息通讯——MQTT的入门和使用

消息通讯——MQTT的入门和使用

作者头像
不愿意做鱼的小鲸鱼
发布2022-09-26 18:36:41
3K0
发布2022-09-26 18:36:41
举报
文章被收录于专栏:web全栈

Emqx简介

EMQ X (Erlang/Enterprise/Elastic MQTT Broker) 是基于 Erlang/OTP 平台开发的开源物联网 MQTT 消息服务器。 EMQ X 设计目标是实现高可靠,并支持承载海量物联网终端的MQTT连接,支持在海量物联网设备间低延时消息路由: 1. 稳定承载大规模的 MQTT 客户端连接,单服务器节点支持50万到100万连接。 2. 分布式节点集群,快速低延时的消息路由,单集群支持1000万规模的路由。 3. 消息服务器内扩展,支持定制多种认证方式、高效存储消息到后端数据库。 4. 完整物联网协议支持,MQTT、MQTT-SN、CoAP、LwM2M、WebSocket 或私有协议支持。

官方文档:https://docs.emqx.cn/broker/v4.3/getting-started/install.html

MQTT是什么?

MQTT全称消息队列遥测传输 (Message Queuing Telemetry Transport)。其主要提供了订阅/发布两种消息模式,更为简约、轻量,易于使用,特别适合于受限环境(带宽低、网络延迟高、网络通信不稳定)的消息分发,属于物联网(Internet of Thing)的一个标准传输协议。

MQTT实现方式

实现MQTT协议需要客户端和服务器端通讯完成,在通讯过程中,MQTT协议中有三种身份:发布者(Publish)、代理(Broker)(服务器)、订阅者(Subscribe)。其中,消息的发布者和订阅者都是客户端,消息代理是服务器,消息发布者可以同时是订阅者。

消息通讯——MQTT的入门和使用-左眼会陪右眼哭の博客
消息通讯——MQTT的入门和使用-左眼会陪右眼哭の博客

MQTT传输的消息分为:主题(Topic)和负载(payload)两部分: (1)Topic,可以理解为消息的类型,订阅者订阅(Subscribe)后,就会收到该主题的消息内容(payload); (2)payload,可以理解为消息的内容,是指订阅者具体要使用的内容。

Emqx安装

官方网站:https://www.emqx.cn/

安装步骤

  1. 下载地址:https://www.emqx.cn/downloads#broker
  2. 解压程序包
  3. 启动 EMQ X Broker 进入到emqx解压后目录,进入bin目录,执行其下的命令脚本 #启动emqx emqx start #查看emqx状态 emqx status #停止 EMQ X Broker emqx stop
  4. 卸载 EMQ X Broker 直接删除 EMQ X 目录即可

Emqx Dashboard插件

Emqx自带dashboard插件:通过Dashboard,你可以查看服务器基本信息、负载情况和统计数据,可以查看某个客户端的连接状态等信息甚至断开其连接,也可以动态加载和卸载指定插件。 除此之外,EMQ X Dashboard 还提供了规则引擎的可视化操作界面,同时集成了一个简易的 MQTT 客户端工具供用户测试使用。 当 EMQ X 成功运行在你的本地计算机上且 EMQ X Dashboard 被默认启用时,你可以访问 http://localhost:18083 来查看你的 Dashboard,默认用户名是admin,密码是 public

消息通讯——MQTT的入门和使用-左眼会陪右眼哭の博客
消息通讯——MQTT的入门和使用-左眼会陪右眼哭の博客

MQTT 设计了的3 QoS 等级

QoS 0:消息最多传递一次,如果当时客户端不可用,则会丢失该消息。 QoS 1:消息传递至少 1 次。 QoS 2:消息仅传送一次。

需要开放的端口

消息通讯——MQTT的入门和使用-左眼会陪右眼哭の博客
消息通讯——MQTT的入门和使用-左眼会陪右眼哭の博客

Emqx使用

java使用mqtt

使用步骤如下
  1. 导入依赖
代码语言:javascript
复制
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jpa</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.2</version>
        </dependency>
  1. 订阅者(App.java)
代码语言:javascript
复制
package cn.kt.mtqqdemo.mqtt;
/**
 * Created by tao.
 * Date: 2021/4/12 13:57
 * 描述:
 */
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.util.UUID;
public class App {
    public static void main(String[] args) {
        try {
            //apollo地址
            String HOST = "tcp://127.0.0.1:1883";
            //要订阅的主题
            String TOPIC1 = "ceshi";
            //指你Apollo中的用户名密码
            String userName = "admin";
            String pwd = "123456";
            String clientid = UUID.randomUUID().toString().replace("-", "");
            MqttClient client = new MqttClient(HOST, clientid, new MemoryPersistence());
            // MQTT的连接对象
            MqttConnectOptions options = new MqttConnectOptions();
            //设置连接参数
            //清除session回话
            options.setCleanSession(false);
            options.setUserName(userName);
            options.setPassword(pwd.toCharArray());
            //超时设置
            options.setConnectionTimeout(10);
            //心跳保持时间
            options.setKeepAliveInterval(20);
            //遗嘱:当该客户端端口连接时,会向whb主题发布一条信息
            options.setWill("nick", "我挂了,你加油".getBytes(), 1, true);
            //监听对象:自己创建
            client.setCallback(new PushCallback());
            //打开连接
            client.connect(options);
            //设置消息级别
            int[] Qos = {1};
            //订阅主题
            String[] topics = {TOPIC1};
            client.subscribe(topics, Qos);

        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
}
  1. 发布者(sendOut.java)
代码语言:javascript
复制
package cn.kt.mtqqdemo.mqtt;
/**
 * Created by tao.
 * Date: 2021/4/12 14:09
 * 描述:
 */
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.io.UnsupportedEncodingException;
import java.util.Scanner;
public class SendOut {
    //tcp://MQTT安装的服务器地址:MQTT定义的端口号
    String HOST = "tcp://127.0.0.1:1883";
    //定义一个主题
    public static final String TOPIC = "ceshi";
    //    public static final String TOPIC = "abc";
    //定义MQTT的ID,可以在MQTT服务配置中指定
    private static final String clientid = "server1";
    private MqttMessage message;
    public static final String TOPIC1 = "topic1";
    public static final String userName = "admin";
    public static final String pwd = "123456";
    public MqttClient client;
    private MqttTopic topic;
    public SendOut() {
        try {
            client = new MqttClient(HOST, clientid, new MemoryPersistence());
            connect();
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    //发布消息
    public void publish(MqttTopic topic, MqttMessage message) throws MqttException {
        MqttDeliveryToken token = topic.publish(message);
        token.waitForCompletion();
        //打印发送状态
        System.out.println("message is published completely!" + token.isComplete());
    }

    //建立连接:参数与订阅端相似
    private void connect() throws MqttException {
        MqttConnectOptions options = new MqttConnectOptions();
        options.setCleanSession(false);
        options.setUserName(userName);
        options.setPassword(pwd.toCharArray());
        options.setConnectionTimeout(10);
        options.setKeepAliveInterval(20);
        client.setCallback(new PushCallback());
        client.connect(options);
    }

    public static void main(String[] args) throws MqttException, UnsupportedEncodingException {
        SendOut service = new SendOut();
        Scanner sc = new Scanner(System.in);
        service.topic = service.client.getTopic(TOPIC);
        service.message = new MqttMessage();
        //确保被收到一次
        service.message.setQos(1);
        service.message.setPayload("干嘛这么想不开,要在脸上贴个输字".getBytes("UTF-8"));
        service.publish(service.topic, service.message);
    }
}
  1. 订阅消息回调(OnMessageCallback.java)
代码语言:javascript
复制
package cn.kt.mtqqdemo.mqtt;
/**
 * Created by tao.
 * Date: 2021/4/12 13:58
 * 描述:
 */
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
public class OnMessageCallback implements MqttCallback {
    public void connectionLost(Throwable cause) {
        // 连接丢失后,一般在这里面进行重连
        System.out.println("连接断开,可以做重连");
    }
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        // subscribe后得到的消息会执行到这里面
        System.out.println("接收消息主题:" + topic);
        System.out.println("接收消息Qos:" + message.getQos());
        System.out.println("接收消息内容:" + new String(message.getPayload()));
    }

    public void deliveryComplete(IMqttDeliveryToken token) {
        System.out.println("deliveryComplete---------" + token.isComplete());
    }
}
  1. 发布消息回调(PushCallback.java)
代码语言:javascript
复制
package cn.kt.mtqqdemo.mqtt;
/**
 * Created by tao.
 * Date: 2021/4/12 14:01
 * 描述:
 */
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
public class PushCallback implements MqttCallback {

    //连接丢失:一般用与重连
    public void connectionLost(Throwable throwable) {
        System.out.println("丢失连接");
    }
    //消息到达:指收到消息
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        System.out.println("接收消息主题 : " + topic);
        System.out.println("接收消息Qos : " + message.getQos());
        System.out.println("接收消息内容 : " + new String(message.getPayload()));
    }

    public void deliveryComplete(IMqttDeliveryToken token) {
        //(发布)publish后会执行到这里,发送状态
        System.out.println("deliveryComplete---------"
                + token.isComplete());
    }
}
测试效果
  1. 发布者
消息通讯——MQTT的入门和使用-左眼会陪右眼哭の博客
消息通讯——MQTT的入门和使用-左眼会陪右眼哭の博客
  1. 订阅者
消息通讯——MQTT的入门和使用-左眼会陪右眼哭の博客
消息通讯——MQTT的入门和使用-左眼会陪右眼哭の博客

js使用mqtt

引入mqttws31.js

可以下载: 链接:https://pan.baidu.com/s/1c9CfyhT4CSY2FEOa1OgxPw 提取码:siwg

也可以用对应的cdn 地址

代码语言:javascript
复制
<!-- For the plain library-->
<script src="https://cdnjs.cloudflare.com/ajax/libs/paho-mqtt/1.0.1/mqttws31.js" type="text/javascript"></script>
<!-- For the minified library-->
<script src="https://cdnjs.cloudflare.com/ajax/libs/paho-mqtt/1.0.1/mqttws31.min.js" type="text/javascript"></script>
代码如下
代码语言:javascript
复制
<!DOCTYPE html >
<html>

<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <meta http - equiv="X-UA-Compatible" content="ie=edge">
    <title> Document </title>
    <link href="https://cdn.bootcdn.net/ajax/libs/twitter-bootstrap/3.3.7/css/bootstrap.min.css" rel="nofollow noopener"  rel="stylesheet">
    <script src="https://cdn.bootcdn.net/ajax/libs/jquery/2.2.1/jquery.min.js"></script>
    <script src="./js/mqttws31.js" type="text/javascript"></script>
    <style>
        #contentList li {
            word-break: break-all;
            word-wrap: break-word;
        }
    </style>
</head>

<body>
    <div style="width: 900px;margin: 50px auto;">
        <div class="form-group">
            <label>评论人:</label>
            <input type="text" class="form-control" id="user">
        </div>

        <div class="form-group">
            <label>评论内容:</label>
            <textarea class="form-control" id="content" style="word-break:break-all;word-wrap:break-word;"></textarea>
        </div>

        <div class="form-group">
            <input type="button" value="发表评论" class="btn btn-primary" onclick="send()">
        </div>

        <div>
            <ul id="contentList" class="list-group">
                <!-- <li class="list-group-item">
                    <span class="badge">评论人: {{ item.user }} 时间:{{item.time}}</span> {{ item.content }}
                </li> -->
            </ul>
        </div>
    </div>

    <script>
        // http://192.168.3.181/
        var hostname = '192.168.3.181',
            port = 8083,
            clientId = 'client-' + generateUUID(),
            timeout = 1000,
            keepAlive = 2000,
            cleanSession = false,
            ssl = false,
            userName = 'Nick',
            password = '12356',
            topic = 'ceshi';
        client = new Paho.MQTT.Client(hostname, port, clientId);
        //建立客户端实例
        var options = {
            invocationContext: {
                host: hostname,
                port: port,
                path: client.path,
                clientId: clientId
            },
            timeout: timeout,
            keepAliveInterval: keepAlive,
            cleanSession: cleanSession,
            useSSL: ssl,
            userName: userName,
            password: password,
            onSuccess: onConnect,
            onFailure: function(e) {
                console.log(e);
            }
        };
        client.connect(options);

        //连接服务器并注册连接成功处理事件
        function onConnect() {
            console.log("onConnected");
            client.subscribe(topic);
        }

        client.onConnectionLost = onConnectionLost;

        //注册连接断开处理事件
        client.onMessageArrived = onMessageArrived;

        //注册消息接收处理事件
        function onConnectionLost(responseObject) {
            console.log(responseObject);
            if (responseObject.errorCode !== 0) {
                console.log("onConnectionLost:" + responseObject.errorMessage);
                console.log("连接已断开");
            }
        }

        //收到消息时处理事件
        function onMessageArrived(message) {
            var msg = message.payloadString;
            var obj = JSON.parse(msg);
            console.log("收到消息:" + obj);
            /*
            <li class="list-group-item">
                    <span class="badge">评论人: {{ item.user }} 时间:{{item.time}}</span> {{ item.content }}
                </li>
            */
            $('#contentList').append($(`<li class="list-group-item" > <span class="badge">评论人:` + obj.name + `,时间:` + obj.time + `</span>` + obj.content + `</li>`));
        }

        //点击发送按钮事件
        function send() {
            var name = document.getElementById("user").value;
            var content = document.getElementById("content").value;
            console.log('name :>> ', name);
            console.log('content :>> ', content);
            var time = new Date().Format("yyyy-MM-dd hh:mm:ss");
            var getConment = {
                name: name,
                content: content,
                time: time,
            }
            if (name) {
                var str = getConment;
                message = new Paho.MQTT.Message(JSON.stringify(str));
                message.destinationName = topic;
                client.send(message);
                document.getElementById("content").value = "";
                document.getElementById("user").value = "";
            }
        }

        //生成UUID
        function generateUUID() {
            var d = new Date().getTime();
            if (window.performance && typeof window.performance.now === "function") {
                d += performance.now(); //use high-precision timer if available
            }
            var uuid = 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function(c) {
                var r = (d + Math.random() * 16) % 16 | 0;
                d = Math.floor(d / 16);
                return (c == 'x' ? r : (r & 0x3 | 0x8)).toString(16);
            });
            return uuid;
        }
        //date时间格式化
        Date.prototype.Format = function(fmt) {
            var o = {
                "M+": this.getMonth() + 1, //月份
                "d+": this.getDate(), //日
                "h+": this.getHours(), //小时
                "m+": this.getMinutes(), //分
                "s+": this.getSeconds(), //秒
                "q+": Math.floor((this.getMonth() + 3) / 3), //季度
                "S": this.getMilliseconds() //毫秒
            };
            if (/(y+)/.test(fmt)) fmt = fmt.replace(RegExp.$1, (this.getFullYear() + "").substr(4 - RegExp.$1.length));
            for (var k in o)
                if (new RegExp("(" + k + ")").test(fmt)) fmt = fmt.replace(RegExp.$1, (RegExp.$1.length == 1) ? (o[k]) : (("00" + o[k]).substr(("" + o[k]).length)));
            return fmt;
        }
    </script>
</body>
</html>
测试效果

页面效果

消息通讯——MQTT的入门和使用-左眼会陪右眼哭の博客
消息通讯——MQTT的入门和使用-左眼会陪右眼哭の博客

java 连接mqtt订阅者收到消息

消息通讯——MQTT的入门和使用-左眼会陪右眼哭の博客
消息通讯——MQTT的入门和使用-左眼会陪右眼哭の博客
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Emqx简介
    • MQTT是什么?
      • MQTT实现方式
      • Emqx安装
        • 安装步骤
          • Emqx Dashboard插件
            • MQTT 设计了的3 QoS 等级
              • 需要开放的端口
              • Emqx使用
                • java使用mqtt
                  • 使用步骤如下
                  • 测试效果
                • js使用mqtt
                  • 引入mqttws31.js
                  • 代码如下
                  • 测试效果
              相关产品与服务
              物联网
              腾讯连连是腾讯云物联网全新商业品牌,它涵盖一站式物联网平台 IoT Explorer,连连官方微信小程序和配套的小程序 SDK、插件和开源 App,并整合腾讯云内优势产品能力,如大数据、音视频、AI等。同时,它打通腾讯系 C 端内容资源,如QQ音乐、微信支付、微保、微众银行、医疗健康等生态应用入口。提供覆盖“云-管-边-端”的物联网基础设施,面向“消费物联”和 “产业物联”两大赛道提供全方位的物联网产品和解决方案,助力企业高效实现数字化转型。
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档