前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >高并发数据结构Disruptor解析(2)

高并发数据结构Disruptor解析(2)

作者头像
干货满满张哈希
发布2021-04-12 16:09:55
5550
发布2021-04-12 16:09:55
举报

Sequence(续)

之前说了Sequence通过给他的核心值value添加前置无用的padding long还有后置无用的padding long来避免对于value操作的false sharing的发生。那么对于这个value的操作是怎么操作的呢? 这里我们需要先了解下Unsafe类这个东西,可以参考我的另一篇文章 - Java Unsafe 类。 Unsafe中有一些底层为C++的方法,对于Sequence,其中做了: 获取Unsafe,通过Unsafe获取Sequence中的value的地址,根据这个地址CAS更新。 com.lmax.disruptor.Sequence.java

代码语言:javascript
复制
public class Sequence extends RhsPadding
{
    static final long INITIAL_VALUE = -1L;
    private static final Unsafe UNSAFE;
    private static final long VALUE_OFFSET;

    static
    {
        UNSAFE = Util.getUnsafe();
        try
        {
            VALUE_OFFSET = UNSAFE.objectFieldOffset(Value.class.getDeclaredField("value"));
        }
        catch (final Exception e)
        {
            throw new RuntimeException(e);
        }
    }

    /**
     * 默认初始value为-1
     */
    public Sequence()
    {
        this(INITIAL_VALUE);
    }

    public Sequence(final long initialValue)
    {
        UNSAFE.putOrderedLong(this, VALUE_OFFSET, initialValue);
    }

    public long get()
    {
        return value;
    }

    /**
     * 利用Unsafe更新value的地址内存上的值从而更新value的值
     */
    public void set(final long value)
    {
        UNSAFE.putOrderedLong(this, VALUE_OFFSET, value);
    }

    /**
     * 利用Unsafe原子更新value
     */
    public void setVolatile(final long value)
    {
        UNSAFE.putLongVolatile(this, VALUE_OFFSET, value);
    }

    /**
     * 利用Unsafe CAS
     */
    public boolean compareAndSet(final long expectedValue, final long newValue)
    {
        return UNSAFE.compareAndSwapLong(this, VALUE_OFFSET, expectedValue, newValue);
    }


    public long incrementAndGet()
    {
        return addAndGet(1L);
    }

    public long addAndGet(final long increment)
    {
        long currentValue;
        long newValue;

        do
        {
            currentValue = get();
            newValue = currentValue + increment;
        }
        while (!compareAndSet(currentValue, newValue));

        return newValue;
    }

    @Override
    public String toString()
    {
        return Long.toString(get());
    }
}

Producer

SingleProducerSequencer

接下来我们先从Producer看起。Disruptor分为单生产者和多生产者,先来关注下单生产者的核心类SingleProducerSequencer,类结构如下:

这里写图片描述
这里写图片描述

针对这些接口做一下简单的描述: Cursored接口:实现此接口的类,可以理解为,记录某个sequence的类。例如,生产者在生产消息时,需要知道当前ringBuffer下一个生产的位置,这个位置需要更新,每次更新,需要访问getCursor来定位。 Sequenced接口:实现此接口类,可以理解为,实现一个有序的存储结构,也就是RingBuffer的一个特性。一个Producer,在生产Event时,先获取下一位置的Sequence,之后填充Event,填充好后再publish,这之后,这个Event就可以被消费处理了

  • getBufferSize获取ringBuffer的大小
  • hasAvailableCapacity判断空间是否足够
  • remainingCapacity获取ringBuffer的剩余空间
  • next申请下一个或者n个sequence(value)作为生产event的位置
  • tryNext尝试申请下一个或者n个sequence(value)作为生产event的位置,容量不足会抛出InsufficientCapacityException
  • publish发布Event

Sequencer接口:**Sequencer接口,扩展了Cursored和Sequenced接口。在前两者的基础上,增加了消费与生产相关的方法。其中一个比较重要的设计是关于**GatingSequence的设计: 之后我们会提到,RingBuffer的头由一个名字为Cursor的Sequence对象维护,用来协调生产者向RingBuffer中填充数据。表示队列尾的Sequence并没有在RingBuffer中,而是由消费者维护。这样的话,队列尾的维护就是无锁的。但是,在生产者方确定RingBuffer是否已满就需要跟踪更多信息。为此,GatingSequence用来跟踪相关Sequence

  • INITIAL_CURSOR_VALUE: -1 为 sequence的起始值
  • claim: 申请一个特殊的Sequence,只有设定特殊起始值的ringBuffer时才会使用(一般是多个生产者时才会使用)
  • isAvailable:非阻塞,验证一个sequence是否已经被published并且可以消费
  • addGatingSequences:将这些sequence加入到需要跟踪处理的gatingSequences中
  • removeGatingSequence:移除某个sequence
  • newBarrier:给定一串需要跟踪的sequence,创建SequenceBarrier。SequenceBarrier是用来给多消费者确定消费位置是否可以消费用的
  • getMinimumSequence:获取这个ringBuffer的gatingSequences中最小的一个sequence
  • getHighestPublishedSequence:获取最高可以读取的Sequence
  • newPoller:目前没用,不讲EventPoller相关的内容(没有用到)

之后,抽象类AbstractSequencer实现Sequencer这个接口:定义了5个域:

代码语言:javascript
复制
    private static final AtomicReferenceFieldUpdater SEQUENCE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(AbstractSequencer.class, Sequence[].class, "gatingSequences");
    protected final int bufferSize;
    protected final WaitStrategy waitStrategy;
    protected final Sequence cursor = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
    protected volatile Sequence[] gatingSequences = new Sequence[0];
  • SEQUENCE_UPDATER 是用来原子更新gatingSequences 的工具类
  • bufferSize记录生产目标RingBuffer的大小
  • waitStrategy表示这个生产者的等待策略(之后会讲)
  • cursor:生产定位,初始为-1
  • gatingSequences :前文已讲

构造方法增加了一些对于这个类的限制:

代码语言:javascript
复制
public AbstractSequencer(int bufferSize, WaitStrategy waitStrategy)
{
    if (bufferSize < 1)
    {
        throw new IllegalArgumentException("bufferSize must not be less than 1");
    }
    if (Integer.bitCount(bufferSize) != 1)
    {
        throw new IllegalArgumentException("bufferSize must be a power of 2");
    }

    this.bufferSize = bufferSize;
    this.waitStrategy = waitStrategy;
}

bufferSize不能小于1并且bufferSize必须是2的n次方。原因我的第一篇文章已经讲述。 对于getCursor和getBufferSize的实现,这里仅仅是简单的getter:

代码语言:javascript
复制
@Override
public final long getCursor()
{
    return cursor.get();
}

@Override
public final int getBufferSize()
{
    return bufferSize;
}

对于addGatingSequences和removeGatingSequence,则是原子更新:

代码语言:javascript
复制
public final void addGatingSequences(Sequence... gatingSequences)
{
    SequenceGroups.addSequences(this, SEQUENCE_UPDATER, this, gatingSequences);
}

public boolean removeGatingSequence(Sequence sequence)
{
    return SequenceGroups.removeSequence(this, SEQUENCE_UPDATER, sequence);
}

原子更新工具类静态方法代码:

代码语言:javascript
复制
/**
 * 原子添加sequences
 * 
 * @param holder 原子更新的域所属的类对象
 * @param updater 原子更新的域对象
 * @param cursor 定位
 * @param sequencesToAdd 要添加的sequences
 * @param 
 */
static  void addSequences(
        final T holder,
        final AtomicReferenceFieldUpdater updater,
        final Cursored cursor,
        final Sequence... sequencesToAdd)
{
    long cursorSequence;
    Sequence[] updatedSequences;
    Sequence[] currentSequences;
    //在更新成功之前,一直重新读取currentSequences,扩充为添加所有sequence之后的updatedSequences
    do
    {
        currentSequences = updater.get(holder);
        updatedSequences = copyOf(currentSequences, currentSequences.length + sequencesToAdd.length);
        cursorSequence = cursor.getCursor();

        int index = currentSequences.length;
        //将新的sequences的值设置为cursorSequence
        for (Sequence sequence : sequencesToAdd)
        {
            sequence.set(cursorSequence);
            updatedSequences[index++] = sequence;
        }
    }
    while (!updater.compareAndSet(holder, currentSequences, updatedSequences));

    cursorSequence = cursor.getCursor();
    for (Sequence sequence : sequencesToAdd)
    {
        sequence.set(cursorSequence);
    }
}

/**
 * 原子移除某个指定的sequence
 * 
 * @param holder 原子更新的域所属的类对象
 * @param sequenceUpdater 原子更新的域对象
 * @param sequence 要移除的sequence
 * @param 
 * @return
 */
static  boolean removeSequence(
        final T holder,
        final AtomicReferenceFieldUpdater sequenceUpdater,
        final Sequence sequence)
{
    int numToRemove;
    Sequence[] oldSequences;
    Sequence[] newSequences;

    do
    {
        oldSequences = sequenceUpdater.get(holder);

        numToRemove = countMatching(oldSequences, sequence);

        if (0 == numToRemove)
        {
            break;
        }

        final int oldSize = oldSequences.length;
        newSequences = new Sequence[oldSize - numToRemove];

        for (int i = 0, pos = 0; i < oldSize; i++)
        {
            final Sequence testSequence = oldSequences[i];
            if (sequence != testSequence)
            {
                newSequences[pos++] = testSequence;
            }
        }
    }
    while (!sequenceUpdater.compareAndSet(holder, oldSequences, newSequences));

    return numToRemove != 0;
}

private static  int countMatching(T[] values, final T toMatch)
{
    int numToRemove = 0;
    for (T value : values)
    {
        if (value == toMatch) // Specifically uses identity
        {
            numToRemove++;
        }
    }
    return numToRemove;
}

对于newBarrier,返回的是一个ProcessingSequenceBarrier: SequenceBarrier我们之后会详讲,这里我们可以理解为用来协调消费者消费的对象。例如消费者A依赖于消费者B,就是消费者A一定要后于消费者B消费,也就是A只能消费B消费过的,也就是A的sequence一定要小于B的。这个Sequence的协调,通过A和B设置在同一个SequenceBarrier上实现。同时,我们还要保证所有的消费者只能消费被Publish过的。这里我们先不深入

代码语言:javascript
复制
public SequenceBarrier newBarrier(Sequence... sequencesToTrack)
{
    return new ProcessingSequenceBarrier(this, waitStrategy, cursor, sequencesToTrack);
}

之后到了我们这次的核心,SingleProducerSequencer,观察它的结构,他依然利用了long冗余避免CPU的false sharing,这次的field不只有一个,而是有两个,所以,前后放上7个long类型,这样在最坏的情况下也能避免false sharing(参考我的第一篇文章) 这两个field是:

代码语言:javascript
复制
protected long nextValue = Sequence.INITIAL_VALUE;
protected long cachedValue = Sequence.INITIAL_VALUE;

初始值都为-1,这里强调下,由于这个类并没有实现任何的Barrier,所以在Disruptor框架中,这个类并不是线程安全的。不过由于从命名上看,就是单一生产者,所以在使用的时候也不会用多线程去调用里面的方法。 之后就是对AbstractSequencer抽象方法的实现: hasAvailableCapacity判断空间是否足够:

代码语言:javascript
复制
@Override
public boolean hasAvailableCapacity(int requiredCapacity) {
    //下一个生产Sequence位置
    long nextValue = this.nextValue;
    //下一位置加上所需容量减去整个bufferSize,如果为正数,那证明至少转了一圈,则需要检查gatingSequences(由消费者更新里面的Sequence值)以保证不覆盖还未被消费的
    long wrapPoint = (nextValue + requiredCapacity) - bufferSize;
    //Disruptor经常用缓存,这里缓存之间所有gatingSequences最小的那个,这样不用每次都遍历一遍gatingSequences,影响效率
    long cachedGatingSequence = this.cachedValue;
    //只要wrapPoint大于缓存的所有gatingSequences最小的那个,就重新检查更新缓存
    if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)
    {
        long minSequence = Util.getMinimumSequence(gatingSequences, nextValue);
        this.cachedValue = minSequence;
        //空间不足返回false
        if (wrapPoint > minSequence)
        {
            return false;
        }
    }
    //若wrapPoint小于缓存的所有gatingSequences最小的那个,证明可以放心生产
    return true;
}

对于next方法:申请下一个或者n个sequence(value)作为生产event的位置

代码语言:javascript
复制
@Override
public long next() {
    return next(1);
}

@Override
public long next(int n) {
    if (n < 1) {
        throw new IllegalArgumentException("n must be > 0");
    }

    long nextValue = this.nextValue;
    //next方法和之前的hasAvailableCapacity同理,只不过这里是相当于阻塞的
    long nextSequence = nextValue + n;
    long wrapPoint = nextSequence - bufferSize;
    long cachedGatingSequence = this.cachedValue;

    if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) {
        long minSequence;
        //只要wrapPoint大于最小的gatingSequences,那么不断唤醒消费者去消费,并利用LockSupport让出CPU,直到wrapPoint不大于最小的gatingSequences
        while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue))) {
            waitStrategy.signalAllWhenBlocking();
            LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?
        }
        //同理,缓存最小的gatingSequences
        this.cachedValue = minSequence;
    }

    this.nextValue = nextSequence;

    return nextSequence;
}

tryNext尝试申请下一个或者n个sequence(value)作为生产event的位置,容量不足会抛出InsufficientCapacityException。而这里的容量检查,就是通过之前的hasAvailableCapacity方法检查:

代码语言:javascript
复制
    @Override
    public long tryNext() throws InsufficientCapacityException {
        return tryNext(1);
    }

    @Override
    public long tryNext(int n) throws InsufficientCapacityException {
        if (n < 1) {
            throw new IllegalArgumentException("n must be > 0");
        }

        if (!hasAvailableCapacity(n)) {
            throw InsufficientCapacityException.INSTANCE;
        }

        long nextSequence = this.nextValue += n;

        return nextSequence;
    }

publish发布Event:

代码语言:javascript
复制
    @Override
    public void publish(long sequence) {
        //cursor代表可以消费的sequence
        cursor.set(sequence);
        waitStrategy.signalAllWhenBlocking();
    }

    @Override
    public void publish(long lo, long hi) {
        publish(hi);
    }

其他:

代码语言:javascript
复制
    @Override
    public void claim(long sequence) {
        nextValue = sequence;
    }

    @Override
    public boolean isAvailable(long sequence) {
        return sequence <= cursor.get();
    }

    @Override
    public long getHighestPublishedSequence(long nextSequence, long availableSequence) {
        return availableSequence;
    }

下面,我们针对SingleProducerSequencer画一个简单的工作流程: 假设有如下RingBuffer和SingleProducerSequencer,以及对应的消费者辅助类SequenceBarrier,这里不画消费者,假设有不断通过SequenceBarrier消费的消费者。SingleProducerSequencer的gatingSequences数组内保存这一个指向某个Sequence的引用,同时这个Sequence也会被SequenceBarrier更新以表示消费者消费到哪里了。这里生产的Sequence还有消费的Sequence都是从零开始不断增长的,即使大于BufferSize,也可以通过sequence的值对BufferSize取模定位到RingBuffer上。

这里写图片描述
这里写图片描述

假设SingleProducerSequencer这时生产两个Event,要放入RingBuffer。则假设先调用hasAvailableCapacity(2)判断下。代码流程是: wrapPoint = (nextValue + requiredCapacity) - bufferSize = (-1 + 2) - 4 = -3 -3 < cachedValue所以不用检查gateSequences直接返回true。假设返回true,就开始填充,之后调用publish更新cursor,这样消费者调用isAvailable根据Cursor就可以判断,sequence:0和sequence:1可以消费了。

这里写图片描述
这里写图片描述

假设这之后,消费者消费了一个Event,更新Sequence为0.

这里写图片描述
这里写图片描述

之后,生产者要生产四个Event,调用hasAvailableCapacity(4)检查。代码流程是: wrapPoint = (nextValue + requiredCapacity) - bufferSize = (1 + 4) - 4 = 1 1 > cachedValue所以要重新检查,这是最小的Sequence是0,但是1 > 仍然大于最小的Sequence,所以更新cachedValue,返回false。

这里写图片描述
这里写图片描述

至此,展示了一个简单的生产过程,SingleProducerSequencer也就讲完啦。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2016-07-26 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Sequence(续)
  • Producer
    • SingleProducerSequencer
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档