前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >分布式专题|都说netty入门很难,那是因为你没有看我的文章!

分布式专题|都说netty入门很难,那是因为你没有看我的文章!

作者头像
AI码师
发布2021-01-12 14:29:05
3980
发布2021-01-12 14:29:05
举报

点击上方蓝字关注我们 文末有惊喜

在写代码之前,我们先看下netty的线程模型,这比那固定格式的代码将会更有趣,看完线程模型,你就知道netty写的那几段固定代码的意义了。

线程模型图

在这里插入图片描述

这个线程模型图里面大概包含了这几个组件:bossGroup,workGroup,selectot(accept),selector(读写),pipline,NioSocketChannel,NioServerSocketChannel;

  • bossgroup,workgroup 在netty中,处理客户端的请求会被注册在两类selector上,这两类selector分别对应两个线程池bossGroup和workgroup,bossGroup主要处理客户端与服务端建立连接注册的selector;workgroup看名字也知道了,是用来干活的线程池,它主要负责处理客户端读事件的selector逻辑;在创建netty的第一行代码中,就是创建这两个线程池,一般情况下bossgroup会设置成一个线程,workgroup会设置多个线程,默认不写的话,netty会获取当前服务器中的cpu核数*2作为默认创建的线程数量。
  • selector(accepet),selector(读写) selector和NIO中的selector是同一种组件,不过在netty中会分为两种类型的selector:专门处理连接事件的selector和专门处理读写事件的selector;但是在NIO中处理这些事件都是使用的同一个selector,NIO中通过遍历key的方式,来判断是连接事件还是读写事件,然后交给后端线程处理的逻辑;
  • NioServerSocketChannel 这是服务端启动之后创建的一个channel,然后会把这个channel注册到selector中,并添加自己感兴趣的accept的事件,后续所有客户端发起的连接都会被该channel监听到。具体用来做什么,我们会结合下个组件介绍
  • NioSocketChannel 客户端在发起连接请求之后,服务端会通过调用NioServerSocketChannel的accepet方法,生成一个NioSocketChannel,接着会从workGroup中挑选一个eventLoop,然后把channel注册到该eventLoop线程的selector上,并添加感兴趣的读事件;后续客户端与服务端所有的读写操作都会在该channel中进行。
  • pipline pipline是一个实现了职责链模式的管道处理器,在初始化之后,会添加一些处理器,例如:编码器、解码器、业务逻辑处理器,select在得到客户端发送过来的数据后,会把数据丢到这个管道里面,然后从头到尾依次执行这些处理器;如果是服务端把数据发往客户端,会从尾部到头部依次执行处理器,但是从服务端发数据到客户端,只会执行出站处理器;客户端发送数据到服务端,只会执行入站处理器。

介绍完这些基本组件之后,我们对netty的线程模型应该有了初步的认识,现在我们大概梳理下netty的整个处理过程:

流程讲解

  1. 服务端初始化时,会创建两个线程组bossGroup,workGoup;
  2. 创建一个NioServerSocketChannel 注册到bossGroup中eventLoop的selector上面,添加自己感兴趣的accept事件, 并监听指定端口;
  3. client1发起连接请求,在服务端会产生一个accept事件,通过遍历selector中的key得到accept事件;
  4. 服务端的NioServerSocketChannel通过accept方法进行阻塞(其实该事件已经来了,不需要阻塞),返回一个客户端的channel1(NioSocketChannel);
  5. 获得了chnnel1之后,服务端会从workgroup挑选一个eventloop1,并将channel1注册到该eventloop1的selector1上面,并添加感兴趣的读事件;这时候已经初始化好了该通道中的pipline1,并将所有的处理器都添加到了pipline1中;
  6. 这个时候又新加入一个client2发起连接,会执行同样的操作,最终将chnnel2注册到另外一个eventloop2里面的selector2上面,并添加感兴趣的读事件;这时候已经初始化好了该通道中的pipline2,并将所有的处理器都添加到了pipline2中;
  7. 如果client1发送数据到服务端,服务端生成的selector1会监听到该事件(读事件),读取通道中的数据,并将数据交给pipline1中,执行后续逻辑处理;
  8. 如果client2发送数据到服务端,服务端生成的selector2会监听到该事件(读事件),读取通道中的数据,并将数据交给pipline2中,执行后续逻辑处理;

快速上手

前面已经将netty的基本组成和其线程模型大概说了下,现在我们演示下如何使用netty进行开发:代码已经放到码云:穿云箭

添加依赖

代码语言:javascript
复制
   <dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>5.0.0.Alpha1</version>
   </dependency>

服务端代码

代码语言:javascript
复制
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class NettyServer {
    public static void main(String[] args) {
        // 创建 处理连接请求的线程组 1个
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        // 创建工作组线程 默认为 cpu核数*2 个
        EventLoopGroup workGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            
            serverBootstrap.group(bossGroup, workGroup).channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childHandler(new ChannelInitializer<SocketChannel>() {

                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            //在pipline中添加自定义的handle处理器
                            socketChannel.pipeline().addLast(new NettyServerHandler());
                        }
                    });
            System.out.println("netty server start");
            // 绑定9000 端口号 sync指的是 创建完端口监听后,才执行后续操作
            ChannelFuture cf = serverBootstrap.bind(9000).sync();
            // 添加监听器 
            cf.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture channelFuture) throws Exception {
                    System.out.println("服务启动完成");
                }
            });
            // 注册chnnel的关闭事件,sync是只有当关闭事件发生后才结束该线程,否则一直阻塞
            cf.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }
}

创建自定义的处理器,写我们自己的业务逻辑

代码语言:javascript
复制
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf msg1 = (ByteBuf) msg;
        System.out.println(String.format("收到客户端(%s)消息:%s", ctx.channel().remoteAddress().toString(), msg1.toString(CharsetUtil.UTF_8)));
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ByteBuf buf = Unpooled.copiedBuffer("HelloClient", CharsetUtil.UTF_8);
        ctx.writeAndFlush(buf);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(String.format("有新的客户端连接:%s", ctx.channel().remoteAddress().toString()));
    }
}
# 这里的ChannelInboundHandlerAdapter已经被废弃了,大家后续可以继承SimpleChannelInboundHandler,支持传入泛型,然后配合解码器使用,这里只是做个简单的演示。

客户端代码

代码语言:javascript
复制
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.CharsetUtil;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;

public class NettyClient {
    public static void main(String[] args) {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(bossGroup).channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {

                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new NettyClientHandler());
                        }

                        @Override
                        public void channelActive(ChannelHandlerContext ctx) throws Exception {
                            System.out.println(String.format("有新的客户端连接:%s", ctx.channel().remoteAddress().toString()));
                        }
                    });
            System.out.println("netty client start");
            ChannelFuture cf = bootstrap.connect("127.0.0.1", 9000).sync();
            cf.addListener((ChannelFutureListener) channelFuture -> System.out.println("客户端启动完成"));

            String msg = "";
            BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
            do {
                try {
                    msg = br.readLine();
                } catch (IOException e) {
                    e.printStackTrace();
                }
                ByteBuf buf = Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8);
                cf.channel().writeAndFlush(buf);
            } while (!msg.equals("end"));
            System.out.println("您已退出");
            cf.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
        }

    }
}

创客户端自定义处理器

代码语言:javascript
复制
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

public class NettyClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ByteBuf buf = Unpooled.copiedBuffer("HelloServer", CharsetUtil.UTF_8);
        ctx.writeAndFlush(buf);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf msg1 = (ByteBuf) msg;
        System.out.println(String.format("收到服户端(%s)消息:%s", ctx.channel().remoteAddress().toString(), msg1.toString(CharsetUtil.UTF_8)));
    }

}

netty相关面试知识拓展

什么是拆包和粘包

名词解释

客户端与服务端建立了TCP/UDP连接,如果连接中限制了发送数据的报文大小,此时 将要发送的数据大于这个限制,就会产生拆包现象;截取后的数据包会等待下次发送数据的时候一起发送,如果这个时候这部分数据和其他数据包一起发到服务端,又会产生粘包的现象;

解决方案

  • 自己定义数据发送的数据格式,包括数据长度和数据内容两个,通过长度来判断数据有没有结束
  • 使用定长解码器实现
  • 使用指定开始符和结束符实现

解释下什么是零拷贝

说零拷贝之前,我们需要引入一个名词“直接内存”,我们知道java代码都运行在jvm虚拟机中,分配的内存数据都是在jvm中分配的,如果想直接访问jvm之外的内存数据,那就叫直接内存访问;在netty中,直接使用直接内存进行socket进行读写。不需要将数据拷贝到jvm中的缓冲区中,而是将数据直接发送到socket中,不需要再执行中间的拷贝操作;

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-12-26,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 乐哉开讲 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 线程模型图
  • 流程讲解
  • 快速上手
    • 添加依赖
      • 服务端代码
        • 创建自定义的处理器,写我们自己的业务逻辑
          • 客户端代码
            • 创客户端自定义处理器
            • netty相关面试知识拓展
              • 什么是拆包和粘包
                • 名词解释
              • 解释下什么是零拷贝
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档