专栏首页CodeGuide | 程序员编码指南websocket与下位机通过netty方式通信传输行为信息

websocket与下位机通过netty方式通信传输行为信息

作者:小付哥

博闻强识而让,敦善行而不怠,谓之君子

微信公众号:bugstack虫洞栈 | 关注获得源码 沉淀、分享、成长,专注于原创专题案例,以最易学习编程的方式分享知识,让自己和他人都能有所收获。目前已完成的专题有;Netty4.x实战专题案例、用Java实现JVM、基于JavaAgent的全链路监控、手写RPC框架、架构设计专题案例[Ing]等。

前言介绍

在物联网开发中,常常需要通过网页端来控制设备,包括;获取信息、执行操作、启动停止等,就像我们在手机上会控制家里的小米盒子、路由器、电饭煲或者在线养狗等一些设备一样。在这里所有的下层设备都可以通过socket通信链接到服务端,而用户一端在通过http链接或者websocket链接到服务端,通过发送和接收数据来做出相应的行为操作。如下图;

微信公众号:bugstack虫洞栈 & 执行流程

案例目标

  1. 本章节整合Springboot+Netty,通过部署nettySocket与webSocket两套服务端,来接收转发行为消息。
  2. 客户端采用js链接websocket,用于接收服务端反馈与发送指令,用于获取下位机信息。
  3. 在test中启动一个模拟下位机,用于反馈信息数据。在真实开发中下位机与服务端通信通常是定义好的字节码,需要自己编写解码器。

环境准备

  1. jdk 1.8.0
  2. IntelliJ IDEA Community Edition 2018.3.1 x64
  3. Netty 4.1.36.Final

代码示例

 1itstack-demo-netty-3-01
 2└── src
 3    ├── main
 4    │   ├── java
 5    │   │   └── org.itstack.demo.ark
 6    │   │       ├── domain
 7    │   │       │    ├── msgobj
 8    │   │       │    │   ├── Feedback.java
 9    │   │       │    │   ├── QueryInfoReq.java
10    │   │       │    │   └── Text.java
11    │   │       │    ├── Device.java
12    │   │       │    ├── EasyResult.java 
13    │   │       │    ├── InfoProtocol.java
14    │   │       │    └── ServerInfo.java 
15    │   │       ├── server
16    │   │       │    ├── socket
17    │   │       │    │   ├── MyChannelInitializer.java
18    │   │       │    │   ├── MyServerHandler.java
19    │   │       │    │   └── NettyServer.java    
20    │   │       │    └── websocket
21    │   │       │        ├── MyWsChannelInitializer.java
22    │   │       │        ├── MyWsServerHandler.java
23    │   │       │        └── WsNettyServer.java
24    │   │       ├── util
25    │   │       │    ├── CacheUtil.java
26    │   │       │    ├── MsgUtil.java
27    │   │       │    └── MsgUtil.java
28    │   │       ├── web
29    │   │       │    └── NettyController.java    
30    │   │       └── Application.java
31    │   └── resources    
32    │   │   └── application.yml
33    │   └── webapp
34    │       ├── arkWs
35    │       │    ├── js
36    │       │    │   └── ws.js   
37    │       │   └── arkWsControlCenter.html    
38    │       ├── res        
39    │       └── WEB-INF
40    │            └── index.jsp   
41    │
42    └── test
43        └── java
44            └── org.itstack.demo.test
45                └── ApiTest.java

演示部分重点代码块,完整代码下载关注公众号;bugstack虫洞栈,回复Netty案例

server/socket/MyServerHandler.java & socket数据处理

  • 当有下位机链接服务端时,构建下位机信息,实际使用可以通过注册方式进行链接验证。
  • 当收到下位机信息后转发到websocket端,使网页端收到下位机信息反馈
 1public class MyServerHandler extends ChannelInboundHandlerAdapter {
 2
 3    private Logger logger = LoggerFactory.getLogger(MyServerHandler.class);
 4
 5    /**
 6     * 当客户端主动链接服务端的链接后,这个通道就是活跃的了。也就是客户端与服务端建立了通信通道并且可以传输数据
 7     */
 8    @Override
 9    public void channelActive(ChannelHandlerContext ctx) throws Exception {
10        SocketChannel channel = (SocketChannel) ctx.channel();
11        String channelId = channel.id().toString();
12        System.out.println("链接报告开始");
13        System.out.println("链接报告信息:有一客户端链接到本服务端。channelId:" + channelId);
14        System.out.println("链接报告IP:" + channel.localAddress().getHostString());
15        System.out.println("链接报告Port:" + channel.localAddress().getPort());
16        System.out.println("链接报告完毕");
17
18        //构建设备信息{下位机、中继器、IO板卡}
19        Device device = new Device();
20        device.setChannelId(channelId);
21        device.setNumber(UUID.randomUUID().toString());
22        device.setIp(channel.remoteAddress().getHostString());
23        device.setPort(channel.remoteAddress().getPort());
24        device.setConnectTime(new Date());
25        //添加设备信息
26        deviceGroup.put(channelId, device);
27        CacheUtil.cacheClientChannel.put(channelId, channel);
28    }
29
30
31    @Override
32    public void channelRead(ChannelHandlerContext ctx, Object objMsgJsonStr) throws Exception {
33        //接收设备发来信息
34        System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 接收到消息内容:" + objMsgJsonStr);
35
36        //转发消息到Ws
37        CacheUtil.wsChannelGroup.writeAndFlush(new TextWebSocketFrame(objMsgJsonStr.toString()));
38    }
39
40}

server/websocket/MyWsServerHandler.java & websocket数据处理

  • websocket数据需要转换后使用,可以支持文本消息,本案例中使用json字符串,方便对象传输
  • channelRead转发数据,当收到数据后发送给下位机,主要通过内容中channel控制
 1public class MyWsServerHandler extends ChannelInboundHandlerAdapter {
 2
 3
 4    @Override
 5    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
 6
 7       ...
 8
 9        //ws
10        if (msg instanceof WebSocketFrame) {
11            WebSocketFrame webSocketFrame = (WebSocketFrame) msg;
12            //关闭请求
13            if (webSocketFrame instanceof CloseWebSocketFrame) {
14                handshaker.close(ctx.channel(), (CloseWebSocketFrame) webSocketFrame.retain());
15                return;
16            }
17            //ping请求
18            if (webSocketFrame instanceof PingWebSocketFrame) {
19                ctx.channel().write(new PongWebSocketFrame(webSocketFrame.content().retain()));
20                return;
21            }
22            //只支持文本格式,不支持二进制消息
23            if (!(webSocketFrame instanceof TextWebSocketFrame)) {
24                throw new Exception("仅支持文本格式");
25            }
26
27            String request = ((TextWebSocketFrame) webSocketFrame).text();
28            System.out.println("服务端收到:" + request);
29            InfoProtocol infoProtocol = JSON.parseObject(request, InfoProtocol.class);
30            //socket转发消息
31            String channelId = infoProtocol.getChannelId();
32            Channel channel = CacheUtil.cacheClientChannel.get(channelId);
33            if (null == channel) return;
34            channel.writeAndFlush(MsgUtil.buildMsg(infoProtocol));
35
36            //websocket消息反馈发送成功
37            ctx.writeAndFlush(MsgUtil.buildWsMsgText(ctx.channel().id().toString(), "向下位机传达消息success!"));
38        }
39
40    }
41
42}

web/NettyController.java & 控制层方便获取服务端信息

  • 控制层提供了查询服务列表、链接设备信息、以及主动触发信息发送
  • 另外如果需要将服务端的启动关闭进行手动控制,可以在这里面提供方法供调用
 1@Controller
 2public class NettyController {
 3
 4    private Logger logger = LoggerFactory.getLogger(NettyController.class);
 5
 6    @RequestMapping("/index")
 7    public String index() {
 8        return "index";
 9    }
10
11    @RequestMapping("/queryNettyServerList")
12    @ResponseBody
13    public Collection<ServerInfo> queryNettyServerList() {
14        try {
15            Collection<ServerInfo> serverInfos = CacheUtil.serverInfoMap.values();
16            logger.info("查询服务端列表。{}", JSON.toJSONString(serverInfos));
17            return serverInfos;
18        } catch (Exception e) {
19            logger.info("查询服务端列表失败。", e);
20            return null;
21        }
22    }
23
24    @RequestMapping("/queryDeviceList")
25    @ResponseBody
26    public Collection<Device> queryDeviceList() {
27        try {
28            Collection<Device> deviceInfos = CacheUtil.deviceGroup.values();
29            logger.info("查询设备链接列表。{}", JSON.toJSONString(deviceInfos));
30            return deviceInfos;
31        } catch (Exception e) {
32            logger.info("查询设备链接列表失败。", e);
33            return null;
34        }
35    }
36
37    @RequestMapping("/doSendMsg")
38    @ResponseBody
39    public EasyResult doSendMsg(String reqStr) {
40        try {
41            logger.info("向设备发送信息[可以通过另外一个Web服务调用本接口发送信息]:{}", reqStr);
42            InfoProtocol infoProtocol = MsgUtil.getMsg(reqStr);
43            String channelId = infoProtocol.getChannelId();
44            Channel channel = CacheUtil.cacheClientChannel.get(channelId);
45            channel.writeAndFlush(MsgUtil.buildMsg(infoProtocol));
46            return EasyResult.buildSuccessResult();
47        } catch (Exception e) {
48            logger.error("向设备发送信息失败:{}", reqStr, e);
49            return EasyResult.buildErrResult(e);
50        }
51    }
52
53}

Application.java & 启动层

  • 通过继承CommandLineRunner方法,在服务就绪后启动socket服务
  • 启动后需要循环验证是否启动完成
 1@SpringBootApplication
 2@ComponentScan("org.itstack.demo.ark")
 3public class Application implements CommandLineRunner {
 4
 5    private Logger logger = LoggerFactory.getLogger(Application.class);
 6
 7    @Value("${netty.socket.port}")
 8    private int nettyServerPort;
 9    @Value("${netty.websocket.port}")
10    private int nettyWsServerPort;
11    //默认线程池
12    private static ExecutorService executorService = Executors.newFixedThreadPool(2);
13
14    public static void main(String[] args) {
15        SpringApplication.run(Application.class, args);
16    }
17
18    @Override
19    public void run(String... args) throws Exception {
20        //启动NettyServer-socket
21        logger.info("启动NettyServer服务,启动端口:{}", nettyServerPort);
22        NettyServer nettyServer = new NettyServer(new InetSocketAddress(nettyServerPort));
23        Future<Channel> future = executorService.submit(nettyServer);
24        Channel channel = future.get();
25        if (null == channel) {
26            throw new RuntimeException("netty server-s open error channel is null");
27        }
28        while (!channel.isActive()) {
29            logger.info("启动NettyServer服务,循环等待启动...");
30            Thread.sleep(500);
31        }
32        logger.info("启动NettyServer服务,完成:{}", channel.localAddress());
33        CacheUtil.serverInfoMap.put(nettyServerPort, new ServerInfo("NettySocket", NetUtil.getHost(), nettyServerPort, new Date()));
34
35        //启动NettyServer-websocket
36        logger.info("启动NettyWsServer服务,启动端口:{}", nettyWsServerPort);
37        WsNettyServer wsNettyServer = new WsNettyServer(new InetSocketAddress(nettyWsServerPort));
38        Future<Channel> wsFuture = executorService.submit(wsNettyServer);
39        Channel wsChannel = wsFuture.get();
40        if (null == wsChannel) {
41            throw new RuntimeException("netty server-ws open error channel is null");
42        }
43        while (!wsChannel.isActive()) {
44            logger.info("启动NettyWsServer服务,循环等待启动...");
45            Thread.sleep(500);
46        }
47        logger.info("启动NettyWsServer服务,完成:{}", wsChannel.localAddress());
48        CacheUtil.serverInfoMap.put(nettyServerPort, new ServerInfo("NettyWsSocket", NetUtil.getHost(), nettyServerPort, new Date()));
49    }
50
51}

webapp/arkWs/js/ws.js & 链接websocket服务端

 1socket = new WebSocket("ws://localhost:7398/websocket");
 2
 3    socket.onmessage = function(event){
 4
 5        var msg = JSON.parse(event.data);
 6        console.info(msg);
 7
 8        $("#msgBox").val($("#msgBox").val() + event.data + "\r\n");
 9
10    };

案例测试

  1. 分别启动如下内容;
  2. Application.java,Plugins/spring-boot/spring-boot:run
  3. ApiTest.java,右键启动模拟下位机
  4. 打开服务端链接;http://localhost:8080/ http://localhost:8080/arkWs/arkWsControlCenter.html

微信公众号:bugstack虫洞栈 & 服务端与监控

  1. 发送模拟信息,观察执行结果; 12019-12-01 15:11:49.965 INFO 7620 --- [nio-8080-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring FrameworkServlet 'dispatcherServlet' 22019-12-01 15:11:49.965 INFO 7620 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet : FrameworkServlet 'dispatcherServlet': initialization started 32019-12-01 15:11:49.980 INFO 7620 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet : FrameworkServlet 'dispatcherServlet': initialization completed in 15 ms 42019-12-01 15:11:51.157 INFO 7620 --- [nio-8080-exec-3] o.itstack.demo.ark.web.NettyController : 查询设备链接列表。[{"channelId":"281d1279","connectTime":1575184302964,"ip":"127.0.0.1","number":"74de0967-c0b4-4426-a9d1-183feaff2acf","port":3972}] 52019-12-01 15:11:51.162 INFO 7620 --- [io-8080-exec-10] o.itstack.demo.ark.web.NettyController : 查询服务端列表。[{"ip":"10.13.70.50","openDate":1575184290501,"port":7397,"typeInfo":"NettyWsSocket"}] 62019-12-01 15:11:58.476 INFO 7620 --- [ntLoopGroup-7-1] o.i.d.a.s.websocket.MyWsServerHandler : 链接报告开始 72019-12-01 15:11:58.476 INFO 7620 --- [ntLoopGroup-7-1] o.i.d.a.s.websocket.MyWsServerHandler : 链接报告信息:有一客户端链接到本服务端 82019-12-01 15:11:58.476 INFO 7620 --- [ntLoopGroup-7-1] o.i.d.a.s.websocket.MyWsServerHandler : 链接报告IP:0:0:0:0:0:0:0:1 92019-12-01 15:11:58.476 INFO 7620 --- [ntLoopGroup-7-1] o.i.d.a.s.websocket.MyWsServerHandler : 链接报告Port:7398 102019-12-01 15:11:58.476 INFO 7620 --- [ntLoopGroup-7-1] o.i.d.a.s.websocket.MyWsServerHandler : 链接报告完毕 11服务端收到:{"channelId":"281d1279","msgType":2,"msgObj":{"stateType":"1"}} 122019-12-01 15:12:05 接收到消息内容:{"msgObj":{"stateMsg":"温度信息:91.31334894176383°C","stateType":1,"channelId":"93c1120a"},"msgType":3,"channelId":"93c1120a"} 13服务端收到:{"channelId":"281d1279","msgType":2,"msgObj":{"stateType":"1"}} 142019-12-01 15:12:05 接收到消息内容:{"msgObj":{"stateMsg":"温度信息:44.83794772946285°C","stateType":1,"channelId":"93c1120a"},"msgType":3,"channelId":"93c1120a"}

综上总结

  1. 在使用springboot与netty结合,开发一个简便的服务端还是很方便的,而且在集合一些springcloud的服务,可以使项目工程更加完善。
  2. 可以尝试做一些设备控制服务,在我们不在家的时候也可以通过一个h5链接控制家里的设备,比如快到家将热水器打开。
  3. 本案例还偏向于模拟,在实际开发过程中还是会出现很多业务问题需要解决,尤其是服务端与下位机的通信,需要编写编码器与解码器。

本文分享自微信公众号 - bugstack虫洞栈(bugstack),作者:小付哥

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2019-12-01

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • netty案例,netty4.1中级拓展篇九《Netty集群部署实现跨服务端通信的落地方案》

    Netty的性能非常好,在一些小型用户体量的socket服务内,仅部署单台机器就可以满足业务需求。但当遇到一些中大型用户体量的服务时,就需要考虑讲Netty按照...

    小傅哥
  • netty案例,netty4.1中级拓展篇二《Netty使用Protobuf传输数据》

    在netty数据传输过程中可以有很多选择,比如;字符串、json、xml、java对象,但为了保证传输的数据具备;良好的通用性、方便的操作性和传输的高性能,我们...

    小傅哥
  • netty案例,netty4.1中级拓展篇三《Netty传输Java对象》

    Netty在实际应用级开发中,有时候某些特定场景下会需要使用Java对象类型进行传输,但是如果使用Java本身序列化进行传输,那么对性能的损耗比较大。为此我们需...

    小傅哥
  • Spark之SQL解析(源码阅读十)

      如何能更好的运用与监控sparkSQL?或许我们改更深层次的了解它深层次的原理是什么。之前总结的已经写了传统数据库与Spark的sql解析之间的差别。那么我...

    用户3003813
  • 使用Hystrix实现自动降级与依赖隔离[微服务]

    目前对于一些非核心操作,如增减库存后保存操作日志 发送异步消息时(具体业务流程),一旦出现MQ服务异常时,会导致接口响应超时,因此可以考虑对非核心操作引入服务降...

    高广超
  • java虚拟机,应该了解一点点

    在这个身处互联网,高并发,高可用的软件世界里,后端人员,面对线上线下许多异常问题不知所措时,可以侧面从jvm的角度去排查问题了,当然了,如果不是j...

    后端Coder
  • mysql索引和日志相关问题

    他们有一个共同的字段, 叫做xid, 崩溃恢复的时候, 会按照顺序扫描redolog:

    历久尝新
  • Jmeter 常用函数(29)- 详解 __eval

    https://www.cnblogs.com/poloyy/p/13291704.html

    小菠萝测试笔记
  • 北大陈浩然笔记:特征缩放和泛化能力(亮点)

    表示第 i 个数据的第 j 个属性,它是一个实数,yi 是第 i 个数据的标签值,也是实数。f是我们学习到的模型,

    double
  • mysql数据迁移hbase问题

    无法直接dump,写了java多线程程序做迁移 问题1:Operation not allowed after ResultSet closed 裸jdbc语句...

    财主刀刀

扫码关注云+社区

领取腾讯云代金券