前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >一文吃透ConcurrentHashMap的前世与今生

一文吃透ConcurrentHashMap的前世与今生

作者头像
柏炎
发布2022-08-23 14:11:23
2600
发布2022-08-23 14:11:23
举报
文章被收录于专栏:深入浅出java后端

前言

hello,everyone。上篇博客中介绍了HashMap,本文给大家带来ConcurrentHashMap。

【 一文吃透HashMap的前世与今生】:https://cloud.tencent.com/developer/article/2079820

上文中提到了,HashMap是线程不安全的类,k-v类型数据操作在多线程下推荐使用ConcurrentHashMap。本文将会延续HashMap的解读思路,对ConcurrentHashMap从关键成员变量,核心方法与常见面试点出发,帮助大家深入浅出的理解ConcurrentHashMap这个在java中核心数据结构。

一.ConcurrentHashMap介绍

ConcurrentHashMap与HashMap一样,同样是对K-V类型数据进行操作的数据结构。HashMap中通过modCount来避免多线程下的冲突,但是是通过fast-fail机制解决。无法解决在多线程情况下,对同一个key值进行有序的赋值动作。ConcurrentHashMap在1.7版本中使用segment分段锁的形式控制并发写入,JDK1.8版本的时候使用CAS+synchronized关键字控制并发写入。本文将针对JDK1.8介绍ConcurrentHashMap的核心概念。

二.关键概念介绍

ConcurrentHashMap的关键数据结构概念与HashMap是一致的。

  • 数组
  • 线性链表
  • 二叉树
  • 哈希表
  • 哈希冲突

【一问吃透HashMap的前世与今生:https://juejin.cn/post/6959584804375363620】

三.关键成员变量

代码语言:javascript
复制
//hashamp的最大容量,2的30次方
private static final int MAXIMUM_CAPACITY = 1 << 30;

//构造hashmap时,默认初始化容量为16
private static final int DEFAULT_CAPACITY = 16;

//数组可能最大值,需要与toArray()相关方法关联
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

//并发级别,遗留下来的,为兼容以前的版本
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;

//默认扩容的扩展因子,当hashmap中的元素个数达到当前容量的75%时,触发扩容
private static final float LOAD_FACTOR = 0.75f;

//ConcurrentHashMap数组节点上链表转换为红黑的的阈值,链表节点达到8个时转换为红黑树【注意:这里不是绝对,切往后看】
static final int TREEIFY_THRESHOLD = 8;

//链表节点数小于6个时,从红黑树转换为链表
static final int UNTREEIFY_THRESHOLD = 6;

//链表转化为红黑的第二个要求,与TREEIFY_THRESHOLD对应,最小的链转树的数组大小
static final int MIN_TREEIFY_CAPACITY = 64;

//2^15-1,help resize的最大线程数
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;

//32-16=16,sizeCtl中记录size大小的偏移量
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;

//记录目标的hash值
static final int MOVED     = -1; 

//目标的红黑树根节点hash值
static final int TREEBIN   = -2; 

//ReservationNode的hash值
static final int RESERVED  = -3;

//获取当前CPU核数
static final int NCPU = Runtime.getRuntime().availableProcessors();

//存放node的数组
transient volatile Node<K,V>[] table;

/*控制标识符,用来控制table的初始化和扩容的操作,不同的值有不同的含义
*当为负数时:-1代表正在初始化,-N代表有N-1个线程正在 进行扩容
*当为0时:代表当时的table还没有被初始化
*当为正数时:表示初始化或者下一次进行扩容的大小
*/
private transient volatile int sizeCtl;

//默认为null,初始化发生在第一次插入操作,默认大小为16的数组,用来存储Node节点数据,扩容时大小总是2的幂次方
transient volatile Node<K,V>[] table;

//默认为null,扩容时新生成的数组,其大小为原数组的两倍
private transient volatile Node<K,V>[] nextTable;

四.关键方法解析

4.1.构造方法

ConcurrentHashMap拥有空参与有参的构造方法。

4.1.1.空参构造方法

代码语言:javascript
复制
/**
 * Creates a new, empty map with the default initial table size (16).
 */
public ConcurrentHashMap() {
}

从注释可以看出来,空参构造方法新建一个空的map,初始化大小为默认的16。空参构造的时候,当我们放入第一个元素的时候将会初始化map。我们来看一下put方法中putVal方法

initTable()方法解析

代码语言:javascript
复制
private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    while ((tab = table) == null || tab.length == 0) {
        //如果一个线程发现sizeCtl<0,意味着另外的线程执行CAS操作成功,当前线程只需要让出cpu时间片
        if ((sc = sizeCtl) < 0)
            Thread.yield();
        //如果cas成功,修改sizeCtl变量未-1,表示正在初始化
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            try {
                    //table为空,设置一个默认大小的数组
                if ((tab = table) == null || tab.length == 0) {
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    @SuppressWarnings("unchecked")
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                    table = tab = nt;
                    //设置一个默认扩容大小为12,根据扩容因子0.75得到。因为这里是空参构造方法,所有参数
                    为默认
                    sc = n - (n >>> 2);
                }
            } finally {
                sizeCtl = sc;
            }
            break;
        }
    }
    return tab;
}

从上的值,空构造方本身没有什么含义,只会初始化一些有默认值的变量,真正的结构数据初始化需要等到第一次put元素进去。

4.1.2.有参构造方法

代码语言:javascript
复制
public ConcurrentHashMap(int initialCapacity) {
    if (initialCapacity < 0)
        throw new IllegalArgumentException();
    int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
               MAXIMUM_CAPACITY :
               //将当前容量设置为1.5倍+1
               tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
    this.sizeCtl = cap;
}

容量设置保证为2的倍数。这里一个有一个特别重要的地方,在上面备注中已经指出来,传入的容量在初始化的时候会设置为传入容量的1.5倍+1,这么做的原因是,如果你需要7个元素的map容量,你传入了7,按照常理会设置一个8容量给你,但是你已经知道容量是7,元素个数到了第6个时,将会发生扩容。这里为了避免初始化容量设置不合理,导致后续扩容的耗时操作,则根据0.75f的扩容因子进行了初始化容量的设置。

如果输入7,则容量为16

如果输入15,则容量为32

4.2.put方法

代码语言:javascript
复制
public V put(K key, V value) {
    return putVal(key, value, false);
}
代码语言:javascript
复制
//onlyIfAbsent参数为,当key值不存在的时候在进行设置
final V putVal(K key, V value, boolean onlyIfAbsent) {
        //这里与HashMap不同,key与value都不允许为空
    if (key == null || value == null) throw new NullPointerException();
    //计算key的hash
    int hash = spread(key.hashCode());
    int binCount = 0;
    //数组遍历
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        //如果为空参构造方法,table参数还未被初始化,则先进行初始化动作
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
            //如果根据hash定位到的数组节点为空则使用cas进行新节点的设置
        else if ((f = tabAt(tab, i = (n - 1) &amp; hash)) == null) {
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }
        //如果定位到的hash值为-1,则说明正在扩容,当前线程去帮忙进行扩容操作
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        else {
                //最后一种情况,说明不是直接在数组上的节点,则遍历链表或者红黑树,遍历时使用synchronized加锁
            V oldVal = null;
            synchronized (f) {
                    //在节点 f 上进行同步,节点插入之前,再次利用tabAt(tab, i) == f判断,防止被其它线程修改
                if (tabAt(tab, i) == f) {
                        //根据在第二点的值,hash值大于0时为链表节点,则进行链表遍历
                    if (fh >= 0) {
                            //链表节点计数
                        binCount = 1;
                        //链表遍历
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            if (e.hash == hash &amp;&amp;
                                ((ek = e.key) == key ||
                                 (ek != null &amp;&amp; key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    //这里判断节点是不是等于TreeBin,如果是,则进行红黑树遍历
                    else if (f instanceof TreeBin) {
                        Node<K,V> p;
                        binCount = 2;
                        //红黑树设置值
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            //计数值如果大于8
            if (binCount != 0) {
                    //计数大于8则链表转换红黑树
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                 //如果key值节点已经存在则进行返回
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    //容量增加,决定是否扩容
    addCount(1L, binCount);
    return null;
}

put方法是整个ConcurrentHashMap的核心,使用cas+synchronized关键字的方式保证了各个节点的线程安全。在这里不知道小伙伴有没有疑问为什么要使用cas+synchronized加锁,替换了JDK1.7版本里面Segment+ReentrantLock的方式。这里的主要原因是因为锁被细化了,Segment+ReentrantLock是分段加锁,锁的冲突相对频率高、而cas+synchronized是针对数组节点或者数组上整条链而言的,大大减少了锁的竞争冲突。JDK1.6之后,对synchronized关键字又做了关键的性能提升,做了各种的锁粗化,升级,消除等等一系列优化。

因此在ConcurrentHashMap这么细粒度下加锁,性能是不会比ReentrantLock差的。

synchronized锁升级过程:https://www.jianshu.com/p/704eb56aa52a

4.3.get方法

代码语言:javascript
复制
public V get(Object key) {
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
    //hash值计算
    int h = spread(key.hashCode());
    if ((tab = table) != null &amp;&amp; (n = tab.length) > 0 &amp;&amp;
        (e = tabAt(tab, (n - 1) &amp; h)) != null) {
        //如果数组上的节点符合直接返回
        if ((eh = e.hash) == h) {
            if ((ek = e.key) == key || (ek != null &amp;&amp; key.equals(ek)))
                return e.val;
        }
        //小于0,则进行红黑树遍历,其中find方法点击进入,根据注释的值,将会调用子类TreeBin内部find方法,红黑树遍历查找
        else if (eh < 0)
            return (p = e.find(h, key)) != null ? p.val : null;
        //链表遍历
        while ((e = e.next) != null) {
            if (e.hash == h &amp;&amp;
                ((ek = e.key) == key || (ek != null &amp;&amp; key.equals(ek))))
                return e.val;
        }
    }
    return null;
}

get方法的逻辑比较简单,全程也没有加锁的动作出现。这里大家应该会有一点疑问,那我在扩容的过程中,节点重新hash了,还能正确遍历吗?这里就要看一下存储数据的Node<K,V>

代码语言:javascript
复制
static class Node<K,V> implements Map.Entry<K,V> {
        final int hash;
        final K key;
        volatile V val;
        volatile Node<K,V> next;
}

node节点的数据都是用volatile关键字进行修饰的,可以从主存中加载到最新的地址信息数据,也就不需要加锁来实现了。

4.4.remove方法

代码语言:javascript
复制
final V replaceNode(Object key, V value, Object cv) {
    int hash = spread(key.hashCode());
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        if (tab == null || (n = tab.length) == 0 ||
            (f = tabAt(tab, i = (n - 1) &amp; hash)) == null)
            break;
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        else {
            V oldVal = null;
            boolean validated = false;
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    if (fh >= 0) {
                        validated = true;
                        for (Node<K,V> e = f, pred = null;;) {
                            K ek;
                            if (e.hash == hash &amp;&amp;
                                ((ek = e.key) == key ||
                                 (ek != null &amp;&amp; key.equals(ek)))) {
                                V ev = e.val;
                                if (cv == null || cv == ev ||
                                    (ev != null &amp;&amp; cv.equals(ev))) {
                                    oldVal = ev;
                                    if (value != null)
                                        e.val = value;
                                    else if (pred != null)
                                        pred.next = e.next;
                                    else
                                        setTabAt(tab, i, e.next);
                                }
                                break;
                            }
                            pred = e;
                            if ((e = e.next) == null)
                                break;
                        }
                    }
                    else if (f instanceof TreeBin) {
                        validated = true;
                        TreeBin<K,V> t = (TreeBin<K,V>)f;
                        TreeNode<K,V> r, p;
                        if ((r = t.root) != null &amp;&amp;
                            (p = r.findTreeNode(hash, key, null)) != null) {
                            V pv = p.val;
                            if (cv == null || cv == pv ||
                                (pv != null &amp;&amp; cv.equals(pv))) {
                                oldVal = pv;
                                if (value != null)
                                    p.val = value;
                                else if (t.removeTreeNode(p))
                                    setTabAt(tab, i, untreeify(t.first));
                            }
                        }
                    }
                }
            }
            if (validated) {
                if (oldVal != null) {
                    if (value == null)
                        addCount(-1L, -1);
                    return oldVal;
                }
                break;
            }
        }
    }
    return null;
}

remove方法整体上与put方法类似,都是属于写操作,直接定位到数组头结点则直接删除,否则加synchronized关键字进行处理。

4.5.扩容

扩容发生的时机是当 table 容量不足的时候,即 table 的元素数量达到容量阈值 sizeCtl,需要对 table 进行扩容。

触发的入口分析:

1.put方法,塞入元素个数达到扩容的阈值

2.触发了tryPresize。这个动作有两种可能触发,

第一种:但数组有一个单位的链表节点达到了8个,但是数组长度还是小于64的情况,会尝试去扩容。

第二种:则是调用方法,putAll时。

ok,知道了入口,我们逐层方法点击定位到扩容的方法为:transfer

代码语言:javascript
复制
private final void transfer(ConcurrentHashMap.Node<K,V>[] tab, ConcurrentHashMap.Node<K,V>[] nextTab) {
    int n = tab.length, stride;
    // 多线程扩容,每核处理的量小于16,则强制赋值16
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
        stride = MIN_TRANSFER_STRIDE; 
    // nextTab 为空,先实例化一个新的数组
    if (nextTab == null) { 
        try {
            @SuppressWarnings("unchecked")
            // 新数组的大小是原来的两倍
            ConcurrentHashMap.Node<K,V>[] nt = (ConcurrentHashMap.Node<K,V>[])new ConcurrentHashMap.Node<?,?>[n << 1];
            nextTab = nt;
        } catch (Throwable ex) { 
            sizeCtl = Integer.MAX_VALUE;
            return;
        }
        // 更新成员变量
        nextTable = nextTab;
        // 更新转移下标,就是 老的 tab 的 length
        transferIndex = n;
    }
    // bound :该线程此次可以处理的区间的最小下标,超过这个下标,就需要重新领取区间或者结束扩容
    // advance: 该参数
    int nextn = nextTab.length;
    // 创建一个 fwd 节点,用于占位。当别的线程发现这个槽位中是 fwd 类型的节点,则跳过这个节点。
    ConcurrentHashMap.ForwardingNode<K,V> fwd = new ConcurrentHashMap.ForwardingNode<K,V>(nextTab);
    // advance 变量指的是是否继续递减转移下一个桶,如果为 true,表示可以继续向后推进,反之,说明还没有处理好当前桶,不能推进
    boolean advance = true;
    // 完成状态,如果是 true,表示扩容结束
    boolean finishing = false; // to ensure sweep before committing nextTab
    // 死循环,i 表示下标,bound 表示当前线程可以处理的当前桶区间最小下标
    for (int i = 0, bound = 0;;) {
        ConcurrentHashMap.Node<K,V> f; int fh;
        while (advance) {
            int nextIndex, nextBound;
            if (--i >= bound || finishing)
                advance = false;
            else if ((nextIndex = transferIndex) <= 0) {
                i = -1;
                advance = false;
            }
            else if (U.compareAndSwapInt
                    (this, TRANSFERINDEX, nextIndex,
                            nextBound = (nextIndex > stride ?
                                    nextIndex - stride : 0))) {
                bound = nextBound;
                i = nextIndex - 1;
                advance = false;
            }
        }
        if (i < 0 || i >= n || i + n >= nextn) {
            int sc;
            if (finishing) {
                nextTable = null;
                table = nextTab;
                sizeCtl = (n << 1) - (n >>> 1);
                return;
            }
            if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                    return;
                finishing = advance = true;
                i = n; // recheck before commit
            }
        }
        else if ((f = tabAt(tab, i)) == null)
            advance = casTabAt(tab, i, null, fwd);
        else if ((fh = f.hash) == MOVED)
            advance = true; // already processed
        else {
            synchronized (f) {
            // 这儿多判断一次,是否为了防止可能出现的remove()操作
                if (tabAt(tab, i) == f) {
                    // 旧链表上该节点的数据,会被分成低位和高位,低位就是在新链表上的位置跟旧链表上一样,
                    // 高位就是在新链表的位置是旧链表位置加上旧链表的长度
                    ConcurrentHashMap.Node<K,V> ln, hn;
                    if (fh >= 0) {
                        int runBit = fh &amp; n;
                        ConcurrentHashMap.Node<K,V> lastRun = f;
                        for (ConcurrentHashMap.Node<K,V> p = f.next; p != null; p = p.next) {
                            int b = p.hash &amp; n;
                            if (b != runBit) {
                                runBit = b;
                                lastRun = p;
                            }
                        }
                        if (runBit == 0) {
                            ln = lastRun;
                            hn = null;
                        }
                        else {
                            hn = lastRun;
                            ln = null;
                        }
                        for (ConcurrentHashMap.Node<K,V> p = f; p != lastRun; p = p.next) {
                            int ph = p.hash; K pk = p.key; V pv = p.val;
                            // 该节点哈希值与旧链表长度与运算,结果为0,则在低位节点上,反之,在高位节点上
                            if ((ph &amp; n) == 0)
                                ln = new ConcurrentHashMap.Node<K,V>(ph, pk, pv, ln);
                            else
                                hn = new ConcurrentHashMap.Node<K,V>(ph, pk, pv, hn);
                        }
                        setTabAt(nextTab, i, ln);
                        // 在nextTable i + n 位置处插上链表
                        setTabAt(nextTab, i + n, hn);
                        // 在table i 位置处插上ForwardingNode 表示该节点已经处理过了
                        setTabAt(tab, i, fwd);
                        advance = true;
                    }
                    else if (f instanceof ConcurrentHashMap.TreeBin) {
                        // 如果是TreeBin,则按照红黑树进行处理,处理逻辑与上面一致
                        // 红黑树的逻辑跟节点一模一样,最后也会分高位和低位
                        ConcurrentHashMap.TreeBin<K,V> t = (ConcurrentHashMap.TreeBin<K,V>)f;
                        ConcurrentHashMap.TreeNode<K,V> lo = null, loTail = null;
                        ConcurrentHashMap.TreeNode<K,V> hi = null, hiTail = null;
                        int lc = 0, hc = 0;
                        for (ConcurrentHashMap.Node<K,V> e = t.first; e != null; e = e.next) {
                            int h = e.hash;
                            ConcurrentHashMap.TreeNode<K,V> p = new ConcurrentHashMap.TreeNode<K,V>
                                    (h, e.key, e.val, null, null);
                            if ((h &amp; n) == 0) {
                                if ((p.prev = loTail) == null)
                                    lo = p;
                                else
                                    loTail.next = p;
                                loTail = p;
                                ++lc;
                            }
                            else {
                                if ((p.prev = hiTail) == null)
                                    hi = p;
                                else
                                    hiTail.next = p;
                                hiTail = p;
                                ++hc;
                            }
                        }
                        // 如果树的节点数小于等于 6,那么转成链表,反之,创建一个新的树
                        ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                (hc != 0) ? new ConcurrentHashMap.TreeBin<K,V>(lo) : t;
                        hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                (lc != 0) ? new ConcurrentHashMap.TreeBin<K,V>(hi) : t;
                        setTabAt(nextTab, i, ln);
                        setTabAt(nextTab, i + n, hn);
                        setTabAt(tab, i, fwd);
                        advance = true;
                    }
                }
            }
        }
    }
}

五.总结

本文详细介绍了ConcurrentHashMap数据结构的核心逻辑与实现。现做以下总结

1.ConcurrentHashMap初始化容量为2的幂次,空参构造默认容量为16,有参构造容量为大于1.5倍+1参数的最小2的幂次。

2.put方法中使用数组cas自旋设值,链表或者红黑树synchronized关键字加锁设值,根据判断数组上的hash值判断当前节点状态,-1为正在发生扩容,大于0为hash值直接进行值的覆盖,节点类型为TreeBin则为红黑树,否则链表遍历赋值。

3.红黑树转换要求,链表节点数大于等于8,且数组长度大于等于64.

4.get方法不加锁,支持并发,通过volatile关键字保证内存可见性,拿到最新的节点数据。

六.参考

https://juejin.cn/post/6844904018729254926

https://www.jianshu.com/p/cf5e024d9432

https://segmentfault.com/a/1190000021237438

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2021-05-11,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 一.ConcurrentHashMap介绍
  • 二.关键概念介绍
  • 三.关键成员变量
  • 四.关键方法解析
    • 4.1.构造方法
      • 4.1.1.空参构造方法
      • 4.1.2.有参构造方法
    • 4.2.put方法
      • 4.3.get方法
        • 4.4.remove方法
          • 4.5.扩容
          • 五.总结
          • 六.参考
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档