前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >高并发数据结构Disruptor解析(4)

高并发数据结构Disruptor解析(4)

作者头像
干货满满张哈希
发布2021-04-12 16:21:08
6510
发布2021-04-12 16:21:08
举报

RingBuffer

RingBuffer类是Disruptor核心的数据结构类。它是一个环状的Buffer,上面的槽(slot)可以保存一个个Event。下面是Disruptor中RingBuffer类继承关系:

这里写图片描述
这里写图片描述

除了实现之前提到过的Sequenced和Cursored接口,这里还涉及到了DataProvider这个接口。

代码语言:javascript
复制
public interface DataProvider
{
    T get(long sequence);
}

它只有一个方法get,这个方法就是获取某个sequence对应的对象,对象类型在这里是抽象的(T)。这个方法对于RingBuffer会在两个地方调用,第一个是在生产时,这个Event对象需要被生产者获取往里面填充数据。第二个是在消费时,获取这个Event对象用于消费。 EventSequencer接口没有自己的方法,只是为了将Sequencer和DataProvider合起来。 EventSink代表RingBuffer是一个以Event槽为基础的数据结构。同时实现EventSequencer和EventSink代表RingBuffer是一个以Event槽为基础元素保存的数据结构。 EventSink接口的主要方法都是发布Event,发布一个Event的流程是:申请下一个Sequence->申请成功则获取对应槽的Event->初始化并填充对应槽的Event->发布Event。这里,初始化,填充Event是通过实现EventTranslator,EventTranslatorOneArg,EventTranslatorTwoArg,EventTranslatorThreeArg,EventTranslatorVararg这些EventTranslator来做的。我们看下EventTranslator,EventTranslatorOneArg和EventTranslatorVararg的源码:

代码语言:javascript
复制
public interface EventTranslator<T>
{
    /**
     * Translate a data representation into fields set in given event
     *
     * @param event    into which the data should be translated.
     * @param sequence that is assigned to event.
     */
    void translateTo(final T event, long sequence);
}

public interface EventTranslatorOneArg<T, A>
{
    /**
     * Translate a data representation into fields set in given event
     *
     * @param event    into which the data should be translated.
     * @param sequence that is assigned to event.
     * @param arg0     The first user specified argument to the translator
     */
    void translateTo(final T event, long sequence, final A arg0);
}

public interface EventTranslatorVararg<T>
{
    /**
     * Translate a data representation into fields set in given event
     *
     * @param event    into which the data should be translated.
     * @param sequence that is assigned to event.
     * @param args     The array of user arguments.
     */
    void translateTo(final T event, long sequence, final Object... args);
}

他们由生产者用户实现,将Event初始化并填充。在发布一条Event的时候,这些Translator的translate方法会被调用。在translate方法初始化并填充Event。对于EventTranslator,translate方法只接受Event和Sequence作为参数,对于其他的,都还会接受一个或多个参数用来初始化并填充Event。 EventSink接口是用来发布Event的,在发布的同时,调用绑定的Translator来初始化并填充Event。EventSink接口的大部分方法接受不同的Translator来处理Event:

代码语言:javascript
复制
public interface EventSink<E> {
    /**
     * 申请下一个Sequence->申请成功则获取对应槽的Event->利用translator初始化并填充对应槽的Event->发布Event
     * @param translator translator用户实现,用于初始化Event,这里是不带参数Translator
     */
     void publishEvent(EventTranslator translator);

    /**
     * 尝试申请下一个Sequence->申请成功则获取对应槽的Event->利用translator初始化并填充对应槽的Event->发布Event
     * 若空间不足,则立即失败返回
     * @param translator translator用户实现,用于初始化Event,这里是不带参数Translator
     * @return 成功true,失败false
     */
     boolean tryPublishEvent(EventTranslator translator);

      void publishEvent(EventTranslatorOneArg translator, A arg0);

      boolean tryPublishEvent(EventTranslatorOneArg translator, A arg0);

      void publishEvent(EventTranslatorTwoArg translator, A arg0, B arg1);

      boolean tryPublishEvent(EventTranslatorTwoArg translator, A arg0, B arg1);

      void publishEvent(EventTranslatorThreeArg translator, A arg0, B arg1, C arg2);

      boolean tryPublishEvent(EventTranslatorThreeArg translator, A arg0, B arg1, C arg2);

     void publishEvent(EventTranslatorVararg translator, Object... args);

     boolean tryPublishEvent(EventTranslatorVararg translator, Object... args);

    /**
     * 包括申请多个Sequence->申请成功则获取对应槽的Event->利用每个translator初始化并填充每个对应槽的Event->发布Event
     * @param translators
     */
     void publishEvents(EventTranslator[] translators);

     void publishEvents(EventTranslator[] translators, int batchStartsAt, int batchSize);

     boolean tryPublishEvents(EventTranslator[] translators);

     boolean tryPublishEvents(EventTranslator[] translators, int batchStartsAt, int batchSize);

      void publishEvents(EventTranslatorOneArg translator, A[] arg0);

      void publishEvents(EventTranslatorOneArg translator, int batchStartsAt, int batchSize, A[] arg0);

      boolean tryPublishEvents(EventTranslatorOneArg translator, A[] arg0);

      boolean tryPublishEvents(EventTranslatorOneArg translator, int batchStartsAt, int batchSize, A[] arg0);

      void publishEvents(EventTranslatorTwoArg translator, A[] arg0, B[] arg1);

      void publishEvents(
            EventTranslatorTwoArg translator, int batchStartsAt, int batchSize, A[] arg0,
            B[] arg1);

      boolean tryPublishEvents(EventTranslatorTwoArg translator, A[] arg0, B[] arg1);

      boolean tryPublishEvents(
            EventTranslatorTwoArg translator, int batchStartsAt, int batchSize,
            A[] arg0, B[] arg1);

      void publishEvents(EventTranslatorThreeArg translator, A[] arg0, B[] arg1, C[] arg2);

      void publishEvents(
            EventTranslatorThreeArg translator, int batchStartsAt, int batchSize,
            A[] arg0, B[] arg1, C[] arg2);

      boolean tryPublishEvents(EventTranslatorThreeArg translator, A[] arg0, B[] arg1, C[] arg2);

      boolean tryPublishEvents(
            EventTranslatorThreeArg translator, int batchStartsAt,
            int batchSize, A[] arg0, B[] arg1, C[] arg2);

     void publishEvents(EventTranslatorVararg translator, Object[]... args);

     void publishEvents(EventTranslatorVararg translator, int batchStartsAt, int batchSize, Object[]... args);

     boolean tryPublishEvents(EventTranslatorVararg translator, Object[]... args);

     boolean tryPublishEvents(EventTranslatorVararg translator, int batchStartsAt, int batchSize, Object[]... args);
}

接下来到我们的主要环节,RingBuffer类。与之前相似,RingBuffer也是做了缓冲行填充。 RingBuffer类中保存了整个RingBuffer每个槽(entry或者slot)的Event对象,对应的field是private final Object[] entries;,这些对象只在RingBuffer初始化时被建立,之后就是修改这些对象(初始化Event和填充Event),并不会重新建立新的对象。RingBuffer可以有多生产者和消费者,所以这个entries会被多线程访问频繁的,但不会修改(因为不会重新建立新的对象,这个数组保存的是对对象的具体引用,所以不会变)。但是我们要避免他们和被修改的对象读取到同一个缓存行,避免缓存行失效重新读取。 我们看源代码:

代码语言:javascript
复制
abstract class RingBufferPad
{
    protected long p1, p2, p3, p4, p5, p6, p7;
}

abstract class RingBufferFields extends RingBufferPad
{
    //Buffer数组填充
    private static final int BUFFER_PAD;
    //Buffer数组起始基址
    private static final long REF_ARRAY_BASE;
    //2^n=每个数组对象引用所占空间,这个n就是REF_ELEMENT_SHIFT
    private static final int REF_ELEMENT_SHIFT;
    private static final Unsafe UNSAFE = Util.getUnsafe();

    static
    {
        //Object数组引用长度,32位为4字节,64位为8字节
        final int scale = UNSAFE.arrayIndexScale(Object[].class);
        if (4 == scale)
        {
            REF_ELEMENT_SHIFT = 2;
        }
        else if (8 == scale)
        {
            REF_ELEMENT_SHIFT = 3;
        }
        else
        {
            throw new IllegalStateException("Unknown pointer size");
        }
        //需要填充128字节,缓存行长度一般是128字节
        BUFFER_PAD = 128 / scale;
        // Including the buffer pad in the array base offset
        REF_ARRAY_BASE = UNSAFE.arrayBaseOffset(Object[].class) + (BUFFER_PAD << REF_ELEMENT_SHIFT);
    }

    private final long indexMask;
    private final Object[] entries;
    protected final int bufferSize;
    protected final Sequencer sequencer;

    RingBufferFields(
        EventFactory eventFactory,
        Sequencer sequencer)
    {
        this.sequencer = sequencer;
        this.bufferSize = sequencer.getBufferSize();
        //保证buffer大小不小于1
        if (bufferSize < 1)
        {
            throw new IllegalArgumentException("bufferSize must not be less than 1");
        }
        //保证buffer大小为2的n次方
        if (Integer.bitCount(bufferSize) != 1)
        {
            throw new IllegalArgumentException("bufferSize must be a power of 2");
        }
        //m % 2^n  <=>  m & (2^n - 1)
        this.indexMask = bufferSize - 1;
        /**
         * 结构:缓存行填充,避免频繁访问的任一entry与另一被修改的无关变量写入同一缓存行
         * --------------
         * *   数组头   * BASE
         * *   Padding  * 128字节
         * * reference1 * SCALE
         * * reference2 * SCALE
         * * reference3 * SCALE
         * ..........
         * *   Padding  * 128字节
         * --------------
         */
        this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];
        //利用eventFactory初始化RingBuffer的每个槽
        fill(eventFactory);
    }

    private void fill(EventFactory eventFactory)
    {
        for (int i = 0; i < bufferSize; i++)
        {
            entries[BUFFER_PAD + i] = eventFactory.newInstance();
        }
    }

    @SuppressWarnings("unchecked")
    protected final E elementAt(long sequence)
    {
        return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT));
    }
}

注释中提到对于entries数组的缓存行填充,申请的数组大小为实际需要大小加上2 * BUFFER_PAD,所占空间就是2*128字节。由于数组中的元素经常访问,所以将数组中的实际元素两边各加上128字节的padding防止false sharing。 所以,初始化RingBuffer内所有对象时,从下标BUFFER_PAD开始,到BUFFER_PAD+bufferSize-1为止。取出某一sequence的对象,也是BUFFER_PAD开始算0,这里的:return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT)); 代表取出entries对象,地址为REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT)的对象。这里是个对象引用,地址是以REF_ARRAY_BASE 为基址(数组基址+数组头+引用偏移),每个引用占用2^REF_ELEMENT_SHIFT个字节,sequence 对bufferSize取模乘以2^REF_ELEMENT_SHIFT。 接下来看可以供用户调用的具体的构造方法,RingBuffer在Disruptor包外部不能直接调用其构造方法,用户只能用静态方法创建:

代码语言:javascript
复制
    /**
     * Construct a RingBuffer with the full option set.
     *
     * @param eventFactory to newInstance entries for filling the RingBuffer
     * @param sequencer    sequencer to handle the ordering of events moving through the RingBuffer.
     * @throws IllegalArgumentException if bufferSize is less than 1 or not a power of 2
     */
    RingBuffer(
        EventFactory eventFactory,
        Sequencer sequencer)
    {
        super(eventFactory, sequencer);
    }

    /**
     * Create a new multiple producer RingBuffer with the specified wait strategy.
     *
     * @param factory      used to create the events within the ring buffer.
     * @param bufferSize   number of elements to create within the ring buffer.
     * @param waitStrategy used to determine how to wait for new elements to become available.
     * @throws IllegalArgumentException if bufferSize is less than 1 or not a power of 2
     * @see MultiProducerSequencer
     */
    public static  RingBuffer createMultiProducer(
        EventFactory factory,
        int bufferSize,
        WaitStrategy waitStrategy)
    {
        MultiProducerSequencer sequencer = new MultiProducerSequencer(bufferSize, waitStrategy);

        return new RingBuffer(factory, sequencer);
    }

    /**
     * Create a new multiple producer RingBuffer using the default wait strategy  {@link BlockingWaitStrategy}.
     *
     * @param factory    used to create the events within the ring buffer.
     * @param bufferSize number of elements to create within the ring buffer.
     * @throws IllegalArgumentException if bufferSize is less than 1 or not a power of 2
     * @see MultiProducerSequencer
     */
    public static  RingBuffer createMultiProducer(EventFactory factory, int bufferSize)
    {
        return createMultiProducer(factory, bufferSize, new BlockingWaitStrategy());
    }

    /**
     * Create a new single producer RingBuffer with the specified wait strategy.
     *
     * @param factory      used to create the events within the ring buffer.
     * @param bufferSize   number of elements to create within the ring buffer.
     * @param waitStrategy used to determine how to wait for new elements to become available.
     * @throws IllegalArgumentException if bufferSize is less than 1 or not a power of 2
     * @see SingleProducerSequencer
     */
    public static  RingBuffer createSingleProducer(
        EventFactory factory,
        int bufferSize,
        WaitStrategy waitStrategy)
    {
        SingleProducerSequencer sequencer = new SingleProducerSequencer(bufferSize, waitStrategy);

        return new RingBuffer(factory, sequencer);
    }

    /**
     * Create a new single producer RingBuffer using the default wait strategy  {@link BlockingWaitStrategy}.
     *
     * @param factory    used to create the events within the ring buffer.
     * @param bufferSize number of elements to create within the ring buffer.
     * @throws IllegalArgumentException if bufferSize is less than 1 or not a power of 2
     * @see MultiProducerSequencer
     */
    public static  RingBuffer createSingleProducer(EventFactory factory, int bufferSize)
    {
        return createSingleProducer(factory, bufferSize, new BlockingWaitStrategy());
    }

    /**
     * Create a new Ring Buffer with the specified producer type (SINGLE or MULTI)
     *
     * @param producerType producer type to use {@link ProducerType}.
     * @param factory      used to create events within the ring buffer.
     * @param bufferSize   number of elements to create within the ring buffer.
     * @param waitStrategy used to determine how to wait for new elements to become available.
     * @throws IllegalArgumentException if bufferSize is less than 1 or not a power of 2
     */
    public static  RingBuffer create(
        ProducerType producerType,
        EventFactory factory,
        int bufferSize,
        WaitStrategy waitStrategy)
    {
        switch (producerType)
        {
            case SINGLE:
                return createSingleProducer(factory, bufferSize, waitStrategy);
            case MULTI:
                return createMultiProducer(factory, bufferSize, waitStrategy);
            default:
                throw new IllegalStateException(producerType.toString());
        }
    }

用户组装一个RingBuffer需要如下元素:实现EventFactory的Event的工厂,实现Sequencer的生产者,等待策略waitStrategy还有bufferSize。 接下来里面方法的实现都比较简单,这里不再赘述

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2016-07-30 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • RingBuffer
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档