前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >物联网的服务端设计(二)创建连接

物联网的服务端设计(二)创建连接

作者头像
海哥@开发
发布2022-12-08 16:22:19
6110
发布2022-12-08 16:22:19
举报
文章被收录于专栏:海哥聊技术海哥聊技术

引用Spring官方的一句话,让你简单的创建一个项目。

Spring Boot makes it easy to create stand-alone, production-grade Spring based Applications that you can “just run”.

这么好用的框架我们也要用起来,不要只作为web项目使用。让Spring帮我们管理对象多方便啊。

新建项目

正常创建一个SpringBoot2.6.x的项目。在POM里引一下Netty。

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.6.9</version>
    <relativePath/> <!-- lookup parent from repository -->
  </parent>
  <groupId>org.guohai</groupId>
  <artifactId>iot-server</artifactId>
  <version>0.0.1</version>
  <name>iot-server</name>
  <description>iot server by netty</description>
  <properties>
    <java.version>11</java.version>
  </properties>
  <dependencies>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter</artifactId>
    </dependency>
    <dependency>
      <groupId>io.netty</groupId>
      <artifactId>netty-all</artifactId>
    </dependency>
  </dependencies>
  <build>
    <plugins>
      <plugin>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-maven-plugin</artifactId>
        <configuration>
          <excludes>
            <exclude>
              <groupId>org.projectlombok</groupId>
              <artifactId>lombok</artifactId>
            </exclude>
          </excludes>
        </configuration>
      </plugin>
    </plugins>
  </build>
</project>

同时会自动 创建一个main类文件。作为我们的主引导文件。

首先要创建的是两个事件循环组,分别用户来维护客户端的连接和数据的读写。其中一个构建参数的方法是事件组里的线程数量,如果不进行显示声明会自动创建CPU核心数x2的线程。如果工作在类似docker的容器里该参数会不准确,我们需要显示声明下。其中boosGroup只负责连接我们把线程数设置为1,workerGroup为处理数据的读写线程数可以稍微多一些。这里我们设置为2

代码语言:javascript
复制
  /**
   * 主事件,负责连接。单一线程就行
   */
  private final EventLoopGroup bossGroup = new NioEventLoopGroup(1);
  /**
   * 负责处理业务,不设置线程数时为CPU核心*2.如果运行在容器状态下会不准,建议手动设置
   */
  private final EventLoopGroup workerGroup = new NioEventLoopGroup(2);

接下来准备启动我们的Netty的服务进程,我们的启动肯定希望是在整个spring资源加载完毕后。这里可以实现一下CommandLineRunner接口的run方法。

代码语言:javascript
复制
  /**
   * 实现自定义的run方法
   * @param args 输入的参数
   * @throws Exception 抛出异常
   */
  @Override
  public void run(String... args) throws Exception {
    try{
      ServerBootstrap bootstrap = new ServerBootstrap();
      bootstrap
          .group(bossGroup, workerGroup)
          // 这里还可以支持其他的实现,
          // 比如在Linux下可以用基于EpollServerSocketChannel
          // 在mac下可以使用KQueueServerSocketChannel
          // 在这里我们用比较通用的NioServerSocketChannel实现
          .channel(NioServerSocketChannel.class)
          .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel ch) {

            }
          });
      // 绑定端口
      ChannelFuture channelFuture = bootstrap.bind(SERVER_PORT).sync();
      logger.info("Server start listen port :" + SERVER_PORT);
      channelFuture.channel().closeFuture().sync();
    }finally {
      workerGroup.shutdownGracefully();
      bossGroup.shutdownGracefully();
    }

  }

运行我们的程序,目前已经可以开始监听本机的 SERVER_PORT 端口,但客户端连接上来,还不会有任何的回应。我们还需要实现一个最简单的 ChannelHandler。

为了下一步json的解码准备我们起名叫 DecoderHandler 。需要继承自 JsonObjectDecoder 类,并覆写下 extractObject方法。

这里推荐看下 JsonObjectDecoder 的实现,他也是继承自ByteToMessageDecoder 。通过对json当中 的{}或[]进行检查来区分数据包。为了解决连包半包问题 ByteToMessageDecoder 本身是线程不安全的,我们继承下来的DecoderHandler 肯定也是线程不安全的。目前先不解决这个问题,在后续的重构代码环节会进行实现的优化。

看下 JsonObjectDecoder 的源码,如果想在 DecoderHandler 里直接继承处理json数据,覆写下extractObject方法即可。

代码语言:javascript
复制
    /**
     * 识别到一个正确的json数据,进行处理。
     * @param ctx channel
     * @param buffer bytebuff
     * @param index 此次包的开始点
     * @param length 此次包的长度
     * @return 返回一个bytebuf做后续处理,如果不需要可以返回Unpooled.EMPTY_BUFFER
     */
    @Override
    protected ByteBuf extractObject(ChannelHandlerContext ctx, ByteBuf buffer,
                                    int index, int length){
        try{
            // 首先按指定的位置标记从 buffer中读取数据到新的bytebuf中。
      // 这里的 byteBuf 是基于零拷贝实现的,共用的是同一份内存区,性能更好。不要手动释放 byteBuf 对象
            // 这里的ByteBuf是netty重写的nio中的ByteBuffer性能更好
            ByteBuf byteBuf = buffer.slice(index, length);

            // 把接收到的流转写成string字符串
            String message = byteBuf.toString(CharsetUtil.UTF_8);
            logger.info(message);
            // 测试阶段直接回写数据
            ctx.writeAndFlush(Unpooled.copiedBuffer(message, CharsetUtil.UTF_8));
            

        }catch (Exception e){
            e.printStackTrace();
        }
    // 已经处理完毕,不需要后续处理直接return null即可
        return null;
    }

然后回到上一步的initChannel中增加一个 pipeline的channelHandler.

代码语言:javascript
复制
  @Override
  public void initChannel(SocketChannel ch) {
    ch.pipeline()
      .addLast(new DecoderHandler());
  }

再次运行我们的程序,并使用nc进行下测试。可以看到服务端已经可以回写我们发送的字符串。

代码语言:javascript
复制
$ nc 127.0.0.1 4100
{"msgType": 20, "txNo": "1234567890123"} 
{"msgType": 20, "txNo": "1234567890123"}

本节 源码

下一章节我们将会实现

  1. 一个客户端的空闲检测,并踢掉空闲的客户端
  2. 服务端空闲,并下发心跳包
  3. 定时的netty连接状态打印
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2022-07-20,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 海哥聊技术 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 这么好用的框架我们也要用起来,不要只作为web项目使用。让Spring帮我们管理对象多方便啊。
  • 新建项目
    • 下一章节我们将会实现
    相关产品与服务
    容器服务
    腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档