在面试的时候,ConcureentHashMap在JDK1.7的时候线程安全底层具体实现方式是什么?
CouncureentHashMap在JDK1.7的时候如下图:
ConcurrentHashMap由Segment数组组成,Segment继承了ReentrantLock可以提供锁的功能,也表示并发度,是最大并行访问的线程数量,每一个Segment内部包含一个HashEntry数组用于元素存储,
HashEntry则是一个K-V的存储单元,尾部可以挂HashEntry使用链地址法解决hash冲突
1.7中的ConcurrentHashMap源码大约1600行,去除大量注释外,我们只需要关注核心方法,
segment大小固定后,就不可变了默认是16个。也就是说,默认支持16个线程并发写。
16个segment就是16把锁(门牌号),那么在put的时候,是怎么定位到那获取哪个门牌号?数据是怎么put进去的?
ConcurrentHashMap定位一个元素需要两次Hahs,,操作,第一次Hash定位到Segement,第二次Hash定位到元素所在的链表的头部.这种结构下,Hash过程比普通的HashMap要久,但是写操作的时候,只对元素所在的Segement加锁即可,不会影响其他的Segement.在理想情况下,ConcurrentHashMap最高可以同时支持Segement数量大小的写操作.正因为这样,ConcurrentHashMap的并发能力得以提高.
我们来看看源码:
public V put(K key, V value) {
Segment<K,V> s;
//1.value不能为null
if (value == null) throw new NullPointerException();
int hash = hash(key);
int j = (hash >>> segmentShift) & segmentMask;
//2.定位Segemnt,如果为null就先初始化,CAS操作
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j);
//使用Segment的put操作
return s.put(key, hash, value, false);
}
在得到对应的segment之后,调用s.put方法
Segment
的结构和 HashMap
类似,是一种数组和链表结构,一个 Segment
包含一个 HashEntry
数组,每个 HashEntry
是一个链表结构的元素,每个 Segment
守护着一个 HashEntry
数组里的元素,当对 HashEntry
数组的数据进行修改时,必须首先获得对应的 Segment
的锁。也就是说,对同一 Segment
的并发写入会被阻塞,不同 Segment
的写入是可以并发执行的。
我们在来看看,调用Segment的put操作,操作需要加锁,如果tryLock失败成功就继续执行,如果tryLock失败,则进去scanAndLockForPut尝试一定次数的自旋,先看看tryLock成功的场景
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
//1.tryLock成功,node为null
HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;
//3.entryAt通过hash定位并获取table中指定下标处的头元素(内部使用volatile保证线程安全)
HashEntry<K,V> first = entryAt(tab, index);
//4.从链表头结点开始遍历
for (HashEntry<K,V> e = first;;) {
//5.如果头结点不为空,那么往后遍历,一旦找到一样的key就替换并break,内部会处理onlyIfAbsent的情况
if (e != null) {
K k;
if ((k = e.key) == key ||(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
//6.到这里有两种情况:第一种情况是first为空,说明定位到的HashEntry数组该位置没有元素,
//第二种情况是first不为空,但是链表遍历到最后了,链表中不存在key这个键,
//这两种情况都需要将新的key构造节点并插入
else {
//7.node不为null,说明在scanAndLockForPut里面自旋等待锁的时候,线程并没有傻等着,而是已经把节点构造好了,
//既然构造好了,那还等啥,直接头插法设置next即可,并且这个setNext是线程安全的,在前面的HashEntry已经提过
if (node != null)
node.setNext(first);
else
//8.到这里说明node还没构造好,可能是tryLock一下就成功了还没来得及构造节点,那就构造一下
node = new HashEntry<K,V>(hash, key, value, first);
//9.插入之后Segment持有的元素加一
int c = count + 1;
//10.判断是否需要扩容
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
//11.如果不扩容,因为是头插法,node成了新的头,自然要把node设置到HashEntry数组的指定位置,
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
//12.解锁
unlock();
}
//13.返回旧值
return oldValue;
}
总结put方法:
1、在put方法中,首先要加锁,如果获取锁失败就会通过自旋的方式阻塞保证能拿到锁.通过key的hash值来确定具体的链表头.
2、遍历该链表,如果不为空则判断传入的key和当前遍历的key是否相等,相等则覆盖value
3、如果链表为空则需要新建一个HashEntry并加入到Segment中,同时会先判断是否需要扩容.
4、最后会释放锁
来看看get方法:
get操作不需要加锁,先通过hash值定位到Segement,然后遍历HashEntry,代码就不贴了,核心在下面:
将要查找的key通过Hash定位到具体的segment,再通过一次Hash定位到具体的元素上,然后遍历链表元素,如果找到相同的key就返回对应的value.
总结
ConcurrentHashMap采用锁分段技术(1.7),内部为Segment数组来进行细分,而每个Segment又通过HashEntry数组来进行封装,当进行写操作的时候,只需要对这个key对应的segment进行加锁操作,这样就不会对其他的Segment造成影响.在默认情况下,每个ConcurrentHashMap包含了16个Segment,每个Segment包含16个HashEntry,对一个Segment进行加锁的时候,其他15个还能正常使用,因此性能比较高.