前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >ESA2GJK1DH1K基础篇: 来吧! 彻底了解一下MQTT

ESA2GJK1DH1K基础篇: 来吧! 彻底了解一下MQTT

作者头像
杨奉武
发布2019-10-30 10:39:49
4190
发布2019-10-30 10:39:49
举报
文章被收录于专栏:知识分享知识分享

首先你需要知道MQTT并不是什么高大上的事物,它只是一个软件,对就是一个软件.其实就是个TCP服务器

一,既然是TCP服务器,这个TCP服务器和咱平时做的有什么不一样呢.

  首先,平时的时候咱做的TCP服务器都是,一个或者多个客户端连接咱做的TCP服务器,然后TCP服务器处理客户端的数据.

  现在呢!需求变了!

  假设我有5个网络设备,3个手机.我现在想让网络设备把数据远程传给手机.而且我还需要记录网络设备上传的数据.

  假设通信是这样的(而且后期还会不停的增加设备和手机)

二,咋办???

  1. 需要记录所有设备的数据

  2. 设备和手机之间存在多对一和一对多

  所以,必须需要个公共的服务器进行数据的中转.

  假设就把这个服务器做成TCP服务器,有人问,你咋不做成UDP呢?UDP他妹的发送数据不好判断是不是发送成功.难道我还每次都让服务器

  给我回数据不成,我还是少找些麻烦!

  还有就是要实现远程,有个公网IP就可以,可以自己买个服务器,上网络公司拉一根专网

  或者用自己电脑,用花生壳映射

  还是用云服务器吧!就是运行在别人的服务器上的一台电脑(就是一台电脑),IP地址直接是公网.方便.

三,怎么设计这个TCP服务器???

  1.为了应对这种通信,首先设备发送的数据决不能是单单的数据,必须加点东西

  2.如果把发送的数据带上标识呢? 假设设备1发送的数据是   (aaaaa数据 )  aaaaa是数据标识,后面是真实数据

  3.然后呢!假设手机1就接收数据标识是aaaaa的数据,怎么让服务器转发给它呢???

  4.如果手机1在连接上TCP服务器的时候 告诉TCP服务器我接收数据标识是 aaaaa的数据

  5.通过上面的方式是不是有点眉头了????

  咱呢姑且把 "告诉TCP服务器我接收数据标识是 aaaaa的数据"  这个事情呢,起个名字     订阅的主题是 aaaaa

  把 "假设设备1发送的数据是   (aaaaa数据 )"    消息前面的 aaaaa 叫做 发布的主题是aaaaa

四,总结上面的就是

  手机1先连接TCP服务器,然后呢,规定个协议,告诉TCP服务器我订阅的主题是aaaaa

  这样呢服务器就记住了,当出现消息前面的主题是aaaaa的消息的时候,他就把这个消息发给手机1

  当然咱假设,设备1连接上TCP服务器,然后,告诉TCP服务器我订阅的主题是wwww

  这样呢服务器就记住了,当出现消息前面的主题是wwww的消息的时候,他就把这个消息发给设备1

  然后设备1连接上TCP服务器以后呢,这样发送信息(假设发送的消息是123456):   aaaaa1123456

  服务器一接收到客户端的消息,就取出来这个消息的标识是什么,取出来的是 aaaaa

  然后呢,轮训下记录的谁需要消息标识是aaaaa的消息,然后找到了手机1

  最后把这个消息发送给手机1这个客户端,然后手机1就接收到了1123456这个消息

  同理:手机1发送  wwww998877  然后这个消息就会发给设备1 ,设备1就会收到 998877

  不知道听没听懂!!!没有做过TCP服务器的你先做个TCP服务器和一个客户端试一试通信.....

五,总结

  这个服务器道理上是这样,服务器记录各个设备的信息,各个设备订阅的主题,然后呢,判断这个消息然后进行转发

  但是...做个简单的完全可以做出来,但是要想做的完善,而且要支持庞大消息数量的设备(来个百万级).....不是一朝一夕就可以的.

  其实很长时间以前,人们就有这种需求了.多对一和一对多通信

  所以呢,一些组织和单位就开始解决这种问题,开始做这种软件,所以MQTT就诞生了.

  之所以叫MQTT是因为是外国人做的这种TCP服务器,外国人呢,为实现这种功能的TCP取了个名字叫

  Message Queuing Telemetry Transport

  然后取每个首字母  就叫 MQTT了

  其实有很多家做MQTT软件,但是呢,我比较喜欢用emqtt

看看怎么连接上他们做的MQTT软件

一,首先咱知道就是个TCP服务器,所以呢,需要先用TCP连接上他们的服务器.

二,然后需要发送第一条消息(注:并不是上来就可以订阅主题的)

  MQTT软件规定呢,你发送的第一条信息是连接信息(相当于咱要先登录)

  他规定呢!

  ClientID: 各个客户端必须设定一个ID,各个客户端必须都不一样               假设是 123456

  用户名: 咱安装MQTT软件的时候可以设置MQTT软件的登录的用户名     假设是yang

  密码: 咱安装MQTT软件的时候可以设置MQTT软件的登录的密码             假设是 11223344

  下面是我当初研究MQTT的协议,写的,然后把上面三个参数填进去

  注意这节我粘贴的代码是当时为了移植到51单片机而自己写的(51内存太少了),咱用32哈,直接用的官方提供的库.

代码语言:javascript
复制
/**
* @brief  连接服务器的打包函数
* @param  
* @retval 
* @example 
**/
int ConnectMqtt(char *ClientID,char *Username,char *Password)
{
    int ClientIDLen = strlen(ClientID);
    int UsernameLen    = strlen(Username);
    int PasswordLen = strlen(Password);
    int DataLen = 0;
    int Index = 2;
    int i = 0;
    DataLen = 12 + 2+2+ClientIDLen+UsernameLen+PasswordLen;
    MqttSendData[0] = 0x10;                //MQTT Message Type CONNECT
    MqttSendData[1] = DataLen;    //剩余长度(不包括固定头部)
    MqttSendData[Index++] = 0;        // Protocol Name Length MSB    
    MqttSendData[Index++] = 4;        // Protocol Name Length LSB    
    MqttSendData[Index++] = 'M';        // ASCII Code for M    
    MqttSendData[Index++] = 'Q';        // ASCII Code for Q    
    MqttSendData[Index++] = 'T';        // ASCII Code for T    
    MqttSendData[Index++] = 'T';        // ASCII Code for T    
    MqttSendData[Index++] = 4;        // MQTT Protocol version = 4    
    MqttSendData[Index++] = 0xc2;        // conn flags 
    MqttSendData[Index++] = 0;        // Keep-alive Time Length MSB    
    MqttSendData[Index++] = 60;        // Keep-alive Time Length LSB  60S心跳包  
    MqttSendData[Index++] = (0xff00&ClientIDLen)>>8;// Client ID length MSB    
    MqttSendData[Index++] = 0xff&ClientIDLen;    // Client ID length LSB  

    for(i = 0; i < ClientIDLen; i++)
    {
        MqttSendData[Index + i] = ClientID[i];          
    }
    Index = Index + ClientIDLen;
    
    if(UsernameLen > 0)
    {   
        MqttSendData[Index++] = (0xff00&UsernameLen)>>8;//username length MSB    
        MqttSendData[Index++] = 0xff&UsernameLen;    //username length LSB    
        for(i = 0; i < UsernameLen ; i++)
        {
            MqttSendData[Index + i] = Username[i];    
        }
        Index = Index + UsernameLen;
    }
    
    if(PasswordLen > 0)
    {    
        MqttSendData[Index++] = (0xff00&PasswordLen)>>8;//password length MSB    
        MqttSendData[Index++] = 0xff&PasswordLen;    //password length LSB    
        for(i = 0; i < PasswordLen ; i++)
        {
            MqttSendData[Index + i] = Password[i];    
        }
        Index = Index + PasswordLen; 
    }    
    return Index;
}

得到以下数据,然后把这个数据发给TCP 服务器,如果没有错误,服务器就会回 90 02 00 00

10 22 00 04 4D 51 54 54 04 C2 00 03 00 06 31 32 33 34 35 36 00 04 79 61 6E 67 00 08 31 31 32 32 33 33 34 34

先说一件事情  所有的MQTT数据哈  第一个数据是说明整个数据是干什么的数据   第二个是说它后面的数据的总个数

10 : 固定,MQTT规定的连接用0x10

22: 是说0x22后面有0x22个数据  34个

00 04: 后面记录MQTT版本号的字节个数  

4D 51 54 54: M Q T T  版本号字符         这个是4版本,不同版本不一样 3版本的是MQIsdp 额,了解就可以

04: 版本号是 0x04 

C2:这个呢想了解具体呢,需要看协议  http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718028

我说一下哈,不要去纠结这个协议,你再怎么研究也没大用,官方早就给咱准备好了各种平台的包.咱要做的是熟练运用.

上面就是说有用户名和密码,每次连接的时候清除连接信息,没有设置遗嘱(后面会说)

00 03: 心跳包是3S一次(这个自己连接的时候自己设置),

MQTT规定必须客户端必须发心跳包,客户端发送的心跳包数据是 0xC0 0x00,这是MQTT规定的

如果心跳包间隔了你设定心跳包的1.5倍时间,你没有发给服务器,服务器就认为你掉线了,然后还有个遗嘱问题,,后面会说

你发给服务器 0xC0 0x00  服务器会回你 0xD0  0x00  这个知道就行了

00 06:客户端的ClientId有6位

后面的  31 32 33 34 35 36    就是ClientId ,这是MQTT服务器规定的,每隔客户端必须有各自的ClientId

00 04: MQTT的用户名  

79 61 6E 67  我安装MQTT的时候设置的MQTT的用户名是yang

00 08: MQTT的密码

31 31 32 32 33 33 34 34  我安装MQTT的时候设置的MQTT密码

好了,连接上TCP服务器 然后发送

10 22 00 04 4D 51 54 54 04 C2 00 03 00 06 31 32 33 34 35 36 00 04 79 61 6E 67 00 08 31 31 32 32 33 33 34 34

服务器呢就会回你  90 02 00 00

90: 固定

02: 后面有两个数据

后面的两个数据呢,有几个返回值,0就说明成功,其它就是有各种问题

比如说回的是 90 02 00 04  就说明用户名或者密码有问题.

现在服务器认咱了,该告诉服务器我订阅的主题了

假设告诉服务器我订阅的是2222

假设订阅的时候订阅的主题的消息标识是1,消息等级是0

那么打包以后就是 82 09 00 01 00 04 32 32 32 32 00  然后把这个数据发给TCP服务器

代码语言:javascript
复制
/**
* @brief  MQTT订阅/取消订阅数据打包函数
* @param  SendData 
* @param  topic                主题 
* @param  qos         消息等级 
* @param  whether     订阅/取消订阅请求包
* @retval 
* @example 
**/
int MqttSubscribeTopic(char *topic,u8 qos,u8 whether)
{    
    int topiclen = strlen(topic);
    int i=0,index = 0;
  
    if(whether)
        MqttSendData[index++] = 0x82;                        //0x82 //消息类型和标志 SUBSCRIBE 订阅
    else
        MqttSendData[index++] = 0xA2;                        //0xA2 取消订阅
    MqttSendData[index++] = topiclen + 5;                //剩余长度(不包括固定头部)
    MqttSendData[index++] = 0;                          //消息标识符,高位
    MqttSendData[index++] = 0x01;                    //消息标识符,低位
    MqttSendData[index++] = (0xff00&topiclen)>>8;    //主题长度(高位在前,低位在后)
    MqttSendData[index++] = 0xff&topiclen;              //主题长度 
    
    for (i = 0;i < topiclen; i++)
    {
        MqttSendData[index + i] = topic[i];
    }
    index = index + topiclen;
    
    if(whether)
    {
        MqttSendData[index] = qos;//QoS级别
        index++;
    }
    return index;
}

0x82: 告诉MQTT服务器,我要订阅主题

0x09: 后面的数据个数

0x00  0x01  注意哈,订阅主题的时候可以设置了标识  标识呢  1-65535

之所以有这个家伙:咱订阅的时候怎么判断订阅成功了呢???

订阅成功以后呢!服务器会返回咱订阅成功的回复,回复里面就包含着咱写的这个标识

咱呢可以对比下这个标识,然后呢就知道到底是不是订阅成功了.

0x00  0x04  后面订阅主题的长度

32 32 32 32 订阅的主题是 2222

最后一个 00 是说消息等级,一般呢,订阅设置为0 就可以

那就说一下这个消息等级有什么用吧!

咱发送数据的时候也会携带一个消息等级

假设是0  那么这条消息是不是真的发给MQTT服务器(Broker)了,就不知道了,

如果设备多个,还真不敢保证真的发给服务器了

假设是1 那么一个客户端发送消息以后呢,服务器一看消息等级是1,那么就会回给那个发送消息的客户端一个应答消息

客户端发送完消息以后其实内部会启动一个超时操作,如果多少时间内没有回复,那么他会再发一次

假设是2 这个呢就是消息一定要到达MQTT服务器.这个很苛刻,也比较占用内存

如果按照上面发呢,服务器会回

90 03 00 01 00 

90:固定

03:后面的数据长度

00 01:这条主题的标识

00:消息等级

然后看发布

长话短说

发布的时候呢,信息里面都有这些内容

发布的主题,消息,回传标志,消息等级,是不是需要服务器保留消息,消息的标识

代码语言:javascript
复制
/**
* @brief  MQTT发布数据打包函数
* @param  mqtt_message 
* @param  topic                主题 
* @param  qos         消息等级 
* @retval 
* @example 
**/
int MqttPublishData(char * topic, char * message, u8 qos)
{  
    int topic_length = strlen(topic);    
    int message_length = strlen(message);  
    int i,index=0;    
    static u16 id=0;
    
    MqttSendData[index++] = 0x30;    // MQTT Message Type PUBLISH  

  
    if(qos)
        MqttSendData[index++] = 2 + topic_length + 2 + message_length;//数据长度
    else
        MqttSendData[index++] = 2 + topic_length + message_length;   // Remaining length  

  
    MqttSendData[index++] = (0xff00&topic_length)>>8;//主题长度
    MqttSendData[index++] = 0xff&topic_length;
         
    for(i = 0; i < topic_length; i++)
    {
        MqttSendData[index + i] = topic[i];//拷贝主题
    }
    index += topic_length;
        
    if(qos)
    {
        MqttSendData[index++] = (0xff00&id)>>8;
        MqttSendData[index++] = 0xff&id;
        id++;
    }
  
    for(i = 0; i < message_length; i++)
    {
        MqttSendData[index + i] = message[i];//拷贝数据
    }
    index += message_length;
        
    return index;
}

发布的主题: 谁订阅了这个主题,消息就传给谁

回传标志: 我没用过,默认0

消息等级:上面说了

是不是需要服务器保留消息:一会和遗嘱一块说

消息的标识:上面有提及,一般用不到,默认1就可以

最后看遗嘱

还记得上面

我直接说遗嘱是啥意思哈!

假设我手机和一个设备订阅主题和发布主题对应,我就能和这个设备通信了

但是,我怎么知道这个设备掉线了呢?

当然完全可以自己发信息给那个设备,如果不回复,就说明掉线了

但是呢!MQTT服务器提供了一种方式

假设我设置好设备的遗嘱消息是  offline    遗嘱发布的主题是 aaaaa

如果设备掉线,服务器就会给订阅了aaaaa的客户端发送  offline

还记得上面说的不   服务器如果在你设置的心跳包时间的1.5倍收不到心跳包就认为你掉线了.

结语

了解就可以,关键是学会使用,就是个安装软件的事情,封包解包也不用你写

官方早就把各种平台的包给你准备好了,我之所以研究,是因为当时要用51单片机做,官方包太大.....没办法只能自己写....

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019-10-30 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 首先你需要知道MQTT并不是什么高大上的事物,它只是一个软件,对就是一个软件.其实就是个TCP服务器
  • 看看怎么连接上他们做的MQTT软件
  • 现在服务器认咱了,该告诉服务器我订阅的主题了
  • 然后看发布
  • 最后看遗嘱
  • 结语
相关产品与服务
弹性公网 IP
弹性公网 IP(Elastic IP,EIP)是可以独立购买和持有,且在某个地域下固定不变的公网 IP 地址,可以与 CVM、NAT 网关、弹性网卡和高可用虚拟 IP 等云资源绑定,提供访问公网和被公网访问能力;还可与云资源的生命周期解耦合,单独进行操作;同时提供多种计费模式,您可以根据业务特点灵活选择,以降低公网成本。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档