前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >disruptor源码分析三之producer流程

disruptor源码分析三之producer流程

作者头像
山行AI
发布2019-07-12 15:15:59
1.3K0
发布2019-07-12 15:15:59
举报
文章被收录于专栏:山行AI山行AI

关于disruptor其他核心组件的部分,在后面的章节中会进行介绍,最近的两个章节将先过一遍生产者和消费者的流程。

通过上一节,我们知道ringBuffer在disruptor中是数据缓冲区,不同线程之间传递数据的BUFFER。RingBuffer是存储消息的地方,通过一个名为cursor的Sequence对象指示队列的头,协调多个生产者向RingBuffer中添加消息,并用于在消费者端判断RingBuffer是否为空。巧妙的是,表示队列尾的Sequence并没有在RingBuffer中,而是由消费者维护。这样的好处是多个消费者处理消息的方式更加灵活,可以在一个RingBuffer上实现消息的单播,多播,流水线以及它们的组合。RingBuffer中维护了一个名为gatingSequences的Sequence数组来跟踪相关Seqence。

1. 定义producer

producer代码如下:

代码语言:javascript
复制
public class LongEventProducer {    private final RingBuffer<LongEvent> ringBuffer;    public LongEventProducer(RingBuffer<LongEvent> ringBuffer) {        this.ringBuffer = ringBuffer;    }    public void onData(ByteBuffer bb) {        long sequence = ringBuffer.next();  // Grab the next sequence        try {            LongEvent event = ringBuffer.get(sequence); // Get the entry in the Disruptor            // for the sequence            event.set(bb.getLong(0));  // Fill with data        } finally {            ringBuffer.publish(sequence);        }    }}

接下来,我们一步步地分析。

1.1 ringBuffer.next():
代码语言:javascript
复制
 /**     * Increment and return the next sequence for the ring buffer.  Calls of this     * method should ensure that they always publish the sequence afterward.  E.g.     * <pre>     * long sequence = ringBuffer.next();     * try {     *     Event e = ringBuffer.get(sequence);     *     // Do some work with the event.     * } finally {     *     ringBuffer.publish(sequence);     * }     * </pre>     *     * @return The next sequence to publish to.     * @see RingBuffer#publish(long)     * @see RingBuffer#get(long)     */    @Override    public long next()    {        return sequencer.next();    }
  • 增加并返回ring buffer的下一个sequence
  • 调用这个方法必须要确保在调用之后sequence必须要被publish:
代码语言:javascript
复制
long sequence = ringBuffer.next(); try {     Event e = ringBuffer.get(sequence);     // Do some work with the event. } finally {      ringBuffer.publish(sequence); }
1.2 sequencer.next()

这里我们先主要针对ringbuffer默认使用的sequencer,com.lmax.disruptor.MultiProducerSequencer#next():

代码语言:javascript
复制
     /**      * @see Sequencer#next(int)      */     @Override     public long next(int n) {         if (n < 1 || n > bufferSize) {             throw new IllegalArgumentException("n must be > 0 and < bufferSize");         }         long current;         long next;         do {             //current从-1开始变化,由生产者来更新             current = cursor.get();             //要申请的空间,当n为1时,这个值为0             next = current + n;             //bufferSize默认值为1024,当next为0时wrapPoint值为-1024             long wrapPoint = next - bufferSize;             // 缓存的消费者们的进度最小值也就是最慢进度值,小于等于真实进度,是一个缓存值             long cachedGatingSequence = gatingSequenceCache.get();             //wrapPoint > cachedGatingSequence表示生产者生产的速度大于消费者消费的速度,产生了环路,buffer的空间不足             // cachedGatingSequence > current 表示消费者的进度大于当前生产者进度,current值更新不及时,为无效状态.当其它生产者竞争成功,发布的数据也被消费者消费了时可能产生。(如2生产者1个消费者),在其中一个生产者中会出现这种情况             if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current) {                 //获取最新的消费者进度sequence                 long gatingSequence = Util.getMinimumSequence(gatingSequences, current);                 // 消费者最新的进度仍然构成了环路,表示buffer空间不足,只能重试,减少不必要的缓存更新                 if (wrapPoint > gatingSequence) {                     // 停顿一下生产者线程,暂时让出cpu,减少竞争                     LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy?                     continue;                 }                 // 检测到未构成环路,更新gatingSequence,然后进行重试                 // 这里存在竞态条件,多线程模式下,可能会被设置为一个更小的值,从而小于当前分配的值(current)                 //cachedGatingSequence > current满足时也会进入这里,会将最新的gatingSequence放入缓存中。下一次进来,如果cachedGatingSequence > current仍然满足,还是会这样处理,这属于消费大于生产的情况                 gatingSequenceCache.set(gatingSequence);                 // 这里看见有足够空间,这里如果尝试竞争空间会产生重复的代码,其实就是外层的代码,因此直接等待下一个循环             }             // 第二步:看见空间足够时尝试CAS竞争空间             else if (cursor.compareAndSet(current, next)) {                 // 第三步:成功竞争到了这片空间,返回,进行获取ringbuffer中指定sequence处的event的操作                 // 注意!这里更新了生产者进度,然而生产者并未真正发布数据。                 // 因此需要调用getHighestPublishedSequence()确认真正的可用空间                 break;             }             // 第三步:竞争失败则重试         }         while (true);         return next;     }

第一次请求进入时:

这一步返回的next为0,wrapPoint为-1024。

第二次生产者请求进入时:

这一步返回的next为1,wrapPoint为-1023。

第三次生产者请求进入时:

这一步返回的next为1,wrapPoint为-1022

  • 正常情况下进行CAS去获取sequence。
  • 当生产速度大于消费速度一个环(buffer)的距离时,需要生产者尝试重试申请空间(这里需要等待消费者释放空间),生产者等待重试。
  • 当消费者速度大于生产者的时候,需要更新gatingSequenceCache缓存然后进行重试。
1.3 Get the entry in the Disruptor

在上一步中申请到sequence之后通过LongEvent event = ringBuffer.get(sequence);获取ringBuffer中的event element。如果还记得RingBuffer的初始化过程会知道,ringBuffer初始化时需要传入一个EventFactory进行数组预填充,也即内存预分配的操作:

代码语言:javascript
复制
   RingBufferFields(        EventFactory<E> eventFactory,        Sequencer sequencer)    {        this.sequencer = sequencer;        this.bufferSize = sequencer.getBufferSize();        if (bufferSize < 1)        {            throw new IllegalArgumentException("bufferSize must not be less than 1");        }        if (Integer.bitCount(bufferSize) != 1)        {            throw new IllegalArgumentException("bufferSize must be a power of 2");        }        // 掩码        this.indexMask = bufferSize - 1;        // 额外创建 2个填充空间的大小,首尾填充,比较数组和其他对象加载到同一个缓存行。        this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];        fill(eventFactory);    }    /**     * 数组元素的预填充     * @param eventFactory     */    private void fill(EventFactory<E> eventFactory)    {        for (int i = 0; i < bufferSize; i++)        {            entries[BUFFER_PAD + i] = eventFactory.newInstance();        }    }

这一步主要获取在ringBuffer初始化时通过事先用户自定义传入的EventFactory预填充的Event。

获取到event的引用之后,通过event.set(bb.getLong(0));往event中填充数据。

1.4 生产者发布数据(ringBuffer.publish(sequence))

在上面空间申请到之后,就需要使用该方法进行发布数据,因为这时消费者的cachedGatingSequence已经更新,消费者能感知到已经被生产者申请到的这个sequence的数据发布了。如果申请之后不发布数据,则会造成阻塞,引起死锁,所以代码中是放在finally中处理的。

代码语言:javascript
复制
 /**     * Publish the specified sequence.  This action marks this particular     * message as being available to be read.     *     * @param sequence the sequence to publish.     */    @Override    public void publish(long sequence)    {        sequencer.publish(sequence);    }
  • 接下来我们继续看默认的sequencer(com.lmax.disruptor.MultiProducerSequencer#publish(long))中的实现:
代码语言:javascript
复制
/**     * @see Sequencer#publish(long)     */    @Override    public void publish(final long sequence) {        setAvailable(sequence);        waitStrategy.signalAllWhenBlocking();    }
  • 先看setAvailable:
代码语言:javascript
复制
/**     * 设置目标插槽上的数据可用了,将对于插槽上的标记置位可用标记     * <p>     * The below methods work on the availableBuffer flag.     * <p>     * The prime reason is to avoid a shared sequence object between publisher threads.     * (Keeping single pointers tracking start and end would require coordination     * between the threads).     * <p>     * --  Firstly we have the constraint that the delta between the cursor and minimum     * gating sequence will never be larger than the buffer size (the code in     * next/tryNext in the Sequence takes care of that).     * -- Given that; take the sequence value and mask off the lower portion of the     * sequence as the index into the buffer (indexMask). (aka modulo operator)     * -- The upper portion of the sequence becomes the value to check for availability.     * ie: it tells us how many times around the ring buffer we've been (aka division)     * -- Because we can't wrap without the gating sequences moving forward (i.e. the     * minimum gating sequence is effectively our last available position in the     * buffer), when we have new data and successfully claimed a slot we can simply     * write over the top.     */    private void setAvailable(final long sequence) {        setAvailableBufferValue(calculateIndex(sequence), calculateAvailabilityFlag(sequence));    }    private void setAvailableBufferValue(int index, int flag) {        long bufferAddress = (index * SCALE) + BASE;        UNSAFE.putOrderedInt(availableBuffer, bufferAddress, flag);    }    /**     * 计算sequence对应可用标记     *     * @param sequence     * @return     */    private int calculateAvailabilityFlag(final long sequence) {        return (int) (sequence >>> indexShift);    }    /**     * 计算sequence对应的下标(插槽位置)     *     * @param sequence     * @return     */    private int calculateIndex(final long sequence) {        return ((int) sequence) & indexMask;    }
  • 然后看看默认的wait strategy(BlockingWaitStrategy)中的实现:
代码语言:javascript
复制
 @Override    public void signalAllWhenBlocking()    {        synchronized (mutex)        {            // 唤醒屏障上所有等待的线程            mutex.notifyAll();        }    }
  • 它的mutex为一个private final Object mutex = new Object();也就是一个用来同步的对象锁,这里会唤醒在锁上等待的消费者线程。
  • 思考下,这里是否存在虚假唤醒呢,如果存在虚假唤醒,双将如何解决呢?关于虚假唤醒部分可以翻阅本公众号之前的文章。

2. 总结

这里只是根据api操作上生产者调用的流程对源码部分进行了一个流程性的介绍,下一篇会以相同的方式对消费者进行介绍,所有的流程性的东西介绍完了之后,相应的核心组件会再进行针对性的介绍。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-07-04,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 开发架构二三事 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 关于disruptor其他核心组件的部分,在后面的章节中会进行介绍,最近的两个章节将先过一遍生产者和消费者的流程。
    • 1. 定义producer
      • 1.1 ringBuffer.next():
      • 1.2 sequencer.next()
      • 1.3 Get the entry in the Disruptor
      • 1.4 生产者发布数据(ringBuffer.publish(sequence))
    • 2. 总结
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档