7和8 的结构还是有些不一样;7里面是Segment、entry数组实现的,将entry数组分段加锁,而8里是对数组元素加锁,并发上增加了一个counterCells的数组记录并发时增加的值,然后通过cas方式进行操作,性能比起整段锁住的JDK 7 好了很多。
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
// 链表长度统计
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
// 初始化数组
tab = initTable();
// 这个判断是利用unsafe的cas操作,将新值加入到组中还没有值的地方
// tabAt 方法就是取出tab数组中i个位置的值,它底层使用的是unsafe
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// castabAt和tabAt相反,它是在第i个位置设置值,
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
// cas成功就退出,失败,接着循环,下一次不一定能到进入这里
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED)
// 辅助数据迁移
tab = helpTransfer(tab, f);
else {
V oldVal = null;
synchronized (f) {
// 对数组中的要插入的位置元素加锁后,再次判断是否改变
if (tabAt(tab, i) == f) {
// 红黑树的hash = -2 ,所以大于0的是链表
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
// hash相等且
if (e.hash == hash &&
// key相等
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
// 替换值
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
// 退出完成插入
break;
}
// 当前节点对象给pred
Node<K,V> pred = e;
// 下一个节点给e,遍历知道尾结点
if ((e = e.next) == null) {
// 将新节点插入到最后
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
// 判断f是否是一颗红黑树
// 其实在红黑树外面有一层壳TreeBin,使得树成为一个整体,然后加锁,
// 不然没有这层壳,树调整后,根节点可能就不是根节点了,而且也不不能保证插入时没问题的
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
// 将新值加入到红黑树
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
// 当链表长度大于指定值,就树化
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
// 修改计数
addCount(1L, binCount);
return null;
}
final TreeNode<K,V> putTreeVal(int h, K k, V v) {
Class<?> kc = null;
boolean searched = false;
// 遍历
for (TreeNode<K,V> p = root;;) {
int dir, ph; K pk;
if (p == null) {
// 根节点是空的就new一个
first = root = new TreeNode<K,V>(h, k, v, null, null);
break;
}
// 判断插左还是右
else if ((ph = p.hash) > h)
dir = -1;
else if (ph < h)
dir = 1;
// 找到hash值相同的,可能值也相同
else if ((pk = p.key) == k || (pk != null && k.equals(pk)))
// key相同
return p;
// hash值相同,且地址不同或者值又或者equals不等
else if ((kc == null &&
// 获取k的类型,赋值kc
(kc = comparableClassFor(k)) == null) ||
// 只有kc != null才会进入(k实现comparable)
// 判断kc的类型不相等才会返回0,不然返回k和pk的比较结果
(dir = compareComparables(kc, k, pk)) == 0) {
// 进入这里,表示,k实现comparable,或者k和pk相等
if (!searched) {
TreeNode<K,V> q, ch;
// 设置找到的标志
searched = true;
// 找到旧节点 并返回
if (((ch = p.left) != null &&
(q = ch.findTreeNode(h, k, kc)) != null) ||
((ch = p.right) != null &&
(q = ch.findTreeNode(h, k, kc)) != null))
return q;
}
// 计算dir
dir = tieBreakOrder(k, pk);
}
TreeNode<K,V> xp = p;
// 插入节点
if ((p = (dir <= 0) ? p.left : p.right) == null) {
TreeNode<K,V> x, f = first;
first = x = new TreeNode<K,V>(h, k, v, f, xp);
if (f != null)
f.prev = x;
if (dir <= 0)
xp.left = x;
else
xp.right = x;
if (!xp.red)
x.red = true;
else {
lockRoot();
try {
root = balanceInsertion(root, x);
} finally {
unlockRoot();
}
}
break;
}
}
assert checkInvariants(root);
return null;
}
初始化数组
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
// sizeCt 《 0 表示资源被占用
if ((sc = sizeCtl) < 0)
// 让出线程资源,但不代表不会竞争,也有可能会再次进入
Thread.yield(); // lost initialization race; just spin
// 类似加锁,就将seizeCtrl 改为 -1 ,成功的话才进入
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
// 再次判断这个数组是否是空的
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
// 右移2位(除2的2次)也就是四分之一
// 那么sc = 3/4 = 0.75
sc = n - (n >>> 2);
}
} finally {
// 最后赋值给sizeCtl
sizeCtl = sc;
}
break;
}
}
return tab;
}
jdk8中没有size属性,它有baseCount记录map的大小
它会有一个数组countCell[] 用来记录并发时所记录的增加数量,什么意思呢?就是在并发时,同时修改baseCount,要么成功、要么等待(阻塞),它呢用countCell[] 数组来记录线程在修改baseCount失败时所加的数值,当线程A在修改baseCount,线程B来了,它发现baseCount它改不了,不行,不能等,后面还有长队呢,就自己加个值放到countCell数组里,之后在做统计。
ThreadLocalRandom.getProbe() 每个线程生成一个随机数
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
// 当counterCells 不为空,说明有线程进行了+1
if ((as = counterCells) != null ||
// 或者 修改baseCount,失败才会进入
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
// counterCell数组为空
if (as == null || (m = as.length - 1) < 0 ||
// counterCell数组上指定位置为空
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
// cas修改counterCell指定位置变量失败
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
// x = 1, uncontended = false
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
// 这里使用while,可能出现扩容完,又需要扩容的情况
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
if (sc < 0) {
// sc < 0 表示有其他线程在扩容
// 判断扩容是否结束或者线程数已达最大值
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
// 辅助扩容,sc + 1
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
// rs << RESIZE_STAMP_SHIFT 得出的是一个负数,再 +2 还是一个负数,
// 这个值使一个初始标志,
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}
他就是改baseCount,不行就改counterCells
private final void fullAddCount(long x, boolean wasUncontended) {
int h;
// 判断线程生成的随机数是否等于0,但它不会随着你调用而改变
if ((h = ThreadLocalRandom.getProbe()) == 0) {
// 生成随机数
ThreadLocalRandom.localInit(); // force initialization
h = ThreadLocalRandom.getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
for (;;) {
CounterCell[] as; CounterCell a; int n; long v;
if ((as = counterCells) != null && (n = as.length) > 0) {
// 当counterCells 有值,长度大于0
// 取索引位置的counterCell
if ((a = as[(n - 1) & h]) == null) {
// cellsBusy表示没有counterCells数组有没有被使用,
// cellsBusy = 0 表示,没有线程在使用counterCells数组
if (cellsBusy == 0) { // Try to attach new Cell
// 没有线程在使用,就new一个新的
CounterCell r = new CounterCell(x); // Optimistic create
// 再次判断,是否有人在用,然后再修改cellBusy为1(使用中)
if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean created = false;
try { // Recheck under lock
CounterCell[] rs; int m, j;
// 改完cellsBusy后,再判断counterCells索引位置是不是null,
// 如果索引位置是null,就修改值,不是null,就表示有其他线程修改了
// 就不会进入这个if里面去了
if ((rs = counterCells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
// 如果修改成功后就退出方法
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
// 当counterCells索引位置有值不为空,
// 如果wasUncontended是false,就改成true,在左后刷新随机数
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
// 修改counterCells索引位置的值,成功就退出
else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
break;
// counterCels被其他线程修改了,或是长度大于等于CPU核心数
// 这个也是去限制counterCells扩容的
else if (counterCells != as || n >= NCPU)
collide = false; // At max size or stale
else if (!collide)
// 这个collide是控制下面的if分支是否执行
collide = true;
else if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
// 当counterCells != null ,counterCells索引位置不为空,wasUncontended = true,collide = true,cellsBusy = 0,并且修改cellsBusy成功进入
// 要进入这里,最少要两次循环,
try {
if (counterCells == as) {// Expand table unless stale
// 给countCell扩容
CounterCell[] rs = new CounterCell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
counterCells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
// 生成新的随机数
h = ThreadLocalRandom.advanceProbe(h);
}
// 当counterCells = null进入这个if
else if (cellsBusy == 0 && counterCells == as &&
// 习惯cellsBusy为1
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
// cas成功,
boolean init = false;
try { // Initialize table
// 查看counterCells有没有改变,及查看是否有其他线程已经初始化了
if (counterCells == as) {
// 当counterCells没有改变,就初始化它
CounterCell[] rs = new CounterCell[2];
rs[h & 1] = new CounterCell(x);
counterCells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
// 初始化成功后就退出循环,这个退出循环就是退出了这个方法了。
break;
}
// 上面情况都不满足
// cas操作对BaseCount + 1,成功退出,不成功接着循环
else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
break; // Fall back on using base
}
}
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
//
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
// 默认16,表示该线程转移元素时的长度
stride = MIN_TRANSFER_STRIDE; // subdivide range
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
// 长度扩容为2倍
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
// 新数组大小
int nextn = nextTab.length;
// 数组对应位置的节点转移后,会放置一个fwd(可以理解为占位),这个的作用是,在另一个线程在put时发现,这个fwd对象,
// 就会知道数组正在转移,那么就会启动辅助转移元素的方法,在转移完后才能进行put
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
// advance表示是否需要向前获取需要转移的元素
boolean advance = true;
// true表示当前线程的扩容任务完成
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
// 关于这个while的理解:
// 假设当stride(该线程转移元素的长度)= 2,那么它得出的i = 3,1,最后只会去转移索引为1,3的,
// 当stride = 3 ,那么得出的i = 3,2,1,最后转移的就会使索引为3,2,1的
// 所以这个while就相当于获取索引i的
// 假设有两个线程在走,TransferIndex值就会被改变,走到第三个if会有一个线程失败,
// 但它还是会继续循环获取到一个i
while (advance) {
int nextIndex, nextBound;
// 一开始这个条件就不符合
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
// 尝试获取下一个要迁移的元素长度
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
// 下一个索引
i = nextIndex - 1;
advance = false;
}
}
//这里的nextn是新数组大小
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
// 如果已经结束转移,就将table指向新数组
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
// 该线程做完后,sc - 1
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
// 它这个判断和这个一样:sc != resizeStamp(n) << RESIZE_STAMP_SHIFT + 2
// 在进入第一个线程进入transfer之前,会修改sizeCtl = rs << RESIZE_STAMP_SHIFT) + 2 = resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2
// sc 是否等于进入transfer之前设定的初始值,相等表示没有其他线程在操作了,
// 因为每有一个线程操作,都会sc + 1
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
// 如果第i个位置是null,就替换为fwd
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
// 转移完后,进入下一个区域的转移
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
synchronized (f) {
// 对要转移的索引位置对象枷锁后
// 下面的转移逻辑和hashMap一样
if (tabAt(tab, i) == f) {
// 省略
}
}
}
}
}
这个方法和addCount里的类似
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
int rs = resizeStamp(tab.length);
// 这里使用while,可能出现扩容完,又需要扩容的情况
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
// sc < 0 表示有其他线程在扩容
// 判断扩容是否结束或者线程数已达最大值
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
// 辅助扩容,sc + 1
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}