前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >MPSCArrayQueue源码分析

MPSCArrayQueue源码分析

作者头像
山行AI
发布2019-10-14 16:06:09
1.2K0
发布2019-10-14 16:06:09
举报
文章被收录于专栏:山行AI山行AI

JCTools是服务虚拟机并发开发的工具,提供一些JDK没有的并发数据结构辅助开发。

聚合四种 SPSC/MPSC/SPMC/MPMC 数据变量的并发队列:

  • SPSC:单个生产者对单个消费者(无等待、有界和无界都有实现)
  • MPSC:多个生产者对单个消费者(无锁、有界和无界都有实现)
  • SPMC:单生产者对多个消费者(无锁 有界)
  • MPMC:多生产者对多个消费者(无锁、有界)

这里简要根据https://gitee.com/eric_ds/baseutil中的MPSCArrayQueue版本源码进行分析来理解这个高性能的的无锁队列的实现原理。

MPSCArrayQueue源码

代码语言:javascript
复制
package com.jfireframework.baseutil.concurrent;

import java.lang.reflect.Array;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Queue;
import com.jfireframework.baseutil.reflect.UNSAFE;
abstract class PadFor64Bit
{
    // 64长度的缓存行,要进行填充,需要8个byte。
    long p1, p2, p3, p4, p5, p6, p7;

    public static long noHuop(PadFor64Bit instance)
    {
        return instance.p1 + instance.p2 + instance.p3 + instance.p4 + instance.p5 + instance.p6 + instance.p7;
    }
}

abstract class ProducerIndex extends PadFor64Bit
{
    volatile long             producerIndex;
    private static final long OFFSET = UNSAFE.getFieldOffset("producerIndex", ProducerIndex.class);

    boolean casProducerIndex(long index)
    {
        //对producerIndex加1,注意一点,和disruptor不同的是,producerIndex的值是一直增加的
        return UNSAFE.compareAndSwapLong(this, OFFSET, index, index + 1);
    }
}

abstract class Pad2 extends ProducerIndex
{
    public long p1, p2, p3, p4, p5, p6, p7;

    public static long noHuop(Pad2 instance)
    {
        return instance.p1 + instance.p2 + instance.p3 + instance.p4 + instance.p5 + instance.p6 + instance.p7;
    }
}

abstract class ComsumerIndex extends Pad2
{
    volatile long             consumerIndex;
    private static final long OFFSET = UNSAFE.getFieldOffset("consumerIndex", ComsumerIndex.class);

    void orderedSetComsumerIndex(long index)
    {
        UNSAFE.putOrderedLong(this, OFFSET, index);
    }
}

abstract class Pad3 extends ComsumerIndex
{
    long p1, p2, p3, p4, p5, p6, p7;//缓存行填充

    public static long noHuop(Pad3 instance)
    {
        return instance.p1 + instance.p2 + instance.p3 + instance.p4 + instance.p5 + instance.p6 + instance.p7;
    }
}

abstract class ProducerIndexLimit extends Pad3
{
    volatile long             producerIndexLimit = 0;
    private static final long OFFSET             = UNSAFE.getFieldOffset("producerIndexLimit", ProducerIndexLimit.class);

    void orderedSetProducerIndexLimit(long limit)
    {
        UNSAFE.putOrderedLong(this, OFFSET, limit);
    }
}

abstract class Pad4 extends ProducerIndexLimit
{
    long             p1, p2, p3, p4, p5, p6, p7;

    static final int availableBufferOffset = UNSAFE.arrayBaseOffset(new int[0].getClass());
    static final int bufferOffset          = UNSAFE.arrayBaseOffset(Object[].class);
    static final int availableBufferScaleShift;
    static final int bufferScaleShift;

    static
    {
        int availableBufferScale = UNSAFE.arrayIndexScale(new int[0].getClass());
        if (availableBufferScale == 4)
        {
            availableBufferScaleShift = 2;
        }
        else if (availableBufferScale == 8)
        {
            availableBufferScaleShift = 3;
        }
        else
        {
            throw new IllegalArgumentException();
        }
        int bufferScale = UNSAFE.arrayIndexScale(Object[].class);
        if (bufferScale == 4)
        {
            bufferScaleShift = 2;
        }
        else if (bufferScale == 8)
        {
            bufferScaleShift = 3;
        }
        else
        {
            throw new IllegalArgumentException();
        }
    }

    public static long noHuop(Pad4 instance)
    {
        return instance.p1 + instance.p2 + instance.p3 + instance.p4 + instance.p5 + instance.p6 + instance.p7;
    }
}

public class MPSCArrayQueue<E> extends Pad4 implements Queue<E>
{

    public MPSCArrayQueue(int capacity)
    {
        int size = 1;
        int indexShift = 0;
        //size的取值为2次方的值,目的是为了下面的index&mask的取值结果是数组索引下标,最终为大于或等于capacity的2次方幂
        while (size < capacity && size > 0)
        {
            size <<= 1;
            //2次幂的数量 2 2^2 2^3  以2为底,等比为2的等比数列的数量
            indexShift++;
        }
        if (size > 0)
        {
            this.indexShift = indexShift;
            mask = size - 1;
            //存放数据的buffer
            buffer = new Object[size];
            //可用的buffer
            availableBuffers = new int[size];
            //给数组中的每个元素赋一个初始值为-1
            Arrays.fill(availableBuffers, -1);
        }
        else
        {
            throw new IllegalArgumentException("capacity 无法计算得到其最小的2次方幂");
        }
    }

    protected final Object[] buffer;
    protected final int      mask;
    protected final int[]    availableBuffers;
    protected final int      indexShift;

    boolean isAvailable(long address, int flag, int[] availableBuffers)
    {
        return UNSAFE.getIntVolatile(availableBuffers, address) == flag;
    }

    void setAvailable(long index)
    {
        //mask = size -1
        //index < size -1 的
        // size - 1为2的n次方幂
        // n 其实就是indexShift
        //>>表示右移,如果该数为正,则高位补0,若为负数,则高位补1;
        //>>>表示无符号右移,也叫逻辑右移,即若该数为正,则高位补0,而若该数为负数,则右移后高位同样补0
        //那么index >>> indexShift表示将index向右移indexShift位
        //看一下极端情况当size - 1 = 2 ^ 3时,indexShift的值为3,这时如果index=size-1即8二进制为1000时向右移3位时值为1 所有小于8的值进行这种移位得到的结果都是0
        //这也就说明,当flag为1时表示已经满了
        int flag = (int) (index >>> indexShift);
        long address = ((index & mask) << availableBufferScaleShift) + availableBufferOffset;
        UNSAFE.putOrderedInt(availableBuffers, address, flag);
    }

    /**
     * 获取下一个可以使用的生产者下标
     *
     * @return
     */
    long nextProducerIndex()
    {
        long pIndex = producerIndex;
        long pLimit = producerIndexLimit;
        if (pIndex < pLimit)
        {
            if (casProducerIndex(pIndex))
            {
                return pIndex;
            }
        }
        do
        {
            pIndex = producerIndex;
            if (pIndex < pLimit)
            {
                if (casProducerIndex(pIndex))
                {
                    return pIndex;
                }
            }
            else
            {
                //mask的值是size-1
                //consumerIndex表示消费者的索引位置
                //进入这个判断分支就代表pIndex>=pLimit
                //consumerIndex的值为0时,pLimit的值为size(因为mask的值为size-1)
                pLimit = producerIndexLimit = consumerIndex + mask + 1;
                if (pIndex >= producerIndexLimit)
                {
                    // 队列已满
                    return -1;
                }
                else
                {
                    if (casProducerIndex(pIndex))
                    {
                        return pIndex;
                    }
                }
            }
        } while (true);
    }

    Object get(long index)
    {
        long address = ((index & mask) << bufferScaleShift) + bufferOffset;
        return UNSAFE.getObject(buffer, address);
    }

    void set(Object value, long index)
    {
        //计算数组下标
        //index&mask是计算数组的下标
        //bufferOffset=UNSAFE.arrayBaseOffset(Object[].class) 返回Object数组基地址
        // int bufferScale = UNSAFE.arrayIndexScale(Object[].class);
        //        if (bufferScale == 4)
        //        {
        //            bufferScaleShift = 2;
        //        }
        //        else if (bufferScale == 8)
        //        {
        //            bufferScaleShift = 3;
        //        }
        //bufferScale代表的是数组中每个元素的占用的大小
        //<< 位移运算符表示左移,左移2位相当于乘4
        //当bufferScale大小为8时表示一个元素占用的大小为8,那么第一个位置是(0 <<3)+bufferOffset结果为bufferOffset;
        //第二个元素1<<3 + bufferOffset,相当于1*8 + bufferOffset
        //第三个元素2<<3 + bufferOffset相当于2*8 + bufferOffset


        //关于index & mask   当mask为2^2 - 1时,index从一到10对mask取&的值为
//        0
//        1
//        2
//        3
//        0
//        1
//        2
//        3
//        0
//        1
//可见,虽然producerIndex的值和consumerIndex的值是一直增加的,但是对buffer的索引address是没有影响的
//唯一有约束的地方是nextProducerIndex方法中当pIndex >= producerIndexLimit时会停止生产,也就是说消费赶不上生产时会有停止生产的情况出现
        long address = ((index & mask) << bufferScaleShift) + bufferOffset;
        UNSAFE.putObject(buffer, address, value);
    }

    Object getAndSetNull(long index)
    {
        long address = ((index & mask) << bufferScaleShift) + bufferOffset;
        Object result = UNSAFE.getObject(buffer, address);
        UNSAFE.putObject(buffer, address, null);
        return result;
    }

    void waitUnitlAvailable(long index)
    {
        int flag = (int) (index >>> indexShift);
        long address = ((index & mask) << availableBufferScaleShift) + availableBufferOffset;
        int[] availableBuffers = this.availableBuffers;
        if (isAvailable(address, flag, availableBuffers) == false)
        {
            while (isAvailable(address, flag, availableBuffers) == false)
            {
                Thread.yield();
            }
        }
    }

    @Override
    public int size()
    {
        long consumerIndex = this.consumerIndex;
        long producerIndex = this.producerIndex;
        return (int) (producerIndex - consumerIndex);
    }

  .............

    @Override
    public void clear()
    {
        long pIndex = producerIndex;
        long cIndex = this.consumerIndex;
        if (pIndex == cIndex)
        {
            return;
        }
        for (long index = cIndex; index < pIndex; index++)
        {
            waitUnitlAvailable(index);
            set(null, index);
        }
        this.consumerIndex = pIndex;
    }

    @Override
    public boolean add(E e)
    {
        return offer(e);
    }

    @Override
    public boolean offer(E e)
    {
        long index = nextProducerIndex();
        if (index == -1)
        {
            return false;
        }
        set(e, index);
        setAvailable(index);
        return true;
    }

    @Override
    public E remove()
    {
        return poll();
    }

    @SuppressWarnings("unchecked")
    @Override
    public E poll()
    {
        //以与生产者相同的方式来获取flag,如果消费者与生产者用相同的算法得到的flag相同的话,则表示生产者已经生产到这个地方,可以进行消费。
        long cIndex = this.consumerIndex;
        int flag = (int) (cIndex >>> indexShift);
        long address = ((cIndex & mask) << availableBufferScaleShift) + availableBufferOffset;
        final int[] availableBuffers = this.availableBuffers;
        if (isAvailable(address, flag, availableBuffers) == false)
        {
            if (cIndex == producerIndex)
            {
                return null;
            }
            while (isAvailable(address, flag, availableBuffers) == false)
            {
                // assert cIndex < consumerLimit;
                Thread.yield();
            }
        }
        //消费之后,将对应位置上的值置为null
        E e = (E) getAndSetNull(cIndex);
        //移动消费者的指针位置,这种方式在当consumerIndex很大时将获取不到数据
        //注意一点,和disruptor不同的是,consumerIndex的值是一直增加的
        orderedSetComsumerIndex(cIndex + 1);
        return e;
    }

    @Override
    public E element()
    {
        return peek();
    }

    @SuppressWarnings("unchecked")
    @Override
    public E peek()
    {
        long pIndex = producerIndex;
        long consumerIndex = this.consumerIndex;
        if (pIndex == consumerIndex)
        {
            return null;
        }
        //这里的原理与上面一样
        int flag = (int) (consumerIndex >>> indexShift);
        long address = ((consumerIndex & mask) << availableBufferScaleShift) + availableBufferOffset;
        int[] availableBuffers = this.availableBuffers;
        if (isAvailable(address, flag, availableBuffers) == false)
        {
            while (isAvailable(address, flag, availableBuffers) == false)
            {
                Thread.yield();
            }
        }
        E e = (E) get(consumerIndex);
        return e;
    }

}

这里主要的核心有以下几点:

  • unsafe的操作,通遍都是通过unsafe的cas来进行的,有很多while(cas())进行重试的部分,这块就不再多讲了;
  • size的取值,是最小的以2为首项,公比为2的等比数列最小的大于等于capacity的那个值;
  • mask 的取值为size -1 ,这样做的好处是通过index & mask能取到数组中对应的索引;
  • consumerIndex与producerIndex是一直增加的;
  • 当消费者速度不及生产者时,会停止生产,具体的可以看下上面代码中的注释。
  • index >>> indexShift中的indexShift的取值为2到size之间的等比数列的项数; index分别是指consumerIndex和producerIndex,这个index >>> indexShift的作用是用于 判断生产者和消费者的速度,在消费时位置是否可用。

参考:

  • Jctools高性能无锁队列源码分析 https://blog.csdn.net/TheLudlows/article/details/90646236
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-10-12,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 开发架构二三事 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • MPSCArrayQueue源码
  • 参考:
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档