前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Netty的异步任务处理与Socket事件处理

Netty的异步任务处理与Socket事件处理

作者头像
止术
发布2021-08-06 14:28:39
1.2K0
发布2021-08-06 14:28:39
举报
文章被收录于专栏:求道求道

经过前面几章的学习,我们基本是明白了Netty通道的创建、注册、与绑定与JDK NIO的对应关系,如果我们使用的是JDK NIO的方式去开发一个Socket服务端的时候,此时还缺少了一个重要的环节,就是循环处理IO事件!

我们前面不只一次的见到Netty的异步事件,因为我们某些知识还没有学习到,所以我们都按照同步的方式去获取的,所以我们本章节将带你学习,Netty对于IO事件的处理与异步事件的处理!

我们以绑定为出发点,由点到面进行分析!

一、源码入口

我们直接进入到绑定的源码分析:

代码语言:javascript
复制
private static void doBind0(
    final ChannelFuture regFuture, final Channel channel,
    final SocketAddress localAddress, final ChannelPromise promise) {

    // 在触发channelRegistered()之前调用此方法。给用户处理程序一个设置的机会
    // 其channelRegistered()实现中的管道。
    channel.eventLoop().execute(() -> {
        if (regFuture.isSuccess()) {
            channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
        } else {
            promise.setFailure(regFuture.cause());
        }
    });
}

我们上节课直接分析的channel.bind方法,而忽略上上面的异步方法,这里我们开始分析异步方法,我们进入到channel.eventLoop().execute()方法:

image-20210430145227945

二、源码分析

我们前面分析过,每个Channel绑定一个NioEventLoop,而EventLoop又是SingleThreadEventExecutor的子类,所以我们进入到io.netty.util.concurrent.SingleThreadEventExecutor#execute(java.lang.Runnable):

代码语言:javascript
复制
@Override
public void execute(Runnable task) {
    ObjectUtil.checkNotNull(task, "task");
    execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task));
}

---------------------------分界线------------------------------------

//继续往下追  execute
private void execute(Runnable task, boolean immediate) {
    //判断当前执行的线程是不是 NIoEventLoopGroup的线程  这里是false
    boolean inEventLoop = inEventLoop();
    //将任务加入到队列
    addTask(task);
    //这里永远只能启动一次  一个eventLoop
    if (!inEventLoop) {
        //启动线程
        startThread();
        .....................................
    }
    //io.netty.channel.nio.NioEventLoop.selector
    if (!addTaskWakesUp && immediate) {
        wakeup(inEventLoop);
    }
}

我们这里可以分为两部分:

1. 添加任务

代码语言:javascript
复制
addTask(task);

----------------------------------分界线---------------------------
    
protected void addTask(Runnable task) {
    ObjectUtil.checkNotNull(task, "task");
    if (!offerTask(task)) {
        reject(task);
    }
}

基础好一点的同学我估计已经有点猜到了,单看这个 offerTask有没有像和队列相关的操作,我们进入到offerTask方法:

代码语言:javascript
复制
final boolean offerTask(Runnable task) {
    ...............忽略.................
    return taskQueue.offer(task);
}

果不其然,果然是入队操作,taskQueue是什么呢?

image-20210430152558414

我们再初始化NioEventLoop的源码分析学习的时候,学习到,我们会创建两个MpscQ队列(多生产者,单消费者),这个taskQueue就是当时我们创建的一个任务队列,这里面将我们提交的异步任务追加到队列里面!

返回异步任务是不是被追加到队列里面了,如果队列满了,或者其他原因追加失败的话,会返回false,就会执行reject方法:

代码语言:javascript
复制
protected final void reject(Runnable task) {
    rejectedExecutionHandler.rejected(task, this);
}

这个拒绝策略同样是我们再创建NioEventLoop的时候创建保存的,给大家留一个作业,去追一下这个拒绝策略,判断一下当发生了添加异步任务失败之后,会发生什么呢?

2. 启动消费线程

代码语言:javascript
复制
startThread();

-----------------------------分割线-------------------------
    /**
     * 启动线程
     */
private void startThread() {
    if (state == ST_NOT_STARTED) {
        if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
            boolean success = false;
            try {
                //启动线程
                doStartThread();
                success = true;
            } finally {
                if (!success) {
                    STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
                }
            }
        }
    }
}

注意,这里有个CAS操作 STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED); 判断消费线程是不是已经启动,如果已经启动就不进入这个逻辑,如果没启动就进入这个逻辑!我们第一次调用,肯定没启动,进入这个逻辑:

代码语言:javascript
复制
doStartThread();
----------------------------分割线---------------------------

private void doStartThread() {
    assert thread == null;
    //创建一条线程并启动
    //这个线程又EventLoop
    executor.execute(new Runnable() {
        @Override
        public void run() {
            //保存当前线程  给线程赋值的就是这里
            thread = Thread.currentThread();
            ...........................忽略........................
            try {
                //进行实际的启动
                //io.netty.channel.nio.NioEventLoop.run
                SingleThreadEventExecutor.this.run();
                success = true;
            } catch (Throwable t) {
                logger.warn("Unexpected exception from an event executor: ", t);
            } finally {
                ...........................忽略........................
            }
        }
        ...........................忽略........................
    }
      ...........................忽略........................
}

代码比较长,我们只分析主线逻辑:

代码语言:javascript
复制
thread = Thread.currentThread();

首先保存了一下当前线程到成员变量,这个分支不是很重要,后面有时间进行分析!

代码语言:javascript
复制
SingleThreadEventExecutor.this.run();

这个就是处理异步任务的代码,我们进入到run方法查看:

image-20210501112253211

代码语言:javascript
复制
@Override
protected void run() {
    int selectCnt = 0;
    for (;;) {
        try {
            int strategy;
            try {
                //存在任务就返回IO时间的数量,不存在任务就返回select阻塞等待事件发生
                strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
                switch (strategy) {
                    case SelectStrategy.CONTINUE:
                        continue;
                    case SelectStrategy.BUSY_WAIT:
                    //如果不存在异步任务  就进行事件选择
                    case SelectStrategy.SELECT:
                        //下一个定时任务的截至时间  当不存在任务的时候就返回-1
                        long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
                        if (curDeadlineNanos == -1L) {
                            curDeadlineNanos = NONE; 
                        }
                        nextWakeupNanos.set(curDeadlineNanos);
                        try {
                            //不存在任务就去阻塞获取IO事件
                            if (!hasTasks()) {
                                strategy = select(curDeadlineNanos);
                            }
                        } finally {
                            nextWakeupNanos.lazySet(AWAKE);
                        }
                    default:
                }
            } catch (IOException e) {
                //替换一个选择器
                rebuildSelector0();
                //选择次数重置为0
                selectCnt = 0;
                //处理循环异常  主要处理方式就是睡眠一会让程序主动释放CPU
                handleLoopException(e);
                continue;
            }
			//本次循环次数+1
            selectCnt++;
            cancelledKeys = 0;
            needsToSelectAgain = false;
            //这里是默认值  50
            final int ioRatio = this.ioRatio;
            boolean ranTasks;
            //不会进这个分支
            if (ioRatio == 100) {
                try {
                    if (strategy > 0) {
                        processSelectedKeys();
                    }
                } finally {
                    // Ensure we always run tasks.
                    ranTasks = runAllTasks();
                }
                //当存在I/O事件的时候
            } else if (strategy > 0) {
                //记录一下当前的时间
                final long ioStartTime = System.nanoTime();
                try {
                    //处理IO事件
                    processSelectedKeys();
                } finally {
                    //计算处理IO事件耗费的事件
                    final long ioTime = System.nanoTime() - ioStartTime;
                    //里面的时间是计算处理异步任务的时间尽量保持为1:1
                    ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                }
            } else {
                //没有IO事件的话就处理异步任务
                ranTasks = runAllTasks(0); 
            }
			
            if (ranTasks || strategy > 0) {
                if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
                    logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                                 selectCnt - 1, selector);
                }
                //没有空轮询的话三次一清空
                selectCnt = 0;
                //如果空轮询的次数超过默认的512次  就处理空轮询BUG的选择器
            } else if (unexpectedSelectorWakeup(selectCnt)) { 
                //空轮询被处理后清空 轮询次数
                selectCnt = 0;
            }
        } catch (CancelledKeyException e) {
            ...................忽略........................
        } finally {
        	...................忽略........................    
        }
    }
}

这主线逻辑分为三个:如何解决IO事件、如何处理异步任务、如何解决空轮询BUG!!分支代码关注一下注释,这里分析下主线代码:

I. I/O事件的处理

processSelectedKeys();

代码语言:javascript
复制
private void processSelectedKeys() {
    if (selectedKeys != null) {
        processSelectedKeysOptimized();
    } else {
        processSelectedKeysPlain(selector.selectedKeys());
    }
}

selectedKeys是我们在创建NIOEventLoop的时候,会创建一个优化后的的SelectorKeySet集合,使用数组来实现的,大家忘记的话,可以会看一下NioEventLoop的初始化源码篇!

当你没有禁用优化的时候,就会进入到if分支,我们查看if内部代码的源码:

代码语言:javascript
复制
private void processSelectedKeysOptimized() {
    //开始遍历所有的主键
    for (int i = 0; i < selectedKeys.size; ++i) {
        //获取事件
        final SelectionKey k = selectedKeys.keys[i];
        //将该位置的数据制空
        selectedKeys.keys[i] = null;
		//获取之间注册NioServerSocketChannel的时候,绑定的Channel对象
        final Object a = k.attachment();

        if (a instanceof AbstractNioChannel) {
            //开始进行IO事件处理
            processSelectedKey(k, (AbstractNioChannel) a);
        } else {
            .........................忽略............................
        }
        .........................忽略............................
    }
}

获取事件集合中的每一个key,同时获取之前绑定的NioServerSocketChannel,然后调用processSelectedKey处理这个事件:

代码语言:javascript
复制
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    if (!k.isValid()) {
        //当key失效之后,就关闭通道
        ....................忽略....................
    }

    try {
        //获取当前事件的key 掩码
        int readyOps = k.readyOps();
        //是否包含连接事件
        if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
            //获取包含的事件
            int ops = k.interestOps();
            //剔除OP_CONNECT事件
            ops &= ~SelectionKey.OP_CONNECT;
            //重新更新关注的事件
            k.interestOps(ops);
			//传播 connect事件
            unsafe.finishConnect();
        }
		//如果当前返回的关注事件的掩码包含 OP_WRITE的话
        if ((readyOps & SelectionKey.OP_WRITE) != 0) {
            //开始向通道内刷新数据
            ch.unsafe().forceFlush();
        }
//如果当前的事件掩码包含读、新连接接入事件  或者 不关注任何事件的时候  传播read事件
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {	  //传播read事件 可能是新连接接入也可能有数据可读
            unsafe.read();
        }
    } catch (CancelledKeyException ignored) {
        //发生异常关闭通道
        unsafe.close(unsafe.voidPromise());
    }
}

大家可以看到,里面的处理基本和我们对于JDK NIO的处理一致,就是判断各种事件然后进行对应的处理!

II、异步任务的处理
代码语言:javascript
复制
runAllTasks();
代码语言:javascript
复制
protected boolean runAllTasks() {
   assert inEventLoop();
   boolean fetchedAll;
   boolean ranAtLeastOne = false;

   do {
       //合并任务  将定时任务的队列里面的任务拉去出来,和异步任务的队列进行合并
       fetchedAll = fetchFromScheduledTaskQueue();
       //开始执行全部的任务
       if (runAllTasksFrom(taskQueue)) {
           ranAtLeastOne = true;
       }
   } while (!fetchedAll); 

   if (ranAtLeastOne) {
       lastExecutionTime = ScheduledFutureTask.nanoTime();
   }
   afterRunningAllTasks();
   return ranAtLeastOne;
}

这里就是异步任务的被执行的地方,这里分为两个步骤:1. 合并任务 2.执行taskQueue异步任务 3.执行tailQueue异步任务!

合并任务

代码语言:javascript
复制
fetchedAll = fetchFromScheduledTaskQueue();

Netty在我们学习中已经知道了两种队列,一种是taskQueue队列,一种是tailQueue队列,现在又出现了第三种队列:scheduledTaskQueue,他是一个专门存放定时任务的对队列,这里的合并任务就是将即将要执行的任务合并到taskQueue中等待执行!

这行代码执行完毕后,所有即将要执行的任务都被添加在了taskQueue队列中,等待后续的执行!

执行taskQueue异步任务

代码语言:javascript
复制
//注意这里传入的是合并完成后额taskQueue
runAllTasksFrom(taskQueue)

上述代码将对应的任务全部集中到了taskQueue队列中后们这里开始消费taskQueue队列进行执行!我们可以适当的看一下源码:

代码语言:javascript
复制
protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {
    //从taskQueue队列中弹出一个任务
    Runnable task = pollTaskFrom(taskQueue);
    if (task == null) {
        return false;
    }
    for (;;) {
        //执行任务  调用run方法
        safeExecute(task);
        //继续弹出任务
        task = pollTaskFrom(taskQueue);
        //如果弹出的任务为空
        if (task == null) {
            //直接返回
            return true;
        }
    }
}

执行tailQueue异步任务

代码语言:javascript
复制
afterRunningAllTasks();

这里开始执行tailQueue节点的任务,可以看到,tailQueue节点的任务执行优先级低于上述两种队列!

image-20210503101059511

代码语言:javascript
复制
@Override
protected void afterRunningAllTasks() {
    //注意这里传入的是 tailQueue
    runAllTasksFrom(tailTasks);
}

//继续往下看源码
protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {
    //弹出任务
    Runnable task = pollTaskFrom(taskQueue);
    if (task == null) {
        return false;
    }
    for (;;) {
        //执行任务
        safeExecute(task);
        //再次弹出任务
        task = pollTaskFrom(taskQueue);
        if (task == null) {
            //任务执行完毕  返回true
            return true;
        }
    }
}

这里就不作过多讲解了,这里和上面的逻辑基本一致,只是执行的qeueb不是一个!

III、解决臭名昭著的JDK空轮询BUG

可能大家大家都知道,JDK NIO在事件循环判断的时候可能会出现空轮询的BUG,导致CPU100%,虽然Oracle官方宣称空轮询的BUG已经解决了,但是后续经过一些公司实际的业务上证明并没有解决,只是出现几率小了点,Netty事实上并没有解决这个空轮询BUG只是用另外一种比较巧妙的方法规避开了,我们一起学习下:

首先,我们先想一下,我们如何断定我们的程序可能发生了空轮询的BUG,学习过NIO的都知道,我们会调用一个selector.select()进行阻塞等待有完成的事件发生,当selet方法阻塞解除的时候,就证明一定有我么感兴趣的事件发生,但是当我们发现select方法解除了阻塞,但是事件数量却为0的时候,我们就认为可能出现了空轮询的BUG!

但是IO数量为0并不是一定出现了空轮询的BUG,也可能外部调用了markUp方法,所以我们不能每一次出现事件数量为0的时候都认为程序出现了空轮询BUG,所以我们就需要有一个记录它出现该类异常情况发生的次数,当发生的次数达到了我们设置的阈值,就证明它可能发生了空轮询的BUG,这个时候需要处理这个空轮询的BUG!

那么如何处理呢? 我们任务发生空轮询问题是因为(JDK官方认为,这个Linux Epoll告诉JDK有事件了,但是JDK获取事件的时候获取了一个空,所以JDK只能返回一个0)所以就发生了空轮询:

JDK官方给出的解决方案

Netty是使用的第三种,抛弃旧的选择器,重建一个新的选择器,然后替换旧的选择器,我们一起看下源码!

我们看看Netty是如何做的,我们回到io.netty.channel.nio.NioEventLoop#run源码:

我还是,为了方便讲解,把这段代码贴出来省略和空轮询无关的代码(完整代码见上):

代码语言:javascript
复制
@Override
protected void run() {
    int selectCnt = 0;
    for (;;) {
        ........................忽略进行事件选择的代码...................
			//本次循环次数+1
            selectCnt++;
        ....................忽略事件处理和异步任务执行的代码................
			//当处理的异步任务或者IO事件的数量大于0,证明没有发生空轮询
            if (ranTasks || strategy > 0) {
                //每隔三次打印一次日志
                if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
                    logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                                 selectCnt - 1, selector);
                }
                //没有空轮询的话清空 
                selectCnt = 0;
                //如果出现异步任务为空  IO事件为空的话就会进入到这个逻辑
            } else if (unexpectedSelectorWakeup(selectCnt)) { 
                //空轮询被处理后清空 轮询次数
                selectCnt = 0;
            }
    } catch (CancelledKeyException e) {
        ...................忽略........................
    } finally {
        ...................忽略........................    
    }
}

可以仔细的看一下 上述代码的注释,我们进入到 unexpectedSelectorWakeup(selectCnt) 方法:

代码语言:javascript
复制
private boolean unexpectedSelectorWakeup(int selectCnt) {
    ..............忽略日志打印................
    if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
        //判断异常情况的次数是不是超过了预设的512次
        selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
        //开始重新构建一个selector
        rebuildSelector();
        return true;
    }
    return false;
}

我们读源码到这里,可以知道,当异常执行的次数超过了阈值 512次,就会调用一个 rebuildSelector方法,我们点进去看一下:

代码语言:javascript
复制
public void rebuildSelector() {
    if (!inEventLoop()) {
        execute(new Runnable() {
            @Override
            public void run() {
                rebuildSelector0();
            }
        });
        return;
    }
    rebuildSelector0();
}

我们按照惯例,按照同步方法调用 rebuildSelector0();

代码语言:javascript
复制
private void rebuildSelector0() {
    //获取原始的选择器
    final Selector oldSelector = selector;
    //声明一个新的选择器
    final SelectorTuple newSelectorTuple;

    if (oldSelector == null) {
        return;
    }

    try {
        //创建一个新的选择器,赋值给新的选择器变量
        newSelectorTuple = openSelector();
    } catch (Exception e) {
        logger.warn("Failed to create a new Selector.", e);
        return;
    }

    int nChannels = 0;
    //开始遍历旧的选择器,将旧选择器的IO事件的key,绑定到新创建的选择器上
    for (SelectionKey key: oldSelector.keys()) {
        //获取旧选择器的管道
        Object a = key.attachment();
        try {
            //如果key失效了,就跳过!
            if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {
                continue;
            }
			//获取对应关注的事件掩码
            int interestOps = key.interestOps();
            //将旧key置为失效
            key.cancel();
            //重新将管道绑定到新的选择器上
            SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);
            //替换管道里面保存的选择器事件主键
            if (a instanceof AbstractNioChannel) {
                // Update SelectionKey
                ((AbstractNioChannel) a).selectionKey = newKey;
            }
            nChannels ++;
        } catch (Exception e) {
            ...............省略...............
        }
    }
	//重新保存新的优化后的选择器和原始选择器  
    selector = newSelectorTuple.selector;
    unwrappedSelector = newSelectorTuple.unwrappedSelector;

    try {
        //关闭旧的选择器
        oldSelector.close();
    } catch (Throwable t) {
        if (logger.isWarnEnabled()) {
            ...............省略..................
        }
    }
	...............省略..................
}

我们从上述代码可以看到,Netty处理空轮询的问题的策略是,当发现你可能发生空轮询的次数超过了512次的时候,就直接重新获取一个新的选择器,然后将旧的选择器直接替换掉,这样空轮询的BUG也就很轻易的解决了!

三、总结

  1. 每一个EventLoop都会启动一条永久运行的线程,用于处理异步任务和IO事件,我们称之为Reactor线程。
  2. 如果存在IO事件的话,会先处理IO事件!
  3. Reactor线程会先将定时任务里面的任务合并到taskqueue里面,然后执行!taskQueue执行完毕后执行tailQueue队列的任务!
  4. 如果空轮询的次数发生了512次,就认为发生了空轮询的BUG,就会抛弃原来的选择器,重建一个新的选择器,将旧选择器上的事件全部绑定到新的选择器上,然后将旧选择器删除!

才疏学浅,如果文章中理解有误,欢迎大佬们私聊指正!欢迎关注作者的公众号,一起进步,一起学习!

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-07-20,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 源码学徒 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、源码入口
  • 二、源码分析
    • 1. 添加任务
      • 2. 启动消费线程
        • I. I/O事件的处理
        • II、异步任务的处理
        • III、解决臭名昭著的JDK空轮询BUG
    • 三、总结
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档