下面是多生产者核心类MultiProducerSequencer的类继承关系,与之前讲的SingleProducerSequencer相似:
MultiProducerSequencer是多生产者类,线程安全,与单一生产者不同的是,这里的cursor不再是可以消费的标记,而是多线程生产者抢占的标记。可以消费的sequence由availableBuffer来判断标识。 这个类没有缓存行填充,因为主要的四个域:
private final Sequence gatingSequenceCache = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
private final int[] availableBuffer;
private final int indexMask;
private final int indexShift;
可变的域有两个:gatingSequenceCache 和availableBuffer;gatingSequenceCache 本身为Sequence,做了缓存行填充。availableBuffer是一个很大的数组,其中的每个元素都会改变,但是同一时刻只会有一个线程读取访问修改其中的元素的值,所以,没必要做缓冲行填充。 gatingSequenceCache是gatingSequence的缓存,和之前的单一生产者的类似(之前对于两个long类型做了缓存行填充,这里用Sequence类相当于只给一个long做了缓存行填充)。 indexMask:利用对2^n取模 = 对2^n -1 取与运算原理,indexMask=bufferSize - 1 indexShift:就是上面的n,用来定位某个sequence到底转了多少圈,用来标识已被发布的sequence。 availableBuffer:每个槽存过几个Event,就是sequence到底转了多少圈,存在这个数组里,下标就是每个槽。为什么不直接将sequence存入availableBuffer,因为这样sequence值会过大,很容易溢出
构造方法:
public MultiProducerSequencer(int bufferSize, final WaitStrategy waitStrategy)
{
super(bufferSize, waitStrategy);
availableBuffer = new int[bufferSize];
indexMask = bufferSize - 1;
indexShift = Util.log2(bufferSize);
initialiseAvailableBuffer();
}
首先,availableBuffer数组初始化每个值都为-1:
private void initialiseAvailableBuffer() {
for (int i = availableBuffer.length - 1; i != 0; i--) {
setAvailableBufferValue(i, -1);
}
setAvailableBufferValue(0, -1);
}
这里是通过Unsafe类来更新数组每个值的: 数组结构是:
--------------
* 数组头 * BASE
* reference1 * SCALE
* reference2 * SCALE
* reference3 * SCALE
--------------
所以定位数组每个值的地址就是Base + Scale*下标
private void setAvailableBufferValue(int index, int flag)
{
long bufferAddress = (index * SCALE) + BASE;
UNSAFE.putOrderedInt(availableBuffer, bufferAddress, flag);
}
hasAvailableCapacity与单一生产者验证原理类似:
@Override
public boolean hasAvailableCapacity(final int requiredCapacity) {
return hasAvailableCapacity(gatingSequences, requiredCapacity, cursor.get());
}
private boolean hasAvailableCapacity(Sequence[] gatingSequences, final int requiredCapacity, long cursorValue)
{
//下一位置加上所需容量减去整个bufferSize,如果为正数,那证明至少转了一圈,则需要检查gatingSequences(由消费者更新里面的Sequence值)以保证不覆盖还未被消费的
//由于最多只能生产不大于整个bufferSize的Events。所以减去一个bufferSize与最小sequence相比较即可
long wrapPoint = (cursorValue + requiredCapacity) - bufferSize;
//缓存
long cachedGatingSequence = gatingSequenceCache.get();
//缓存失效条件
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > cursorValue)
{
long minSequence = Util.getMinimumSequence(gatingSequences, cursorValue);
gatingSequenceCache.set(minSequence);
//空间不足
if (wrapPoint > minSequence)
{
return false;
}
}
return true;
}
next用于多个生产者抢占n个RingBuffer槽用于生产Event:
@Override
public long next(int n) {
if (n < 1) {
throw new IllegalArgumentException("n must be > 0");
}
long current;
long next;
do {
//首先通过缓存判断空间是否足够
current = cursor.get();
next = current + n;
long wrapPoint = next - bufferSize;
long cachedGatingSequence = gatingSequenceCache.get();
//如果缓存不满足
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current) {
//重新获取最小的
long gatingSequence = Util.getMinimumSequence(gatingSequences, current);
//如果空间不足,则唤醒消费者消费,并让出CPU
if (wrapPoint > gatingSequence) {
waitStrategy.signalAllWhenBlocking();
LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy?
continue;
}
//重新设置缓存
gatingSequenceCache.set(gatingSequence);
} //如果空间足够,尝试CAS更新cursor,更新cursor成功代表成功获取n个槽,退出死循环
else if (cursor.compareAndSet(current, next)) {
break;
}
}
while (true);
//返回最新的cursor值
return next;
}
tryNext多个生产者尝试抢占:
public long tryNext(int n) throws InsufficientCapacityException {
if (n < 1) {
throw new IllegalArgumentException("n must be > 0");
}
long current;
long next;
//尝试获取一次,若不成功,则抛InsufficientCapacityException
do {
current = cursor.get();
next = current + n;
if (!hasAvailableCapacity(gatingSequences, n, current)) {
throw InsufficientCapacityException.INSTANCE;
}
}
while (!cursor.compareAndSet(current, next));
return next;
}
配置好Event,之后发布Publish:
@Override
public void publish(long sequence) {
setAvailable(sequence);
waitStrategy.signalAllWhenBlocking();
}
@Override
public void publish(long lo, long hi) {
for (long l = lo; l <= hi; l++) {
setAvailable(l);
}
waitStrategy.signalAllWhenBlocking();
}
/**
* 发布某个sequence之前的都可以被消费了需要将availableBuffer上对应sequence下标的值设置为第几次用到这个槽
* @param sequence
*/
private void setAvailable(final long sequence) {
setAvailableBufferValue(calculateIndex(sequence), calculateAvailabilityFlag(sequence));
}
/**
* 某个sequence右移indexShift,代表这个Sequence是第几次用到这个ringBuffer的某个槽,也就是这个sequence转了多少圈
* @param sequence
* @return
*/
private int calculateAvailabilityFlag(final long sequence) {
return (int) (sequence >>> indexShift);
}
/**
* 定位ringBuffer上某个槽用于生产event,对2^n取模 = 对2^n -1
* @param sequence
* @return
*/
private int calculateIndex(final long sequence) {
return ((int) sequence) & indexMask;
}
下面我们还是举一个简单实例: 假设我们有两个生产线程同时调用一个MultiProducerSequencer来生产Event,RingBuffer的大小为4,对应的消费者辅助类SequenceBarrier,这里不画消费者,假设有不断通过SequenceBarrier消费的消费者。SingleProducerSequencer的gatingSequences数组内保存这一个指向某个Sequence的引用,同时这个Sequence也会被SequenceBarrier更新以表示消费者消费到哪里了。
假设Thread1和Thread2同时分别要生产2个和3个Event,他们同时分别调用next(2)和next(3)来抢占槽。假设同时运行到了(如果空间足够,尝试CAS更新cursor,更新cursor成功代表成功获取n个槽,退出死循环)这里,他们尝试CAS更新cursor,分别是:cursor.compareAndSet(-1, 1)还有cursor.compareAndSet(-1, 2). 假设Thread1成功,那么接下来,Thread2由于更新失败而且空间不足,一致唤醒消费者消费,并让出CPU。Thread1,生产2个Event,并发布时,将availableBuffer中的下标为(0&3=)0,(1&3=)1的值设置为(0>>>2=)0,(1>>>2)0:
这时,消费者调用isAvailable(0)检查0这个sequence是否可以被消费,isAvailable检查(0&3=)0下标的availableBuffer值为0,等于(0>>>2=),所以,可以消费。
至此,多生产者也简述完毕