ConcurrentHashMap是线程安全的集合实现。在高并发下,ConcurrentHashMap能保证内部的每个方法都是原子性的!(可以保证单个方法是原子性的,但多个原子操作结合到一起使用并不能保证原子性!!)
它在HashMap的基础上,提供了更好的并发性能。通常被当作是HashMap在高并发情况下的替代集合。今天,我就带领大家一起阅读源码,看看它的存储结构和实现原理是怎么样的。
注:本片文章所涉及的源码版本来自 JDK1.8
Java 8 几乎完全重写了 ConcurrentHashMap,代码量从原来 Java 7 中的 1000 多行,变成了现在的 6000 多行
ConcurrentHashMap 取消了 Segment 分段锁,采用 Node 数组 + CAS + synchronized 来保证并发安全。数据结构跟 HashMap 1.8 的结构类似,数组+链表/红黑二叉树。Java 8 的实现中当链表长度超过一定阈值(8)并且数组长度大于等于 64 时将链表(寻址时间复杂度为 O(N))转换为红黑树(寻址时间复杂度为 O(log(N)))

Java 8 中,锁粒度更细,synchronized 只锁定当前链表或红黑树的首节点,这样只要 Hash 不冲突,就不会产生并发,就不会影响其他 Node 的读写,效率大幅提升
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
Node(int hash, K key, V val, Node<K,V> next) {
this.hash = hash;
this.key = key;
this.val = val;
this.next = next;
}
public final K getKey() { return key; }
public final V getValue() { return val; }
public final int hashCode() { return key.hashCode() ^ val.hashCode(); }
public final String toString(){ return key + "=" + val; }
public final V setValue(V value) {
throw new UnsupportedOperationException();
}
public final boolean equals(Object o) {
Object k, v, u; Map.Entry<?,?> e;
return ((o instanceof Map.Entry) &&
(k = (e = (Map.Entry<?,?>)o).getKey()) != null &&
(v = e.getValue()) != null &&
(k == key || k.equals(key)) &&
(v == (u = val) || v.equals(u)));
}
/**
* Virtualized support for map.get(); overridden in subclasses.
*/
Node<K,V> find(int h, Object k) {
Node<K,V> e = this;
if (k != null) {
do {
K ek;
if (e.hash == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
} while ((e = e.next) != null);
}
return null;
}
}static final class TreeNode<K,V> extends Node<K,V> {
TreeNode<K,V> parent; // 父节点
TreeNode<K,V> left;
TreeNode<K,V> right;
TreeNode<K,V> prev; // 前驱节点
boolean red; // 维护颜色 红/黑
TreeNode(int hash, K key, V val, Node<K,V> next,
TreeNode<K,V> parent) {
super(hash, key, val, next);
this.parent = parent;
}
Node<K,V> find(int h, Object k) {
return findTreeNode(h, k, null);
}
/**
* Returns the TreeNode (or null if not found) for the given key
* starting at given root.
*/
final TreeNode<K,V> findTreeNode(int h, Object k, Class<?> kc) {
if (k != null) {
TreeNode<K,V> p = this;
do {
int ph, dir; K pk; TreeNode<K,V> q;
TreeNode<K,V> pl = p.left, pr = p.right;
if ((ph = p.hash) > h)
p = pl;
else if (ph < h)
p = pr;
else if ((pk = p.key) == k || (pk != null && k.equals(pk)))
return p;
else if (pl == null)
p = pr;
else if (pr == null)
p = pl;
else if ((kc != null ||
(kc = comparableClassFor(k)) != null) &&
(dir = compareComparables(kc, k, pk)) != 0)
p = (dir < 0) ? pl : pr;
else if ((q = pr.findTreeNode(h, k, kc)) != null)
return q;
else
p = pl;
} while (p != null);
}
return null;
}
}// 表数组,懒惰初始化。第一次插入时初始化,大小总是为 2^n
transient volatile Node<K,V>[] table;
// 默认初始化容量,默认为 16
private static final int DEFAULT_CAPACITY = 16;
// 控制表初始化和扩容操作,不同的值有不同的含义
// -1 表示正在初始化集合,其他线程需要自旋等待
// -(1 + nThreads) 表示有 nThreads 个线程正在进行扩容操作
// 正数表示下一次需要扩容的阈值
private transient volatile int sizeCtl; // 被 volatile 修饰保证了可见性当我们调用无参构造时
ConcurrentHashMap map = new ConcurrentHashMap();默认会创建大小为 16 的空集合
当我们调用构造方法并手动设置集合大小时
ConcurrentHashMap map = new ConcurrentHashMap(12);源码如下
/**
* 创建一个新的空集合,其初始大小可容纳指定数量的元素,而无需动态调整大小
*
* @param initialCapacity 集合内部应适应这么多的元素
* @throws IllegalArgumentException 如果集合的初始容量为负数
*/
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0) {
throw new IllegalArgumentException();
}
/*
MAXIMUM_CAPACITY: 最大容量
MAXIMUM_CAPACITY >>> 1:最大容量的一半
*/
// 如果设置的初始值 >= 最大容量的一半,则将初始容量设置为最大容量,否则调用 tableSizeFor() 计算
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
// initialCapacity >>> 1:将 initialCapacity 整除 2
// 假设传的初始值为12,则传入 tableSizeFor 方法中的值为 12 + 6 + 1 = 19
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
// 将最终的初始容量赋值给 sizeCtl
this.sizeCtl = cap;
}/**
* 内部根据给定容量转换成 2^n 值
*/
private static final int tableSizeFor(int c) { // initialCapacity=12时, c=19
int n = c - 1;
n |= n >>> 1;
n |= n >>> 2;
n |= n >>> 4;
n |= n >>> 8;
n |= n >>> 16;
// 如果结果小于 0,则返回 1;如果大于等于最大容量,则返回最大容量;否则返回结果加 1
return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}/**
* 将指定键映射到此表中的指定值。键和值都不能为 null
*
*
* @param key 与指定值关联的键
* @param value 与指定键关联的值
* @return 与键关联的前一个值,如果没有键的映射,则为 null
* @throws NullPointerException 如果指定的键或值为 null
*/
public V put(K key, V value) {
// 我们调用 put 方法时,其内部调用了 putVal() 方法
return putVal(key, value, false);
}
/**
* onlyIfAbsent 参数用于控制在调用 putVal 方法时的行为:
* 1.如果 onlyIfAbsent 为 true,则只有当键 key 在映射中不存在时才会插入新的键值对。
* 如果键已经存在,则不会更新其对应的值。
* <p>
* 2.如果 onlyIfAbsent 为 false,则无论键是否存在都会插入或更新键值对。
* 如果键已经存在,则会替换其对应的旧值。
*/
final V putVal(K key, V value, boolean onlyIfAbsent) {
// key 和 value 不能为空
if (key == null || value == null) throw new NullPointerException();
// spread 方法能确保返回结果是正数,真正用到的 hash 码
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0) {
// 如果 table == null,则初始化数组表(体现懒惰初始化) 自旋+CAS
tab = initTable();
}
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 桶存在但还没有元素,CAS 放入,不加锁,成功了就直接 break 跳出
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
break;
}
// 帮忙扩容
else if ((fh = f.hash) == MOVED)
// 帮忙之后, 进入下一轮循环
tab = helpTransfer(tab, f);
else {
V oldVal = null;
// 锁住链表头节点
synchronized (f) {
// 再次确认链表头节点没有被移动
if (tabAt(tab, i) == f) {
// 链表
if (fh >= 0) {
binCount = 1;
// 遍历链表
for (Node<K,V> e = f;; ++binCount) {
K ek;
// 找到相同的 key
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
// 更新
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
// 已经是最后的节点了, 新增 Node, 追加至链表尾
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
// 红黑树
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
// putTreeVal 会看 key 是否已经在树中, 是, 则返回对应的 TreeNode
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
// 释放链表头节点的锁
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
// 如果链表长度 >= 树化阈值(8), 进行链表转为红黑树
// 当然内部还要判断是否哈希表长度大于64,如果不大于,则会先不扩容,先将哈希表变大
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
// 增加 size 计数
addCount(1L, binCount);
return null;
}
// check 是之前 binCount 的个数
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
if (
// 已经有了 counterCells, 向 cell 累加
(as = counterCells) != null ||
// 还没有, 向 baseCount 累加
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)
) {
CounterCell a; long v; int m;
boolean uncontended = true;
if (
// 还没有 counterCells
as == null || (m = as.length - 1) < 0 ||
// 还没有 cell
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
// cell cas 增加计数失败
!(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
) {
// 创建累加单元数组和cell, 累加重试
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
// 获取元素个数
s = sumCount();
}
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
// newtable 已经创建了,帮忙扩容
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
// 需要扩容,这时 newtable 未创建
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}hashcode == MOVED == -1则需要进行扩容TREEIFY_THRESHOLD则要执行树化方法,在treeifyBin中会首先判断当前数组长度 ≥64 时才会将链表转换为红黑树扩展:
ConcurrentHashMap 为什么 key 和 value 不能为 null?
ConcurrentHashMap 的 key 和 value 不能为 null 主要是为了避免二义性
null 是一个特殊的值,表示没有对象或没有引用。如果你用 null 作为键,那么你就无法区分这个键是否存在于 ConcurrentHashMap 中,还是根本没有这个键。同样,如果你用 null 作为值,那么你就无法区分这个值是否是真正存储在 ConcurrentHashMap 中的,还是因为找不到对应的键而返回的
拿 get 方法取值来说,返回的结果为 null 存在两种情况:
这也就是二义性的由来
多线程环境下,存在一个线程操作该ConcurrentHashMap时,其他的线程将该ConcurrentHashMap修改的情况,所以无法通过containsKey(key)来判断否存在这个键值对,也就没办法解决二义性问题
与此形成对比的是,HashMap可以存储 null 的 key 和 value,但 null 作为键只能有一个,null 作为值可以有多个。如果传入 null 作为参数,就会返回 hash 值为 0 的位置的值。单线程环境下,不存在一个线程操作该 HashMap 时,其他的线程将该HashMap修改的情况,所以可以通过contains(key)来做判断是否存在这个键值对,从而做相应的处理,也就不存在二义性问题
也就是说,多线程下无法正确判定键值对是否存在(存在其他线程修改的情况),单线程是可以的(不存在其他线程修改的情况)
如果你确实需要在 ConcurrentHashMap 中使用 null 的话,可以使用一个特殊的静态空对象来代替 null
public static final Object NULL = new Object();/**
* 使用 sizeCtl 中的大小来初始化表数组
*/
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
// 如果 sizeCtl < 0,说明有其他线程 CAS 成功,正在执行初始化操作
// 此处为负数只可能为 -1。因为 whlie 判断已经保证不可能存在扩容操作(还没表数组)
if ((sc = sizeCtl) < 0) {
Thread.yield(); // 让出 CPU 使用权,线程状态从 Running 运行状态进入 Runnable 就绪状态
// 调用 Unsafe 类的 compareAndSwapInt 方法来 CAS 修改 SIZECTL 的值为 -1,表示正在初始化
} else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
// 下一次扩容时的阈值
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}/**
* 返回指定键映射到的值,如果此映射不包含键的映射,则返回null
*
* @throws NullPointerException 如果指定的键为null
*/
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
// key 所对应的 hash 值。保证得到的 hash 码都是正数
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
// (n - 1) & h):等价于 %
// TabAt(表, 下标):根据表下标去找对应的链表
(e = tabAt(tab, (n - 1) & h)) != null) {
// 如果指定位置元素存在,头结点hash值相同
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek))) {
// key hash 值相等,key值相同,直接返回元素 value
return e.val;
}
}
else if (eh < 0)
// 头结点 hash 值小于0,说明正在扩容或者是红黑树,通过 find() 查找
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {
// 是链表,遍历查找
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}size() 计算实际发生在 put(),remove() 改变集合元素的操作之中
当然,size() 得到的结果可能不是一个特别准确的值。因为在多线程环境下访问,可能增加可能删除
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
// 将 baseCount 计数与所有 cell 计数累加
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}Java7 中ConcurrentHashMap使用的分段锁,也就是每一个 Segment 上同时只有一个线程可以操作,每一个Segment 都是一个类似HashMap 数组的结构,它可以扩容,它的冲突会转化为链表。但是Segment的个数一但初始化就不能改变
Java8 中的ConcurrentHashMap使用的Synchronized 锁加 CAS 的机制。结构也由 Java7 中的 Segment 数组 + HashEntry 数组 + 链表 进化成了 Node 数组 + 链表 / 红黑树,Node 是类似于一个 HashEntry 的结构。它的冲突再达到一定大小时会转化成红黑树,在冲突小于一定数量时又退回链表
有些同学可能对Synchronized的性能存在疑问,其实Synchronized锁自从引入锁升级策略后,性能不再是问题。有兴趣的话我可以分享一篇我关于锁升级的笔记给大家~