前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >EMQX体验

EMQX体验

作者头像
尚浩宇
发布2023-03-23 13:53:01
6840
发布2023-03-23 13:53:01
举报
文章被收录于专栏:杂烩杂烩

一、概述

    物联网大多基于MQTT协议进行消息传输,其中EMQX是比较流行的开源实现,EMQX简单易用,社区资源丰富,可参加官网https://www.emqx.com/,本文是简单初探,通过安装EMQX、客户端测试,代码测试等三块进行一个体验。

二、安装

    EMQX有很多种部署方式,官网写的很详细,参加https://www.emqx.io/docs/zh/v5.0/deploy/install-docker.html,本文采用centos7+docker安装方式,执行:

代码语言:javascript
复制
docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8084:8084 -p 8883:8883 -p 18083:18083 emqx/emqx:5.0.20

    然后访问:http://host:18083/可看到如下页面(默认账号admin:public):

三、客户端测试

    EMQX适配了很系统的桌面客户端,笔者使用的MacOs,下载的客户端界面如下:

    链接到刚才部署的EMQX上,可以简单做个测试:

四、代码测试

    代码语言为JAVA,官网提供的测试项目地址:https://github.com/emqx/MQTT-Client-Examples/tree/master/mqtt-client-Java

    第一步,引入依赖

代码语言:javascript
复制
<dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.5</version>
        </dependency>

    第二步,编写异步回调(业务处理类)

代码语言:javascript
复制
package io.emqx.mqtt;

import org.eclipse.paho.client.mqttv3.*;

import java.io.IOException;

public class MqttExample implements MqttCallback {

    public static void main(String[] args) {
        String broker = "tcp://broker.emqx.io:1883";
        int qos = 0;
        String action = "publish";
        String topic = "test/topic";
        String message = "Hello MQTT";
        String clientId = MqttClient.generateClientId();
        boolean cleanSession = true;
        String userName = "emqx";
        String password = "public";
        for (int i = 0; i < args.length; i++) {
            if (args[i].length() == 2 && args[i].startsWith("-")) {
                char arg = args[i].charAt(1);
                if (arg == 'h') {
                    help();
                    return;
                }

                if (i == args.length - 1 || args[i + 1].charAt(0) == '-') {
                    System.out.println("Missing value for argument: " + args[i]);
                    help();
                    return;
                }
                switch (arg) {
                    case 'b':
                        broker = args[++i];
                        break;
                    case 'a':
                        action = args[++i];
                        break;
                    case 't':
                        topic = args[++i];
                        break;
                    case 'q':
                        qos = Integer.parseInt(args[++i]);
                        break;
                    case 'c':
                        cleanSession = Boolean.parseBoolean(args[++i]);
                        break;
                    case 'u':
                        userName = args[++i];
                        break;
                    case 'z':
                        password = args[++i];
                        break;
                    default:
                        System.out.println("Unknown argument: " + args[i]);
                        help();
                        return;
                }
            } else {
                System.out.println("Unknown argument: " + args[i]);
                help();
                return;
            }
        }

        if (!action.equals("publish") && !action.equals("subscribe")) {
            System.out.println("Invalid action: " + action);
            help();
            return;
        }
        if (qos < 0 || qos > 2) {
            System.out.println("Invalid QoS: " + qos);
            help();
            return;
        }

        MqttExample sample = new MqttExample(broker, clientId, cleanSession, userName, password);
        try {
            if (action.equals("publish")) {
                sample.publish(topic, qos, message.getBytes());
            } else {
                sample.subscribe(topic, qos);
            }
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    private MqttClient client;
    private String brokerUrl;
    private MqttConnectOptions options;
    private boolean clean;
    private String password;
    private String userName;

    public MqttExample(String brokerUrl, String clientId, boolean cleanSession, String userName, String password) {
        this.brokerUrl = brokerUrl;
        this.clean = cleanSession;
        this.password = password;
        this.userName = userName;

        options = new MqttConnectOptions();
        options.setCleanSession(clean);
        if (userName != null) {
            options.setUserName(this.userName);
        }
        if (password != null) {
            options.setPassword(this.password.toCharArray());
        }

        try {
            client = new MqttClient(this.brokerUrl, clientId);
            client.setCallback(this);
        } catch (MqttException e) {
            e.printStackTrace();
            log(e.toString());
            System.exit(1);
        }
    }

    public void subscribe(String topicName, int qos) throws MqttException {

        client.connect(options);
        log("Connected to " + brokerUrl + " with client ID " + client.getClientId());

        client.subscribe(topicName, qos);
        log("Subscribed to topic: " + topicName + " qos " + qos);

        try {
            System.in.read();
        } catch (IOException e) {
        }

        client.disconnect();
        log("Disconnected");
    }

    public void publish(String topicName, int qos, byte[] payload) throws MqttException {
        client.connect(options);
        log("Connected to " + brokerUrl + " with client ID " + client.getClientId());

        MqttMessage message = new MqttMessage(payload);
        message.setQos(qos);

        client.publish(topicName, message);
        log("Published to topic \"" + topicName + "\" qos " + qos);
        client.disconnect();
        log("Disconnected");
    }

    private void log(String message) {
        System.out.println(message);
    }


    public void connectionLost(Throwable throwable) {
        log("Connection lost: " + throwable);
        System.exit(1);
    }

    public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
        log("Received message:\n" +
                "Topic: " + s + "\t" +
                "Message: " + mqttMessage.toString()
        );
    }

    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {


    }

    private static void help() {
        System.out.println(
                "Args:\n" +
                        "-h Help information\n" +
                        "-b MQTT broker url [default: tcp://broker.emqx.io:1883]\n" +
                        "-a publish/subscribe action [default: publish]\n" +
                        "-u Username [default: emqx]\n" +
                        "-z Password [default: public]\n" +
                        "-c Clean session [default: true]\n" +
                        "-t Publish/Subscribe topic [default: test/topic]\n" +
                        "-q QoS [default: 0]"
        );
    }
}

    测试类

代码语言:javascript
复制
package io.emqx.mqtt;

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;


public class MqttSample {
    public static void main(String[] args) {
        String topic = "test/topic";
        String content = "Hello World";
        int qos = 2;
        String broker = "tcp://broker.emqx.io:1883";
        String clientId = MqttClient.generateClientId();
        MemoryPersistence persistence = new MemoryPersistence();
        MqttConnectOptions connOpts = new MqttConnectOptions();
        connOpts.setUserName("emqx_user");
        connOpts.setPassword("emqx_password".toCharArray());
        try {
            MqttClient client = new MqttClient(broker, clientId, persistence);
            client.setCallback(new SampleCallback());

            System.out.println("Connecting to broker: " + broker);
            client.connect(connOpts);
            System.out.println("Connected to broker: " + broker);
            client.subscribe(topic, qos);
            System.out.println("Subscribed to topic: " + topic);
            MqttMessage message = new MqttMessage(content.getBytes());
            message.setQos(qos);
            client.publish(topic, message);
            System.out.println("Message published");
            client.disconnect();
            System.out.println("Disconnected");
            client.close();
            System.exit(0);
        } catch (MqttException me) {
            System.out.println("reason " + me.getReasonCode());
            System.out.println("msg " + me.getMessage());
            System.out.println("loc " + me.getLocalizedMessage());
            System.out.println("cause " + me.getCause());
            System.out.println("excep " + me);
            me.printStackTrace();
        }
    }

}
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2023-03-21,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、概述
  • 二、安装
  • 三、客户端测试
  • 四、代码测试
相关产品与服务
物联网
腾讯连连是腾讯云物联网全新商业品牌,它涵盖一站式物联网平台 IoT Explorer,连连官方微信小程序和配套的小程序 SDK、插件和开源 App,并整合腾讯云内优势产品能力,如大数据、音视频、AI等。同时,它打通腾讯系 C 端内容资源,如QQ音乐、微信支付、微保、微众银行、医疗健康等生态应用入口。提供覆盖“云-管-边-端”的物联网基础设施,面向“消费物联”和 “产业物联”两大赛道提供全方位的物联网产品和解决方案,助力企业高效实现数字化转型。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档