JCTools是服务虚拟机并发开发的工具,提供一些JDK没有的并发数据结构辅助开发。
聚合四种 SPSC/MPSC/SPMC/MPMC 数据变量的并发队列:
这里简要根据https://gitee.com/eric_ds/baseutil中的MPSCArrayQueue版本源码进行分析来理解这个高性能的的无锁队列的实现原理。
package com.jfireframework.baseutil.concurrent;
import java.lang.reflect.Array;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Queue;
import com.jfireframework.baseutil.reflect.UNSAFE;
abstract class PadFor64Bit
{
// 64长度的缓存行,要进行填充,需要8个byte。
long p1, p2, p3, p4, p5, p6, p7;
public static long noHuop(PadFor64Bit instance)
{
return instance.p1 + instance.p2 + instance.p3 + instance.p4 + instance.p5 + instance.p6 + instance.p7;
}
}
abstract class ProducerIndex extends PadFor64Bit
{
volatile long producerIndex;
private static final long OFFSET = UNSAFE.getFieldOffset("producerIndex", ProducerIndex.class);
boolean casProducerIndex(long index)
{
//对producerIndex加1,注意一点,和disruptor不同的是,producerIndex的值是一直增加的
return UNSAFE.compareAndSwapLong(this, OFFSET, index, index + 1);
}
}
abstract class Pad2 extends ProducerIndex
{
public long p1, p2, p3, p4, p5, p6, p7;
public static long noHuop(Pad2 instance)
{
return instance.p1 + instance.p2 + instance.p3 + instance.p4 + instance.p5 + instance.p6 + instance.p7;
}
}
abstract class ComsumerIndex extends Pad2
{
volatile long consumerIndex;
private static final long OFFSET = UNSAFE.getFieldOffset("consumerIndex", ComsumerIndex.class);
void orderedSetComsumerIndex(long index)
{
UNSAFE.putOrderedLong(this, OFFSET, index);
}
}
abstract class Pad3 extends ComsumerIndex
{
long p1, p2, p3, p4, p5, p6, p7;//缓存行填充
public static long noHuop(Pad3 instance)
{
return instance.p1 + instance.p2 + instance.p3 + instance.p4 + instance.p5 + instance.p6 + instance.p7;
}
}
abstract class ProducerIndexLimit extends Pad3
{
volatile long producerIndexLimit = 0;
private static final long OFFSET = UNSAFE.getFieldOffset("producerIndexLimit", ProducerIndexLimit.class);
void orderedSetProducerIndexLimit(long limit)
{
UNSAFE.putOrderedLong(this, OFFSET, limit);
}
}
abstract class Pad4 extends ProducerIndexLimit
{
long p1, p2, p3, p4, p5, p6, p7;
static final int availableBufferOffset = UNSAFE.arrayBaseOffset(new int[0].getClass());
static final int bufferOffset = UNSAFE.arrayBaseOffset(Object[].class);
static final int availableBufferScaleShift;
static final int bufferScaleShift;
static
{
int availableBufferScale = UNSAFE.arrayIndexScale(new int[0].getClass());
if (availableBufferScale == 4)
{
availableBufferScaleShift = 2;
}
else if (availableBufferScale == 8)
{
availableBufferScaleShift = 3;
}
else
{
throw new IllegalArgumentException();
}
int bufferScale = UNSAFE.arrayIndexScale(Object[].class);
if (bufferScale == 4)
{
bufferScaleShift = 2;
}
else if (bufferScale == 8)
{
bufferScaleShift = 3;
}
else
{
throw new IllegalArgumentException();
}
}
public static long noHuop(Pad4 instance)
{
return instance.p1 + instance.p2 + instance.p3 + instance.p4 + instance.p5 + instance.p6 + instance.p7;
}
}
public class MPSCArrayQueue<E> extends Pad4 implements Queue<E>
{
public MPSCArrayQueue(int capacity)
{
int size = 1;
int indexShift = 0;
//size的取值为2次方的值,目的是为了下面的index&mask的取值结果是数组索引下标,最终为大于或等于capacity的2次方幂
while (size < capacity && size > 0)
{
size <<= 1;
//2次幂的数量 2 2^2 2^3 以2为底,等比为2的等比数列的数量
indexShift++;
}
if (size > 0)
{
this.indexShift = indexShift;
mask = size - 1;
//存放数据的buffer
buffer = new Object[size];
//可用的buffer
availableBuffers = new int[size];
//给数组中的每个元素赋一个初始值为-1
Arrays.fill(availableBuffers, -1);
}
else
{
throw new IllegalArgumentException("capacity 无法计算得到其最小的2次方幂");
}
}
protected final Object[] buffer;
protected final int mask;
protected final int[] availableBuffers;
protected final int indexShift;
boolean isAvailable(long address, int flag, int[] availableBuffers)
{
return UNSAFE.getIntVolatile(availableBuffers, address) == flag;
}
void setAvailable(long index)
{
//mask = size -1
//index < size -1 的
// size - 1为2的n次方幂
// n 其实就是indexShift
//>>表示右移,如果该数为正,则高位补0,若为负数,则高位补1;
//>>>表示无符号右移,也叫逻辑右移,即若该数为正,则高位补0,而若该数为负数,则右移后高位同样补0
//那么index >>> indexShift表示将index向右移indexShift位
//看一下极端情况当size - 1 = 2 ^ 3时,indexShift的值为3,这时如果index=size-1即8二进制为1000时向右移3位时值为1 所有小于8的值进行这种移位得到的结果都是0
//这也就说明,当flag为1时表示已经满了
int flag = (int) (index >>> indexShift);
long address = ((index & mask) << availableBufferScaleShift) + availableBufferOffset;
UNSAFE.putOrderedInt(availableBuffers, address, flag);
}
/**
* 获取下一个可以使用的生产者下标
*
* @return
*/
long nextProducerIndex()
{
long pIndex = producerIndex;
long pLimit = producerIndexLimit;
if (pIndex < pLimit)
{
if (casProducerIndex(pIndex))
{
return pIndex;
}
}
do
{
pIndex = producerIndex;
if (pIndex < pLimit)
{
if (casProducerIndex(pIndex))
{
return pIndex;
}
}
else
{
//mask的值是size-1
//consumerIndex表示消费者的索引位置
//进入这个判断分支就代表pIndex>=pLimit
//consumerIndex的值为0时,pLimit的值为size(因为mask的值为size-1)
pLimit = producerIndexLimit = consumerIndex + mask + 1;
if (pIndex >= producerIndexLimit)
{
// 队列已满
return -1;
}
else
{
if (casProducerIndex(pIndex))
{
return pIndex;
}
}
}
} while (true);
}
Object get(long index)
{
long address = ((index & mask) << bufferScaleShift) + bufferOffset;
return UNSAFE.getObject(buffer, address);
}
void set(Object value, long index)
{
//计算数组下标
//index&mask是计算数组的下标
//bufferOffset=UNSAFE.arrayBaseOffset(Object[].class) 返回Object数组基地址
// int bufferScale = UNSAFE.arrayIndexScale(Object[].class);
// if (bufferScale == 4)
// {
// bufferScaleShift = 2;
// }
// else if (bufferScale == 8)
// {
// bufferScaleShift = 3;
// }
//bufferScale代表的是数组中每个元素的占用的大小
//<< 位移运算符表示左移,左移2位相当于乘4
//当bufferScale大小为8时表示一个元素占用的大小为8,那么第一个位置是(0 <<3)+bufferOffset结果为bufferOffset;
//第二个元素1<<3 + bufferOffset,相当于1*8 + bufferOffset
//第三个元素2<<3 + bufferOffset相当于2*8 + bufferOffset
//关于index & mask 当mask为2^2 - 1时,index从一到10对mask取&的值为
// 0
// 1
// 2
// 3
// 0
// 1
// 2
// 3
// 0
// 1
//可见,虽然producerIndex的值和consumerIndex的值是一直增加的,但是对buffer的索引address是没有影响的
//唯一有约束的地方是nextProducerIndex方法中当pIndex >= producerIndexLimit时会停止生产,也就是说消费赶不上生产时会有停止生产的情况出现
long address = ((index & mask) << bufferScaleShift) + bufferOffset;
UNSAFE.putObject(buffer, address, value);
}
Object getAndSetNull(long index)
{
long address = ((index & mask) << bufferScaleShift) + bufferOffset;
Object result = UNSAFE.getObject(buffer, address);
UNSAFE.putObject(buffer, address, null);
return result;
}
void waitUnitlAvailable(long index)
{
int flag = (int) (index >>> indexShift);
long address = ((index & mask) << availableBufferScaleShift) + availableBufferOffset;
int[] availableBuffers = this.availableBuffers;
if (isAvailable(address, flag, availableBuffers) == false)
{
while (isAvailable(address, flag, availableBuffers) == false)
{
Thread.yield();
}
}
}
@Override
public int size()
{
long consumerIndex = this.consumerIndex;
long producerIndex = this.producerIndex;
return (int) (producerIndex - consumerIndex);
}
.............
@Override
public void clear()
{
long pIndex = producerIndex;
long cIndex = this.consumerIndex;
if (pIndex == cIndex)
{
return;
}
for (long index = cIndex; index < pIndex; index++)
{
waitUnitlAvailable(index);
set(null, index);
}
this.consumerIndex = pIndex;
}
@Override
public boolean add(E e)
{
return offer(e);
}
@Override
public boolean offer(E e)
{
long index = nextProducerIndex();
if (index == -1)
{
return false;
}
set(e, index);
setAvailable(index);
return true;
}
@Override
public E remove()
{
return poll();
}
@SuppressWarnings("unchecked")
@Override
public E poll()
{
//以与生产者相同的方式来获取flag,如果消费者与生产者用相同的算法得到的flag相同的话,则表示生产者已经生产到这个地方,可以进行消费。
long cIndex = this.consumerIndex;
int flag = (int) (cIndex >>> indexShift);
long address = ((cIndex & mask) << availableBufferScaleShift) + availableBufferOffset;
final int[] availableBuffers = this.availableBuffers;
if (isAvailable(address, flag, availableBuffers) == false)
{
if (cIndex == producerIndex)
{
return null;
}
while (isAvailable(address, flag, availableBuffers) == false)
{
// assert cIndex < consumerLimit;
Thread.yield();
}
}
//消费之后,将对应位置上的值置为null
E e = (E) getAndSetNull(cIndex);
//移动消费者的指针位置,这种方式在当consumerIndex很大时将获取不到数据
//注意一点,和disruptor不同的是,consumerIndex的值是一直增加的
orderedSetComsumerIndex(cIndex + 1);
return e;
}
@Override
public E element()
{
return peek();
}
@SuppressWarnings("unchecked")
@Override
public E peek()
{
long pIndex = producerIndex;
long consumerIndex = this.consumerIndex;
if (pIndex == consumerIndex)
{
return null;
}
//这里的原理与上面一样
int flag = (int) (consumerIndex >>> indexShift);
long address = ((consumerIndex & mask) << availableBufferScaleShift) + availableBufferOffset;
int[] availableBuffers = this.availableBuffers;
if (isAvailable(address, flag, availableBuffers) == false)
{
while (isAvailable(address, flag, availableBuffers) == false)
{
Thread.yield();
}
}
E e = (E) get(consumerIndex);
return e;
}
}
这里主要的核心有以下几点: