前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Netty中的任务队列(添加元素篇)

Netty中的任务队列(添加元素篇)

作者头像
书唐瑞
发布2022-06-02 14:06:55
7100
发布2022-06-02 14:06:55
举报
文章被收录于专栏:Netty历险记

此篇文章讲解一下Netty中的任务队列.这里说的任务队列是Netty中的IO线程对应的任务队列.

在Netty中NioEventLoopGroup这个类相当于线程池,而由它创建的每个NioEventLoop相当于池中的线程,因为每个NioEventLoop都是和唯一的一个线程绑定的,而这个线程只负责IO相关的工作,因此称作IO线程.

在创建NioEventLoop的时候会创建一个与之关联的任务队列(Queue<Runnable> taskQueue).这个任务队列用于'装载'其他非IO线程向IO线程提交的任务,比如业务线程(即非IO线程)需要向对端写数据,那么业务线程会把写数据这个操作封装成一个任务'丢到'任务队列中,由IO线程将数据写到网络中.

代码语言:javascript
复制
private void write(Object msg, boolean flush, ChannelPromise promise) {
    ...
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        if (flush) {
            next.invokeWriteAndFlush(m, promise);
        } else {
            next.invokeWrite(m, promise);
        }
    } else {
        // 业务线程将写操作封装成Task
        final WriteTask task = WriteTask.newInstance(next, m, promise, flush);
        // 提交任务到IO线程对应的任务队列中
        if (!safeExecute(executor, task, promise, m, !flush)) {
            task.cancel();
        }
    }
}

private static boolean safeExecute(EventExecutor executor, Runnable runnable, ChannelPromise promise, Object msg, boolean lazy) {
    // 将任务提交到IO线程的任务队列中
    executor.execute(runnable);    
}

首先看下这个taskQueue是由谁实现的

代码语言:javascript
复制
// 实例化NioEventLoop
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
                 EventLoopTaskQueueFactory queueFactory) {
               
    // newTaskQueue(queueFactory)会实现具体的任务队列
    super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory), rejectedExecutionHandler);
    this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
    this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
    final SelectorTuple selectorTuple = openSelector();
    this.selector = selectorTuple.selector;
    this.unwrappedSelector = selectorTuple.unwrappedSelector;
}
代码语言:javascript
复制
private static Queue<Runnable> newTaskQueue(EventLoopTaskQueueFactory queueFactory
    if (queueFactory == null) {
        // 流程会走到这里
        return newTaskQueue0(DEFAULT_MAX_PENDING_TASKS);
    }
    return queueFactory.newTaskQueue(DEFAULT_MAX_PENDING_TASKS);
}
代码语言:javascript
复制
private static Queue<Runnable> newTaskQueue0(int maxPendingTasks) {
    // 流程会继续调用 PlatformDependent.<Runnable>newMpscQueue()
    return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
            : PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
}
代码语言:javascript
复制
public static <T> Queue<T> newMpscQueue() {
    return Mpsc.newMpscQueue();
}
代码语言:javascript
复制
static <T> Queue<T> newMpscQueue() {
    // 流程会执行第一个new MpscUnboundedArrayQueue<T>(MPSC_CHUNK_SIZE)
    // MPSC_CHUNK_SIZE=1024
    return USE_MPSC_CHUNKED_ARRAY_QUEUE ? new MpscUnboundedArrayQueue<T>(MPSC_CHUNK_SIZE)
        : new MpscUnboundedAtomicArrayQueue<T>(MPSC_CHUNK_SIZE);
}

上面说了这么多,只是想说明taskQueue具体的实现

代码语言:javascript
复制
Queue<Runnable> taskQueue = new MpscUnboundedArrayQueue<T>(1024)

接下来具体分析下MpscUnboundedArrayQueue

代码语言:javascript
复制
org.jctools.queues.MpscUnboundedArrayQueue

在Netty之前的版本中,taskQueue是Netty自身实现它的.但是后面版本就将这个taskQueue的实现'交由'JCTools下的类来实现了.

代码语言:javascript
复制
<dependency>
    <groupId>org.jctools</groupId>
    <artifactId>jctools-core</artifactId>
    <version>3.1.0</version>
</dependency>

在Netty中,多个Netty客户端连接Netty服务端的时候,Netty服务端中的一个IO线程会负责处理多个客户端.

如上图所示,IO线程-1负责处理Netty客户端-1和Netty客户端-2的读写请求.当多个业务线程需要向对端写数据的时候,会把写操作封装成任务'丢到'IO线程-1的任务队列中.

Netty中的线程有个特别的地方,就是一个IO线程会对应多个业务线程,业务线程就是生产者,IO线程就是消费者,它消费业务线程'生产'的任务.属于单消费者多生产者模式.通过类的名称MpscUnboundedArrayQueue可以看出来,这个类就是为多生产者(MultiProducer)单消费者(SingleConsumer)设计的.

MpscUnboundedArrayQueue的底层使用数组的形式存储元素.

代码语言:javascript
复制
// MpscUnboundedArrayQueue继承BaseMpscLinkedArrayQueue
public BaseMpscLinkedArrayQueue(final int initialCapacity) {
    // 转成2^n
    int p2capacity = Pow2.roundToPowerOfTwo(initialCapacity);
    long mask = (p2capacity - 1) << 1;
    // 存储元素的底层数组
    E[] buffer = allocateRefArray(p2capacity + 1);
    producerBuffer = buffer;
    producerMask = mask;
    consumerBuffer = buffer;
    consumerMask = mask;
    soProducerLimit(mask);
}

从全局的角度看下MpscUnboundedArrayQueue的底层结构,这里假设

initialCapacity=4

虽然设置的初始容量大小=4,但是当存放的元素大于4的时候,就会新创建一个与之前同等大小的数组,然后'挂接'到之前的数组. 当再次'装载'不了新放入的元素时候,会再次新创建一个数组'挂接'到之前的数组,以此类推.最后形成一个数组+链表的结构.

结合源码分析下.以下假设初始容量大小initialCapacity=4

代码语言:javascript
复制
// MpscUnboundedArrayQueue继承BaseMpscLinkedArrayQueue
public BaseMpscLinkedArrayQueue(final int initialCapacity) {
    // p2capacity  = 4
    int p2capacity = Pow2.roundToPowerOfTwo(initialCapacity);
    // 转成二进制mask=110
    long mask = (p2capacity - 1) << 1;
    // 存储元素的底层数组大小=4+1=5
    E[] buffer = allocateRefArray(p2capacity + 1);
    // 指向生产者的数组
    producerBuffer = buffer;
    producerMask = mask;
    // 指向消费者的数组
    consumerBuffer = buffer;
    consumerMask = mask;
    // producerLimit=mask=110
    soProducerLimit(mask);
}

构造方法中,虽然设置的初始容量=4,但是在初始化底层数组的时候,分配的大小=5.从上面的图中可以看出,上一个数组为了指向下一个数组,因此数组在设计的时候就必须要多出来一个元素,用于指向下一个数组.

当提交元素的时候,代码如下所示,调用offer方法

代码语言:javascript
复制
MpscUnboundedArrayQueue<Integer> queue = new MpscUnboundedArrayQueue<>(4);
queue.offer(1);
代码语言:javascript
复制
public boolean offer(final E e) {
    long mask;
    E[] buffer;
    long pIndex;

    while (true) {
        long producerLimit = lvProducerLimit();
        pIndex = lvProducerIndex();
        // 表示正在扩容
        if ((pIndex & 1) == 1) {
            continue;
        }
        mask = this.producerMask;
        buffer = this.producerBuffer;
        // 如果提交的元素即将超过容量
        if (producerLimit <= pIndex) {
            int result = offerSlowPath(mask, pIndex, producerLimit);
            switch (result) {
                case CONTINUE_TO_P_INDEX_CAS:
                    break;
                case RETRY:
                    continue;
                case QUEUE_FULL:
                    return false;
                case QUEUE_RESIZE:
                    // 扩容
                    resize(mask, buffer, pIndex, e, null);
                    return true;
            }
        }

        // +2
        if (casProducerIndex(pIndex, pIndex + 2)) {
            break;
        }
    }
    // 计算元素在数组中的偏移地址
    final long offset = modifiedCalcCircularRefElementOffset(pIndex, mask);
    soRefElement(buffer, offset, e);
    return true;
}

首先要明确一点的是,producerIndex(即代码中的pIndex)记录生产者添加元素指向的位置,而且这个位置并不是在数组中的实际下标.

每添加一个元素,producerIndex就会+2.并不是+1.

通过构造方法初始化时,producerLimit=110.

当添加第一个元素之后,pIndex=010

当添加第二个元素之后,pIndex=100

当添加第三个元素之后,pIndex=110

根据上面第16行代码producerLimit <= pIndex满足条件.进入下面的代码

代码语言:javascript
复制
private int offerSlowPath(long mask, long pIndex, long producerLimit) {
    final long cIndex = lvConsumerIndex();
    long bufferCapacity = getCurrentBufferCapacity(mask);

    if (cIndex + bufferCapacity > pIndex) {
        if (!casProducerLimit(producerLimit, cIndex + bufferCapacity)) {
            return RETRY;
        } else {
            return CONTINUE_TO_P_INDEX_CAS;
        }
    } else if (availableInQueue(pIndex, cIndex) <= 0) {
        return QUEUE_FULL;

    }
    // pIndex + 1
    else if (casProducerIndex(pIndex, pIndex + 1)) {
        // 需要扩容
        return QUEUE_RESIZE;
    } else {
        return RETRY;
    }
}

能走到上面的代码,说明此时容器马上满了,需要扩容了,会将pIndex+1.之后就会进入到扩容逻辑.

代码语言:javascript
复制
resize(mask, buffer, pIndex, e, null);

之前的pIndex=110,加1之后,变成pIndex=111.这个时候,其他生产者线程根据(pIndex & 1) == 1判断成立,说明有一个生产者线程正在扩容容器,当前生产者线程需要重试.

也就是说根据最后一个字节,控制是否有生产者线程正在扩容.

代码语言:javascript
复制
public boolean offer(final E e) {
    long mask;
    E[] buffer;
    long pIndex;

    while (true) {
        long producerLimit = lvProducerLimit();
        pIndex = lvProducerIndex();
        // 表示有其他线程正在扩容
        if ((pIndex & 1) == 1) {
            // 重试
            continue;
        }
        ...
    }
    ...
}

扩容的线程会重新创建一个新的数组

代码语言:javascript
复制
private void resize(long oldMask, E[] oldBuffer, long pIndex, E e, Supplier<E> s) {
    int newBufferLength = getNextBufferSize(oldBuffer);
    final E[] newBuffer;
    try {
        // 创建新数组
        newBuffer = allocateRefArray(newBufferLength);
    } catch (OutOfMemoryError oom) {
        assert lvProducerIndex() == pIndex + 1;
        soProducerIndex(pIndex);
        throw oom;
    }

    producerBuffer = newBuffer;
    final int newMask = (newBufferLength - 2) << 1;
    producerMask = newMask;

    final long offsetInOld = modifiedCalcCircularRefElementOffset(pIndex, oldMask);
    final long offsetInNew = modifiedCalcCircularRefElementOffset(pIndex, newMask);

    soRefElement(newBuffer, offsetInNew, e == null ? s.get() : e);
    // 新老数组进行连接
    soRefElement(oldBuffer, nextArrayOffset(oldMask), newBuffer);

    final long cIndex = lvConsumerIndex();
    final long availableInQueue = availableInQueue(pIndex, cIndex);

    soProducerLimit(pIndex + Math.min(newMask, availableInQueue));

    // +2之后,保证其他生产者线程可以继续增加元素了
    soProducerIndex(pIndex + 2);

    // 添加一个JUMP元素
    soRefElement(oldBuffer, offsetInOld, JUMP);
}

在扩容的时候,会添加一个JUMP元素,这个元素是用来告诉消费者,当消费到这类元素的时候,需要跳到下一个数组继续消费.

假设向容器中依次添加1-9这9个元素,它的结构如下

消费者也会按照1-9进行消费.(即添加顺序和消费顺序一致)

在向容器中添加元素的时候,采用如下方式. 根据起始地址+偏移地址,提高添加元素的速度.

代码语言:javascript
复制
static long modifiedCalcCircularRefElementOffset(long index, long mask) {
    return REF_ARRAY_BASE + ((index & mask) << (REF_ELEMENT_SHIFT - 1));
}


public static final long REF_ARRAY_BASE;
public static final int REF_ELEMENT_SHIFT;

static {
    // 数组中一个元素占用的大小
    final int scale = UnsafeAccess.UNSAFE.arrayIndexScale(Object[].class);
    if (4 == scale) {
        REF_ELEMENT_SHIFT = 2;
    } else if (8 == scale) {
        REF_ELEMENT_SHIFT = 3;
    } else {
        throw new IllegalStateException("Unknown pointer size: " + scale);
    }
    // 数组中第一个元素的偏移地址
    REF_ARRAY_BASE = UnsafeAccess.UNSAFE.arrayBaseOffset(Object[].class);
}

此篇简单介绍下Netty中如何使用JCTools中的类在并发场景下提交元素,以及它的底层数据结构. 这种是与传统直接创建一个2倍的数组处理方式不同的.

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-01-06,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Netty历险记 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
容器服务
腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档