前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【开源视频联动物联网平台】vertx写一个mqtt服务端

【开源视频联动物联网平台】vertx写一个mqtt服务端

作者头像
帐篷Li-物联网布道师
发布2024-03-20 08:48:28
1230
发布2024-03-20 08:48:28
举报
代码语言:javascript
复制
/**
 * VertxMqttServer: 一个基于Vert.x实现的MQTT服务器。
 * 该服务器支持设备连接、认证、消息发布和订阅功能。
 */
package com.mz.network.mqtt;

import com.alibaba.fastjson.JSON;
import com.mz.network.authority.DeviceAuthorityService;
import com.mz.network.client.Client;
import com.mz.network.client.ClientRepository;
import com.mz.network.client.message.ClientMessage;
import com.mz.network.core.Topics;
import com.mz.network.events.CommandReplyEvent;
import com.mz.network.events.DeviceReportEvent;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.AsyncResult;
import io.vertx.core.buffer.Buffer;
import io.vertx.mqtt.*;
import io.vertx.mqtt.messages.MqttPublishMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationEventPublisher;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@Slf4j
public class VertxMqttServer extends AbstractVerticle {

    @Autowired
    private ClientRepository clientRepository;

    @Value("${vertx.service-id}")
    private String serviceId;

    @Autowired
    private ApplicationEventPublisher eventPublisher;

    @Autowired
    private MqttServerOptions mqttServerOptions;

    @Autowired
    private DeviceAuthorityService authorityService;

    // 用于存储订阅关系的映射
    private final Map<String, MqttEndpoint> subscribers = new ConcurrentHashMap<>();

    /**
     * 重写AbstractVerticle的start方法,实现MQTT服务器的启动和初始化。
     * @throws Exception 启动过程中可能抛出的异常
     */
    @Override
    public void start() throws Exception {
        // 创建MQTT服务器实例
        MqttServer mqttServer = MqttServer.create(vertx, mqttServerOptions);
        // 处理MQTT客户端连接请求
        mqttServer.endpointHandler(mqttEndpoint -> {
            String clientId = mqttEndpoint.clientIdentifier();
            log.debug("接收到MQTT客户端[{}]消息", clientId);
            // 执行创建连接
            doConnect(mqttEndpoint);
        }).listen(result -> {
            if (result.succeeded()) {
                int port = mqttServer.actualPort();
                log.debug("MQTT server started on port {}", port);
            } else {
                log.warn("MQTT server start failed", result.cause());
            }
        });
    }

    /**
     * 处理MQTT客户端连接请求。
     * @param endpoint MqttEndpoint实例,代表与客户端的连接
     */
    protected void doConnect(MqttEndpoint endpoint) {
        if (endpoint.auth() == null) {
            endpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED);
            return;
        }
        String userName = endpoint.auth().userName();
        String passWord = endpoint.auth().password();

        if (authorityService.verification(endpoint.clientIdentifier(), userName, passWord)) {
            log.debug("MQTT客户端:{}认证通过", endpoint.clientIdentifier());
            acceptConnect(endpoint);
        } else {
            log.warn("客户端[{}]用户名密码错误", endpoint.clientIdentifier());
            endpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
        }
    }

    /**
     * 接受MQTT客户端连接,并进行订阅处理。
     * @param endpoint MqttEndpoint实例,代表与客户端的连接
     */
    protected void acceptConnect(MqttEndpoint endpoint) {
        String clientId = endpoint.clientIdentifier();
        MqttClienttest client = new MqttClienttest(endpoint);

        endpoint.accept(false)
                .closeHandler(v -> {
                    log.debug("[{}] closed", clientId);
                    Client old = clientRepository.getClient(clientId);
                    if (old == client) {
                        clientRepository.unregister(clientId);
                    } else {
                        log.debug("client {} is unregistered", client);
                    }
                })
                .subscribeHandler(subscribe -> {
                    List<MqttQoS> grantedQosLevels = new ArrayList<>();
                    for (MqttTopicSubscription s : subscribe.topicSubscriptions()) {
                        log.info("[{}] Subscription for {} with QoS {}", clientId, s.topicName(), s.qualityOfService());
                        grantedQosLevels.add(s.qualityOfService());
                        subscribers.put(s.topicName(), endpoint);
                    }
                    // ack the subscriptions request
                    endpoint.subscribeAcknowledge(subscribe.messageId(), grantedQosLevels);

                    // specifing handlers for handling QoS 1 and 2
                    endpoint.publishAcknowledgeHandler(messageId -> log.info("[{}] Received ack for message = {}", clientId, messageId))
                            .publishReceivedHandler(endpoint::publishRelease)
                            .publishCompletionHandler(messageId -> log.info("[{}] Received ack for message = {}", clientId, messageId));

                })
                .unsubscribeHandler(unsubscribe -> {
                    unsubscribe.topics().forEach(topicName -> {
                        log.info("[{}] Unsubscription for {}", clientId, topicName);
                        // Remove the endpoint when a client unsubscribes
                        subscribers.remove(topicName);
                    });
                    // ack the subscriptions request
                    endpoint.unsubscribeAcknowledge(unsubscribe.messageId());
                })
                .disconnectHandler(v -> log.info("[{}] Received disconnect from client", clientId))
                .exceptionHandler(e -> log.error(clientId, e))
                .publishHandler(message -> {
                    //设备推送了消息
                    String topicName = message.topicName();
                    Buffer buffer = message.payload();
                    String payload = buffer.toString();

                    log.info("接受到客户端消息推送:[{}] payload [{}] with QoS [{}]", topicName, payload, message.qosLevel());
                    if (message.qosLevel() == MqttQoS.AT_LEAST_ONCE) {
                        endpoint.publishAcknowledge(message.messageId());
                    } else if (message.qosLevel() == MqttQoS.EXACTLY_ONCE) {
                        endpoint.publishReceived(message.messageId());
                    }

                    //往订阅的客户端发送消息
                    handlePublish(message);
                    //平台物模型处理
                    try {
                        ClientMessage event = null;
                        //目前仅支持reply和report的topic
                        if (Topics.reply.equals(topicName)) {
                            event = JSON.parseObject(payload, CommandReplyEvent.class);
                        } else if (Topics.report.equals(topicName)) {
                            event = JSON.parseObject(payload, DeviceReportEvent.class);
                        }
                        if (null != event) {
                            event.setClientId(clientId);
                            //发布事件到spring
                            eventPublisher.publishEvent(event);
                        } else {
                            log.warn("不支持的topic:{} => {}", topicName, payload);
                        }
                    } catch (Exception e) {
                        log.error(e.getMessage(), e);
                    }
                })
                .publishReleaseHandler(messageId -> {
                    log.debug("complete message :{}", messageId);
                    endpoint.publishComplete(messageId);
                });
        //注册设备
        clientRepository.register(client);
    }

    /**
     * 处理MQTT消息的发布,包括订阅关系的转发。
     * @param message MqttPublishMessage实例,代表接收到的MQTT消息
     */
    private void handlePublish(MqttPublishMessage message) {
        // Handle incoming publish messages
        String topic = message.topicName();
        String payload = message.payload().toString();
        int qos = message.qosLevel().value();

        System.out.println("Received message on [" + topic + "] payload [" + payload + "] with QoS " + qos);

        // Forward the message to subscribers matching the topic
        subscribers.forEach((subscribedTopic, subscriberEndpoint) -> {
            if (topicMatches(subscribedTopic, topic)) {
                // Handle different QoS levels
                switch (qos) {
                    case 0:
                        // QoS 0: At most once delivery, no acknowledgment needed
                        subscriberEndpoint.publish(topic, message.payload(), message.qosLevel(), message.isDup(), false);
                        System.out.println("Message forwarded to [" + subscribedTopic + "] with QoS 0");
                        break;
                    case 1:
                    case 2:
                        // QoS 1 and QoS 2: Acknowledgment needed
                        subscriberEndpoint.publish(topic, message.payload(), message.qosLevel(), message.isDup(), false,
                            publishResult -> handlePublishResult(publishResult, subscribedTopic, qos));
                        break;
                    default:
                        System.err.println("Unsupported QoS level: " + qos);
                }
            }
        });
    }

    /**
     * 处理MQTT消息发布的结果。
     * @param publishResult AsyncResult实例,代表MQTT消息发布的异步结果
     * @param topic 订阅主题
     * @param qos QoS级别
     */
    private void handlePublishResult(AsyncResult<Integer> publishResult, String topic, int qos) {
        if (publishResult.succeeded()) {
            System.out.println("Message forwarded to subscribers of [" + topic + "] with QoS " + qos);
        } else {
            System.err.println("Failed to forward message to [" + topic + "] with QoS " + qos +
                ": " + publishResult.cause().getMessage());
        }
    }

    /**
     * 判断订阅主题是否匹配实际主题。
     * @param subscribedTopic 订阅主题
     * @param actualTopic 实际主题
     * @return 是否匹配
     */
    private boolean topicMatches(String subscribedTopic, String actualTopic) {
        String[] subscribedParts = subscribedTopic.split("/");
        String[] actualParts = actualTopic.split("/");

        if (subscribedParts.length != actualParts.length) {
            return false;
        }

        for (int i = 0; i < subscribedParts.length; i++) {
            if (!subscribedParts[i].equals("+") && !subscribedParts[i].equals(actualParts[i])) {
                return false;
            }
        }

        return true;
    }
}
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2023-11-24,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
物联网
腾讯连连是腾讯云物联网全新商业品牌,它涵盖一站式物联网平台 IoT Explorer,连连官方微信小程序和配套的小程序 SDK、插件和开源 App,并整合腾讯云内优势产品能力,如大数据、音视频、AI等。同时,它打通腾讯系 C 端内容资源,如QQ音乐、微信支付、微保、微众银行、医疗健康等生态应用入口。提供覆盖“云-管-边-端”的物联网基础设施,面向“消费物联”和 “产业物联”两大赛道提供全方位的物联网产品和解决方案,助力企业高效实现数字化转型。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档