前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >JDK 7 ConcurrentHashMap源码解读

JDK 7 ConcurrentHashMap源码解读

作者头像
用针戳左手中指指头
发布2021-01-29 11:09:57
3250
发布2021-01-29 11:09:57
举报
文章被收录于专栏:学习计划学习计划

原理分析

HashMap存在并发问题,jdk有提供HashTable,这个HashTable是对HashMap中的所以方法加锁以达到线程安全,但是,这种方式会使得性能下降,看下面的图,假如有两个线程分别要put kk3和kk4,第一个线程最快,它对kk3进行put操作,这时另一个线程要put kk4就要等待,问题是,这两个元素所要put的位置,互不相干,但还是需要等待,这造成了一种资源浪费,所以才会出现ConcurrentHashMap,它分段式加锁,就能很大程度上避免下面的情况。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-1YhqJdMs-1594129846423)

在这里插入图片描述
在这里插入图片描述

下面的问题就是如何分段?

在ConcurrentHashMap中

它有这样一个结构:

ConcurrentHashMap

----Segment[]

--------HashEntry[]

在ConcurrentHashMap中有最上层的数组segment,segment里每一个元素里还有一个HashEntry,图示如下:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-xaUVjUFw-1594129846425)

在这里插入图片描述
在这里插入图片描述

按结构来看,就像是segment将HashEntry进行了分段,每段都有相同个数的HashEntry。

源码

它有这些个属性:

代码语言:javascript
复制
 	// 默认初始大小
	static final int DEFAULT_INITIAL_CAPACITY = 16;
	// 默认加载因子
    static final float DEFAULT_LOAD_FACTOR = 0.75f;
	// 默认的并发级别
    static final int DEFAULT_CONCURRENCY_LEVEL = 16;
	// 最大的容量
    static final int MAXIMUM_CAPACITY = 1 << 30;
	// 每段的最小容量
    static final int MIN_SEGMENT_TABLE_CAPACITY = 2;
	// 最大段数
    static final int MAX_SEGMENTS = 1 << 16; // slightly conservative
	// 重试次数
    static final int RETRIES_BEFORE_LOCK = 2;
	// hash种子
    private transient final int hashSeed = randomHashSeed(this);

    final int segmentMask;

    final int segmentShift;

    final Segment<K,V>[] segments;

    transient Set<K> keySet;
    transient Set<Map.Entry<K,V>> entrySet;
    transient Collection<V> values;
代码语言:javascript
复制
public ConcurrentHashMap(int initialCapacity,
                         float loadFactor, int concurrencyLevel) {
    // 检查初始化值是否异常
        if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
     // 判断并发级别是否最大值
        if (concurrencyLevel > MAX_SEGMENTS)
            concurrencyLevel = MAX_SEGMENTS;
    // Find power-of-two sizes best matching arguments
    int sshift = 0;
    int ssize = 1;
    // 这里计算ssize和hashmap里的很像,就是找大于等于并发级别的2次方式数
    // 比如,16,那么这个ssize就是16,如果是17,就会是32
    
    // sshift下面再说
    while (ssize < concurrencyLevel) {
        ++sshift;
        ssize <<= 1;
    }
    this.segmentShift = 32 - sshift;
    this.segmentMask = ssize - 1;
    if (initialCapacity > MAXIMUM_CAPACITY)
        initialCapacity = MAXIMUM_CAPACITY;
    // 向上取整,计算,分段下的数组大小
    // 如果,initialCapacity=17,ssize=16,就是我们想让数组初始化大小为17时,c=1
   
    int c = initialCapacity / ssize;
    if (c * ssize < initialCapacity)
        ++c;
     // (默认最小大小)MIN_SEGMENT_TABLE_CAPACITY=2
    int cap = MIN_SEGMENT_TABLE_CAPACITY;
    // 如果计算后的值大于最小值就左移(*2);当c=1,cap=2;c=3,cap=4;c=5,cap=8
    while (cap < c)
        cap <<= 1;
    // 创建segment数组,参数(加载因子,阈值,内部数组tab)
    // 原型参照对象
    Segment<K,V> s0 =
        new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                         (HashEntry<K,V>[])new HashEntry[cap]);
    Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
    UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
    this.segments = ss;
}

在构造器中为什么要创建这个s0呢??? 其实这类似一个原型的意思,打个比方,在外层segment数组下,需要一个HashEntry数组,那么每次去添加的时候,都要new HashEntry,那么是不是都需要再计算一次刚刚的步骤呢?没有必要,所以创建这个s0作为一个原型参照。

需要注意的是,创建好segment后,segment的大小就不会变了,变的是segment里的HashEntry

来看put方法

代码语言:javascript
复制
 public V put(K key, V value) {
        Segment<K,V> s;
        if (value == null)
            throw new NullPointerException();
        int hash = hash(key);
     // 计算下标
        int j = (hash >>> segmentShift) & segmentMask;
     // 判断计算的下标处,segment元素是否null,并获取segment
        if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
             (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
            s = ensureSegment(j);
        return s.put(key, hash, value, false);
    }

计算下标,注意segmentShiftsegmentMask

代码语言:javascript
复制
   int j = (hash >>> segmentShift) & segmentMask;

segmentShiftsegmentMask这两个值的计算在构造方法中,这里截出一段。

假设concurrencyLevel = 16,走下面代码

代码语言:javascript
复制
	int sshift = 0;
    int ssize = 1;
    
    // 1 < 16 => ssize = 2,sshift = 1
	// 2 < 16 => ssize = 4,sshift = 2
	// 4 < 16 => ssize = 8,sshift = 3
	// 8 < 16 => ssize = 16,sshift = 4
	// 16 < 16 退出循环
    while (ssize < concurrencyLevel) {
        ++sshift;
        ssize <<= 1;
    }
	// 32 - 4 = 28
	// 不懂这里为什么要32 - 4(待补充)
    this.segmentShift = 32 - sshift;
	// 16 - 1 = 15 => 0000 0000 0000 0000 0000 0000 0000 1111
    this.segmentMask = ssize - 1;

那么hash >>> segmentShift就是右移28位,就会得到4位随机有效值:

代码语言:javascript
复制
hash =>

1010 1010 1010 1010 1010 1010 1010 1010

hash >>> 28

0000 0000 0000 0000 0000 0000 0000 1010

segmentMask =>

0000 0000 0000 0000 0000 0000 0000 1111

(hash >>> segmentShift) & segmentMask =>

0000 0000 0000 0000 0000 0000 0000 1010

这里的计算是为了使数组随机散列的更均匀。

然后是接下去的一句,意思是取segments数组的(j << SSHIFT) + SBASE个位置

代码语言:javascript
复制
UNSAFE.getObject(segments, (j << SSHIFT) + SBASE)

查看SSHIFT发现它的值在静态代码快里定义好了:31 - Integer.numberOfLeadingZeros(ss),那么上面的表达式可以转为:

(j << 31 - Integer.numberOfLeadingZeros(ss)) + SBASE

代码语言:javascript
复制
 static {
        int ss, ts;
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class tc = HashEntry[].class;
            Class sc = Segment[].class;
            // 获取数组基本偏移量
            TBASE = UNSAFE.arrayBaseOffset(tc);
            SBASE = UNSAFE.arrayBaseOffset(sc);
            // 获取元素间隔比例
            ts = UNSAFE.arrayIndexScale(tc);
            ss = UNSAFE.arrayIndexScale(sc);
            HASHSEED_OFFSET = UNSAFE.objectFieldOffset(
                ConcurrentHashMap.class.getDeclaredField("hashSeed"));
            SEGSHIFT_OFFSET = UNSAFE.objectFieldOffset(
                ConcurrentHashMap.class.getDeclaredField("segmentShift"));
            SEGMASK_OFFSET = UNSAFE.objectFieldOffset(
                ConcurrentHashMap.class.getDeclaredField("segmentMask"));
            SEGMENTS_OFFSET = UNSAFE.objectFieldOffset(
                ConcurrentHashMap.class.getDeclaredField("segments"));
        } catch (Exception e) {
            throw new Error(e);
        }
        if ((ss & (ss-1)) != 0 || (ts & (ts-1)) != 0)
            throw new Error("data type scale not a power of two");
     // 取高位前面的0的个数
        SSHIFT = 31 - Integer.numberOfLeadingZeros(ss);
        TSHIFT = 31 - Integer.numberOfLeadingZeros(ts);
    }

UNSAFE可以定位到对象属性位置,也可以修改值,但是他需要偏移量。

那么Integer.numberOfLeadingZeros(ss)又表示什么?

它表示的是高位前面的0的个数,比如8的二进制是0000 0000 0000 0000 0000 0000 0000 1000

28 = Integer.numberOfLeadingZeros(8)

然而(j << 31 - Integer.numberOfLeadingZeros(ss)) + SBASE = j * ss和hashmap中计算下标一样。

补充证明过程;

如果,在计算的下标处,元素为null,就会进入ensureSegment方法。

因为会出现多个线程会在同一个位置申请空间,所以这里出现很多unsafe操作,确保只有一个线程能够进行赋值

代码语言:javascript
复制
 private Segment<K,V> ensureSegment(int k) {
        final Segment<K,V>[] ss = this.segments;
        long u = (k << SSHIFT) + SBASE; // raw offset
        Segment<K,V> seg;
        if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
            // 获取原型元素
            Segment<K,V> proto = ss[0]; // use segment 0 as prototype
   
            int cap = proto.table.length;
            float lf = proto.loadFactor;
            int threshold = (int)(cap * lf);
            // cap初始化大小,看上面几行代码,可以知道,它是获取的是原型元素ss[0]的,没有去计算,计算就和上面说过的一样,是在构造函数中初始化好的,以ss[0]作为原型去创建。
            HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
            // 再次判断还是不是空的,
            if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                == null) { // recheck
                // 如果是空的,就生成一个新的
                Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
                // 通过cas操作,将生成的放入
                // 这里while是以为可能存在多个线程,第一个线程更新中,但第二个线程也同样操作,但第二个操作更快,那么cas操作失败,循环,再次判断是否==null,这种写法比较严谨
                while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                       == null) {
                    if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
                        break;
                }
            }
        }
        return seg;
    }

最后进入segment的put方法

代码语言:javascript
复制
 final V put(K key, int hash, V value, boolean onlyIfAbsent) {
            HashEntry<K,V> node = tryLock() ? null :
                scanAndLockForPut(key, hash, value);
            V oldValue;
            try {
                 // 这里和hashmap中的一样,
                HashEntry<K,V>[] tab = table;
                int index = (tab.length - 1) & hash;
                // 这里是获取hashEntry的头结点,不是第一个元素
                HashEntry<K,V> first = entryAt(tab, index);
                // 这里就有两种情况,实现的原理就和hashmap一样。
                for (HashEntry<K,V> e = first;;) {
                    if (e != null) {
                        K k;
                        if ((k = e.key) == key ||
                            (e.hash == hash && key.equals(k))) {
                            // key重复就覆盖,并返回旧值
                            oldValue = e.value;
                            if (!onlyIfAbsent) {
                                e.value = value;
                                ++modCount;
                            }
                            break;
                        }
                        // 没有重复的就下一个节点,再次循环
                        e = e.next;
                    }
                    else {
                        // 如果已经new出node,就直接设置节点
                        if (node != null)
                            node.setNext(first);
                        else
                            node = new HashEntry<K,V>(hash, key, value, first);
                        int c = count + 1;
                        if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                            // 扩容
                            rehash(node);
                        else
                            // 利用UNSAFE在指定位置设置值
                            setEntryAt(tab, index, node);
                        ++modCount;
                        count = c;
                        oldValue = null;
                        break;
                    }
                }
            } finally {
                unlock();
            }
            return oldValue;
        }

中间的for有两种情况

1.头尾null

2.头不为空

它继承了reetrantlock,下面这个方法是为了加锁的,最后他会返回new的node,然而在上一层方法中他也会进行判断和new node,注意这里的处理方式,如果一个while循环里什么都不干,就空转,那会使CPU占用率高,在循环里做一些操作,使它循环的没那么快(这一知识点可以百度看一下),提高性能,既然要做一些操作,那么就可以去new 一个Entry。

代码语言:javascript
复制
 private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
            HashEntry<K,V> first = entryForHash(this, hash);
            HashEntry<K,V> e = first;
            HashEntry<K,V> node = null;
            int retries = -1; // negative while locating node
     // 循环尝试加锁
            while (!tryLock()) {
                HashEntry<K,V> f; // to recheck first below
                // 这一串是目的是为了创建node,retries表示是否需要new这个对象
                if (retries < 0) {
                    if (e == null) {
                        if (node == null) // speculatively create node
                            node = new HashEntry<K,V>(hash, key, value, null);
                        retries = 0;
                    }
                    else if (key.equals(e.key))
                        retries = 0;
                    else
                        e = e.next;
                }
                // 当达到一定次数就加锁
                else if (++retries > MAX_SCAN_RETRIES) {
                    lock();
                    break;
                }
                // 每一次重试都检查一次链表有没有发生变化,如果第一个节点发生变化,就重新遍历一下
                else if ((retries & 1) == 0 &&
                         // 获取当前数组的头结点位置
                         (f = entryForHash(this, hash)) != first) {
                    e = first = f; // re-traverse if entry changed
                    retries = -1;
                }
            }
            return node;
        }

扩容方法

代码语言:javascript
复制
 private void rehash(HashEntry<K,V> node) {
 
            HashEntry<K,V>[] oldTable = table;
            int oldCapacity = oldTable.length;
            int newCapacity = oldCapacity << 1;
            threshold = (int)(newCapacity * loadFactor);
            HashEntry<K,V>[] newTable =
                (HashEntry<K,V>[]) new HashEntry[newCapacity];
            int sizeMask = newCapacity - 1;
            for (int i = 0; i < oldCapacity ; i++) {
                HashEntry<K,V> e = oldTable[i];
                if (e != null) {
                    HashEntry<K,V> next = e.next;
                    // 新数组下标
                    int idx = e.hash & sizeMask;
                    if (next == null)   //  Single node on list
                        // 如果下一个为空,就单个node,就放到新节点上
                        newTable[idx] = e;
                    else { // Reuse consecutive sequence at same slot
                        // 这一串:遍历hashEntry,判断下一个元素的下标是否相等,
                        // 不相等,则记录下一个节点的下标和对象,(补充画图)一直遍历
                        // 其结果就是记录hashEntry最后key相同的元素,
                        HashEntry<K,V> lastRun = e;
                        int lastIdx = idx;
                        for (HashEntry<K,V> last = next;
                             last != null;
                             last = last.next) {
                            // 计算新的下标,
                            int k = last.hash & sizeMask;
                            if (k != lastIdx) {
                                lastIdx = k;
                                lastRun = last;
                            }
                        }
                        // 把记录的最后的元素放到新数组
                        newTable[lastIdx] = lastRun;
                        // 头插法,遍历hashEntry,除刚刚记录的最后的元素外,全部通过头插法加入到新数组
                        for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
                            V v = p.value;
                            int h = p.hash;
                            int k = h & sizeMask;
                            HashEntry<K,V> n = newTable[k];
                            newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
                        }
                    }
                }
            }
     // 将我们新的添加对象放到新数组里
            int nodeIndex = node.hash & sizeMask; // add the new node
            node.setNext(newTable[nodeIndex]);
            newTable[nodeIndex] = node;
            table = newTable;
        }
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2020-07-07 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 原理分析
  • 源码
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档