前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >thingsboard之边缘网关建立连接过程

thingsboard之边缘网关建立连接过程

作者头像
johnhuster的分享
发布2022-06-30 15:44:40
1.9K0
发布2022-06-30 15:44:40
举报
文章被收录于专栏:johnhuster

        thingsboard3.3.4版本之后就有了对应的边缘网关的管理功能,对应的边缘网关项目为thingsboard-edge,相比于之前的普通网关或者设备上传遥测数据,边缘网关增加了很多优势:1、边缘端与云端断开连接时,在边缘端缓冲数据,等连接上之后再将缓冲的数据上传到云端,如下图所示:

2、距离设备更近,网络稳定性更高,能够更快的完成设备联动操作

本文主要讲下边缘网关连接云端的过程:

1、在thingsboard项目创建对应的边缘实例

2、 使用上图中的Edge Key与secret启动thingsboard-edge项目

3、边缘端与云端通信是通过grpc实现的,下面就是建立连接流程

EdgeGrpcService

代码语言:javascript
复制
    @Override
    public StreamObserver<RequestMsg> handleMsgs(StreamObserver<ResponseMsg> outputStream) {
        return new EdgeGrpcSession(ctx, outputStream, this::onEdgeConnect, this::onEdgeDisconnect, mapper,
                sendDownlinkExecutorService).getInputStream();
    }

 接下来看下EdgeGrpcSession的构造过程:

代码语言:javascript
复制
    EdgeGrpcSession(EdgeContextComponent ctx, StreamObserver<ResponseMsg> outputStream,
            BiConsumer<EdgeId, EdgeGrpcSession> sessionOpenListener,
            Consumer<EdgeId> sessionCloseListener, ObjectMapper mapper,
            ScheduledExecutorService sendDownlinkExecutorService) {
        this.sessionId = UUID.randomUUID();
        this.ctx = ctx;
        this.outputStream = outputStream;
        this.sessionOpenListener = sessionOpenListener;
        this.sessionCloseListener = sessionCloseListener;
        this.mapper = mapper;
        this.sendDownlinkExecutorService = sendDownlinkExecutorService;
        initInputStream();
    }

下面是建立连接的核心方法:initInputStream

代码语言:javascript
复制
    private void initInputStream() {
        this.inputStream = new StreamObserver<>() {
            @Override
            public void onNext(RequestMsg requestMsg) {
                if (!connected && requestMsg.getMsgType().equals(RequestMsgType.CONNECT_RPC_MESSAGE)) {
                    ConnectResponseMsg responseMsg = processConnect(requestMsg.getConnectRequestMsg());
                    outputStream.onNext(ResponseMsg.newBuilder()
                            .setConnectResponseMsg(responseMsg)
                            .build());
                    if (ConnectResponseCode.ACCEPTED != responseMsg.getResponseCode()) {
                        outputStream.onError(new RuntimeException(responseMsg.getErrorMsg()));
                    } else {
                        connected = true;
                    }
                }
                if (connected) {
                    if (requestMsg.getMsgType().equals(RequestMsgType.SYNC_REQUEST_RPC_MESSAGE)) {
                        if (requestMsg.hasSyncRequestMsg() && requestMsg.getSyncRequestMsg().getSyncRequired()) {
                            startSyncProcess(edge.getTenantId(), edge.getId());
                        } else {
                            syncCompleted = true;
                        }
                    }
                    if (requestMsg.getMsgType().equals(RequestMsgType.UPLINK_RPC_MESSAGE)) {
                        if (requestMsg.hasUplinkMsg()) {
                            onUplinkMsg(requestMsg.getUplinkMsg());
                        }
                        if (requestMsg.hasDownlinkResponseMsg()) {
                            onDownlinkResponse(requestMsg.getDownlinkResponseMsg());
                        }
                    }
                }
            }
            
            @Override
            public void onError(Throwable t) {
                log.error("Failed to deliver message from client!", t);
                closeSession();
            }
            
            @Override
            public void onCompleted() {
                closeSession();
            }
            
            private void closeSession() {
                connected = false;
                if (edge != null) {
                    try {
                        sessionCloseListener.accept(edge.getId());
                    } catch (Exception ignored) {
                    }
                }
                try {
                    outputStream.onCompleted();
                } catch (Exception ignored) {
                }
            }
        };
    }

initInputStream方法除了有建立连接的processConnect,还有处理来自边缘端上行消onUplinkMsg方法,以及处理云端下发到边缘端的消息响应的onDownlinkResponse方法。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022-06-29,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档