前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >深入Netty事件流程分析(上)

深入Netty事件流程分析(上)

作者头像
keithl
发布2020-05-07 10:18:37
1.1K0
发布2020-05-07 10:18:37
举报
文章被收录于专栏:疾风先生疾风先生

前面我们已经深入分析Netty中的核心组件,接下来我们开始来深入理解Netty各个组件处理事件的运作流程,通过事件流程的分析,我们可以思考Netty框架是如何设计组件之间的协作来配合完成基于Reactor模式且具备可伸缩性的Web服务,由于Netty事件流程比较多且杂,上篇主要分析事件轮询器初始化,启动类初始化组件以及服务端的端口绑定事件.

事件轮询分析

EventLoopGroup初始化流程
  • 入口程序代码
代码语言:javascript
复制
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
  • NioEventLoopGroup类图结构
  • EventLoopGroup初始化源码
代码语言:javascript
复制
// NioEventLoopGroup构造器
// NioEventLoopGroup.java
public NioEventLoopGroup(int nThreads) {
    this(nThreads, (Executor) null);
 }

public NioEventLoopGroup(int nThreads, Executor executor) {
  this(nThreads, executor, SelectorProvider.provider());
}

public NioEventLoopGroup(
  int nThreads, Executor executor, final SelectorProvider selectorProvider) {
  // 默认使用阻塞式轮询策略
  this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}

public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,final SelectStrategyFactory selectStrategyFactory) {
        super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());   // channel处理不过来的时候直接丢弃
}

// MultithreadEventLoopGroup.java
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
  // 默认线程数量为CPU*2 或者是通过 io.netty.eventLoopThreads进行配置
  super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}

// MultithreadEventExecutorGroup.java
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
  // 创建默认事件执行选择器(从Group中选择一个EventLoop来处理Channel的策略选择器)
  this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}

// 初始化Group的核心方法
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
        EventExecutorChooserFactory chooserFactory, Object... args) {
}

// 上述可以看到初始化创建默认具备线程池的一些默认策略(线程大小/线程工厂/存储任务队列/丢弃策略)/创建默认的事件轮询选择器/默认的IO复用器提供者
  • EventLoopGroup初始化的核心流程

根据上述可以知道,EventLoopGroup初始化的操作主要是初始化一组EventLoop的执行器,并创建选举EventLoop的选择器,并为每个EventLoop在销毁的时候添加监听器以便于程序能够获取当前EventLoop销毁情况,同时每个EventLoop对外提供服务都是只读模式,也就是选举EventLoop都是处于只读的稳定版本.

EventLoop的初始化流程
  • EventLoop的创建流程包含在上述EventLoopGroup为每个执行器(EventLoop)进行初始化的过程,即在源代码中如下:
代码语言:javascript
复制
// MultithreadEventExecutorGroup.java
// 初始化执行器
children[i] = newChild(executor, args);

// newChild的实现子类NioEventLoopGroup
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
  EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
  return new NioEventLoop(this, executor, (SelectorProvider) args[0],
                          ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
}

// NioEventLoop
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
             SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
             EventLoopTaskQueueFactory queueFactory) {
  super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
        rejectedExecutionHandler);
  this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
  this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
  final SelectorTuple selectorTuple = openSelector();
  this.selector = selectorTuple.selector;
  this.unwrappedSelector = selectorTuple.unwrappedSelector;
}
  • EventLoop的初始化流程
  • 基于上述的认知,我们来总结下EventLoopGroup,EventLoop,EventExecutor以及Thread之间的关系,首先先从源码开始分析如下:
代码语言:javascript
复制
// MultithreadEventExecutorGroup.java
// ThreadPerTaskExecutor看成线程池 - 对应默认的线程工厂类
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());

// 有多少个线程就有多少个EventLoop
children = new EventExecutor[nThreads];

// 用线程池创建EventLoop
children[i] = newChild(executor, args);

// SingleThreadEventExecutor.java
// this为NioEventLoop
this.executor = ThreadExecutorMap.apply(executor, this);

// ThreadExecutorMap.java
// 创建新的执行器
public static Executor apply(final Executor executor, final EventExecutor eventExecutor) {
  // check not null ...
  return new Executor() {
    @Override
    public void execute(final Runnable command) {
      // ThreadPerTaskExecutor.execute -> apply
      // 启动一个线程执行任务并传递事件轮询器
      // FastThreadLocalThread.run()
      executor.execute(apply(command, eventExecutor));
    }
  };
}

// 新任务Task
public static Runnable apply(final Runnable command, final EventExecutor eventExecutor) {
  ObjectUtil.checkNotNull(command, "command");
  ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
  return new Runnable() {
    @Override
    public void run() {
      // 将EventLoop存储到FastThreadLocal(即保证FastThreadLocalThread独占持有自己的EventLoop)
      setCurrentEventExecutor(eventExecutor);
      try {
        // 执行任务
        command.run();
      } finally {
        // 任务执行完之后释放独占EventLoop的资源
        setCurrentEventExecutor(null);
      }
    }
  };
}

private static void setCurrentEventExecutor(EventExecutor executor) {
  // 使用FastThreadLocal来存储事件轮询器,保证每个事件轮询器都会有对应的一个线程来处理
  mappings.set(executor);
}
// 通过上述的流程可知,在每个EventLoop都含有一个新的Executor
// 而每一个Executor都通过默认的线程工厂创建一个FastThreadLocalThread线程来处理task任务
// 此时的Task任务为一个新的任务task
  • 通过源码分析,可以得到以下简要的EventLoopGroup,Group下的线程池Executor,EventLoop与EventLoop下的Executor以及Thread之间的关系如下:

通过上述示意图可知,每个EventLoop处理任务时都会通过Group下的Executor来创建对应的线程来执行EventLoop的事件任务,并且为了保证并发安全问题,在每次处理任务之前,将会把当前的EventLoop与Thread进行绑定,也就是当前EventLoop为当前执行的线程Thread所独占持有,通过FastThreadLocal来维护两者之间的关系,一旦EventLoop事件任务处理完成之后,将解除两者的绑定.同时也可以看到处理一组事件任务的Thread将通过线程组的方式进行维护和管理.

Netty线程模型细化

可以看到上述一个EventLoop绑定一个专有的线程,由专有的线程负责处理EventLoop的事件,且一个channel都会对应着一个EventLoop来负责处理channel相关的事件,同时一个EventLoop/Thread能够处理多个Channel需要依赖于AIO或者是NIO的API才能实现,AbstractBootstrap处理服务端Channel,ServerBootstrap处理客户端Channel,而对于BIO模型而言,只能一个EventLoop/Thread处理对应一个Channel,即摘录《Netty实战》关于NIO/OIO(old IO,BIO)模型如下:

  • 基于NIO/AIO的线程模型
  • 基于BIO的线程模型(OIO为old IO,即使用BIO的API)
  • EventLoop启动任务的执行源码
代码语言:javascript
复制
// 调用以下的方法时执行流程
// SingleThreadEventExecutor.java
eventLoop.execute(task);

// 这里的线程执行流程不弄清楚,后面的事件流程将很理解
// 根据类设计可知,execute为SingleThreadEventExecutor下的方法,结合上面的EventLoop初始化流程可知,每个EventLoop都拥有一个内置的Executor,而这个Executor用于创建FastThreadLocalThread线程来保证当前eventloop与当前线程之间的绑定关联,源码如下:

private void execute(Runnable task, boolean immediate) {
  // 判断当前执行的线程是否与eventloop对应(EventLoop - Thread绑定一起)
  boolean inEventLoop = inEventLoop();
  // 将任务添加到队列中,如果队列满则丢弃当前任务
  addTask(task);
  if (!inEventLoop) {
    // 启动一个线程,如果当前EventLoop持有的线程已经开启过则直接跳过,如果开启过线程,则执行doStartThread方法
    startThread();
    // ...
}

private void doStartThread(){
  // EventLoop持有的executor来创建一个FastThreadLocalThread线程,在该线程中保证当前事件轮询器与线程处于线程安全,通过FastThreadLocal将线程与EventLoop进行关联
  executor.execute(new Runnable() {
    	//....
  });
}
  • 结合EventLoop初始化对应的executor以及ThreadExecutorMap中的源码,现将一个不在当前线程的EventLoop提交任务时创建一个完整线程执行细节流程绘制如下:

也就是说,最终处理任务task都在NioEventLoop执行的run方法中体现,或者更为严格意义上来取决于我们选择的EventLoop的IO操作模式,具体是交由EventLoop的IO操作模式的run方法通过队列中获取任务来进行处理,于是根据源码中提供的任务队列与拒绝策略,对于EventLoop处理任务的流程如下(摘录自《Netty实战》):

  • 与线程池不一样的是,EventLoop是与指定的线程绑定在一起,也就是一个线程处理一个EventLoop,并且在整个Web服务中EventLoop始终是由当前的专有线程负责事件的任务处理
  • 当添加任务到EventLoop执行的时候,需要校验当前的线程是不是持有之前分配好的EventLoop,如果不是那么就添加到任务队列进行等待EventLoop下一次处理事件时再执行,如果队列满了,那么此时就会触发拒绝策略丢弃任务,如果是之前分配好的EventLoop那么就会直接执行任务Task.
Netty之NIO事件轮询流程

基于上述的线程任务流程分析之后,我们知道在EventLoop中最终会调用NioEventLoop下的run方法,对此,现该run方法执行的事件轮询操作流程进行分析.

  • 事件轮询源码
代码语言:javascript
复制
// NioEventLoop.run()核心代码
for(;;){
  	// 检测当前的EventLoop的队列中是否有任务
    if (!hasTasks()) {
      strategy = select(curDeadlineNanos);
    }
  // 根据服务器配置eventloop的IO处理能力比率
  if (ioRatio == 100) {
    // 如果IO处理比率高,则同时处理就绪事件以及当前轮询器队列中的所有任务
    // 不然就分开处理
    try {
      if (strategy > 0) {
        // 处理一系列的就绪事件
        processSelectedKeys();
      }
    } finally {
      // Ensure we always run tasks.
      // 执行所有的任务
      ranTasks = runAllTasks();
    }
  } else if (strategy > 0) {
    final long ioStartTime = System.nanoTime();
    try {
      // 处理就绪事件,处理ACCEPT/READ/WRITE事件
      processSelectedKeys();
    } finally {
      // 在一定事件内处理队列中的任务
      // Ensure we always run tasks.
      final long ioTime = System.nanoTime() - ioStartTime;
      ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
    }
  } else {
    // 处理任务
    ranTasks = runAllTasks(0); // This will run the minimum number of tasks
  }
}

// NioEventLoop的unsafe为NioMessageUnsafe
processSelectedKeys(){
  int readyOps = k.readyOps();
  if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
    int ops = k.interestOps();
    ops &= ~SelectionKey.OP_CONNECT;
    k.interestOps(ops);

    unsafe.finishConnect();
  }

  if ((readyOps & SelectionKey.OP_WRITE) != 0) {
    ch.unsafe().forceFlush();
  }

  if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
    unsafe.read();
  }
}

// runAllTasks
runAllTasks(){
  do {
    fetchedAll = fetchFromScheduledTaskQueue();
    if (runAllTasksFrom(taskQueue)) {
      ranAtLeastOne = true;
    }
  } while (!fetchedAll);
}

protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {
  Runnable task = pollTaskFrom(taskQueue);
  if (task == null) {
    return false;
  }
  for (;;) {
    // 在当前EventLoop所在的线程执行run方法
    // task.run();
    safeExecute(task);
    task = pollTaskFrom(taskQueue);
    if (task == null) {
      return true;
    }
  }
}
  • 事件轮询流程图

至此,关于EventLoop的事件流程分析结束,接下来我们来看启动类添加组件并完成初始化具体事件流程.

启动类初始化组件分析

SeverBootstrap初始化流程
  • 入口程序代码
代码语言:javascript
复制
ServerBootstrap bootstrap = new ServerBootstrap();
  • 启动类的类图结构设计
  • Bootstrap初始化源码
代码语言:javascript
复制
// 在上述执行初始化流程中,会在内部完成以下组件的初始化
// ServerBootstrap.java
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {

    // The order in which child ChannelOptions are applied is important they may depend on each other for validation
    // purposes.
    private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap<ChannelOption<?>, Object>();
    private final Map<AttributeKey<?>, Object> childAttrs = new ConcurrentHashMap<AttributeKey<?>, Object>();
    private final ServerBootstrapConfig config = new ServerBootstrapConfig(this);
}

// ServerBootstrapConfig.java
public final class ServerBootstrapConfig extends AbstractBootstrapConfig<ServerBootstrap, ServerChannel> {

    ServerBootstrapConfig(ServerBootstrap bootstrap) {
        super(bootstrap);
    }
}

// AbstractBootstrapConfig.java
public abstract class AbstractBootstrapConfig<B extends AbstractBootstrap<B, C>, C extends Channel> {
    
    // 存储为子类的server bootstrap,即上述的ServerBootstrap
    protected final B bootstrap;

    protected AbstractBootstrapConfig(B bootstrap) {
        this.bootstrap = ObjectUtil.checkNotNull(bootstrap, "bootstrap");
    }
}

对此,结合之前组件分析,我们知道Channel是存在语义上的层次关系,我们关注ServerBootstrap与ServerBootstrapConfig, AbstractBootstrap与AbstractBootstrapConfig之间在语义层次上分别获取channel信息的区分,其类图组件如下:

EventLoopGroup添加到启动类
  • 入口程序代码
代码语言:javascript
复制
bootsrtap.group(bossGroup, workerGroup);
  • group()事件处理源代码
代码语言:javascript
复制
// ServerBootsrtap.java
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
  super.group(parentGroup);
  if (this.childGroup != null) {
    throw new IllegalStateException("childGroup set already");
  }
  this.childGroup = ObjectUtil.checkNotNull(childGroup, "childGroup");
  return this;
}

// AbstractBootstrap.java
// AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel>>
public B group(EventLoopGroup group) {
  ObjectUtil.checkNotNull(group, "group");
  if (this.group != null) {
    throw new IllegalStateException("group set already");
  }
  this.group = group;
  return self();
}

通过上述可知并结合多Reactor模式可知:

  • ServerBootstrap持有childGroup,用于处理socketChannel的读写事件
  • AbstractBootstrap持有parentGroup,用于处理serverChannel的accept事件
服务端Channel添加到启动类
  • 入口程序
代码语言:javascript
复制
// 创建一个服务端的ServerChannel并指定其BACKLOG大小为100
bootsrtap.channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 100);
  • channel()事件处理的源代码
代码语言:javascript
复制
// AbstractBootstrap.java
// 通过传递的服务端Channel构造一个Channel创建工厂类,用于后续构建服务端的Channel
public B channel(Class<? extends C> channelClass) {
  return channelFactory(new ReflectiveChannelFactory<C>(
    ObjectUtil.checkNotNull(channelClass, "channelClass")
  ));
}

// 服务端Channel的配置存储到容器Map中
public <T> B option(ChannelOption<T> option, T value) {
  ObjectUtil.checkNotNull(option, "option");
  synchronized (options) {
    if (value == null) {
      options.remove(option);
    } else {
      options.put(option, value);
    }
  }
  return self();
}

根据上述源码可知,启动类调用channel()方法目的是创建一个ChannelFactory工厂类,用于后续构建服务端的Channel实例,我们可以看到Netty框架此处使用工厂模式来创建Channel,目的是为了支持创建不同服务端类型的Channel而避免使用new Class()的方式硬编码逐一分别实现,有助不同Channel类型的扩展.

将Handler添加到启动类
  • 入口程序
代码语言:javascript
复制
bootstrap.handler(new LoggingHandler(LogLevel.INFO));
  • handler()源码
代码语言:javascript
复制
// 源码
// AbstractBootstrap.java
// 当前的服务端channelHandler存在于AbstractBootstrap
public B handler(ChannelHandler handler) {
  this.handler = ObjectUtil.checkNotNull(handler, "handler");
  return self();
}

通过上述可知,在启动类传递的handler将保存到AbstractBootstrap类中,该handler()方法主要用于处理服务端channel的事件完成结果的处理,对于服务端而言,主要处理监听客户端连接完成的事件处理

将childHandler添加到启动类
  • 入口程序
代码语言:javascript
复制
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
  // 保证每一个socket channel都会对应着一个自己的channel handler
  @Override
  public void initChannel(SocketChannel ch) throws Exception {
    ChannelPipeline p = ch.pipeline();
    if (sslCtx != null) {
      p.addLast(sslCtx.newHandler(ch.alloc()));
    }
    p.addLast(serverHandler);
  }
});
  • childHandler()源码
代码语言:javascript
复制
// 源码
// ServerBootstrap.java
// 将上述的childHandler绑定到ServerBootstrap,为ServerBootstrap所持有
public ServerBootstrap childHandler(ChannelHandler childHandler) {
  this.childHandler = ObjectUtil.checkNotNull(childHandler, "childHandler");
  return this;
}

class ServerBootstrapAcceptor{
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        final Channel child = (Channel) msg;
    
        child.pipeline().addLast(childHandler);
    }
}

在启动类添加的childHandler最终保存到当前的ServerBootstrap类中,主要用于处理客户端SocketChannel连接的读写事件,在后续事件分析中,我们可以知道添加当前的childHandler将会在Acceptor中注册客户端channel时会将对应的childHandler添加到channel的责任链pipeline中,先前在组件源码分析中已经说明到,ServerChannel是作为客户端SocketChannel的语义层次上的父类,于是对于handler我们也可以理解childHandler是处理客户端读写事件的handler的处理器.

启动类初始化组件小结

通过上述的类图可以知道,ServerBootsrtap与SocketChannel进行关联,AbstractServerBootstrap与ServerSocketChannel进行关联,对于channel,ServerSocketChannel与SocketChannel是层次上的父子关系,对于Bootsrap类抑或是Config类,均通过子类获取与SocketChannel相关的信息,通过父类获取与ServerSocketChannel相关信息,层次划分明确,现将Bootstrap构造初始化操作事件流程绘制如下:

我们知道在Netty框架在处理服务端与客户端的事件是划分层次的,在语义层次上,服务端属于“父类”,客户端属于“子类”,两者之间的事件所依赖的组件也在语义上划分层次,对此,结合上述对EventLoopGroup与EventLoop的源码分析,现将启动类Bootstrap,EventLoopGroup,EventLoop,Channel以及Thread之间的关联示意图绘制如下:

端口绑定事件分析

启动类绑定端口分析
  • 入口程序代码
代码语言:javascript
复制
// 入口程序
bootsrtap.bind(PORT);
  • bind()核心源码摘录
代码语言:javascript
复制
// AbstractBootstrap.java
// 通过类名称可知是创建服务端的Channel并注册Channel事件实现对客户端Channel连接的监听
public ChannelFuture bind(int inetPort) {
  return bind(new InetSocketAddress(inetPort));
}

// 由于注册绑定流程复杂,这里将绑定注册流程划分出来,摘录核心方法,Netty框架中使用EventLoop来处理每个channel事件,存在多线程异步执行的情况.对于异步返回的结果ChannelFuture已在Netty组件源码分析说明到,这里不再详述
// bind包括: 创建channel -> 初始化channel -> 注册channel -> channel绑定端口操作
doBind(final SocketAddress localAddress){
    // 初始化并注册服务端的channel
    initAndRegister();
    
    //...
      
    //如果注册成功,执行服务端channel的绑定操作
    doBind0(regFuture, channel, localAddress, promise);
}
服务端channel创建事件
  • 创建服务端channel的流程
代码语言:javascript
复制
// 源代码
// 使用channelFactory创建NioServerSocketChannel实例
channel = channelFactory.newChannel();
  • NioServerSocketChannel类图结构设计
  • NioServerSocketChannel初始化源码
代码语言:javascript
复制
public NioServerSocketChannel() {
  // 创建java的nio下的ServerSocketChannel并传递到当前的NioServerSocketChannel构造器中
  this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}

public NioServerSocketChannel(ServerSocketChannel channel) {
  // 服务端监听Accept事件并保存,后续在进行注册的时候将会使用到OP_ACCEPT
  	// 1.设置channel的父类,如果当前为服务端的channel则为null
  	// 2.创建channelId
    // 3.创建Nio的Unsafe类
    // 4.创建channel的责任链pipeline,同时每个pipeline都会创建一个双端链表连接上下文对象
  super(null, channel, SelectionKey.OP_ACCEPT);
  // 1. 为当前的channel创建接收数据的ByteBuff分配器,即AdaptiveRecvByteBufAllocator,该分配器默认从1024kb开始创建缓冲区分配数据,最小为64kb,最多不超过65536kb
  // 2. 保存java对象的ServerSocketChannel
  config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}

根据上述源码可知,创建Channel时会将与操作Channel相关的组件也一起完成初始化操作,即创建操作缓冲区数据的Unsafe以及对缓冲区数据进行读写存储的ByteBuf分配器.

服务端channel的初始化流程
  • 初始化源码入口
代码语言:javascript
复制
// AbstractBootstrap调用init方法,但是当前类没有实现,交由子类ServerBootstrap去执行
init(channel);
  • 初始化核心代码
代码语言:javascript
复制
// ServerBootstrap.java

init(){
    // 1. 为当前的channel设置option以及attributes
    // 2. 获取当前channel的责任链,为当前的责任添加初始化handler处理器
    
    // init下初始化handler核心代码
    p.addLast(new ChannelInitializer<Channel>() {
      @Override
      public void initChannel(final Channel ch) {
        final ChannelPipeline pipeline = ch.pipeline();
        // 获取服务端channel的handler处理类
        ChannelHandler handler = config.handler();
        if (handler != null) {
          pipeline.addLast(handler);
        }
    		// 在channel所在的eventloop创建一个线程来执行任务
        ch.eventLoop().execute(new Runnable() {
          @Override
          public void run() {
            // 在任务下为服务端的channel添加Acceptor处理器负责处理客户端channel连接进来的事件完成处理
            pipeline.addLast(new ServerBootstrapAcceptor(
              ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
          }
        });
      }
    });
}

通过上述初始化之后,服务端当前责任链pipeline结构为:head -> initChannelHandler -> tail

服务端channel的注册流程
  • 源码入口
代码语言:javascript
复制
// 源码
// AbstractBootstrap.java
// 获取boss NioEventLoopGroup,将channel注册到当前的group下
ChannelFuture regFuture = config().group().register(channel);
  • 源代码分析
代码语言:javascript
复制
// 根据NioEventLoopGroup的继承类图,可知register方法是在MultithreadEventLoopGroup下
// MultithreadEventLoopGroup.java
 public ChannelFuture register(Channel channel) {
   // 选举一个EventLoop来注册channel
   // 在初始化Group操作的时候已经完成选择器的初始化操作,这里调用选择器来选择一个EventLoop
   // 这里调用EventLoop的注册方法,在上述入口中使用NioEventLoop可知使用的register方法为SingleThreadEventLoop类下的方法,最终调用AbstractChannel下的register方法
   // 方法调用走向如下:
   // MultithreadEventLoopGroup.regitser() -> SingleThreadEventLoop.regitser() -> promise.channel().unsafe().register() -> unsafe(NioMessageUnsafe).regitser() -> AbstractNioUnsafe.regitser() -> AbstractUnsafe.register() -> AbstractUnsafe.register0()
   return next().register(channel);
 }

//AbstractChannel.java下的AbstractUnsafe
register0(promise);

// 上述注册方法的核心步骤:
// 1. 将channel注册到复用器selector上
// 2. 注册完成之后唤醒回调责任链下所有先前已加入的channelHandler类下的handlerAdd方法
// 3. 注册完成之后将结果设置在promise中
// 4. 将注册结果传递到责任链pipeline中,并执行回调channelHandler(ChannelInboundHandler)类下的channelRegistered方法,链式回调执行
// 5. 如果channel为active状态,则继续传播结果事件到channelHandler(ChannelInboundHandler)类下的channelActive方法,链式回调执行
// 6. 5步骤是在第一次进行注册的时候会执行(表示channel已经打开),如果已经注册过,那么校验会自动开始数据读取操作,客户端channel注册读取OP_READ操作, 对应服务端的Channel而言就是监听客户端socket的连接ACCEPT事件

初始化的时候,责任链为head -> initChannelHandler -> tail当完成上述注册流程的时候,执行入站事件,会依次调用责任链,此时责任链最终为head -> handler -> acceptor -> tail,也就是当channel完成注册的时候,才会将启动类中的handler添加到对应channel的pipeline中,否则就不申请内存创建channel对象,类似于懒加载处理,只在完成channel注册的时候(表示当前channel是要进行事件的监听操作)才会初始化一个完整的pipeline下handler,对此,注册前后的pipeline如下:

最后,在基于上述的线程执行任务细节基础之上,将服务端的初始化并注册流程示意图流程绘制如下:

执行端口绑定与监听操作
  • 端口绑定源码分析
代码语言:javascript
复制
// 上述channel注册成功之后,这个时候在上面流程只会触发Active事件,这个时候没有绑定端口没有触发监听事件
// AbstractBootstrap.java
doBind0(regFuture, channel, localAddress, promise);

private static void doBind0(
  // channel所在的eventloop线程执行任务
  channel.eventLoop().execute(new Runnable() {
    @Override
    public void run() {
      // 注册成功将channel进行绑定操作
      if (regFuture.isSuccess()) {
        // 
        channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
      } else {
        promise.setFailure(regFuture.cause());
      }
    }
  });
}
  
  // AbstractChannel.java
  @Override
  public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
    return pipeline.bind(localAddress, promise);
  }
  
  // DefaultChannelPipeline.java
  @Override
  public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
    // 在链表尾部添加绑定操作
    return tail.bind(localAddress, promise);
  }
  
  // AbstractChannelHandlerContext.java
  @Override
  public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
    ObjectUtil.checkNotNull(localAddress, "localAddress");
    if (isNotValidPromise(promise, false)) {
      // cancelled
      return promise;
    }
// 搜索outboundContext上下文
    final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
      // 执行责任链pipeline 出站事件,从链表尾部开始搜索,因而最后的context是headContext
      // 执行headContext下的invokeBind方法,该方法还是属于当前类,对此查看下文
      next.invokeBind(localAddress, promise);
    } else {
      safeExecute(executor, new Runnable() {
        @Override
        public void run() {
          next.invokeBind(localAddress, promise);
        }
      }, promise, null, false);
    }
    return promise;
  }
  
  // AbstractChannelHandlerContext.java
  private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
    if (invokeHandler()) {
      try {
        ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
      } catch (Throwable t) {
        notifyOutboundHandlerException(t, promise);
      }
    } else {
      bind(localAddress, promise);
    }
  }
  
  // headContext的绑定方法
   @Override
  public void bind(
    ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
    unsafe.bind(localAddress, promise);
  }
  
  // unsafe为NioMessageUnsafe,执行该类下的bind方法(AbstractUnsafe.java中定义)
  // 最后再执行channel下的doBind(localAddress);方法,即NioServerSocketChannel下的方法
  @SuppressJava6Requirement(reason = "Usage guarded by java version check")
  @Override
  protected void doBind(SocketAddress localAddress) throws Exception {
    // 可以看到实现了端口的绑定操作
    if (PlatformDependent.javaVersion() >= 7) {
      javaChannel().bind(localAddress, config.getBacklog());
    } else {
      javaChannel().socket().bind(localAddress, config.getBacklog());
    }
  }

对此,基于上述源码的分析,我们绘制服务端channel的端口绑定流程如下:

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-04-24,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 疾风先生 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • EventLoopGroup初始化流程
  • EventLoop的初始化流程
  • Netty线程模型细化
  • Netty之NIO事件轮询流程
  • SeverBootstrap初始化流程
  • EventLoopGroup添加到启动类
  • 服务端Channel添加到启动类
  • 将Handler添加到启动类
  • 将childHandler添加到启动类
  • 启动类初始化组件小结
  • 启动类绑定端口分析
  • 服务端channel创建事件
  • 服务端channel的初始化流程
  • 服务端channel的注册流程
  • 执行端口绑定与监听操作
相关产品与服务
容器服务
腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档