作者:小付哥
博闻强识而让,敦善行而不怠,谓之君子
微信公众号:bugstack虫洞栈 | 关注获得源码 沉淀、分享、成长,专注于原创专题案例,以最易学习编程的方式分享知识,让自己和他人都能有所收获。目前已完成的专题有;Netty4.x实战专题案例、用Java实现JVM、基于JavaAgent的全链路监控、手写RPC框架、架构设计专题案例[Ing]等。
在物联网开发中,常常需要通过网页端来控制设备,包括;获取信息、执行操作、启动停止等,就像我们在手机上会控制家里的小米盒子、路由器、电饭煲或者在线养狗等一些设备一样。在这里所有的下层设备都可以通过socket通信链接到服务端,而用户一端在通过http链接或者websocket链接到服务端,通过发送和接收数据来做出相应的行为操作。如下图;
微信公众号:bugstack虫洞栈 & 执行流程
1itstack-demo-netty-3-01
2└── src
3 ├── main
4 │ ├── java
5 │ │ └── org.itstack.demo.ark
6 │ │ ├── domain
7 │ │ │ ├── msgobj
8 │ │ │ │ ├── Feedback.java
9 │ │ │ │ ├── QueryInfoReq.java
10 │ │ │ │ └── Text.java
11 │ │ │ ├── Device.java
12 │ │ │ ├── EasyResult.java
13 │ │ │ ├── InfoProtocol.java
14 │ │ │ └── ServerInfo.java
15 │ │ ├── server
16 │ │ │ ├── socket
17 │ │ │ │ ├── MyChannelInitializer.java
18 │ │ │ │ ├── MyServerHandler.java
19 │ │ │ │ └── NettyServer.java
20 │ │ │ └── websocket
21 │ │ │ ├── MyWsChannelInitializer.java
22 │ │ │ ├── MyWsServerHandler.java
23 │ │ │ └── WsNettyServer.java
24 │ │ ├── util
25 │ │ │ ├── CacheUtil.java
26 │ │ │ ├── MsgUtil.java
27 │ │ │ └── MsgUtil.java
28 │ │ ├── web
29 │ │ │ └── NettyController.java
30 │ │ └── Application.java
31 │ └── resources
32 │ │ └── application.yml
33 │ └── webapp
34 │ ├── arkWs
35 │ │ ├── js
36 │ │ │ └── ws.js
37 │ │ └── arkWsControlCenter.html
38 │ ├── res
39 │ └── WEB-INF
40 │ └── index.jsp
41 │
42 └── test
43 └── java
44 └── org.itstack.demo.test
45 └── ApiTest.java
演示部分重点代码块,完整代码下载关注公众号;bugstack虫洞栈,回复Netty案例
server/socket/MyServerHandler.java & socket数据处理
1public class MyServerHandler extends ChannelInboundHandlerAdapter {
2
3 private Logger logger = LoggerFactory.getLogger(MyServerHandler.class);
4
5 /**
6 * 当客户端主动链接服务端的链接后,这个通道就是活跃的了。也就是客户端与服务端建立了通信通道并且可以传输数据
7 */
8 @Override
9 public void channelActive(ChannelHandlerContext ctx) throws Exception {
10 SocketChannel channel = (SocketChannel) ctx.channel();
11 String channelId = channel.id().toString();
12 System.out.println("链接报告开始");
13 System.out.println("链接报告信息:有一客户端链接到本服务端。channelId:" + channelId);
14 System.out.println("链接报告IP:" + channel.localAddress().getHostString());
15 System.out.println("链接报告Port:" + channel.localAddress().getPort());
16 System.out.println("链接报告完毕");
17
18 //构建设备信息{下位机、中继器、IO板卡}
19 Device device = new Device();
20 device.setChannelId(channelId);
21 device.setNumber(UUID.randomUUID().toString());
22 device.setIp(channel.remoteAddress().getHostString());
23 device.setPort(channel.remoteAddress().getPort());
24 device.setConnectTime(new Date());
25 //添加设备信息
26 deviceGroup.put(channelId, device);
27 CacheUtil.cacheClientChannel.put(channelId, channel);
28 }
29
30
31 @Override
32 public void channelRead(ChannelHandlerContext ctx, Object objMsgJsonStr) throws Exception {
33 //接收设备发来信息
34 System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 接收到消息内容:" + objMsgJsonStr);
35
36 //转发消息到Ws
37 CacheUtil.wsChannelGroup.writeAndFlush(new TextWebSocketFrame(objMsgJsonStr.toString()));
38 }
39
40}
server/websocket/MyWsServerHandler.java & websocket数据处理
1public class MyWsServerHandler extends ChannelInboundHandlerAdapter {
2
3
4 @Override
5 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
6
7 ...
8
9 //ws
10 if (msg instanceof WebSocketFrame) {
11 WebSocketFrame webSocketFrame = (WebSocketFrame) msg;
12 //关闭请求
13 if (webSocketFrame instanceof CloseWebSocketFrame) {
14 handshaker.close(ctx.channel(), (CloseWebSocketFrame) webSocketFrame.retain());
15 return;
16 }
17 //ping请求
18 if (webSocketFrame instanceof PingWebSocketFrame) {
19 ctx.channel().write(new PongWebSocketFrame(webSocketFrame.content().retain()));
20 return;
21 }
22 //只支持文本格式,不支持二进制消息
23 if (!(webSocketFrame instanceof TextWebSocketFrame)) {
24 throw new Exception("仅支持文本格式");
25 }
26
27 String request = ((TextWebSocketFrame) webSocketFrame).text();
28 System.out.println("服务端收到:" + request);
29 InfoProtocol infoProtocol = JSON.parseObject(request, InfoProtocol.class);
30 //socket转发消息
31 String channelId = infoProtocol.getChannelId();
32 Channel channel = CacheUtil.cacheClientChannel.get(channelId);
33 if (null == channel) return;
34 channel.writeAndFlush(MsgUtil.buildMsg(infoProtocol));
35
36 //websocket消息反馈发送成功
37 ctx.writeAndFlush(MsgUtil.buildWsMsgText(ctx.channel().id().toString(), "向下位机传达消息success!"));
38 }
39
40 }
41
42}
web/NettyController.java & 控制层方便获取服务端信息
1@Controller
2public class NettyController {
3
4 private Logger logger = LoggerFactory.getLogger(NettyController.class);
5
6 @RequestMapping("/index")
7 public String index() {
8 return "index";
9 }
10
11 @RequestMapping("/queryNettyServerList")
12 @ResponseBody
13 public Collection<ServerInfo> queryNettyServerList() {
14 try {
15 Collection<ServerInfo> serverInfos = CacheUtil.serverInfoMap.values();
16 logger.info("查询服务端列表。{}", JSON.toJSONString(serverInfos));
17 return serverInfos;
18 } catch (Exception e) {
19 logger.info("查询服务端列表失败。", e);
20 return null;
21 }
22 }
23
24 @RequestMapping("/queryDeviceList")
25 @ResponseBody
26 public Collection<Device> queryDeviceList() {
27 try {
28 Collection<Device> deviceInfos = CacheUtil.deviceGroup.values();
29 logger.info("查询设备链接列表。{}", JSON.toJSONString(deviceInfos));
30 return deviceInfos;
31 } catch (Exception e) {
32 logger.info("查询设备链接列表失败。", e);
33 return null;
34 }
35 }
36
37 @RequestMapping("/doSendMsg")
38 @ResponseBody
39 public EasyResult doSendMsg(String reqStr) {
40 try {
41 logger.info("向设备发送信息[可以通过另外一个Web服务调用本接口发送信息]:{}", reqStr);
42 InfoProtocol infoProtocol = MsgUtil.getMsg(reqStr);
43 String channelId = infoProtocol.getChannelId();
44 Channel channel = CacheUtil.cacheClientChannel.get(channelId);
45 channel.writeAndFlush(MsgUtil.buildMsg(infoProtocol));
46 return EasyResult.buildSuccessResult();
47 } catch (Exception e) {
48 logger.error("向设备发送信息失败:{}", reqStr, e);
49 return EasyResult.buildErrResult(e);
50 }
51 }
52
53}
Application.java & 启动层
1@SpringBootApplication
2@ComponentScan("org.itstack.demo.ark")
3public class Application implements CommandLineRunner {
4
5 private Logger logger = LoggerFactory.getLogger(Application.class);
6
7 @Value("${netty.socket.port}")
8 private int nettyServerPort;
9 @Value("${netty.websocket.port}")
10 private int nettyWsServerPort;
11 //默认线程池
12 private static ExecutorService executorService = Executors.newFixedThreadPool(2);
13
14 public static void main(String[] args) {
15 SpringApplication.run(Application.class, args);
16 }
17
18 @Override
19 public void run(String... args) throws Exception {
20 //启动NettyServer-socket
21 logger.info("启动NettyServer服务,启动端口:{}", nettyServerPort);
22 NettyServer nettyServer = new NettyServer(new InetSocketAddress(nettyServerPort));
23 Future<Channel> future = executorService.submit(nettyServer);
24 Channel channel = future.get();
25 if (null == channel) {
26 throw new RuntimeException("netty server-s open error channel is null");
27 }
28 while (!channel.isActive()) {
29 logger.info("启动NettyServer服务,循环等待启动...");
30 Thread.sleep(500);
31 }
32 logger.info("启动NettyServer服务,完成:{}", channel.localAddress());
33 CacheUtil.serverInfoMap.put(nettyServerPort, new ServerInfo("NettySocket", NetUtil.getHost(), nettyServerPort, new Date()));
34
35 //启动NettyServer-websocket
36 logger.info("启动NettyWsServer服务,启动端口:{}", nettyWsServerPort);
37 WsNettyServer wsNettyServer = new WsNettyServer(new InetSocketAddress(nettyWsServerPort));
38 Future<Channel> wsFuture = executorService.submit(wsNettyServer);
39 Channel wsChannel = wsFuture.get();
40 if (null == wsChannel) {
41 throw new RuntimeException("netty server-ws open error channel is null");
42 }
43 while (!wsChannel.isActive()) {
44 logger.info("启动NettyWsServer服务,循环等待启动...");
45 Thread.sleep(500);
46 }
47 logger.info("启动NettyWsServer服务,完成:{}", wsChannel.localAddress());
48 CacheUtil.serverInfoMap.put(nettyServerPort, new ServerInfo("NettyWsSocket", NetUtil.getHost(), nettyServerPort, new Date()));
49 }
50
51}
webapp/arkWs/js/ws.js & 链接websocket服务端
1socket = new WebSocket("ws://localhost:7398/websocket");
2
3 socket.onmessage = function(event){
4
5 var msg = JSON.parse(event.data);
6 console.info(msg);
7
8 $("#msgBox").val($("#msgBox").val() + event.data + "\r\n");
9
10 };
微信公众号:bugstack虫洞栈 & 服务端与监控