通过上一节,我们知道ringBuffer在disruptor中是数据缓冲区,不同线程之间传递数据的BUFFER。RingBuffer是存储消息的地方,通过一个名为cursor的Sequence对象指示队列的头,协调多个生产者向RingBuffer中添加消息,并用于在消费者端判断RingBuffer是否为空。巧妙的是,表示队列尾的Sequence并没有在RingBuffer中,而是由消费者维护。这样的好处是多个消费者处理消息的方式更加灵活,可以在一个RingBuffer上实现消息的单播,多播,流水线以及它们的组合。RingBuffer中维护了一个名为gatingSequences的Sequence数组来跟踪相关Seqence。
producer代码如下:
public class LongEventProducer { private final RingBuffer<LongEvent> ringBuffer; public LongEventProducer(RingBuffer<LongEvent> ringBuffer) { this.ringBuffer = ringBuffer; } public void onData(ByteBuffer bb) { long sequence = ringBuffer.next(); // Grab the next sequence try { LongEvent event = ringBuffer.get(sequence); // Get the entry in the Disruptor // for the sequence event.set(bb.getLong(0)); // Fill with data } finally { ringBuffer.publish(sequence); } }}
接下来,我们一步步地分析。
/** * Increment and return the next sequence for the ring buffer. Calls of this * method should ensure that they always publish the sequence afterward. E.g. * <pre> * long sequence = ringBuffer.next(); * try { * Event e = ringBuffer.get(sequence); * // Do some work with the event. * } finally { * ringBuffer.publish(sequence); * } * </pre> * * @return The next sequence to publish to. * @see RingBuffer#publish(long) * @see RingBuffer#get(long) */ @Override public long next() { return sequencer.next(); }
long sequence = ringBuffer.next(); try { Event e = ringBuffer.get(sequence); // Do some work with the event. } finally { ringBuffer.publish(sequence); }
这里我们先主要针对ringbuffer默认使用的sequencer,com.lmax.disruptor.MultiProducerSequencer#next():
/** * @see Sequencer#next(int) */ @Override public long next(int n) { if (n < 1 || n > bufferSize) { throw new IllegalArgumentException("n must be > 0 and < bufferSize"); } long current; long next; do { //current从-1开始变化,由生产者来更新 current = cursor.get(); //要申请的空间,当n为1时,这个值为0 next = current + n; //bufferSize默认值为1024,当next为0时wrapPoint值为-1024 long wrapPoint = next - bufferSize; // 缓存的消费者们的进度最小值也就是最慢进度值,小于等于真实进度,是一个缓存值 long cachedGatingSequence = gatingSequenceCache.get(); //wrapPoint > cachedGatingSequence表示生产者生产的速度大于消费者消费的速度,产生了环路,buffer的空间不足 // cachedGatingSequence > current 表示消费者的进度大于当前生产者进度,current值更新不及时,为无效状态.当其它生产者竞争成功,发布的数据也被消费者消费了时可能产生。(如2生产者1个消费者),在其中一个生产者中会出现这种情况 if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current) { //获取最新的消费者进度sequence long gatingSequence = Util.getMinimumSequence(gatingSequences, current); // 消费者最新的进度仍然构成了环路,表示buffer空间不足,只能重试,减少不必要的缓存更新 if (wrapPoint > gatingSequence) { // 停顿一下生产者线程,暂时让出cpu,减少竞争 LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy? continue; } // 检测到未构成环路,更新gatingSequence,然后进行重试 // 这里存在竞态条件,多线程模式下,可能会被设置为一个更小的值,从而小于当前分配的值(current) //cachedGatingSequence > current满足时也会进入这里,会将最新的gatingSequence放入缓存中。下一次进来,如果cachedGatingSequence > current仍然满足,还是会这样处理,这属于消费大于生产的情况 gatingSequenceCache.set(gatingSequence); // 这里看见有足够空间,这里如果尝试竞争空间会产生重复的代码,其实就是外层的代码,因此直接等待下一个循环 } // 第二步:看见空间足够时尝试CAS竞争空间 else if (cursor.compareAndSet(current, next)) { // 第三步:成功竞争到了这片空间,返回,进行获取ringbuffer中指定sequence处的event的操作 // 注意!这里更新了生产者进度,然而生产者并未真正发布数据。 // 因此需要调用getHighestPublishedSequence()确认真正的可用空间 break; } // 第三步:竞争失败则重试 } while (true); return next; }
第一次请求进入时:
这一步返回的next为0,wrapPoint为-1024。
第二次生产者请求进入时:
这一步返回的next为1,wrapPoint为-1023。
第三次生产者请求进入时:
这一步返回的next为1,wrapPoint为-1022
在上一步中申请到sequence之后通过LongEvent event = ringBuffer.get(sequence);获取ringBuffer中的event element。如果还记得RingBuffer的初始化过程会知道,ringBuffer初始化时需要传入一个EventFactory进行数组预填充,也即内存预分配的操作:
RingBufferFields( EventFactory<E> eventFactory, Sequencer sequencer) { this.sequencer = sequencer; this.bufferSize = sequencer.getBufferSize(); if (bufferSize < 1) { throw new IllegalArgumentException("bufferSize must not be less than 1"); } if (Integer.bitCount(bufferSize) != 1) { throw new IllegalArgumentException("bufferSize must be a power of 2"); } // 掩码 this.indexMask = bufferSize - 1; // 额外创建 2个填充空间的大小,首尾填充,比较数组和其他对象加载到同一个缓存行。 this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD]; fill(eventFactory); } /** * 数组元素的预填充 * @param eventFactory */ private void fill(EventFactory<E> eventFactory) { for (int i = 0; i < bufferSize; i++) { entries[BUFFER_PAD + i] = eventFactory.newInstance(); } }
这一步主要获取在ringBuffer初始化时通过事先用户自定义传入的EventFactory预填充的Event。
获取到event的引用之后,通过event.set(bb.getLong(0));往event中填充数据。
在上面空间申请到之后,就需要使用该方法进行发布数据,因为这时消费者的cachedGatingSequence已经更新,消费者能感知到已经被生产者申请到的这个sequence的数据发布了。如果申请之后不发布数据,则会造成阻塞,引起死锁,所以代码中是放在finally中处理的。
/** * Publish the specified sequence. This action marks this particular * message as being available to be read. * * @param sequence the sequence to publish. */ @Override public void publish(long sequence) { sequencer.publish(sequence); }
/** * @see Sequencer#publish(long) */ @Override public void publish(final long sequence) { setAvailable(sequence); waitStrategy.signalAllWhenBlocking(); }
/** * 设置目标插槽上的数据可用了,将对于插槽上的标记置位可用标记 * <p> * The below methods work on the availableBuffer flag. * <p> * The prime reason is to avoid a shared sequence object between publisher threads. * (Keeping single pointers tracking start and end would require coordination * between the threads). * <p> * -- Firstly we have the constraint that the delta between the cursor and minimum * gating sequence will never be larger than the buffer size (the code in * next/tryNext in the Sequence takes care of that). * -- Given that; take the sequence value and mask off the lower portion of the * sequence as the index into the buffer (indexMask). (aka modulo operator) * -- The upper portion of the sequence becomes the value to check for availability. * ie: it tells us how many times around the ring buffer we've been (aka division) * -- Because we can't wrap without the gating sequences moving forward (i.e. the * minimum gating sequence is effectively our last available position in the * buffer), when we have new data and successfully claimed a slot we can simply * write over the top. */ private void setAvailable(final long sequence) { setAvailableBufferValue(calculateIndex(sequence), calculateAvailabilityFlag(sequence)); } private void setAvailableBufferValue(int index, int flag) { long bufferAddress = (index * SCALE) + BASE; UNSAFE.putOrderedInt(availableBuffer, bufferAddress, flag); } /** * 计算sequence对应可用标记 * * @param sequence * @return */ private int calculateAvailabilityFlag(final long sequence) { return (int) (sequence >>> indexShift); } /** * 计算sequence对应的下标(插槽位置) * * @param sequence * @return */ private int calculateIndex(final long sequence) { return ((int) sequence) & indexMask; }
@Override public void signalAllWhenBlocking() { synchronized (mutex) { // 唤醒屏障上所有等待的线程 mutex.notifyAll(); } }
这里只是根据api操作上生产者调用的流程对源码部分进行了一个流程性的介绍,下一篇会以相同的方式对消费者进行介绍,所有的流程性的东西介绍完了之后,相应的核心组件会再进行针对性的介绍。