专栏首页FoamValueNetty 主从多线程

Netty 主从多线程

Netty is an asynchronous event-driven network application framework for rapid development of maintainable high performance protocol servers & clients. https://netty.io

Netty 是一个异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。

Netty 是一个 NIO 客户端服务器框架,它支持快速轻松地开发网络应用程序,如协议服务器和客户端。它极大地简化了网络编程,如 TCP 和 UDP 套接字服务器。

Nonblocking I/O

NIO,非阻塞 IO。对比于BIO(Blocking I/O,阻塞IO),NIO 的并发性能得到了很大提高。

常见的五种 IO 模型对比

  • 同步阻塞 IO(BIO)阻塞整个步骤。适用于少连接且延迟低的场景。
  • 同步非阻塞 IO(NIO),阻塞业务处理但不阻塞数据接收。适用于高并发且处理简单的场景。
  • 多路复用 IO,数据请求和业务处理是两个分开进行处理。
  • 信号驱动 IO,主要用在嵌入式开发,不参与讨论。
  • 异步 IO,数据请求和业务处理都是异步的,数据请求一次返回一次,适用于长连接的业务场景。

主从多线程

Netty 是典型的 Reator 模型结构。

Reactor 模式是基于事件驱动开发的,其核心组成部分包括 Reactor 和线程池。其中 Reactor 负责监听和分配事件,而线程池负责处理事件。

根据Reactor的数量和线程池的数量,又将Reactor分为三种模型:

  • 单线程模型 (单 Reactor 单线程)
  • 多线程模型 (单 Reactor 多线程)
  • 主从多线程模型 (多 Reactor 多线程)

什么是主从多线程

从一个主线程 NIO 线程池中选择一个线程(boss)作为 Acceptor 线程,绑定监听端口,接收客户端连接的连接,其他线程(worker)负责后续的业务处理工作。


示例代码

从开源项目中截取了一段 Netty 初始化代码片段。

private void start() {
  EventLoopGroup bossGroup = new NioEventLoopGroup();
  EventLoopGroup workerGroup = new NioEventLoopGroup();
  try {
    ServerBootstrap b = new ServerBootstrap();
    b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
        .option(ChannelOption.SO_BACKLOG, 1024)
        // TCP默认开启了 Nagle 算法,该算法的作用是尽可能的发送大数据快,减少网络传输。TCP_NODELAY
        // 参数的作用就是控制是否启用 Nagle 算法。
        .childOption(ChannelOption.TCP_NODELAY, true)
        .handler(new LoggingHandler(LogLevel.INFO))
        // 当客户端第一次进行请求的时候才会进行初始化
        .childHandler(new ChannelInitializer<SocketChannel>() {
          @Override
          protected void initChannel(SocketChannel ch) {
            // 30 秒之内没有收到客户端请求的话就关闭连接
            ch.pipeline().addLast(new IdleStateHandler(30, 0, 0, TimeUnit.SECONDS));
          }
        })
        // TCP默认开启了 Nagle 算法,该算法的作用是尽可能的发送大数据快,减少网络传输。TCP_NODELAY
        // 参数的作用就是控制是否启用 Nagle 算法。
        .childOption(ChannelOption.TCP_NODELAY, true)
        // 是否开启 TCP 底层心跳机制
        .childOption(ChannelOption.SO_KEEPALIVE, true)
        // 表示系统用于临时存放已完成三次握手的请求的队列的最大长度,如果连接建立频繁,服务器处理创建新连接较慢,可以适当调大这个参数
        .option(ChannelOption.SO_BACKLOG, 128);
    // 绑定端口,同步等待绑定成功
    ChannelFuture f = b.bind(host, port).sync();
    ChannelFuture f2 = b.bind(host, port2).sync();
    // 等待服务端监听端口关闭
    f.channel().closeFuture().sync();
  } catch (InterruptedException e) {
    System.out.println(String.format("occur exception when start server:", e));
  } finally {
    System.out.println(String.format("shutdown bossGroup and workerGroup"));
    bossGroup.shutdownGracefully();
    workerGroup.shutdownGracefully();
  }
}

b.bind(host, port).sync() 和 b.bind(host, port2).sync() 同时绑定了两个 ip 和 端口。

运行结果


源代码知识点

NioEventLoopGroup

特殊的 EventExecutorGroup 接口类,它允许注册已处理的通道,以便在事件循环期间进行后续选择。

ServerBootstrap

Bootstrap 子类,可轻松引导 ServerChannel。

NioServerSocketChannel

一个 ServerSocketChannel 接口的实现类,它使用基于NIO选择器的实现来接受新连接。

ChannelFuture

异步 Channel I / O操作的结果,未完成或已完成。


代码调试

new NioEventLoopGroup()

MultithreadEventExecutorGroup.java 初始化实例。

protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                        EventExecutorChooserFactory chooserFactory, Object... args) {
    if (nThreads <= 0) {
        throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
    }

    if (executor == null) {
        executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
    }

    children = new EventExecutor[nThreads];

    for (int i = 0; i < nThreads; i ++) {
        boolean success = false;
        try {
            children[i] = newChild(executor, args);
            success = true;
        } catch (Exception e) {
            // TODO: Think about if this is a good exception type
            throw new IllegalStateException("failed to create a child event loop", e);
        } finally {
            if (!success) {
                for (int j = 0; j < i; j ++) {
                    children[j].shutdownGracefully();
                }

                for (int j = 0; j < i; j ++) {
                    EventExecutor e = children[j];
                    try {
                        while (!e.isTerminated()) {
                            e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                        }
                    } catch (InterruptedException interrupted) {
                        // Let the caller handle the interruption.
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }
        }
    }

    chooser = chooserFactory.newChooser(children);

    final FutureListener<Object> terminationListener = new FutureListener<Object>() {
        @Override
        public void operationComplete(Future<Object> future) throws Exception {
            if (terminatedChildren.incrementAndGet() == children.length) {
                terminationFuture.setSuccess(null);
            }
        }
    };

    for (EventExecutor e: children) {
        e.terminationFuture().addListener(terminationListener);
    }

    Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
    Collections.addAll(childrenSet, children);
    readonlyChildren = Collections.unmodifiableSet(childrenSet);
}

传入参数:

  1. nThreads 此实例将使用的线程数。
  2. executor 需要执行的 Runable 任务对象 。
  3. choicerFactory 创建 EventExecutorChooser 对象的工厂类。
  4. args 参数将传递给每个 newChild 调用。

new ServerBootstrap()

ServerBootstrap 是 Netty 服务端应用开发的入口。

ServerBootstrap 的配置:

  • group 方法,设置初始化的主从"线程池"。
  • channel 方法,设置通道类型。服务端:NioServerSocketChannel。
  • ...

b.bind(host, port).sync()

绑定并侦听某个端口
private ChannelFuture doBind(final SocketAddress localAddress) {
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();
    if (regFuture.cause() != null) {
        return regFuture;
    }

    if (regFuture.isDone()) {
        // At this point we know that the registration was complete and successful.
        ChannelPromise promise = channel.newPromise();
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {
        // Registration future is almost always fulfilled already, but just in case it's not.
        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
        regFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                Throwable cause = future.cause();
                if (cause != null) {
                    // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                    // IllegalStateException once we try to access the EventLoop of the Channel.
                    promise.setFailure(cause);
                } else {
                    // Registration was successful, so set the correct executor to use.
                    // See https://github.com/netty/netty/issues/2586
                    promise.registered();

                    doBind0(regFuture, channel, localAddress, promise);
                }
            }
        });
        return promise;
    }
}

小结

突如其来的三天小长假,彻底打乱了生活节奏 。

一天搬家、一天休息、一天加班。眼见着明天周日应该好好学习知识了,迎来的却是正常班。

周六熬夜写文章,然后明天早起上班去。就这样。

本文分享自微信公众号 - FoamValue(gh_3c635269f459),作者:FoamValue

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2020-06-27

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 「分享计划」花一天时间学习 Nacos 源码

    获取一下完整的项目代码 git clone https://github.com/alibaba/nacos.git,导入到 Eclipse 就可以开始本次的源...

    FoamValue
  • 「拥抱开源」注册中心 Nacos

    长久以来,我一直使用着闭源的分布式服务注册组件。简单易用且可靠,只要告诉它“我新增了一个某个服务”、“我要调用某个服务”、“这个服务最高耗时 3000 ms”,...

    FoamValue
  • 如何使用 BTrace v.2.0.1

    前两周学习 Java Agent、Byte Buddy 的时候,发现了一个新工具 —— BTrace,一款由 sun 公司推出的适用于 Java 平台的安全、动...

    FoamValue
  • Stringipc-从内存任意读写到权限提升

    传入的new_size为-1时,krealloc分配一个0大小的空间返回一个不为0的错误代码ZERO_SIZE_PTR(0x10),绕过下面的判断你,又因为ne...

    0x222进制
  • 《Go 语言程序设计》读书笔记 (五) 协程与通道

    以最简单方式调用make函数创建的是一个无缓冲的channel,但是我们也可以指定第二个整形参数,对应channel的容量。如果channel的容量大于零,那么...

    KevinYan
  • Go基础系列:channel入门

    channel用于goroutines之间的通信,让它们之间可以进行数据交换。像管道一样,一个goroutine_A向channel_A中放数据,另一个goro...

    李海彬
  • TCP/IP协议模型

    OSI参考模型虽然是ISO给出的,但是ISO在推进OSI协议标准化的进程上是比较缓慢的。而TCP/IP协议的标准化进程是相当快速的,因此现阶段的实际标准是TCP...

    zy010101
  • Go语言并发编程总结

    Golang :不要通过共享内存来通信,而应该通过通信来共享内存。这句风靡在Go社区的话,说的就是 goroutine中的 channel ....... 他在...

    李海彬
  • Go语言并发编程总结

    Golang :不要通过共享内存来通信,而应该通过通信来共享内存。这句风靡在Go社区的话,说的就是 goroutine中的 channel ....... 他在...

    李海彬
  • Go语言并发编程总结

    Golang :不要通过共享内存来通信,而应该通过通信来共享内存。这句风靡在Go社区的话,说的就是 goroutine中的 channel …….

    李海彬

扫码关注云+社区

领取腾讯云代金券