前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >MQTT这么好玩不来自己搭建一个吗

MQTT这么好玩不来自己搭建一个吗

作者头像
Coder昊白
发布2023-11-22 09:57:53
4800
发布2023-11-22 09:57:53
举报

前言

之前写了一篇为什么智能硬件首选MQTT - 掘金,这次就来搭建一个自己的MQTT交互平台,实际体验一下,没有实战怎么能行。

一、服务端准备

1. 选择平台

我这里用的平台是EMQX Cloud,可以通过github账号免费申请一个MQTT服务器,对于个人使用来说特别方便,同时使用使用 MQTT 客户端快速测试 MQTT 服务去监听或者模拟下发,这里我们选择免费开启,点击立即部署然后一直同意就建立好了。

2. 启动服务

建立好以后我们点击项目管理,里面就会出现一个我们刚申请的服务器,进去后点击启动,这样我们就把服务启动起来了。

3. 创建用户

点击认证鉴权后选择认证,然后点击右边的添加,即可创建我们的连接用户,这个用户的名称和密码就是我们客户端一会建立连接的时候需要的username和password。至此我们就可以去客户端去写连接代码了。

二、客户端搭建

1. 引入

代码语言:javascript
复制
dependencies {
    implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.4'
    implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1' 
}

2. AndroidManifest.xml 配置

代码语言:javascript
复制
<uses-permission android:name="android.permission.INTERNET" />
<uses-permission android:name="android.permission.WAKE_LOCK" />
<uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" />

<application
   ...
   <service android:name="org.eclipse.paho.android.service.MqttService" />
</application>

3. 创建MQTT客户端

代码语言:javascript
复制
private static MqttAndroidClient mqttAndroidClient;
private static String mqttUsername = ""; //服务端创建的用户名
private static String mqttPassword = ""; //服务端吧创建的用户名密码
private static String clientId = ""; //唯一标识不可重复
 //接受消息的队列
public static final LinkedBlockingQueue<MyMessage> SERVER_QUEUE = new LinkedBlockingQueue<>(
            200);

//消息订阅的topic,可以自定义
private static final String topic = "/" + mqttUsername + "/" + clientId + "/user/get"; 


public static void initIot() {

        String serverUrl = "服务器地址:端口";

        try {
            mqttAndroidClient = new MqttAndroidClient(context, serverUrl, "clientId");

            mqttAndroidClient.setCallback(new MqttCallback() {
                @Override
                public void connectionLost(Throwable cause) {
                    Log.i(TAG, "连接断开");
                }

                @Override
                public void messageArrived(String topic, MqttMessage message) throws Exception {
                    Log.i(TAG, "收到消息:" + message.toString());

                    //建议使用队列接收
                    MyMessage myMessage = new MyMessage();
                    myMessage.setData(message.getPayload());
                    boolean offer = SERVER_QUEUE.offer(aMessage);
                    if (!offer) {
                        Log.e(TAG, "队列已满,无法接受消息!");
                    }
                }

                @Override
                public void deliveryComplete(IMqttDeliveryToken token) {
                    Log.i(TAG, "deliveryComplete: " + token.toString());

                }
            });

            //建立连接规则
            MqttConnectOptions options = new MqttConnectOptions();
            options.setUserName(mqttUsername);
            options.setPassword(mqttPassword.toCharArray());
            options.setCleanSession(true);
            options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1); //MQTT版本
            options.setConnectionTimeout(10); //连接超时时间
            options.setKeepAliveInterval(180); //心跳间隔时间
            options.setMaxInflight(100); //最大请求数,默认10,高流量场景可以增大该值
            options.setAutomaticReconnect(true); //设置自动重新连接

            mqttAndroidClient.connect(options, null, new IMqttActionListener() {
                @Override
                public void onSuccess(IMqttToken asyncActionToken) {
                    Log.i(TAG, "连接成功");
                    //这里订阅消息
                    subscribe();
                }

                @Override
                public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                    Log.i(TAG, "连接失败" + exception);
                }
            });
        } catch (Exception e) {
            Log.e(TAG, "INIT IOT ERROR!");
        }
    }

public class MyMessage {
    
    public Object data;
    
    public MyMessage() {
    }

    public MyMessage(Object data) {
        this.data = data;
    }
    
    public Object getData() {
        return this.data;
    }
    
    public void setData(Object data) {
        this.data = data;
    }
}

4. 订阅消息

代码语言:javascript
复制
private static void subscribe() {
        try {
            mqttAndroidClient.subscribe(topic, 1, null,
                    new IMqttActionListener() {
                        @Override
                        public void onSuccess(IMqttToken asyncActionToken) {
                            Log.i(TAG,
                                    "订阅成功 topic: "
                                            + topic);
                        }

                        @Override
                        public void onFailure(IMqttToken asyncActionToken,
                                              Throwable exception) {
                            Log.e(TAG, "订阅失败!" + exception.getMessage());
                        }
                    });

        } catch (Exception e) {
            Log.e(TAG, "订阅失败!" + e.getMessage());
        }
    }

5. 发布消息

代码语言:javascript
复制
//消息发送队列
public static final LinkedBlockingQueue<String> CLIENT_QUEUE = new LinkedBlockingQueue<>(1000);

//发布消息调用这个方法
public static void putQueue(String msg) {
        boolean offer = CLIENT_QUEUE.offer(msg);
        if (!offer) {
            Log.w(TAG, "操作队列已满!");
        }
    }

//使用线程去读取队列,这样可以防止同一时间多处调用,同时也不会让发送事件丢失
static class IotPublishRunnable implements Runnable {

        @Override
        public void run() {
            while (true) {
                try {
                    String msg = CLIENT_QUEUE.take();
                    if (TextUtils.isEmpty(msg)) {
                        continue;
                    }
                    publish(msg);
                    Thread.sleep(300);
                } catch (Exception e) {
                    Log.e(TAG, "处理iot消息失败");
                }

            }
        }
    }

private static void publishNew(String payload) {
        String topic = "/" + mqttUsername + "/" + clientId + "/user/update";
        Integer qos = 1;

        try {
            if (null == mqttAndroidClient || !mqttAndroidClient.isConnected()) {
                Log.w(TAG, "IOT还未初始化!无法发送消息");
                return;
            }
            mqttAndroidClient.publish(topic, payload.getBytes(StandardCharsets.UTF_8), qos, false,
                    null, new IMqttActionListener() {
                        @Override
                        public void onSuccess(IMqttToken asyncActionToken) {

                        }

                        @Override
                        public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                            String[] topics = asyncActionToken.getTopics();
                            Log.e(TAG, "publish message error! topics: " + Arrays.toString(topics));
                        }
                    });
        } catch (MqttException e) {
            Log.e(TAG, "发送消息失败!");
        } catch (IllegalArgumentException e) {
            Log.e(TAG, "MQTT CLIENT ERROR");
        }
    }

6. 断开连接

代码语言:javascript
复制
public static void disconnect() {
        if (null == mqttAndroidClient || !mqttAndroidClient.isConnected()) {
            Log.w(TAG, "IOT还未初始化!");
            return;
        }

        try {
            mqttAndroidClient.disconnect().setActionCallback(new IMqttActionListener() {
                @Override
                public void onSuccess(IMqttToken asyncActionToken) {
                    Log.i(TAG, "断开连接成功!");
                }

                @Override
                public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                    Log.i(TAG, "断开连接失败!");
                }
            });
        } catch (MqttException e) {
            Log.e(TAG, e.getMessage());
        }
    }

结尾

以上就是客户端的MQTT代码,我是用Java写的,Kotlin版的建议参考Android 使用 Kotlin 连接 MQTT,代码基本就在这里了,项目啥的就不放了。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2023-11-22,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 一、服务端准备
    • 1. 选择平台
      • 2. 启动服务
        • 3. 创建用户
        • 二、客户端搭建
          • 1. 引入
            • 2. AndroidManifest.xml 配置
              • 3. 创建MQTT客户端
                • 4. 订阅消息
                  • 5. 发布消息
                    • 6. 断开连接
                    • 结尾
                    相关产品与服务
                    项目管理
                    CODING 项目管理(CODING Project Management,CODING-PM)工具包含迭代管理、需求管理、任务管理、缺陷管理、文件/wiki 等功能,适用于研发团队进行项目管理或敏捷开发实践。结合敏捷研发理念,帮助您对产品进行迭代规划,让每个迭代中的需求、任务、缺陷无障碍沟通流转, 让项目开发过程风险可控,达到可持续性快速迭代。
                    领券
                    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档