初识Netty

【导读】我在两年前的时候就购买了《Netty权威指南》看了一下,不过没看懂哈哈哈哈,工作中也用不到,很快就忘了,直到前段时间在dy那边需要我重构一个TCP连接通信工具(半个月时间只给它搭了一个架子),所以最近也重新翻开书进行学习。Netty是最流行的NIO框架之一,其健壮性、功能、性能、可定制性和可扩展性都是很优秀的。Netty是Hadoop生态中RPC框架Avro以及Dubbo底层的通信框架。今儿就来聊一下其的简单使用。

一、Netty逻辑架构

采用三层网络架构进行设计和开发,架构如图:

1、Reactor通信调度层

监听网络的读写和连接请求,负责将网络层的数据读取到内存缓冲区中,然后触发各种网络事件,例如连接创建、连接激活、读写事件等,然后将这些事件触发到Pipeline中,由Pipeline管理的职责链进行后续处理。

什么是反应堆Reactor:

BIO模型在并发高的情况下,会创建大量线程,性能很低并可能导致服务宕机。

NIO工作原理:

(1)由一个专门的线程处理所有的IO事件,并负责分发,也就是Selector

(2)事件驱动机制:事件到的时候触发,而不是同步的去监视事件

(3)线程通信:线程之间通过wait/notify等方式进行通信,减少无谓的线程切换

2、职责链Pipeline

负责事件在职责链之间的有序传播,动态的编排职责链。可选择监听和处理特定的事件,拦截处理事件,比如对消息的编解码。使用的是设计模式中的责任链模式。

3、业务逻辑编排层Service ChannelHandler

分成两类:一类是纯粹的业务逻辑编排,另一类是应用层协议插件,用于特定协议的会话和链路管理。比如对消息进行编解码,协议不同,编解码步骤也不一样,例如Http和WebSocket

二、用Netty实现简易聊天窗口

在上一篇用JavaNIO写了一个例子,Java原生的NIO开发需要手动判断事件的类型才能做下一步处理,相比于原生Java NIO的开发,Netty的开发会简单很多,不需要开发者关注当前事件是什么类型的事件,只需在相应的方法里关注业务逻辑的处理等。

1、环境准备:引入Netty依赖

2、服务端开发

public class Server {    private int port;    public Server(int port) {        this.port = port;    }    public void start(){        //配置服务端NIO线程        //接收客户端连接,事件分发线程池        NioEventLoopGroup boss = null;        //处理读写事件线程池        NioEventLoopGroup workder = null;        try {            boss = new NioEventLoopGroup();            workder = new NioEventLoopGroup();
            //创建服务端,相当于NIO的ServerSocketChannel            ServerBootstrap server = new ServerBootstrap();            server.group(boss,workder)                    .channel(NioServerSocketChannel.class)                    .childHandler(new ChannelInitializer<SocketChannel>() {                        @Override                        protected void initChannel(SocketChannel ch) throws Exception {                            ChannelPipeline pipeline = ch.pipeline();                            //绑定IO事件处理类                            pipeline.addLast(new ServerHandler());                        }                    })                    //最大客户端连接数                    .option(ChannelOption.SO_BACKLOG, 128)                    //与客户端保持长链接                    .childOption(ChannelOption.SO_KEEPALIVE, true);            //绑定端口,同步等待            ChannelFuture future = server.bind(this.port).sync();            System.out.println("服务端已启动,端口为:"+port);
            //等待服务器监听端口关闭            future.channel().closeFuture().sync();
        } catch (Exception e) {
        } finally {            //优雅退出,释放线程池资源            boss.shutdownGracefully();            workder.shutdownGracefully();        }    }    public static void main(String[] args) {        new Server(8080).start();    }}

其中在childHandler中是绑定了IO事件的处理类,作用类似于Reactor模式中的Handler类,主要用于处理网络IO事件,例如消息编解码等,我这边主要是用于演示Netty使用,所以未做编解码的处理。

在这边主要用于对聊天室信息发送的处理,看一下其编码:

public class ServerHandler extends ChannelHandlerAdapter {
    //存放所有Channel    private static final ChannelGroup onLineUsers = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    //定义channel的私有名称属性,用于区分channel    AttributeKey<String> name = AttributeKey.valueOf("name");
    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        //将消息转换成ByteBuf,对应于JavaNIO的ByteBuffer        ByteBuf buf = (ByteBuf) msg;        byte[] bytes = new byte[buf.readableBytes()];        buf.readBytes(bytes);        String body = new String(bytes, "UTF-8");        Channel channel = ctx.channel();
        String message = null;        //如果是新建连接        if (!onLineUsers.contains(channel)) {            channel.attr(name).getAndSet(body);            onLineUsers.add(channel);            message = body+"在"+getCurrentTime()+"进入聊天室,当前聊天室人数为:"+onLineUsers.size();            System.out.println(message);            for (Channel user : onLineUsers) {                if (user!=channel) {                    //发送给聊天室的其他人                    ByteBuf resp = Unpooled.copiedBuffer(message.getBytes());                    user.writeAndFlush(resp);                } else {                    //发送给自己                    ByteBuf resp = Unpooled.copiedBuffer("您已进入聊天室".getBytes());                    user.writeAndFlush(resp);                }            }        } else {            //将发送的消息发给其他人            message = "在"+getCurrentTime()+"发送了一条消息,内容:" + body;            for (Channel user : onLineUsers) {                if (user!=channel) {                    ByteBuf resp = Unpooled.copiedBuffer((channel.attr(name).get()+message).getBytes());                    user.writeAndFlush(resp);                }            }        }    }
    public static String getCurrentTime(){        SimpleDateFormat format = new SimpleDateFormat("YYYY-MM-dd HH:mm:ss");        String time = format.format(new Date());        return time;    }
}

需要继承ChannelHandlerAdapter类重写ChannelRead方法,用于读取客户端消息并进行处理,利用ChannelGroup存放所有channel,也就是聊天室的人,利用AttributeKey定义channel的私有名称属性,用于区分channel。

3、客户端开发

public class Client {
    public static void main(String[] args) {        //监听控制台输入,将输入的字符用于在服务端定义当前连接的名字        Scanner scanner = new Scanner(System.in);        new Client().start(scanner.nextLine());    }
    private void start(final String name) {        //用于IO读写的线程池        NioEventLoopGroup group = new NioEventLoopGroup();        try {            //相当于NIO的SocketChannel            Bootstrap client = new Bootstrap();            client.group(group)                    .option(ChannelOption.TCP_NODELAY,true)                    .channel(NioSocketChannel.class)                    .handler(new ChannelInitializer<SocketChannel>() {                        @Override                        protected void initChannel(SocketChannel ch) throws Exception {                            //绑定IO读写事件的处理类                            ch.pipeline().addLast(new ClientHandler(name));                        }                    });            //发起连接            ChannelFuture cf = client.connect(new InetSocketAddress(8080)).sync();            //等待关闭            cf.channel().closeFuture().sync();        } catch (Exception e) {        } finally {            group.shutdownGracefully();        }    }
}

I/O事件处理类:

public class ClientHandler extends ChannelHandlerAdapter{
    private String name;
    public ClientHandler(String name) {        this.name = name;    }

    /**     * 连接成功后触发此方法     * @param ctx     * @throws Exception     */    @Override    public void channelActive(ChannelHandlerContext ctx) throws Exception {        //将第一次输入的名称发送给客户端        String order = name;        ByteBuf buf = Unpooled.buffer(order.length());        buf.writeBytes(order.getBytes());        ctx.writeAndFlush(buf);        //另起一个线程监听控制台输入,如果不添加线程,此客户端会阻塞在此        new Thread(new Runnable() {            @Override            public void run() {                Scanner scanner = new Scanner(System.in);                while (scanner.hasNextLine()) {                    String s = scanner.nextLine();                    ByteBuf buffer = Unpooled.buffer(s.length());                    buffer.writeBytes(s.getBytes());                    ctx.writeAndFlush(buffer);                }            }        }).start();
    }


    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        //接收服务端返回的消息        ByteBuf buf=(ByteBuf) msg;        byte[] bytes=new byte[buf.readableBytes()];        buf.readBytes(bytes);        String txt=new String(bytes,"UTF-8");        System.out.println("【"+name+"】接收到信息:"+txt);    }
}

4、启动

(1)启动服务端

(2)启动两个客户端,名称分别是张三、李四

张三:

此时服务端会输出信息:

李四:

此时服务端会输出信息:

用张三的窗口发送一条消息给李四:

李四会受到消息:

当一个客户端发消息的时候,其他在群里面的人都会受到消息。

=============================

以上就是Netty的简略使用,更多敬请关注之后几篇,一起学习。

本文分享自微信公众号 - Liusy01(Liusy_01),作者:Liusy01

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2020-06-27

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 【设计模式-组合模式】

    【导读】讲究的是“部分-整体”的关系,比如文件与文件夹,文件夹包含了文件和子文件夹,如果需要做一个文件管理系统的话,此时就需要用到组合模式。类似与下图的结构:

    Liusy
  • ZK实现分布式锁

    上一篇说了ZK是什么以及能干什么,今儿这篇就来用ZK实现分布式锁,分别用java原生的zookeeper客户端、ZKClient实现。

    Liusy
  • 【设计模式-工厂相关模式】

    定义一个创建对象的接口,但让实现这个接口的类来决定实例化哪个类,工厂方法让类的实例化推迟到子类中进行。

    Liusy
  • 初探Java设计模式1:创建型模式(工厂,单例等)

    转自https://javadoop.com/post/design-pattern

    Java技术江湖
  • 初探Java设计模式1:创建型模式(工厂,单例等)

    转自https://javadoop.com/post/design-pattern

    Java技术江湖
  • Struts2框架的基本使用

         前面已经介绍过了MVC思想,Struts2是一个优秀的MVC框架,大大降低了各个层之间的耦合度,具有很好的扩展性。从本篇开始我们学习Struts2的基...

    Single
  • Java泛型学习

    1、泛型的概念     泛型即“参数化类型”,就比如我们定义方法的时候,定义一个变量,称为形参,变量值根据传进去的实参的值不同而改变。而泛型的出现,就是为了解决...

    JMCui
  • 初探Java设计模式1:创建型模式(工厂,单例等)

    一直想写一篇介绍设计模式的文章,让读者可以很快看完,而且一看就懂,看懂就会用,同时不会将各个模式搞混。自认为本文还是写得不错的,花了不少心思来写这文章和做图,力...

    黄小斜
  • Android高德之旅(12)厉害了POI

    前后两千万,拍照更清晰。大家好,这里是OPPO R11独家冠名赞助播出的大型情感类电视连续剧《Android高德之旅》,我是主持人大公爵。这期节目,我们来讲一下...

    大公爵
  • 在Java EE7框架中使用MongoDB

    中心点创建应用程序的执行在企业环境中,应用程序必须安全、便携和高可用性。它还必须能够与不同的系统交互,但可控的从一个最好的位置。JEE7合并是一个重要的框架的所...

    用户1289394

扫码关注云+社区

领取腾讯云代金券