前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >netty源码分析二之EventLoopGroup初始化

netty源码分析二之EventLoopGroup初始化

作者头像
山行AI
发布2019-10-24 14:49:59
6850
发布2019-10-24 14:49:59
举报
文章被收录于专栏:山行AI

EventLoopGroup是netty中非常重要的一部分,reactor线程和worker线程都依托于它,本篇主要介绍EventLoopGroup的初始化流程

启动代码

代码语言:javascript
复制
 public static void main(String[] args) throws Exception {        // Configure SSL.        final SslContext sslCtx;        if (SSL) {            SelfSignedCertificate ssc = new SelfSignedCertificate();            sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();        } else {            sslCtx = null;        }
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);        EventLoopGroup workerGroup = new NioEventLoopGroup();        try {            ServerBootstrap b = new ServerBootstrap();            b.group(bossGroup, workerGroup)             .channel(NioServerSocketChannel.class)             .childHandler(new WebSocketServerInitializer(sslCtx));
            Channel ch = b.bind(PORT).sync().channel();
            System.out.println("Open your web browser and navigate to " +                    (SSL? "https" : "http") + "://127.0.0.1:" + PORT + '/');
            ch.closeFuture().sync();        } finally {            bossGroup.shutdownGracefully();            workerGroup.shutdownGracefully();        }    }
  • EventLoopGroup bossGroup = new NioEventLoopGroup(1)用于指定reactor线程
  • EventLoopGroup workerGroup = new NioEventLoopGroup() 用于指定worker线程
  • channel(NioServerSocketChannel.class)用于指定ServerSocketChannel类型为NioServerSocketChannel
  • childHandler(new WebSocketServerInitializer(sslCtx))用于指定使用的handler,是要往pipeline中添加的处理器,pipeline是一个链表的结构,相当于一个执行器链条。
  • bind(PORT) 是用来给ServerSocketChannel绑定端口,这个在上一篇(netty源码分析一中已经讲过)

接下来咱们一层层往里面跟。

EventLoopGroup

是处理请求的核心类,咱们从构造方法开始看:

代码语言:javascript
复制
   /**     * Create a new instance using the specified number of threads, {@link ThreadFactory} and the     * {@link SelectorProvider} which is returned by {@link SelectorProvider#provider()}.     */    public NioEventLoopGroup(int nThreads) {        this(nThreads, (Executor) null);    }
   public NioEventLoopGroup(int nThreads, Executor executor) {           this(nThreads, executor, SelectorProvider.provider());       } 

注意,从这里开始到最后一个构造器,传入的executor为null。

然后到:

代码语言:javascript
复制
 public NioEventLoopGroup(            int nThreads, Executor executor, final SelectorProvider selectorProvider) {        this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);    }
 public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,                              final SelectStrategyFactory selectStrategyFactory) {         super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());     }

接下来到了MultithreadEventLoopGroup:

代码语言:javascript
复制
 /**     * @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor, Object...)     */    protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);    }

这里的DEFAULTEVENTLOOP_THREADS的默认值是cpu核数乘以2:

代码语言:javascript
复制
 static {        DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(                "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
        if (logger.isDebugEnabled()) {            logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);        }    }

针对上面的bossGroup而言,这里最终传入的nThreads的值是1,而对于workerGroup而言,这里最终传入的nThreads的值是cpu核数*2

可以发现,每进一层,总会添加一个默认属性给构造器,那么接下来:

代码语言:javascript
复制
  /**     * Create a new instance.     *     * @param nThreads          the number of threads that will be used by this instance.     * @param executor          the Executor to use, or {@code null} if the default should be used.     * @param args              arguments which will passed to each {@link #newChild(Executor, Object...)} call     */    protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {        this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);    }

这里需要注意下这个DefaultEventExecutorChooserFactory.INSTANCE,对应的是:

针对这个ChooserFactory说几点:

  1. isPowerOfTwo(executors.length)用于判断executors.length的值是不是2的n次方值
  2. PowerOfTwoEventExecutorChooser与GenericEventExecutorChooser的区别在于next()方法,一个是通过与运算来获取数组中的索引值,一个是通过取余运算来计算索引值,毫无疑问,取与运算效率更高。
  3. 如果符合isPowerOfTwo则选用PowerOfTwoEventExecutorChooser,否则选择用GenericEventExecutorChooser

接下来我们紧接着上面的构造方法往下走,到了MultithreadEventExecutorGroup(int nThreads, Executor executor,EventExecutorChooserFactory chooserFactory, Object... args):

代码语言:javascript
复制
 /**     * Create a new instance.     *     * @param nThreads          the number of threads that will be used by this instance.     * @param executor          the Executor to use, or {@code null} if the default should be used.     * @param chooserFactory    the {@link EventExecutorChooserFactory} to use.     * @param args              arguments which will passed to each {@link #newChild(Executor, Object...)} call     */    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,                                            EventExecutorChooserFactory chooserFactory, Object... args) {        if (nThreads <= 0) {            throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));        }
        if (executor == null) {            //标准的netty程序会调用到NioEventLoopGroup的父类MultithreadEventExecutorGroup的如下代码            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());        }
        children = new EventExecutor[nThreads];
        for (int i = 0; i < nThreads; i ++) {            boolean success = false;            try {                //然后通过newChild的方式传递给NioEventLoop                children[i] = newChild(executor, args);                success = true;            } catch (Exception e) {                // TODO: Think about if this is a good exception type                throw new IllegalStateException("failed to create a child event loop", e);            } finally {                if (!success) {                    for (int j = 0; j < i; j ++) {                        children[j].shutdownGracefully();                    }
                    for (int j = 0; j < i; j ++) {                        EventExecutor e = children[j];                        try {                            while (!e.isTerminated()) {                                e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);                            }                        } catch (InterruptedException interrupted) {                            // Let the caller handle the interruption.                            Thread.currentThread().interrupt();                            break;                        }                    }                }            }        }        //chooserFactory对应的是DefaultEventExecutorChooserFactory.INSTANCE        chooser = chooserFactory.newChooser(children);
        final FutureListener<Object> terminationListener = new FutureListener<Object>() {            @Override            public void operationComplete(Future<Object> future) throws Exception {                if (terminatedChildren.incrementAndGet() == children.length) {                    terminationFuture.setSuccess(null);                }            }        };
        for (EventExecutor e: children) {            e.terminationFuture().addListener(terminationListener);        }
        Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);        Collections.addAll(childrenSet, children);        readonlyChildren = Collections.unmodifiableSet(childrenSet);    }

1. executor = new ThreadPerTaskExecutor(newDefaultThreadFactory())

我们知道,从最开始的构造器开始,传入的executor值一直为null,那么这里就需要使用一个ThreadPerTaskExecutor,它的构造为:

代码语言:javascript
复制
public final class ThreadPerTaskExecutor implements Executor {    private final ThreadFactory threadFactory;
    public ThreadPerTaskExecutor(ThreadFactory threadFactory) {        if (threadFactory == null) {            throw new NullPointerException("threadFactory");        }        this.threadFactory = threadFactory;    }
    @Override    public void execute(Runnable command) {        threadFactory.newThread(command).start();    }}

它的代码很少,因为它的核心在于threadFactory,那么我们就来看一看newDefaultThreadFactory()方法是怎么执行的:

代码语言:javascript
复制
 protected ThreadFactory newDefaultThreadFactory() {        return new DefaultThreadFactory(getClass());//这里的getClass()返回的是当前对象NioEventLoopGroup的class    }public DefaultThreadFactory(Class<?> poolType) {        this(poolType, false, Thread.NORM_PRIORITY);    }
public DefaultThreadFactory(Class<?> poolType, boolean daemon, int priority) {        this(toPoolName(poolType), daemon, priority);    }

注意,这里给的priority是Thread.NORM_PRIORITY,传入的daemon是false,代表的是非守护线程。

代码语言:javascript
复制
    /**     * The minimum priority that a thread can have.     */    public final static int MIN_PRIORITY = 1;
   /**     * The default priority that is assigned to a thread.     */    public final static int NORM_PRIORITY = 5;
    /**     * The maximum priority that a thread can have.     */    public final static int MAX_PRIORITY = 10;

在这里我们先看一看toPoolName方法:

代码语言:javascript
复制
public static String toPoolName(Class<?> poolType) {        if (poolType == null) {            throw new NullPointerException("poolType");        }
        String poolName = StringUtil.simpleClassName(poolType);        switch (poolName.length()) {            case 0:                return "unknown";            case 1:                return poolName.toLowerCase(Locale.US);            default:                if (Character.isUpperCase(poolName.charAt(0)) && Character.isLowerCase(poolName.charAt(1))) {                    return Character.toLowerCase(poolName.charAt(0)) + poolName.substring(1);                } else {                    return poolName;                }        }    }

这里返回的是class的simple name,处理成小写的并返回。

继续往下看构造方法:

代码语言:javascript
复制
  public DefaultThreadFactory(String poolName, boolean daemon, int priority) {        this(poolName, daemon, priority, System.getSecurityManager() == null ?        //这里初始化ThreadGroup                Thread.currentThread().getThreadGroup() : System.getSecurityManager().getThreadGroup());    }
 public DefaultThreadFactory(String poolName, boolean daemon, int priority, ThreadGroup threadGroup) {        if (poolName == null) {            throw new NullPointerException("poolName");        }        //先前传入的是Thread.NORM_PRIORITY        if (priority < Thread.MIN_PRIORITY || priority > Thread.MAX_PRIORITY) {            throw new IllegalArgumentException(                    "priority: " + priority + " (expected: Thread.MIN_PRIORITY <= priority <= Thread.MAX_PRIORITY)");        }
        prefix = poolName + '-' + poolId.incrementAndGet() + '-';        this.daemon = daemon;        this.priority = priority;        this.threadGroup = threadGroup;    }

到这里DefaultThreadFactory的初始化就结束了。

注意,这里的ThreadGroup是jdk的ThreadGroup

2. children = new EventExecutor[nThreads] 初始化线程执行器

创建EventExecutor,boosGroup中创建一个,workerGroup中创建了cpu核数*2个:

代码语言:javascript
复制
 children = new EventExecutor[nThreads];
        for (int i = 0; i < nThreads; i ++) {            boolean success = false;            try {                //然后通过newChild的方式传递给NioEventLoop                children[i] = newChild(executor, args);                success = true;            } catch (Exception e) {                // TODO: Think about if this is a good exception type                throw new IllegalStateException("failed to create a child event loop", e);            } finally {                if (!success) {                //如果没有成功,则把这些线程关闭掉                    for (int j = 0; j < i; j ++) {                        children[j].shutdownGracefully();                    }
                    for (int j = 0; j < i; j ++) {                        EventExecutor e = children[j];                        try {                        //等待,直到线程关闭                            while (!e.isTerminated()) {                                e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);                            }                        } catch (InterruptedException interrupted) {                            // Let the caller handle the interruption.                            Thread.currentThread().interrupt();                            break;                        }                    }                }            }        }

这里是初始化EventExecutor的代码,我们主要看下newChild(executor, args)方法,这个方法的入参是上面1中刚讲的executor和args(selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject())参数, 代码对应的是NioEventLoopGroup中的实现 io.netty.channel.nio.NioEventLoopGroup#newChild:

代码语言:javascript
复制
 @Override    protected EventLoop newChild(Executor executor, Object... args) throws Exception {        //executor然后通过newChild的方式传递给NioEventLoop        return new NioEventLoop(this, executor, (SelectorProvider) args[0],            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);    }

EventLoop是EventExecutor的子接口,这里创建了NioEventLoop作为EventExecutor,我们看一下NioEventLoop的构造:

代码语言:javascript
复制
 NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {        super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);        if (selectorProvider == null) {            throw new NullPointerException("selectorProvider");        }        if (strategy == null) {            throw new NullPointerException("selectStrategy");        }        provider = selectorProvider;        //打开selector        final SelectorTuple selectorTuple = openSelector();        selector = selectorTuple.selector;        unwrappedSelector = selectorTuple.unwrappedSelector;        selectStrategy = strategy;    }

注意这里传入的executor是上面讲到的ThreadPerTaskExecutor哦。

我们看下super方法:

代码语言:javascript
复制
protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,                                    boolean addTaskWakesUp, int maxPendingTasks,                                    RejectedExecutionHandler rejectedExecutionHandler) {        super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);        tailTasks = newTaskQueue(maxPendingTasks);    }

继续super往下分析:

代码语言:javascript
复制
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,                                        boolean addTaskWakesUp, int maxPendingTasks,                                        RejectedExecutionHandler rejectedHandler) {        super(parent);        this.addTaskWakesUp = addTaskWakesUp;        this.maxPendingTasks = Math.max(16, maxPendingTasks);        //executor为ThreadPerTaskExecutor        this.executor = ObjectUtil.checkNotNull(executor, "executor");        //任务队列        taskQueue = newTaskQueue(this.maxPendingTasks);        rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");    }

继续super往下:

代码语言:javascript
复制
 protected AbstractScheduledEventExecutor(EventExecutorGroup parent) {        super(parent);    }
 protected AbstractEventExecutor(EventExecutorGroup parent) {         this.parent = parent;     }   

可以看到NioEventLoop的继承关系SingleThreadEventLoop->SingleThreadEventExecutor->AbstractScheduledEventExecutor->AbstractEventExecutor-->...

EventLoop是netty中极其重要的一部分,这里我们只是简单的过一遍它的初始化过程,它内部的其他核心方法将在后面涉及到的时候进行介绍。

3. MultithreadEventExecutorGroup剩余的初始化部分

代码语言:javascript
复制
 //chooserFactory对应的是DefaultEventExecutorChooserFactory.INSTANCE        chooser = chooserFactory.newChooser(children);
        //创建监听器        final FutureListener<Object> terminationListener = new FutureListener<Object>() {            @Override            public void operationComplete(Future<Object> future) throws Exception {                if (terminatedChildren.incrementAndGet() == children.length) {                    terminationFuture.setSuccess(null);                }            }        };
        for (EventExecutor e: children) {            //设置监听器,e.terminationFuture()会在e结束时回调,并执行监听器中的方法            e.terminationFuture().addListener(terminationListener);        }
        Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);        //将children放入到childrenSet        Collections.addAll(childrenSet, children);        //将childrenSet包装成不可修改的线程安全set 只读   并让readonlyChildren指向它        readonlyChildren = Collections.unmodifiableSet(childrenSet);

在这里做了一些其他的初始化工作:

  1. chooser的初始化,使用上面分析的初始化完成的children集合创建chooser,这样后面就可以通过调用chooser的方法来获取到executor了;
  2. 给children集合中的每个executor添加监听器listener
  3. 将children放入Set中并包装成一个线程安全的只读set,赋值给readonlyChildren
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-10-18,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 开发架构二三事 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 启动代码
    • EventLoopGroup
      • 1. executor = new ThreadPerTaskExecutor(newDefaultThreadFactory())
      • 2. children = new EventExecutor[nThreads] 初始化线程执行器
      • 3. MultithreadEventExecutorGroup剩余的初始化部分
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档