Netty 系列一(核心组件和实例).

一、概念

    早期的 Java API 只支持由本地系统套接字库提供所谓的阻塞函数来支持网络编程。由于是阻塞 I/O ,要管理多个并发客户端,需要为每个新的客户端Socket 创建一个 Thread 。这将导致一系列的问题,第一,在任何时候都可能有大量的线程处于休眠状态(不可能每时每刻都有对应的并发数);第二,需要为每个线程的调用栈都分配内存;第三,JVM 在线程的上下文切换所带来的开销会带来麻烦。

    Java 在 2002 年引入了非阻塞 I/O,位于 JDK 1.4 的 java.nio 包中。class java.nio.channels.Selector 是Java 的非阻塞 I/O 实现的关键。它使用了事件通知以确定在一组非阻塞套接字中有哪些已经就绪能够进行 I/O 相关的操作。因为可以在任何的时间检查任意的读操作或者写操作的完成状态,所以如图 1-2 所示,一个单一的线程便可以处理多个并发的连接。

    尽管可以直接使用 Java NIO API,但是在高负载下可靠和高效地处理和调度 I/O 操作是一项繁琐而且容易出错的任务,最好还是留给高性能的网络编程专家——Netty。

    Netty 是一款异步的事件驱动的网络应用程序框架,支持快速的开发可维护的高性能的瞄向协议的服务端和客户端。它驾驭了Java高级API的能力,并将其隐藏在一个易于使用的API之后。首先,它的基于 Java NIO 的异步的和事件驱动的实现,保证了高负载下应用程序性能的最大化和可伸缩性。其次, Netty 也包含了一组设计模式,将应用程序逻辑从网络层解耦,简化了开发过程, 同时也最大限度地提高了可测试性、模块化以及代码的可重用性。

 tips:面向对象的基本概念—> 用较简单的抽象隐藏底层实现的复杂性。

二、核心组件

  • Channel

    Channel是Java NIO的一个基本构造。可以看作是传入或传出数据的载体。因此,它可以被打开或关闭,连接或者断开连接。以下是常用的Channel:

-- EmbeddedChannel -- LocalServerChannel -- NioDatagramChannel -- NioSctpChannel -- NioSocketChannel

  • 回调

当一个回调被触发时,相应的事件可以被一个interface-ChannelHandler的实现处理。

  • Future

Netty中所有的I/O操作都是异步的。因为一个操作可能不会立即返回,所以我们需要一种在之后的某个时间点确定其结果的方法。

    Future 和 回调 是相互补充的机制,提供了另一种在操作完成时通知应用程序的方式。这个对象可以看作是一个异步操作结果的占位符;它将在未来的某个时刻完成,并提供对其结果的访问。

    Netty 提供了ChannelFuture,用于在执行异步操作的时候使用。每个Netty的出站I/O操作都会返回一个ChannelFuture。ChannelFuture能够注册一个或者多个ChannelFutureListener 实例。监听器的回调方法operationComplete(),将会在对应的操作完成时被调用。

  • ChannelHandler

Netty 的主要组件是ChannelHandler,它充当了所有处理入站和出站数据的应用程序逻辑的容器。

    Netty 使用不同的事件来通知我们状态的改变或者是操作的状态,每个事件都可以被分发给ChannelHandler类中某个用户实现的方法。Netty提供了大量预定义的可以开箱即用的ChannelHandler实现,包括用于各种协议的ChannelHandler。

    现在,事件可以被分发给ChannelHandler类中某个用户实现的方法。那么,如果 ChannelHandler 处理完成后不直接返回给客户端,而是传递给下一个ChannelHandler 继续处理呢?那么就要说到 ChannelPipeline !

    ChannelPipeline 提供了 ChannelHandler链 的容器,并定义了用于在该链上传播入站和出站事件流的API。使得事件流经 ChannelPipeline 是 ChannelHandler 的工作,它们是在应用程序的初始化或者引导阶段被安装的。这些对象接收事件、执行他们所实现的处理逻辑,并将数据传递给链中的下一个ChannelHandler:

1、一个ChannelInitializer的实现被注册到了ServerBootstrap中。 2、当 ChannelInitializer.initChannel()方法被调用时, ChannelInitializer将在 ChannelPipeline 中安装一组自定义的 ChannelHandler。 3、ChannelInitializer 将它自己从 ChannelPipeline 中移除。

  • EventLoop

    EventLoop 定义了Netty的核心抽象,用来处理连接的生命周期中所发生的事件,在内部,将会为每个Channel分配一个EventLoop。

    EventLoop本身只由一个线程驱动,其处理了一个Channel的所有I/O事件,并且在该EventLoop的整个生命周期内都不会改变。这个简单而强大的设计消除了你可能有的在ChannelHandler实现中需要进行同步的任何顾虑。

    这里需要说到,EventLoop的管理是通过EventLoopGroup来实现的。还要一点要注意的是,客户端引导类是 Bootstrap,只需要一个EventLoopGroup。服务端引导类是 ServerBootstrap,通常需要两个 EventLoopGroup,一个用来接收客户端连接,一个用来处理 I/O 事件(也可以只使用一个 EventLoopGroup,此时其将在两个场景下共用同一个 EventLoopGroup)。

1、一个 EventLoopGroup 包含一个或者多个 EventLoop; 2、一个 EventLoop 在它的生命周期内只和一个 Thread 绑定; 3、所有由 EventLoop 处理的 I/O 事件都将在它专有的Thread 上被处理; 4、一个 Channel 在它的生命周期内只注册于一个EventLoop; 5、一个 EventLoop 可能会被分配给一个或者多个 Channel(面对多个Channel,一个 EventLoop 按照事件触发,顺序执行)。

三、实例

    所有的Netty服务端/客户端都至少需要两个部分:

1、至少一个ChannelHandler —— 该组件实现了对数据的处理。

2、引导 —— 这是配置服务器的启动代码。

    服务端:

public class EchoServer {

    private final int port;

    public EchoServer(int port) {
        this.port = port;
    }

    public void start() throws InterruptedException {
        final EchoServerHandler serverHandler = new EchoServerHandler();
        //1、创建EventLoopGroup以进行事件的处理,如接受新连接以及读/写数据
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            //2、创建ServerBootstrap,引导和绑定服务器
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(group, group)
                    //3、指定所使用的NIO传输Channel
                    .channel(NioServerSocketChannel.class)
                    //4、使用指定的端口设置套接字地址
                    .localAddress(new InetSocketAddress(port))
                    //5、添加一个 EchoServerHandler 到子 Channel的 ChannelPipeline
                    //当一个新的连接被接受时,一个新的子Channel将会被创建,而 ChannelInitializer 将会把一个你的EchoServerHandler 的实例添加到该 Channel 的 ChannelPipeline 中
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(serverHandler);
                        }
                    });
            //6、异步地绑定服务器,调用sync()方法阻塞等待直到绑定完成
            ChannelFuture channelFuture = bootstrap.bind().sync();
            System.out.println(EchoServer.class.getName() + "started and listening for connections on" + channelFuture.channel().localAddress());
            //7、获取 Channel 的 CloseFuture,并且阻塞当前线程直到它完成
            channelFuture.channel().closeFuture().sync();

        } finally {
            //8、关闭 EventLoopGroup 释放所有的资源
            group.shutdownGracefully().sync();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        new EchoServer(9999).start();
    }
}
@ChannelHandler.Sharable //标识一个Channel-Handler 可以被多个Channel安全的共享
public class EchoServerHandler extends ChannelHandlerAdapter {


    /**
     * 对于每个传入的消息都要调用
     *
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf in = (ByteBuf) msg;
        System.out.println("Server received:" + in.toString(CharsetUtil.UTF_8));
        //将接收到的消息写给发送者,而不冲刷出站消息
        //ChannelHandlerContext 发送消息。导致消息向下一个ChannelHandler流动
        //Channel 发送消息将会导致消息从 ChannelPipeline的尾端开始流动
        ctx.write(in);
    }

    /**
     * 通知 ChannelHandlerAdapter 最后一次对channel-Read()的调用是当前批量读取中的最后一条消息
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        //暂存于ChannelOutboundBuffer中的消息,在下一次调用flush()或者writeAndFlush()方法时将会尝试写出到套接字
        //将这份暂存消息冲刷到远程节点,并且关闭该Channel
        ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)
                .addListener(ChannelFutureListener.CLOSE);
    }

    /**
     * 在读取操作期间,有异常抛出时会调用
     *
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

}

    客户端:

public class EchoClient {

    private final String host;
    private final int port;

    public EchoClient(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public void start() throws InterruptedException {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            //创建Bootstrap
            Bootstrap bootstrap = new Bootstrap();
            //指定 EventLoopGroup 以处理客户端事件;适应于NIO的实现
            bootstrap.group(group)
                    //适用于NIO传输的Channel类型
                    .channel(NioSocketChannel.class)
                    .remoteAddress(new InetSocketAddress(host, port))
                    //在创建Channel时,向ChannelPipeline中添加一个EchoClientHandler实例
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new EchoClientHandler());
                        }
                    });
            //连接到远程节点,阻塞等待直到连接完成
            ChannelFuture channelFuture = bootstrap.connect().sync();
            //阻塞,直到Channel 关闭
            channelFuture.channel().closeFuture().sync();
        } finally {
            //关闭线程池并且释放所有的资源
            group.shutdownGracefully().sync();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        new EchoClient("127.0.0.1", 9999).start();

        System.out.println("------------------------------------");

        new EchoClient("127.0.0.1", 9999).start();

        System.out.println("------------------------------------");

        new EchoClient("127.0.0.1", 9999).start();
    }


}
@ChannelHandler.Sharable //标记该类的实例可以被多个Channel共享
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {

    /**
     * 当从服务器接收到一条消息时被调用
     *
     * @param ctx
     * @param msg ByteBuf (Netty 的字节容器) 作为一个面向流的协议,TCP 保证了字节数组将会按照服务器发送它们的顺序接收
     * @throws Exception
     */
    @Override
    protected void messageReceived(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
        System.out.println("Client" + ctx.channel().remoteAddress() + "connected");
        System.out.println(msg.toString(CharsetUtil.UTF_8));
    }

    /**
     * 在到服务器的连接已经建立之后将被调用
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx)  {
        ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rock!", CharsetUtil.UTF_8));
    }


    /**
     * 在处理过程中引发异常时被调用
     *
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

}

四、结语

    带着一阵迷糊就开始了Netty学习之旅,学到现在还是对Netty一堆专有名词头大!没办法,只好硬着头皮学下去了,毕竟,熟读唐诗三百首,不会作诗也会吟嘛!

    来总结下,一个Netty服务端处理客户端连接的过程:1、创建一个channel同该用户端进行绑定;2、channel从EventLoopGroup获得一个EventLoop,并注册到该EventLoop,channel生命周期内都和该EventLoop在一起(注册时获得selectionKey);3、channel同用户端进行网络连接、关闭和读写,生成相对应的event(改变selectinKey信息),触发eventloop调度线程进行执行;4、ChannelPipeline 找到对应 ChannelHandler 方法处理用户逻辑。

参考资料:《Netty IN ACTION》

演示源代码:https://github.com/JMCuixy/NettyDemo

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏程序员叨叨叨

【转】使用 Spring HATEOAS 开发 REST 服务原文

绝大多数开发人员对于 REST 这个词都并不陌生。自从 2000 年 Roy Fielding 在其博士论文中创造出来这个词之后,REST 架构风格就很快地流行...

1041
来自专栏沃趣科技

MySQL中server_id一致带来的问题

简 介 我们都知道在MySQL搭建复制环境的时候,需要设置每个server的server_id不一致,如果主库与从库的server_id一致,那么复制会失败。...

3896
来自专栏程序猿DD

Hystrix降级逻辑中如何获取触发的异常?

通过之前Spring Cloud系列教程中的《Spring Cloud构建微服务架构:服务容错保护(Hystrix服务降级)》一文,我们已经知道如何通过Hyst...

1343
来自专栏安富莱嵌入式技术分享

【RL-TCPnet网络教程】第26章 RL-TCPnet之DHCP应用

本章节为大家讲解RL-TCPnet的DHCP应用,学习本章节前,务必要优先学习第25章的DHCP基础知识。有了这些基础知识之后,再搞本章节会有事半功倍的效果。

981
来自专栏大魏分享(微信公众号:david-share)

如何使用模拟框架测试微服务? | 微服务系列第八篇

作为开发人员尝试创建集成测试时,会遇到许多复杂问题。出现的两个最常见的问题包括与:

4222
来自专栏java学习

关于Spring 和 Spring MVC的43个问题【问题汇总】

通过Spring提供的IoC容器,可以将对象之间的依赖关系交由Spring进行控制,避免硬编码所造成的过度程序耦合。

901
来自专栏一个会写诗的程序员的博客

Spring Boot 使用 Spring Session 集成 Redis 实现Session共享Spring Boot 使用 Spring Session 集成 Redis 实现Session共享

通常在web开发中,Session 会话管理是很重要的一部分,用于存储与用户相关的一些数据。在Java Web 系统中的 Session一般由 Tomcat 容...

8435
来自专栏JAVA同学会

Zookeeper应用之——栅栏(barrier)

barrier的作用是所有的线程等待,知道某一时刻,锁释放,所有的线程同时执行。举一个生动的例子,比如跑步比赛,所有

963
来自专栏xingoo, 一个梦想做发明家的程序员

Spring MVC 基于Method的映射规则(注解版)

在Restful风格的web开发中,根据不同的请求方法使用相应的控制器处理逻辑成为核心需求,下面就看看如何在Spring MVC中识别不同的请求方法。 请...

2329
来自专栏依乐祝

ASP.NET Core WebApi使用Swagger生成api说明文档看这篇就够了

将 Swagger 生成器添加到 Startup.ConfigureServices 方法中的服务集合中:

2031

扫码关注云+社区

领取腾讯云代金券