前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >thingsboard如何维护设备的状态的

thingsboard如何维护设备的状态的

作者头像
johnhuster的分享
发布2022-03-29 13:49:50
7920
发布2022-03-29 13:49:50
举报
文章被收录于专栏:johnhuster

本文以thingsboard-3.1.1为例说明

正文

thingsboard在内存里面是记录了每个设备(包括网关)的在线状态的,在数据attribute_kv表中active字段对应的就是设备在线状态的值。

thingsboard的对mqtt消息的处理是由MqttTransportHandler来完成的,底层通信基于netty实现,熟悉netty的开发者对ChannelInboundHandlerAdapter一定特别熟悉,咱们直接看下MqttTransportHandler是如何重载channelRead方法的,如下所示:

代码语言:javascript
复制
@Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        log.trace("[{}] Processing msg: {}", sessionId, msg);
        try {
            if (msg instanceof MqttMessage) {
                processMqttMsg(ctx, (MqttMessage) msg);
            } else {
                ctx.close();
            }
        } finally {
            ReferenceCountUtil.safeRelease(msg);
        }
    }
代码语言:javascript
复制
private void processMqttMsg(ChannelHandlerContext ctx, MqttMessage msg) {
        address = (InetSocketAddress) ctx.channel().remoteAddress();
        if (msg.fixedHeader() == null) {
            log.info("[{}:{}] Invalid message received", address.getHostName(), address.getPort());
            processDisconnect(ctx);
            return;
        }
        deviceSessionCtx.setChannel(ctx);
        switch (msg.fixedHeader().messageType()) {
            case CONNECT:
                processConnect(ctx, (MqttConnectMessage) msg);
                break;
            case PUBLISH:
                processPublish(ctx, (MqttPublishMessage) msg);
                break;
            case SUBSCRIBE:
                processSubscribe(ctx, (MqttSubscribeMessage) msg);
                break;
            case UNSUBSCRIBE:
                processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg);
                break;
            case PINGREQ:
                if (checkConnected(ctx, msg)) {
                    ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0)));
                    transportService.reportActivity(sessionInfo);
                }
                break;
            case DISCONNECT:
                if (checkConnected(ctx, msg)) {
                    processDisconnect(ctx);
                }
                break;
            default:
                break;
        }
    }

从上面的方法可以看到thingsboard是如何处理mqtt消息的,针对connect、publish、dusbsrcribe等消息类型进行了处理,processConnect与processDisconnect方法是处理设备连接/断开连接的,在processConnect方法中创建了设备的在线信息到内存中,processDisconnect则相反。

processConnect是建立连接,但是要维护设备的实时连接状态,只处理连接消息肯定是不够的,thingsboard还会处理publish(属性更新以及遥测值上传)等消息也会更新设备的活动状态,具体可以参考TransportService.reportActivity方法。

代码语言:javascript
复制
// 属性更新以及遥测值上传
private void processPublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg) {
        if (!checkConnected(ctx, mqttMsg)) {
            return;
        }
        String topicName = mqttMsg.variableHeader().topicName();
        int msgId = mqttMsg.variableHeader().packetId();
        log.trace("[{}][{}] Processing publish msg [{}][{}]!", sessionId, deviceSessionCtx.getDeviceId(), topicName, msgId);

        if (topicName.startsWith(MqttTopics.BASE_GATEWAY_API_TOPIC)) {
            if (gatewaySessionHandler != null) {
                handleGatewayPublishMsg(topicName, msgId, mqttMsg);
                transportService.reportActivity(sessionInfo);
            }
        } else {
            processDevicePublish(ctx, mqttMsg, topicName, msgId);
        }
    }
代码语言:javascript
复制
@Override
    public void reportActivity(TransportProtos.SessionInfoProto sessionInfo) {
        reportActivityInternal(sessionInfo);
    }

    private SessionMetaData reportActivityInternal(TransportProtos.SessionInfoProto sessionInfo) {
        UUID sessionId = toSessionId(sessionInfo);
        SessionMetaData sessionMetaData = sessions.get(sessionId);
        if (sessionMetaData != null) {
            sessionMetaData.updateLastActivityTime();
        }
        return sessionMetaData;
    }

可以看到每次设备(通过设备自身或者通过网关上传数据)都会更新设备的最后活跃时间字段。看到这里一直没有看到除了设备主动关闭连接的情况下thingsboard是如何清理过期连接的,接下来是本场的主角:DefaultTransportService.checkInactivityAndReportActivity方法:

代码语言:javascript
复制
    private void checkInactivityAndReportActivity() {
        long expTime = System.currentTimeMillis() - sessionInactivityTimeout;
        sessions.forEach((uuid, sessionMD) -> {
            long lastActivityTime = sessionMD.getLastActivityTime();
            TransportProtos.SessionInfoProto sessionInfo = sessionMD.getSessionInfo();
            if (sessionInfo.getGwSessionIdMSB() > 0 &&
                    sessionInfo.getGwSessionIdLSB() > 0) {
                SessionMetaData gwMetaData = sessions.get(new UUID(sessionInfo.getGwSessionIdMSB(), sessionInfo.getGwSessionIdLSB()));
                if (gwMetaData != null) {
                    lastActivityTime = Math.max(gwMetaData.getLastActivityTime(), lastActivityTime);
                }
            }
            
            if (lastActivityTime < expTime) {
                // 长时间没有对话,session过期
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Session has expired due to last activity time: {}", toSessionId(sessionInfo), lastActivityTime);
                }
                process(sessionInfo, getSessionEventMsg(TransportProtos.SessionEvent.CLOSED), null);
                sessions.remove(uuid);
                sessionMD.getListener().onRemoteSessionCloseCommand(TransportProtos.SessionCloseNotificationProto.getDefaultInstance());
            } else {
                if (lastActivityTime > sessionMD.getLastReportedActivityTime()) {
                    final long lastActivityTimeFinal = lastActivityTime;
                    process(sessionInfo, TransportProtos.SubscriptionInfoProto.newBuilder()
                            .setAttributeSubscription(sessionMD.isSubscribedToAttributes())
                            .setRpcSubscription(sessionMD.isSubscribedToRPC())
                            .setLastActivityTime(lastActivityTime).build(), new TransportServiceCallback<Void>() {
                        @Override
                        public void onSuccess(Void msg) {
                            sessionMD.setLastReportedActivityTime(lastActivityTimeFinal);
                        }

                        @Override
                        public void onError(Throwable e) {
                            log.warn("[{}] Failed to report last activity time", uuid, e);
                        }
                    });
                }
            }
        });
    }

checkInactivityAndReportActivity这个方法是DefaultTransportService创建时启动的一个定时检测任务,

private final ConcurrentMap<UUID, SessionMetaData> sessions = new ConcurrentHashMap<>();

sessions是以设备id为主键的ConcurrentMap对象,这个方法就会定期去扫描sessions里的session数据,长时间与thingsboard未进行会话就会关闭与设备的会话连接,并清除内存保存的会话数据。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2021/12/15 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 正文
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档