概述:
ConcurrentHashMap这个类在java.lang.current包中,这个包中的类都是线程安全的。ConcurrentHashMap底层存储数据的结构与1.8的HashMap是一样的,都是数组+链表(或红黑树)的结构。在日常的开发中,我们最长用到的键值对存储结构的是HashMap,但是我们知道,这个类是非线程安全的,在高并发的场景下,在进行put操作的时候有可能进入死循环从而使服务器的cpu使用率达到100%;sun公司因此也给出了与之对应的线程安全的类。在jdk1.5以前,使用的是HashTable,这个类为了保证线程安全,在每个类中都添加了synchronized关键字,而想而知在高并发的情景下相率是非常低下的。为了解决HashTable效率低下的问题,官网在jdk1.5后推出了ConcurrentHashMap来替代饱受诟病的HashTable。jdk1.5后ConcurrentHashMap使用了分段锁的技术。在整个数组中被分为多个segment,每次get,put,remove操作时就锁住目标元素所在的segment中,因此segment与segment之前是可以并发操作的,上述就是jdk1.5后实现线程安全的大致思想。但是,从描述中可以看出一个问题,就是如果出现比较机端的情况,所有的数据都集中在一个segment中的话,在并发的情况下相当于锁住了全表,这种情况下其实是和HashTable的效率出不多的,但总体来说相较于HashTable,效率还是有了很大的提升。jdk1.8后,ConcurrentHashMap摒弃了segment的思想,转而使用cas+synchronized组合的方式来实现并发下的线程安全的,这种实现方式比1.5的效率又有了比较大的提升。
定义变量
1、ziseCtr:该变量主要是用来控制数组的初始化和扩容的,默认值为0,可以概括一下4种状态:
1.1、sizeCtr=0:默认值;
1.2、sizeCtr=-1:表示Map正在初始化中;
1.3、sizeCtr=-N:表示正在有N-1个线程进行扩容操作;
1.4、sizeCtr>0: 未初始化则表示初始化Map的大小,已初始化则表示下次进行扩容操作的阈值;
2、table:用于存储链表或红黑数的数组,初始值为null,在第一次进行put操作的时候进行初始化,默认值为16;
3、nextTable:在扩容时新生成的数组,其大小为当前table的2倍,用于存放table转移过来的值;
4、Node:该类存储数据的核心,以key-value形式来存储;
5、ForwardingNode:这是一个特殊Node节点,仅在进行扩容时用作占位符,表示当前位置已被移动或者为null,该node节点的hash值为-1;
源码部分
put 方法
/**
* Maps the specified key to the specified value in this table.
* Neither the key nor the value can be null.
*
* <p>The value can be retrieved by calling the {@code get} method
* with a key that is equal to the original key.
*
* @param key key with which the specified value is to be associated
* @param value value to be associated with the specified key
* @return the previous value associated with {@code key}, or
* {@code null} if there was no mapping for {@code key}
* @throws NullPointerException if the specified key or value is null
*/
public V put(K key, V value) {
return putVal(key, value, false);
}
/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
//key和value不能为空,为空则直接返回异常
if (key == null || value == null) throw new NullPointerException();
////通过key来计算获得hash值
int hash = spread(key.hashCode());
//用于计算数组位置上存放的node的节点数量
//在put完成后会对这个参数判断是否需要转换成红黑树或链表
int binCount = 0;
//这个过程是非阻塞的,放入失败会一直循环尝试,直至成功
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh; K fk; V fv;
//第一次put操作,对数组进行初始化,实现懒加载
if (tab == null || (n = tab.length) == 0)
//初始化
tab = initTable();
//数组已初始化完成后
//使用cas来获取插入元素所在的数组的下标的位置,该位置为空的话就直接放进去
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))
break; // no lock when adding to empty bin
}
//hash=-1,表明该位置正在进行扩容操作,让当前线程也帮助该位置上的扩容,并发扩容提高扩容的速度
else if ((fh = f.hash) == MOVED)
//帮助扩容
tab = helpTransfer(tab, f);
//插入到该位置已有数据的节点上,即用hash冲突
//在这里为保证线程安全,会对当前数组位置上的第一个节点进行加锁,因此其他位置上
//仍然可以进行插入,这里就是jdk1.8相较于之前版本使用segment作为锁性能要高效的地方
else if (onlyIfAbsent && fh == hash && // check first node
((fk = f.key) == key || fk != null && key.equals(fk)) &&
(fv = f.val) != null)
return fv;
else {
V oldVal = null;
synchronized (f) {
//再一次判断f节点是否为第一个节点,防止其他线程已修改f节点
if (tabAt(tab, i) == f) {
//链表
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key, value);
break;
}
}
}
//红黑树
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
else if (f instanceof ReservationNode)
throw new IllegalStateException("Recursive update");
}
}
//插入成功后判断插入数据所在位置上的节点数量,
//如果数量达到了转化红黑树的阈值,则进行转换
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
//由链表转换成红黑树
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
//使用cas统计数量增加1,同时判断是否满足扩容需求,进行扩容
addCount(1L, binCount);
return null;
}1、判断传进来的key和value是否为空,在ConcurrentHashMap中key和value都不允许为空,然而在HashMap中是可以为key和val都可以为空,这一点值得注意一下;
2、对key进行重hash计算,获得hash值;
3、如果当前的数组为空,说明这是第一插入数据,则会对table进行初始化;
4、插入数据,这里分为3中情况:
1)、插入位置为空,直接将数据放入table的第一个位置中;
2)、插入位置不为空,并且改为是一个ForwardingNode节点,说明该位置上的链表或红黑树正在进行扩容,然后让当前线程加进去并发扩容,提高效率;
3)、插入位置不为空,也不是ForwardingNode节点,若为链表则从第一节点开始组个往下遍历,如果有key的hashCode相等并且值也相等,那么就将该节点的数据替换掉,否则将数据加入到链表末段;若为红黑树,则按红黑树的规则放进相应的位置;
5、数据插入成功后,判断当前位置上的节点的数量,如果节点数据大于转换红黑树阈值(默认为8),则将链表转换成红黑树,提高get操作的速度;
6、数据量+1,并判断当前table是否需要扩容;
/**
* Spreads (XORs) higher bits of hash to lower and also forces top
* bit to 0. Because the table uses power-of-two masking, sets of
* hashes that vary only in bits above the current mask will
* always collide. (Among known examples are sets of Float keys
* holding consecutive whole numbers in small tables.) So we
* apply a transform that spreads the impact of higher bits
* downward. There is a tradeoff between speed, utility, and
* quality of bit-spreading. Because many common sets of hashes
* are already reasonably distributed (so don't benefit from
* spreading), and because we use trees to handle large sets of
* collisions in bins, we just XOR some shifted bits in the
* cheapest possible way to reduce systematic lossage, as well as
* to incorporate impact of the highest bits that would otherwise
* never be used in index calculations because of table bounds.
*/
static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}jdk1.8计算hash的方法是先获取到key的hashCode,然后对hashCode进行高16位和低16位异或运算,然后再与 0x7fffffff 进行与运算。高低位异或运算可以保证haahCode的每一位都可以参与运算,从而使运算的结果更加均匀的分布在不同的区域,在计算table位置时可以减少冲突,提高效率,我们知道Map在put操作时大部分性能都耗费在解决hash冲突上面。得出运算结果后再和 0x7fffffff 与运算,其目的是保证每次运算结果都是一个正数。对于java位运算不了解的同学,建议百度自行了解相关内容。
tabAt(Node<K,V>[] tab, int i): 这个方法使用了java提供的原子操作的类来操作的,sun.misc.Unsafe.getObjectVolatile 的方法来保证每次线程都能获取到最新的值;
casTabAt(Node<K,V>[] tab, int i,Node<K,V> c, Node<K,V> v): 这个方法是通过cas的方式来获取i位置的元素;
如果新增节点之后,所在的链表的元素个数大于等于8,则会调用treeifyBin把链表转换为红黑树。在转换结构时,若tab的长度小于MIN_TREEIFY_CAPACITY,默认值为64;则会将数组长度扩大到原来的两倍,并触发transfer,重新调整节点位置。(只有当tab.length >= 64, ConcurrentHashMap才会使用红黑树。) 新增节点后,addCount统计tab中的节点个数大于阈值(sizeCtl),会触发transfer,重新调整节点位置。
addCount方法
/**
* Adds to count, and if table is too small and not already
* resizing, initiates transfer. If already resizing, helps
* perform transfer if work is available. Rechecks occupancy
* after a transfer to see if another resize is already needed
* because resizings are lagging additions.
*
* //x代表新增的大小
* @param x the count to add
* //代表是否验证扩充数
* @param check if <0, don't check resize, if <= 1 only check if uncontended
*/
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
if ((as = counterCells) != null ||
!U.compareAndSetLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSetLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSetInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSetInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}/**
* Returns the value to which the specified key is mapped,
* or {@code null} if this map contains no mapping for the key.
*
* <p>More formally, if this map contains a mapping from a key
* {@code k} to a value {@code v} such that {@code key.equals(k)},
* then this method returns {@code v}; otherwise it returns
* {@code null}. (There can be at most one such mapping.)
*
* @throws NullPointerException if the specified key is null
*/
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}get操作中没有使用到同步的操作,所以相对来说比较简单一点。通过key的hashCode计算获得相应的位置,然后在遍历该位置上的元素,找到需要的元素,然后返回,如果没有则返回null: