EventLoopGroup是netty中非常重要的一部分,reactor线程和worker线程都依托于它,本篇主要介绍EventLoopGroup的初始化流程
public static void main(String[] args) throws Exception { // Configure SSL. final SslContext sslCtx; if (SSL) { SelfSignedCertificate ssc = new SelfSignedCertificate(); sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build(); } else { sslCtx = null; }
EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new WebSocketServerInitializer(sslCtx));
Channel ch = b.bind(PORT).sync().channel();
System.out.println("Open your web browser and navigate to " + (SSL? "https" : "http") + "://127.0.0.1:" + PORT + '/');
ch.closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }
接下来咱们一层层往里面跟。
是处理请求的核心类,咱们从构造方法开始看:
/** * Create a new instance using the specified number of threads, {@link ThreadFactory} and the * {@link SelectorProvider} which is returned by {@link SelectorProvider#provider()}. */ public NioEventLoopGroup(int nThreads) { this(nThreads, (Executor) null); }
public NioEventLoopGroup(int nThreads, Executor executor) { this(nThreads, executor, SelectorProvider.provider()); }
注意,从这里开始到最后一个构造器,传入的executor为null。
然后到:
public NioEventLoopGroup( int nThreads, Executor executor, final SelectorProvider selectorProvider) { this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE); }
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) { super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject()); }
接下来到了MultithreadEventLoopGroup:
/** * @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor, Object...) */ protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) { super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args); }
这里的DEFAULTEVENTLOOP_THREADS的默认值是cpu核数乘以2:
static { DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt( "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
if (logger.isDebugEnabled()) { logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS); } }
针对上面的bossGroup而言,这里最终传入的nThreads的值是1,而对于workerGroup而言,这里最终传入的nThreads的值是cpu核数*2
可以发现,每进一层,总会添加一个默认属性给构造器,那么接下来:
/** * Create a new instance. * * @param nThreads the number of threads that will be used by this instance. * @param executor the Executor to use, or {@code null} if the default should be used. * @param args arguments which will passed to each {@link #newChild(Executor, Object...)} call */ protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) { this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args); }
这里需要注意下这个DefaultEventExecutorChooserFactory.INSTANCE,对应的是:
针对这个ChooserFactory说几点:
接下来我们紧接着上面的构造方法往下走,到了MultithreadEventExecutorGroup(int nThreads, Executor executor,EventExecutorChooserFactory chooserFactory, Object... args):
/** * Create a new instance. * * @param nThreads the number of threads that will be used by this instance. * @param executor the Executor to use, or {@code null} if the default should be used. * @param chooserFactory the {@link EventExecutorChooserFactory} to use. * @param args arguments which will passed to each {@link #newChild(Executor, Object...)} call */ protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { if (nThreads <= 0) { throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads)); }
if (executor == null) { //标准的netty程序会调用到NioEventLoopGroup的父类MultithreadEventExecutorGroup的如下代码 executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); }
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) { boolean success = false; try { //然后通过newChild的方式传递给NioEventLoop children[i] = newChild(executor, args); success = true; } catch (Exception e) { // TODO: Think about if this is a good exception type throw new IllegalStateException("failed to create a child event loop", e); } finally { if (!success) { for (int j = 0; j < i; j ++) { children[j].shutdownGracefully(); }
for (int j = 0; j < i; j ++) { EventExecutor e = children[j]; try { while (!e.isTerminated()) { e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); } } catch (InterruptedException interrupted) { // Let the caller handle the interruption. Thread.currentThread().interrupt(); break; } } } } } //chooserFactory对应的是DefaultEventExecutorChooserFactory.INSTANCE chooser = chooserFactory.newChooser(children);
final FutureListener<Object> terminationListener = new FutureListener<Object>() { @Override public void operationComplete(Future<Object> future) throws Exception { if (terminatedChildren.incrementAndGet() == children.length) { terminationFuture.setSuccess(null); } } };
for (EventExecutor e: children) { e.terminationFuture().addListener(terminationListener); }
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length); Collections.addAll(childrenSet, children); readonlyChildren = Collections.unmodifiableSet(childrenSet); }
我们知道,从最开始的构造器开始,传入的executor值一直为null,那么这里就需要使用一个ThreadPerTaskExecutor,它的构造为:
public final class ThreadPerTaskExecutor implements Executor { private final ThreadFactory threadFactory;
public ThreadPerTaskExecutor(ThreadFactory threadFactory) { if (threadFactory == null) { throw new NullPointerException("threadFactory"); } this.threadFactory = threadFactory; }
@Override public void execute(Runnable command) { threadFactory.newThread(command).start(); }}
它的代码很少,因为它的核心在于threadFactory,那么我们就来看一看newDefaultThreadFactory()方法是怎么执行的:
protected ThreadFactory newDefaultThreadFactory() { return new DefaultThreadFactory(getClass());//这里的getClass()返回的是当前对象NioEventLoopGroup的class }public DefaultThreadFactory(Class<?> poolType) { this(poolType, false, Thread.NORM_PRIORITY); }
public DefaultThreadFactory(Class<?> poolType, boolean daemon, int priority) { this(toPoolName(poolType), daemon, priority); }
注意,这里给的priority是Thread.NORM_PRIORITY,传入的daemon是false,代表的是非守护线程。
/** * The minimum priority that a thread can have. */ public final static int MIN_PRIORITY = 1;
/** * The default priority that is assigned to a thread. */ public final static int NORM_PRIORITY = 5;
/** * The maximum priority that a thread can have. */ public final static int MAX_PRIORITY = 10;
在这里我们先看一看toPoolName方法:
public static String toPoolName(Class<?> poolType) { if (poolType == null) { throw new NullPointerException("poolType"); }
String poolName = StringUtil.simpleClassName(poolType); switch (poolName.length()) { case 0: return "unknown"; case 1: return poolName.toLowerCase(Locale.US); default: if (Character.isUpperCase(poolName.charAt(0)) && Character.isLowerCase(poolName.charAt(1))) { return Character.toLowerCase(poolName.charAt(0)) + poolName.substring(1); } else { return poolName; } } }
这里返回的是class的simple name,处理成小写的并返回。
继续往下看构造方法:
public DefaultThreadFactory(String poolName, boolean daemon, int priority) { this(poolName, daemon, priority, System.getSecurityManager() == null ? //这里初始化ThreadGroup Thread.currentThread().getThreadGroup() : System.getSecurityManager().getThreadGroup()); }
public DefaultThreadFactory(String poolName, boolean daemon, int priority, ThreadGroup threadGroup) { if (poolName == null) { throw new NullPointerException("poolName"); } //先前传入的是Thread.NORM_PRIORITY if (priority < Thread.MIN_PRIORITY || priority > Thread.MAX_PRIORITY) { throw new IllegalArgumentException( "priority: " + priority + " (expected: Thread.MIN_PRIORITY <= priority <= Thread.MAX_PRIORITY)"); }
prefix = poolName + '-' + poolId.incrementAndGet() + '-'; this.daemon = daemon; this.priority = priority; this.threadGroup = threadGroup; }
到这里DefaultThreadFactory的初始化就结束了。
注意,这里的ThreadGroup是jdk的ThreadGroup
创建EventExecutor,boosGroup中创建一个,workerGroup中创建了cpu核数*2个:
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) { boolean success = false; try { //然后通过newChild的方式传递给NioEventLoop children[i] = newChild(executor, args); success = true; } catch (Exception e) { // TODO: Think about if this is a good exception type throw new IllegalStateException("failed to create a child event loop", e); } finally { if (!success) { //如果没有成功,则把这些线程关闭掉 for (int j = 0; j < i; j ++) { children[j].shutdownGracefully(); }
for (int j = 0; j < i; j ++) { EventExecutor e = children[j]; try { //等待,直到线程关闭 while (!e.isTerminated()) { e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); } } catch (InterruptedException interrupted) { // Let the caller handle the interruption. Thread.currentThread().interrupt(); break; } } } } }
这里是初始化EventExecutor的代码,我们主要看下newChild(executor, args)方法,这个方法的入参是上面1中刚讲的executor和args(selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject())参数, 代码对应的是NioEventLoopGroup中的实现 io.netty.channel.nio.NioEventLoopGroup#newChild:
@Override protected EventLoop newChild(Executor executor, Object... args) throws Exception { //executor然后通过newChild的方式传递给NioEventLoop return new NioEventLoop(this, executor, (SelectorProvider) args[0], ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]); }
EventLoop是EventExecutor的子接口,这里创建了NioEventLoop作为EventExecutor,我们看一下NioEventLoop的构造:
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) { super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler); if (selectorProvider == null) { throw new NullPointerException("selectorProvider"); } if (strategy == null) { throw new NullPointerException("selectStrategy"); } provider = selectorProvider; //打开selector final SelectorTuple selectorTuple = openSelector(); selector = selectorTuple.selector; unwrappedSelector = selectorTuple.unwrappedSelector; selectStrategy = strategy; }
注意这里传入的executor是上面讲到的ThreadPerTaskExecutor哦。
我们看下super方法:
protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor, boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedExecutionHandler) { super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler); tailTasks = newTaskQueue(maxPendingTasks); }
继续super往下分析:
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) { super(parent); this.addTaskWakesUp = addTaskWakesUp; this.maxPendingTasks = Math.max(16, maxPendingTasks); //executor为ThreadPerTaskExecutor this.executor = ObjectUtil.checkNotNull(executor, "executor"); //任务队列 taskQueue = newTaskQueue(this.maxPendingTasks); rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler"); }
继续super往下:
protected AbstractScheduledEventExecutor(EventExecutorGroup parent) { super(parent); }
protected AbstractEventExecutor(EventExecutorGroup parent) { this.parent = parent; }
可以看到NioEventLoop的继承关系SingleThreadEventLoop->SingleThreadEventExecutor->AbstractScheduledEventExecutor->AbstractEventExecutor-->...
EventLoop是netty中极其重要的一部分,这里我们只是简单的过一遍它的初始化过程,它内部的其他核心方法将在后面涉及到的时候进行介绍。
//chooserFactory对应的是DefaultEventExecutorChooserFactory.INSTANCE chooser = chooserFactory.newChooser(children);
//创建监听器 final FutureListener<Object> terminationListener = new FutureListener<Object>() { @Override public void operationComplete(Future<Object> future) throws Exception { if (terminatedChildren.incrementAndGet() == children.length) { terminationFuture.setSuccess(null); } } };
for (EventExecutor e: children) { //设置监听器,e.terminationFuture()会在e结束时回调,并执行监听器中的方法 e.terminationFuture().addListener(terminationListener); }
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length); //将children放入到childrenSet Collections.addAll(childrenSet, children); //将childrenSet包装成不可修改的线程安全set 只读 并让readonlyChildren指向它 readonlyChildren = Collections.unmodifiableSet(childrenSet);
在这里做了一些其他的初始化工作: