Netty学习一

前面我们已经学习了NIO的简单知识,三大组件:ByteBuffer、Channel、Selector。知道ByteBufffer是数据,而Channel是数据的载体通道,selector为多路复用。如果说线程池为线程提供了重复利用的途径,而Selector则为起到了调度线程的目的,也即高效率的使用线程。下面我们开始Netty的学习。

首先,我们来了解一下mmap、sendFile、零拷贝。在java中,由于传统的IO读写需要进行四次拷贝、四次切换(如图),因此效率上,通常在传输大文件的时候比较低。因此引入了mmap和sendFile进行优化。同时这里,我们就需要了解DMA,在计算机原理中,我们可以看到它的身影,全称Direct Memory Access,翻译出来就是直接内存拷贝(不使用CPU)。也即它是相对于操作系统而言的。

那mmap和sendFile做了哪些优化呢?

mmap通过内存映射,将文件映射到内核缓冲区,同时,用户空间可以共享内核空间的数据。这样,在进行网络传输时,可以减少内核空念到用户空间的拷贝次数。

而sendFile则体现在Linux2.1到Linux2.4的优化sendFile的操作:

从图中可以看到Linux2.4的sendFile操作实现了零拷贝操作,也即只在系统层面进行操作。同时从上面的图中可以看到拷贝的次数:mmap经过了3次拷贝、3次切换。而在sendFile中,经过了2次拷贝2次切换。

零拷贝:从操作系统角度来说的,因为内核缓冲区之间,没有数据是重复的(只有内核缓冲区有一份数据)。同时零拷贝带来了更少的数据赋值,还带来了性能上的优势,减少了上下文切换。

Reactor模式和Proactor模式:

1.Reactor模式:主动模式,所谓主动,是指应用程序不断轮询,询问操作系统或者网络框架,IO是否就绪。其中java的NIO就属于这种模式。在这种模式下,实际的IO操作还是应用程序执行的。
2.Procator模式:被动模式,应用程序的读写函数操作交给操作系统或者网络框架,实际IO操作由操作系统或者网络框架完成,之后再回调应用程序。微软的asio库就是这种模式。

事件驱动模型:通常我们设计一个事件处理模型的程序有两种思路:

1.采用轮询的方式:线程不断轮询询问相关事件发生源有没有发生事件,有发生事件就调用事件处理逻辑
2.事件驱动方式:发生事件,主线程把事件加入到事件队列,在另外线程不断循环消费事件列表的事件,调用事件对应的处理逻辑事件。事件驱动方式也被称为消息通知方式,其实是设计模式中观察者模式的思路。基于事件驱动的优点:可扩展性好,高性能。

Reactor线程模型中2个关键组成:

Reactor:Reactor在一个单独的线程运行,负责监听和分发事件,分发给适当的处理程序来对IO事件做出反应。
Handlers:处理程序执行I/O事件要完成的实际事件。

线程模型reactor

Reactor的三种模型:单Reactor单线程、单Reactor多线程、主从Reactor多线程。

单 Reactor 单线程,前台接待员和服务员是同一个人,全程为顾客服
单 Reactor 多线程,1 个前台接待员,多个服务员,接待员只负责接待
主从 Reactor 多线程,多个前台接待员,多个服务生

Netty的线程模型:

Netty主要基于主从Reactor多线程模型,其中主从Reactor主从模型有多个Reactor:

1)MainReactor负责客户端的连接请求,并将请求转交给SubReactor
2)SubReactor负责相应通道的IO读写请求
3)非IO请求(具体业务逻辑处理)的任务则会直接写入队列,等待worker线程进行处理。

EventLoopGroup bossGroup = new NioEventLoopGroup();
EvenLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap server = new ServerBootstrap();
server.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class);

bossGroup线程池则只是在bind某个端口后,获得其中一个线程作为MainReactor,专门处理端口的Accept事件,每个端口对应一个Boss线程,workerGroup线程池会被各个SubReactor和Worker线程充分利用。

NioEventLoop:维护了一个线程和任务队列,支持异步提交执行任务,线程启动时会调用NioEventLoop的run方法,执行I/O任务和非I/O任务:

I/O任务:即selectionKey中的ready的事件,如accept、connect、read、write等,由processSelectKeys方法触发。
非I/O任务:添加到taskQueue中的任务,如register0、bind0等任务,由runAllTasks方法触发。

下面我们来看一下Netty的简单案例,下载Netty的源码,进行编译:

包括服务器端和客户端,其中服务器端包括服务端引导 、服务端处理器,客户端包括客户端引导、客户端处理器

服务器端引导:

/**
 * Echoes back any received data from a client.
 * Netty的服务端引导
 */
public final class EchoServer {

    static final boolean SSL = System.getProperty("ssl") != null;
    //端口
    static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));

    public static void main(String[] args) throws Exception {
        // Configure SSL.
        final SslContext sslCtx;
        if (SSL) {
            SelfSignedCertificate ssc = new SelfSignedCertificate();
            sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
        } else {
            sslCtx = null;
        }

        // Configure the server.boss和worker线程组
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        //创建服务端业务处理器对象,进行业务处理,添加线程组,同时开启通道添加so_backlog
        //同时添加childHandler,重写initChannel方法
        final EchoServerHandler serverHandler = new EchoServerHandler();
        try {
            //服务端引导
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)
             .option(ChannelOption.SO_BACKLOG, 100)
             .handler(new LoggingHandler(LogLevel.INFO))
             .childHandler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     //流水线
                     ChannelPipeline p = ch.pipeline();
                     if (sslCtx != null) {
                         p.addLast(sslCtx.newHandler(ch.alloc()));
                     }
                     //添加日志信息
                     //p.addLast(new LoggingHandler(LogLevel.INFO));
                     //添加serverHandler
                     p.addLast(serverHandler);
                 }
             });

            // Start the server.
            //启动服务器
            ChannelFuture f = b.bind(PORT).sync();

            // Wait until the server socket is closed.
            f.channel().closeFuture().sync();
        } finally {
            // Shut down all event loops to terminate all threads.
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

服务端处理器:

/**
 * Handler implementation for the echo server.
 * Netty服务端业务处理器,继承ChannelInboundHandlerAdapter,
 * 同时重写channelRead、ChannelReadComplete、exceptionCaught三个方法
 */
@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ctx.write(msg);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        ctx.close();
    }
}

客户端引导

/**
 * Sends one message when a connection is open and echoes back any received
 * data to the server.  Simply put, the echo client initiates the ping-pong
 * traffic between the echo client and server by sending the first message to
 * the server.
 * Netty客户端引导
 */
public final class EchoClient {

    static final boolean SSL = System.getProperty("ssl") != null;
    //ip、端口号
    static final String HOST = System.getProperty("host", "127.0.0.1");
    static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
    static final int SIZE = Integer.parseInt(System.getProperty("size", "256"));

    public static void main(String[] args) throws Exception {
        // Configure SSL.git
        final SslContext sslCtx;
        if (SSL) {
            sslCtx = SslContextBuilder.forClient()
                .trustManager(InsecureTrustManagerFactory.INSTANCE).build();
        } else {
            sslCtx = null;
        }

        // Configure the client.
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group)
             .channel(NioSocketChannel.class)
             .option(ChannelOption.TCP_NODELAY, true)
             .handler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
                     if (sslCtx != null) {
                         p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));
                     }
                     //p.addLast(new LoggingHandler(LogLevel.INFO));
                     p.addLast(new EchoClientHandler());
                 }
             });

            // Start the client.
            ChannelFuture f = b.connect(HOST, PORT).sync();

            // Wait until the connection is closed.
            f.channel().closeFuture().sync();
        } finally {
            // Shut down the event loop to terminate all threads.
            group.shutdownGracefully();
        }
    }
}

客户端处理:

/**
 * Handler implementation for the echo client.  It initiates the ping-pong
 * traffic between the echo client and server by sending the first message to
 * the server.
 * Netty客户端业务处理器,重写四个方法:channelActive、channelRead、channelReadComplete、exceptionCaught
 */
public class EchoClientHandler extends ChannelInboundHandlerAdapter {

    private final ByteBuf firstMessage;

    /**
     * Creates a client-side handler.
     * 创建客户端处理器
     */
    public EchoClientHandler() {
        //使用类似byteBuffer方式创建buffer
        firstMessage = Unpooled.buffer(EchoClient.SIZE);
        for (int i = 0; i < firstMessage.capacity(); i ++) {
            firstMessage.writeByte((byte) i);
        }
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        ctx.writeAndFlush(firstMessage);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ctx.write(msg);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
       ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        ctx.close();
    }
}

启动服务器端和客户端,可以看到运行结果:

同时标注部分是发送的消息

今天就学到这里了!

本文分享自微信公众号 - 后端技术学习(gh_9f5627e6cc61),作者:路行的亚洲

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2020-06-25

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • ReentranLock源码学习

    首先回答一个问题?线程的三大特性?什么时候我们需要锁?java中已经提供了synchronized,为什么还要使用ReentrantLock?AQS原理。

    路行的亚洲
  • ThreadPoolExecutor源码学习

    但点进去看newSingleThreadExecutor可以看到其会调用ThreadPoolExecutor里面的线程。因此有必要研究ThreadPoolExe...

    路行的亚洲
  • kafka学习二 -发送消息

    从源码中我们发现在Sender的run方法中,并没有涉及到append追加操作。因此可以看到源码中,如果消息收集器中的消息收集结果为空或者新的消息批次已经创建好...

    路行的亚洲
  • 几种常见设计模式在项目中的应用<Singleton、Factory、Strategy>

      前几天阅读一框架文档,里面有一段这样的描述 “从对象工厂中………” ,促使写下本文。尽管一些模式简单和简单,但是常用、有用。

    梁规晓
  • 一个Java小白通向数据结构算法之旅(5) - 选择排序

    用户2032165
  • 开源依旧:再次分享一个进销存系统

    开篇 我之前发过一篇博文《两天完成一个小型工程报价系统(三层架构)》,不少朋友向我要源码学习,后来久而久之忘记回复了。今天我再分享一个进销存系统,只为学习,没...

    用户1161731
  • UIAutomator2.0和AccessibilityService实现分析

    UiAutomator是Android 4.1以上提供的一个UI自动化测试工具,4.3升级到了UiAutomator2.0,实现方式也从UiTestAutoma...

    drunkdream
  • 解决HttpServletRequest#getIntputStream只能读一次

    以前一直自己封装一个Wrapper来缓存Body的内容,今天发现在Spring中已经帮我们实现了。就是org.springframework.web.util....

    DH镔
  • 补习系列(13)-springboot redis 与发布订阅

    消息发布者是消息载体的生产者,其通过某些主题来向调度中心发送消息; 而消息订阅者会事先向调度中心订阅其"感兴趣"的主题,随后会获得新消息。

    美码师
  • 一种O(n)的排序——计数排序引发的围观风波

    计算机课上,老师给一串数字6 1 6 9 9 1 4 2 1 5 8 8,问道:这一串数字,你们写个程序给我看,要求效率较高。学不出来的别下课了。

    bigsai

扫码关注云+社区

领取腾讯云代金券