(1)LongAddr的结构是怎样的? (2)当前线程应该访问Cell数组里面的哪一个Cell元素? (3)如何初始化Cell数组? (4)Cell数组如何扩容? (5)线程访问分配的Cell元素有冲突后如何处理? (6)如何保证线程操作被分配的Cell元素的原子性?
由LongAddr的类图可知:LongAddr继承自Striped64类,在Striped64内部维护了三个变量。 LongAddr的真实值=base的值+Cell数组所有Cell元素的value值。 base是个基础值,默认为0,cellsBusy用来实现自旋锁,状态只要0和1。
当创建Cell元素,扩容Cell数组或者是初始化Cell数组时,使用CAS操作该变量来保证同时只有一个线程可以进行其中之一的操作。
下面看下Cell的构造
/**
* Padded variant of AtomicLong supporting only raw accesses plus CAS.
*
* JVM intrinsics note: It would be possible to use a release-only
* form of CAS here, if it were provided.
*/
@sun.misc.Contended static final class Cell {
volatile long value;
Cell(long x) { value = x; }
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long valueOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> ak = Cell.class;
valueOffset = UNSAFE.objectFieldOffset
(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}
可以看到,Cell的构造很简单,其内部维护一个被声明为Volatile的变量,保证了内存的可见性。
另外使用CAS算法操作,保证了当前线程更新时被分配的Cell元素中的value值的原子性。
另外,Cell类使用 @sun.misc.Contended修饰是为了避免伪共享。
伪共享解释:数组的元素是Cell
类,可以看到Cell类用Contended注解修饰,这里主要是解决false sharing(伪共享的问题),不过个人认为伪共享翻译的不是很好,或者应该是错误的共享,比如两个volatile变量被分配到了同一个缓存行,但是这两个的更新在高并发下会竞争,比如线程A去更新变量a,线程B去更新变量b,但是这两个变量被分配到了同一个缓存行,因此会造成每个线程都去争抢缓存行的所有权,例如A获取了所有权然后执行更新这时由于volatile的语义会造成其刷新到主存,但是由于变量b也被缓存到同一个缓存行,因此就会造成cache miss,这样就会造成极大的性能损失,因此有一些类库的作者,例如JUC下面的、Disruptor等都利用了插入dummy 变量的方式,使得缓存行被其独占,比如下面这种代码:
static final class Cell {
volatile long p0, p1, p2, p3, p4, p5, p6;
volatile long value;
volatile long q0, q1, q2, q3, q4, q5, q6;
Cell(long x) { value = x; }
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long valueOffset;
static {
try {
UNSAFE = getUnsafe();
Class<?> ak = Cell.class;
valueOffset = UNSAFE.objectFieldOffset
(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}
但是这种方式毕竟不通用,例如32、64位操作系统的缓存行大小不一样,因此JAVA8中就增加了一个注@sun.misc.Contended
解用于解决这个问题,由JVM去插入这些变量,具体可以参考openjdk.java.net/jeps/142 ,但是通常来说对象是不规则的分配到内存中的,但是数组由于是连续的内存,因此可能会共享缓存行,因此这里加一个Contended注解以防cells数组发生伪共享的情况。
/**
* Adds the given value.
*
* @param x the value to add
*/
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
if ((as = cells) != null || !casBase(b = base, b + x)) { //——(1)
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 || //——(2)
(a = as[getProbe() & m]) == null || //——(3)
!(uncontended = a.cas(v = a.value, v + x))) //——(4)
longAccumulate(x, null, uncontended); //——(5)
}
}
代码(1):判断Cell是否为null,如果为null,则当前在基础变量base上进行累加,类似AtomicLong的操作。 代码(2):如果Cells不为null或者线程执行代码(1)的CAS操作失败了,则会去执行代码(2). 代码(2)(3):决定了当前线程应该访问cells数组里面的哪一个元素。 代码(4):如果当前线程映射的元素存在则执行代码(4),使用CAS操作去更新分配的Cell元素的value值。 代码(5):如果当前线程映射的元素不存在或者存在但是CAS操作失败则执行代码(5)。 总结:当前线程应该访问那个Cell元素是通过getProbe() & m进行计算的。
/**
* Handles cases of updates involving initialization, resizing,
* creating new Cells, and/or contention. See above for
* explanation. This method suffers the usual non-modularity
* problems of optimistic retry code, relying on rechecked sets of
* reads.
*
* @param x the value
* @param fn the update function, or null for add (this convention
* avoids the need for an extra field or function in LongAdder).
* @param wasUncontended false if CAS failed before call
*/
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
....省略无关代码....
//*******************************初始化Cell数组******************************************//
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // Initialize table
if (cells == as) {
//——(1)
Cell[] rs = new Cell[2];
//——(2)
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
//——(3)
cellsBusy = 0;
}
if (init)
break;
}
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}
代码(1):初始化Cells数组元素个数为2, 代码(2):然后使用 h&1 计算当前线程应该访问cell数组的那一个位置。 代码(3):重置了cellsBusy标记。这里没用使用CAS算法,因为cellsBusy是volatile类型的,保证了变量内存的可见性。
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
for (;;) {
......
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
else if (n >= NCPU || cells != as) //——(1)
collide = false; // At max size or stale
else if (!collide) //——(2)
collide = true;
else if (cellsBusy == 0 && casCellsBusy()) { //——(3)
try {
if (cells == as) { // Expand table unless stale
Cell[] rs = new Cell[n << 1]; //——(4)
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0; //——(5)
}
collide = false; //——(6)
continue; // Retry with expanded table
}
h = advanceProbe(h); //——(7)
}
.....
}
代码(1)(2):当前cells元素个数小于当前机器的CPU个数并且当前多个线程访问cells中的同一个元素,从而导致冲突使其中一个线程CAS失败时才会进行代码(3)的扩容操作。 代码(3)中的扩容操作也是通过CAS设置cellsBusy为1,然后才能进行扩容。 解释下cellsBusy:他是个标示,为0说明cells数组没用被初始化或者扩容,也没有在新建Cell元素,为1则说明cells数组在被初始化或者扩容,或者当前在创建新的Cell元素。 代码(4):假设CAS成功,则执行代码(4),将容量扩充为之前的两倍,并复制Cell元素到新扩容的数组。
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
.......
boolean collide = false; // True if last slot nonempty
for (;;) {
Cell[] as; Cell a; int n; long v;
if ((as = cells) != null && (n = as.length) > 0) { //——(1)
if ((a = as[(n - 1) & h]) == null) { //——(2)
if (cellsBusy == 0) { // Try to attach new Cell
Cell r = new Cell(x); // Optimistically create
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
.....
h = advanceProbe(h); //——(3)
}
.....
}
代码(3)实现
/**
* Pseudo-randomly advances and records the given probe value for the
* given thread.
* Duplicated from ThreadLocalRandom because of packaging restrictions.
*/
static final int advanceProbe(int probe) {
probe ^= probe << 13; // xorshift
probe ^= probe >>> 17;
probe ^= probe << 5;
UNSAFE.putInt(Thread.currentThread(), PROBE, probe);
return probe;
}
代码(1)(2):当前线程调用add方法并根据当前线程的随机数threadLocalRandomProbe和cells元素个数计算要访问的Cell元素下标,然后如果发现对应下标的值为null,则新增一个Cell元素到cells数组,并且将其添加到cells数组之前要竞争设置cellsBusy为1. 代码(3):对CAS失败的线程重新计算当前线程的随机值threadLocalReadomProbe,以减少下次访问cells元素时的冲突机会。
java.util.concurrency.atomic.LongAdder
是Java8新增的一个类,提供了原子累计值的方法。根据文档的描述其性能要优于AtomicLong
,下图是一个简单的测试对比(平台:MBP):
这里测试时基于JDK1.8进行的,AtomicLong 从Java8开始针对x86平台进行了优化,使用XADD替换了CAS操作,我们知道JUC下面提供的原子类都是基于Unsafe类实现的,并由Unsafe来提供CAS的能力。CAS (compare-and-swap)本质上是由现代CPU在硬件级实现的原子指令,允许进行无阻塞,多线程的数据操作同时兼顾了安全性以及效率。大部分情况下,CAS都能够提供不错的性能,但是在高竞争的情况下开销可能会成倍增长
本章分析了JDK8新增的LongAddr原子性操作类,该类通过cells数组分担了高并发下多线程同时对一个原子变量进行更新时的竞争量,让多个线程可以同时对cells数组里面的元素进行并行的更新操作。 另外,数组元素Cell使用@sun.misc.Contended注解进行修饰,这避免了cells数组内多个原子变量被放入同一个缓存行,也就避免了伪共享,提高了性能。 总的来说,LongAdder从性能上来说要远远好于AtomicLong,一般情况下是可以直接替代AtomicLong使用的,Netty也通过一个接口封装了这两个类,在Java8下直接采用LongAdder,但是AtomicLong的一系列方法不仅仅可以自增,还可以获取更新后的值,如果是例如获取一个全局唯一的ID还是采用AtomicLong会方便一点。