首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【原创】Java并发编程系列27 | ConcurrentHashMap(下)

【原创】Java并发编程系列27 | ConcurrentHashMap(下)

作者头像
java进阶架构师
发布2020-07-02 16:22:06
4650
发布2020-07-02 16:22:06
举报
文章被收录于专栏:Java进阶架构师Java进阶架构师

正文

上一篇详细分析了HashMap源码,介绍了HashMap的数据结构以及并发编程中HashMap的问题,这篇就来看下ConcurrentHashMap。因为ConcurrentHashMapHashMap结构是一样的,本文将重点介绍ConcurrentHashMap在并发编程中如何保证线程安全:

  1. 关键属性
  2. put()方法
  3. 扩容
  4. 如何保证线程线程安全
  5. 使用误区

1. 关键属性

table:用来存放Node结点数据;

Node:结点,保存key-value的数据结构;

nextTable:扩容时新生成的数据,数组为table的两倍;

ForwardingNode:特殊的Node结点,hash值为-1,其中存储nextTable的引用。扩容时,作为一个占位符放在table中表示当前结点为null或则已经被移动。

sizeCtl:控制标识符

  • 负数代表正在进行初始化或扩容操作
  • -1 代表正在初始化
  • -N 表示有N-1个线程正在进行扩容操作
  • 正数 如果当前数组为null,表示table在初始化过程中,sizeCtl表示为需要新建数组的长度
  • 正数 若table已经初始化了,表示临界值,数组的长度n乘以加载因子loadFactor;

sun.misc.Unsafe U:使用CAS修改属性和做一些操作来保证线程安全性,例如:

/**
 * 利用CAS操作获取table数组中索引为i的Node
 */
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
    return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}

/**
 * 利用CAS操作替换table数组中索引为i的元素
 */
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                    Node<K,V> c, Node<K,V> v) {
    return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}

/**
 * 利用CAS操作设置table数组中索引为i的元素
 */
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
    U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}

2. put()方法

由于数据结构一样,put()、get()、remove()方法大致步骤也一致,我们通过put()方法的源码来看下ConcurrentHashMap是如何保证线程安全的。get()、remove()这些数据操作保证线程安全的方式跟put()方法是一样的,理解了put方法,其他的也就明白了。

put()方法大致步骤:

  1. 数组下标没有对应hash值,直接newNode()添加
  2. 数组下标有对应hash值,添加到链表最后
  3. 链表超过最大长度(8),将链表改为红黑树再添加元素

结点在table数组中的位置计算:table[(length - 1) & hash]

put()方法源码:

final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();
    int hash = spread(key.hashCode());// 得到 hash 值
    int binCount = 0;// 用于记录相应链表的长度
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        // 如果数组空,进行数组初始化
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();

        // 找该 hash 值对应的数组下标,得到第一个节点 f
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            // 如果数组该位置为空,利用 CAS 操作将这个新值放入其中即可
            // 如果 CAS 失败,那就是有并发操作,进到下一个循环再put
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }
        
        // hash 等于 MOVED(-1),数组正在扩容,帮助数据迁移
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);// 帮助数据迁移

        // 到这里就是说,f 是该位置的头结点,而且不为空
        else {
            V oldVal = null;
            // 获取数组该位置的头结点的监视器锁
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    // 头结点的 hash 值大于 0,说明是链表
                    if (fh >= 0) {
                        binCount = 1;// 用于累加,记录链表的长度
                        // 遍历链表添加node
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    // 红黑树
                    else if (f instanceof TreeBin) {
                        Node<K,V> p;
                        binCount = 2;
                        // 调用红黑树的插值方法插入新节点
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }

            // 判断是否要将链表转换为红黑树
            if (binCount != 0) {
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    addCount(1L, binCount);
    return null;
}

put()方法是如何保证线程安全的?

  1. table用volatile修饰,保证数组数据修改的可见性
  2. 获取结点使用tabAt(),设置结点使用casTabAt(),利用CAS进行数据操作,使用乐观锁,如果因为其他线程修改数据导致当前线程操作失败,自旋重试直到成功。

举例说明:

HashMap中,线程A线程B可以同时检查得到tab[1]==null,然后线程A设置了tab[1]=Node(A),马上线程B又设置tab[1]=Node(B),那么线程A设置的数据就丢失了;

ConcurrentHashMap中,当线程A设置了tab[1]=Node(A)后,线程B又设置tab[1]=Node(B)就会失败,然后自旋进下一次循环,设置Node(A).next=Node(B)

  1. 操作链表上的数据和红黑树上的数据时加synchronized锁,将第一个结点作为监视器锁。同样避免了因为多线程操作对数据结构的破坏和数据的丢失。
  2. 不再使用++size这种操作记录结点数量,而是用volatile变量baseCount和volatile数组CounterCell[]来记录。CAS操作成功用baseCount记录,CAS失败用CounterCell[]记录,最终将两个变量结合计算出总结点数量。

3. 扩容

ConcurrentHashMap单线程扩容过程与HashMap类似,大致过程如下:

  1. newTab = new Node[2*length],创建一个两倍于原来数组oldTab的新数组newTab,遍历oldTable,将oldTab中的结点转移到newTab中。
  2. 如果桶中oldTab[i]只有一个元素node,直接将node放入newTab[node.hash & (newCap - 1)]中。
  3. 如果桶中oldTab[i]是链表,分成两个链表分别放入newTab[i]和newTab[i+oldTab.length]。
  4. 如果桶中oldTab[i]是树,树打散成两颗树插入到新桶中去。

ConcurrentHashMap支持多线程扩容:

  1. 当一个线程发现数组结点到达阈值时,调用transfer(tab, null)进行扩容并迁移数据,会创建一个2倍长度的新数组nextTable。
  2. 当另一个线程要操作数据时发现table数组正在扩容,就会调用transfer(tab, nextTable)帮忙迁移数据。
  3. 多个线程同时迁移数据怎么实现呢?设置一个步长stride,每个线程负责一个步长的数据迁移。例:table.length==64,步长stride=16,每个线程每次负责迁移16个桶,如果当前线程16个桶迁移结束再去申请16个桶迁移。

扩容如何保证线程安全呢?

  1. table和nextTable的修改都是通过CAS操作,失败后自旋重试,不会造成数据丢失和错误。
  2. 链表和红黑树的操作都是将第一个结点作为监视器锁加synchronized锁,同步处理,保证线程安全。

扩容源码(源码较长,还是建议看下,实在没时间阅读源码的话可以直接看节的总结):

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
    int n = tab.length, stride;

    // stride步长,在单核下直接等于 n,多核模式下为 (n>>>3)/NCPU,最小值是 16
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
        stride = MIN_TRANSFER_STRIDE; // subdivide range

    // 如果 nextTab 为 null,先进行一次初始化
    // 第一个发起迁移的线程调用此方法时,参数 nextTab 为 null
    // 之后参与迁移的线程调用此方法时,nextTab 为 nextTable
    if (nextTab == null) {
        try {
            // 容量翻倍
            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
            nextTab = nt;
        } catch (Throwable ex) {      // try to cope with OOME
            sizeCtl = Integer.MAX_VALUE;
            return;
        }
        nextTable = nextTab; // nextTable 是 ConcurrentHashMap 中的属性
        transferIndex = n;// transferIndex 也是 ConcurrentHashMap 的属性,用于控制迁移的位置
    }

    int nextn = nextTab.length;

    // ForwardingNode表示已经迁移过的结点,hash值为MOED(-1)
    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
    
    boolean advance = true;// advance 指的是做完了一个位置的迁移工作,可以准备做下一个位置的了
    boolean finishing = false; // to ensure sweep before committing nextTab


    for (int i = 0, bound = 0;;) {// i 是位置索引,bound 是边界,
        Node<K,V> f; int fh;

        // i 指向了 transferIndex,bound 指向了 transferIndex-stride
        while (advance) {
            int nextIndex, nextBound;
            if (--i >= bound || finishing)
                advance = false;

            // 将 transferIndex 值赋给 nextIndex
            // 这里 transferIndex 一旦小于等于 0,说明原数组的所有位置都有相应的线程去处理了
            else if ((nextIndex = transferIndex) <= 0) {
                i = -1;
                advance = false;
            }
            else if (U.compareAndSwapInt
                     (this, TRANSFERINDEX, nextIndex,
                      nextBound = (nextIndex > stride ?
                                   nextIndex - stride : 0))) {
                bound = nextBound;// nextBound 是这次迁移任务的边界
                i = nextIndex - 1;
                advance = false;
            }
        }
        if (i < 0 || i >= n || i + n >= nextn) {
            int sc;
            // 所有的迁移操作已经完成
            if (finishing) {
                nextTable = null;
                table = nextTab;// 将新的 nextTab 赋值给 table 属性,完成迁移
                sizeCtl = (n << 1) - (n >>> 1);// 重新计算 sizeCtl:n 是原数组长度,所以 sizeCtl 得出的值将是新数组长度的 0.75 倍
                return;
            }

            // 修改sizeCtl
            if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                // 任务结束,方法退出
                if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                    return;
                finishing = advance = true;
                i = n; // recheck before commit
            }
        }
        // 如果位置 i 处是空的,没有任何节点,那么放入刚刚初始化的 ForwardingNode
        else if ((f = tabAt(tab, i)) == null)
            advance = casTabAt(tab, i, null, fwd);
        // 该位置处是一个 ForwardingNode,代表该位置已经迁移过了
        else if ((fh = f.hash) == MOVED)
            advance = true; // already processed
        // 对数组该位置处的结点加锁,开始处理数组该位置处的迁移工作
        else {
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    Node<K,V> ln, hn;
                    // 头结点的 hash 大于 0,说明是链表的 Node 节点
                    if (fh >= 0) {
                        /*
                         * 将链表一分为二
                         * 找到原链表中的 lastRun,然后 lastRun 及其之后的节点是一起进行迁移的
                         * astRun 之前的节点需要进行克隆,然后分到两个链表中
                         */
                        int runBit = fh & n;
                        Node<K,V> lastRun = f;
                        for (Node<K,V> p = f.next; p != null; p = p.next) {
                            int b = p.hash & n;
                            if (b != runBit) {
                                runBit = b;
                                lastRun = p;
                            }
                        }
                        if (runBit == 0) {
                            ln = lastRun;
                            hn = null;
                        }
                        else {
                            hn = lastRun;
                            ln = null;
                        }
                        for (Node<K,V> p = f; p != lastRun; p = p.next) {
                            int ph = p.hash; K pk = p.key; V pv = p.val;
                            if ((ph & n) == 0)
                                ln = new Node<K,V>(ph, pk, pv, ln);
                            else
                                hn = new Node<K,V>(ph, pk, pv, hn);
                        }
                        // 其中的一个链表放在新数组的位置 i
                        setTabAt(nextTab, i, ln);
                        // 另一个链表放在新数组的位置 i+n
                        setTabAt(nextTab, i + n, hn);
                        // 将原数组该位置处设置为 fwd,代表该位置已经处理完毕,
                        // 其他线程一旦看到该位置的 hash 值为 MOVED,就不会进行迁移了
                        setTabAt(tab, i, fwd);
                        // advance 设置为 true,代表该位置已经迁移完毕
                        advance = true;
                    }
                    else if (f instanceof TreeBin) {
                        // 红黑树的迁移
                        TreeBin<K,V> t = (TreeBin<K,V>)f;
                        TreeNode<K,V> lo = null, loTail = null;
                        TreeNode<K,V> hi = null, hiTail = null;
                        int lc = 0, hc = 0;
                        for (Node<K,V> e = t.first; e != null; e = e.next) {
                            int h = e.hash;
                            TreeNode<K,V> p = new TreeNode<K,V>
                                (h, e.key, e.val, null, null);
                            if ((h & n) == 0) {
                                if ((p.prev = loTail) == null)
                                    lo = p;
                                else
                                    loTail.next = p;
                                loTail = p;
                                ++lc;
                            }
                            else {
                                if ((p.prev = hiTail) == null)
                                    hi = p;
                                else
                                    hiTail.next = p;
                                hiTail = p;
                                ++hc;
                            }
                        }
                        // 如果一分为二后,节点数少于 8,那么将红黑树转换回链表
                        ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                            (hc != 0) ? new TreeBin<K,V>(lo) : t;
                        hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                            (lc != 0) ? new TreeBin<K,V>(hi) : t;

                        // 将 ln 放置在新数组的位置 i
                        setTabAt(nextTab, i, ln);
                        // 将 hn 放置在新数组的位置 i+n
                        setTabAt(nextTab, i + n, hn);
                        // 将原数组该位置处设置为 fwd,代表该位置已经处理完毕,
                        // 其他线程一旦看到该位置的 hash 值为 MOVED,就不会进行迁移了
                        setTabAt(tab, i, fwd);
                        // advance 设置为 true,代表该位置已经迁移完毕
                        advance = true;
                    }
                }
            }
        }
    }
}

4. 总结:如何保证线程线程安全

  1. CAS操作数据:table数组的取值/设置值、链表的数值操作、sizeCtl修改都利用CAS来完成,当因为其他线程修改了数据导致操作失败后,会自旋重试直到成功,保证了在多线程环境下的数据安全。
  2. synchronized互斥锁:操作链表/树种的元素时,使用synchronized锁,将第一个结点作为监视器锁,保证线程安全。
  3. volatile修饰变量:table、baseCount、CounterCell[]、sizeCtl等变量用volatile修饰,保证了多线程环境下数据读写的可见性。

5. 使用注意

线程安全的容器只能保证自身的数据不被破坏,但无法保证业务的行为是否正确。

ConcurrentHashMap只是保证put进容器的数据正确保存,get时可以正确获取,但并发容器并不是锁,并不能保证业务上的线程安全。

举例说明:启动100个线程统计字符串"a"出现的次数,正确结果应该是100,但是每次执行完结果都小于100。

ConcurrentHashMap能保证的情况:

  1. put("a1", 1)和put("a2", 2),如果"a1"和"a2"的hash值相等,但并不equals,在并发环境中出问题。
  2. get("a")在多线程环境中可以取到正确的值。

ConcurrentHashMap不能解决的情况:两个线程同时get("a")=1,然后又同时put("a", 2)。这个问题不是ConcurrentHashMap本身的问题,因为get得到的是最新值没问题,put的值也已经保存了,而是业务代码的线程安全问题,对一个共享的集合操作时没有同步处理,需要加锁解决。

public class ConcurrentHashMapTest {
    private static final Map<String, Long> wordCounts = new ConcurrentHashMap<String, Long>();
    public static void main(String[] args) {
        for (int i = 0; i <= 99; i++) {
            new Thread(){
                public void run() {
                    Long oldValue = wordCounts.get("a");
                    Long newValue = (oldValue == null) ? 1L : oldValue + 1;
                    wordCounts.put("a", newValue);
                };
            }.start();
        }
        System.out.println(wordCounts);
    }
}

解决:synchronized加锁

public class ConcurrentHashMapTest {
    private static final Map<String, Long> wordCounts = new ConcurrentHashMap<String, Long>();
    public static void main(String[] args) {
        for (int i = 0; i <= 99; i++) {
            new Thread(){
                public void run() {
                    synchronized (wordCounts) {
                        Long oldValue = wordCounts.get("a");
                        Long newValue = (oldValue == null) ? 1L : oldValue + 1;
                        wordCounts.put("a", newValue);
                    }
                };
            }.start();
        }
        System.out.println(wordCounts);
    }
}

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-07-01,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 java进阶架构师 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 关键属性
  • 2. put()方法
  • 3. 扩容
  • 4. 总结:如何保证线程线程安全
  • 5. 使用注意
相关产品与服务
容器服务
腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档