前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Netty源码阅读入门实战(四)-NioEventLoop

Netty源码阅读入门实战(四)-NioEventLoop

作者头像
JavaEdge
修改2021-01-01 23:56:11
4110
修改2021-01-01 23:56:11
举报
文章被收录于专栏:JavaEdgeJavaEdge

1 NioEventLoop概述

  • Netty服务端默认起多少个线程?何时启动?
  • Netty如何解决JDK空轮询bug?
  • Netty如何保证异步串行无锁化?

综述:

  • NioEventLoop创建
  • NioEventLoop启动
  • NioEventLoop执行逻辑

2 NioEventLoop创建

概述

new NioEventLoopGroup) 线程组,默认 2*cpu

  • new ThreadPerTaskExecutor() 线程创建器
  • for() {newChild() } 构造 NioEventLoop
  • chooserFactory.newChooser() 线程选择器

对应

代码语言:txt
复制
new NioEventLoopGroup) [线程组,默认 2*cpu]

对应

代码语言:txt
复制
new ThreadPerTaskExecutor() [线程创建器]

对应

代码语言:txt
复制
for() {newChild() } [构造 NioEventLoop]

对应

代码语言:txt
复制
chooserFactory.newChooser() [线程选择器]

3 ThreadPerTaskThread

2 服务端Channel的创建

  • 服务端的socket在哪里初始化?
  • 在哪里accept连接?

流程

创建服务端Channel

  • bind() 用户代码入口
  • initAndRegister() 初始化并注册
  • newChannel() 创建服务端channel

初始化服务端Channel

注册selector

端口绑定

下面看源码:

bind 对应样例的

跟进调试
跟进调试
  • 通过反射创建的 channel
    看看 channelFactory
  • 反射创建服务端 Channel newSocket() 通过jdk来创建底层jdk channel NioServerSocketChannelConfig0 tcp 参数配置类 AbstractNioChannel() - configureBlocking(false) 阻塞模式 - AbstractChannel() 创建 id,unsafe,pipeline

下面来看这个流程,首先

代码语言:txt
复制
newSocket() [通过jdk来创建底层jdk channel]
  • 创建完毕了
代码语言:txt
复制
NioServerSocketChannelConfig0 [tcp 参数配置类]
代码语言:txt
复制
AbstractNioChannel()
  • ANC
代码语言:txt
复制
configureBlocking(false) [阻塞模式]
代码语言:txt
复制
AbstractChannel() [创建 id,unsafe,pipeline]

4 创建NioEventLoop线程

newchild()

  • 保存线程执行器 ThreadPerTaskExecutorchooser = chooserFactory.newChooser(children);NioEventLoopGroup.next()
  • 创建一个 MpscQueue
  • 创建一个 selector
    5 创建线程选择器

isPowerOfTwo() 判断是否为 2 的幂

  • PowerOfTwoEventExecutorChooser 优化 - index++ & (length-1)
  • GenericEventExecutorChooser 普通 - abs(index++ % length)
  • 先看看普通的
  • 再看幂2的

循环取数组索引下标,& 比取模性能更高

6 NioEventLoop的启动

NioEventLoop启动触发器

  • 服务端启动绑定端口
  • 新连接接入通过chooser绑定一 个NioEventLoop

NioEventLoop启动

  • bind() > execute(task) 入口
  • startThread() -> doStartThread() 创建线程
  • ThreadPerTaskExecutorexecute()
  • thread = Thread.currentThread()
  • NioEventLoop.run() 启动
    7 NioEventLoop执行概述8 检测IO事件select()方法执行逻辑
  • deadline以及任务穿插逻辑处理
  • 阻塞式select
  • 避免jdk空轮训的bug
    执行至此,说明已进行了一次阻塞式的 select 操作
    执行至此,说明已进行了一次阻塞式的 select 操作
    产生空轮询的判断
    产生空轮询的判断
    当空轮询次数大于阈值
    当空轮询次数大于阈值
    阈值定义
    阈值定义
    阈值
    阈值

避免空轮询的再次发生

  • 创建新选择器
  • 获取旧选择器的所有 key 值
  • netty 包装的 channel
  • 更新选择器的 key
  • 至此,解决了空轮bug
    9 处理IO事件

processSelectedKey 执行流程

  • selected keySet优化
    原生JDK创建一个 selector
    原生JDK创建一个 selector
    单线程处理,其实不需要两个数组,后续版本已经是一个数组
    单线程处理,其实不需要两个数组,后续版本已经是一个数组
    只需关注
    只需关注
    根本不需要此三个方法
    根本不需要此三个方法
    通过反射
    通过反射
    即此类
    即此类
    继续反射流程
    继续反射流程
    替换为优化后的 set 集合
    替换为优化后的 set 集合
    一句话总结:用数组替换HashSet 的实现,做到 add 时间复杂度为O(1) private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); if (!k.isValid()) { final EventLoop eventLoop; try { eventLoop = ch.eventLoop(); } catch (Throwable ignored) { // If the channel implementation throws an exception because there is no event loop, we ignore this // because we are only trying to determine if ch is registered to this event loop and thus has authority // to close ch. return; } // Only close ch if ch is still registerd to this EventLoop. ch could have deregistered from the event loop // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is // still healthy and should not be closed. // See https://github.com/netty/netty/issues/5125 if (eventLoop != this || eventLoop == null) { return; } // close the channel if the key is not valid anymore unsafe.close(unsafe.voidPromise()); return; } try { int readyOps = k.readyOps(); // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise // the NIO JDK channel implementation may throw a NotYetConnectedException. if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking // See https://github.com/netty/netty/issues/924 int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } // Process OP_WRITE first as we may be able to write some queued buffers and so free memory. if ((readyOps & SelectionKey.OP_WRITE) != 0) { // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write ch.unsafe().forceFlush(); } // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead // to a spin loop if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); if (!ch.isOpen()) { // Connection already closed - no need to handle write. return; } } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } }Netty 默认通过反射将selector 底层的 HashSet 实现转为数组优化 处理每个 ketSet 时都会取到对应的 attachment,即在向 selector注册 io 事件时绑定的经过 Netty 封装后的 channel10 reactor线程任务的执行runAllTasks()执行流程
  • processSelectedKeysOptimized
  • task 的分类和添加
    定时任务
    定时任务
  • 任务的聚合
    从定时任务中拉取
    从定时任务中拉取
    根据时间,时间相同根据名称
    根据时间,时间相同根据名称
    取nanotime 截止时间前的定时任务
    取nanotime 截止时间前的定时任务
  • 任务的执行
    从普通 taskqueue 中取任务
    从普通 taskqueue 中取任务
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018.07.14 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1 NioEventLoop概述
  • 2 NioEventLoop创建
    • 概述
    • 3 ThreadPerTaskThread
    • 2 服务端Channel的创建
      • 流程
      • 4 创建NioEventLoop线程
      • 6 NioEventLoop的启动
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档