前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >JDK 8 ConcurrentHashMap源码解读

JDK 8 ConcurrentHashMap源码解读

作者头像
用针戳左手中指指头
发布2021-01-29 10:47:56
2710
发布2021-01-29 10:47:56
举报
文章被收录于专栏:学习计划

7和8 的结构还是有些不一样;7里面是Segment、entry数组实现的,将entry数组分段加锁,而8里是对数组元素加锁,并发上增加了一个counterCells的数组记录并发时增加的值,然后通过cas方式进行操作,性能比起整段锁住的JDK 7 好了很多。

代码语言:javascript
复制
final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());
        // 链表长度统计
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)
                // 初始化数组
                tab = initTable();
            // 这个判断是利用unsafe的cas操作,将新值加入到组中还没有值的地方
            // tabAt 方法就是取出tab数组中i个位置的值,它底层使用的是unsafe
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                // castabAt和tabAt相反,它是在第i个位置设置值,
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    // cas成功就退出,失败,接着循环,下一次不一定能到进入这里
                    break;                   // no lock when adding to empty bin
            }
            else if ((fh = f.hash) == MOVED)
                // 辅助数据迁移
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                synchronized (f) {
                    // 对数组中的要插入的位置元素加锁后,再次判断是否改变
                    if (tabAt(tab, i) == f) {
                        // 红黑树的hash = -2 ,所以大于0的是链表
                        if (fh >= 0) {
                      
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                // hash相等且
                                if (e.hash == hash &&
                                    // key相等
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    // 替换值
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    // 退出完成插入
                                    break;
                                }
                                // 当前节点对象给pred
                                Node<K,V> pred = e;
                                // 下一个节点给e,遍历知道尾结点
                                if ((e = e.next) == null) {
                                     // 将新节点插入到最后
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        // 判断f是否是一颗红黑树
                        // 其实在红黑树外面有一层壳TreeBin,使得树成为一个整体,然后加锁,
                        // 不然没有这层壳,树调整后,根节点可能就不是根节点了,而且也不不能保证插入时没问题的
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            // 将新值加入到红黑树
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    // 当链表长度大于指定值,就树化
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
    // 修改计数
        addCount(1L, binCount);
        return null;
    }
代码语言:javascript
复制
 final TreeNode<K,V> putTreeVal(int h, K k, V v) {
            Class<?> kc = null;
            boolean searched = false;
     // 遍历
            for (TreeNode<K,V> p = root;;) {
                int dir, ph; K pk;
                if (p == null) {
                    // 根节点是空的就new一个
                    first = root = new TreeNode<K,V>(h, k, v, null, null);
                    break;
                }
                // 判断插左还是右
                else if ((ph = p.hash) > h)
                    dir = -1;
                else if (ph < h)
                    dir = 1;
                // 找到hash值相同的,可能值也相同
                else if ((pk = p.key) == k || (pk != null && k.equals(pk)))
                    // key相同
                    return p;
                // hash值相同,且地址不同或者值又或者equals不等
                else if ((kc == null &&
                          // 获取k的类型,赋值kc
                          (kc = comparableClassFor(k)) == null) ||
                         // 只有kc != null才会进入(k实现comparable)
                         // 判断kc的类型不相等才会返回0,不然返回k和pk的比较结果
                         (dir = compareComparables(kc, k, pk)) == 0) {
                    // 进入这里,表示,k实现comparable,或者k和pk相等
                    if (!searched) {
                        TreeNode<K,V> q, ch;
                        // 设置找到的标志
                        searched = true;
                        // 找到旧节点 并返回
                        if (((ch = p.left) != null &&
                             (q = ch.findTreeNode(h, k, kc)) != null) ||
                            ((ch = p.right) != null &&
                             (q = ch.findTreeNode(h, k, kc)) != null))
                            return q;
                    }
                    // 计算dir
                    dir = tieBreakOrder(k, pk);
                }

                TreeNode<K,V> xp = p;
                // 插入节点
                if ((p = (dir <= 0) ? p.left : p.right) == null) {
                    TreeNode<K,V> x, f = first;
                    first = x = new TreeNode<K,V>(h, k, v, f, xp);
                    if (f != null)
                        f.prev = x;
                    if (dir <= 0)
                        xp.left = x;
                    else
                        xp.right = x;
                    if (!xp.red)
                        x.red = true;
                    else {
                        lockRoot();
                        try {
                            root = balanceInsertion(root, x);
                        } finally {
                            unlockRoot();
                        }
                    }
                    break;
                }
            }
            assert checkInvariants(root);
            return null;
        }

初始化数组

代码语言:javascript
复制
   private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        while ((tab = table) == null || tab.length == 0) {
            // sizeCt 《 0 表示资源被占用
            if ((sc = sizeCtl) < 0)
                // 让出线程资源,但不代表不会竞争,也有可能会再次进入
                Thread.yield(); // lost initialization race; just spin
            // 类似加锁,就将seizeCtrl 改为 -1 ,成功的话才进入
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                    // 再次判断这个数组是否是空的
                    if ((tab = table) == null || tab.length == 0) {
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
                        // 右移2位(除2的2次)也就是四分之一
                        // 那么sc = 3/4 = 0.75
                        sc = n - (n >>> 2);
                    }
                } finally {
                    // 最后赋值给sizeCtl
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;
    }

jdk8中没有size属性,它有baseCount记录map的大小

它会有一个数组countCell[] 用来记录并发时所记录的增加数量,什么意思呢?就是在并发时,同时修改baseCount,要么成功、要么等待(阻塞),它呢用countCell[] 数组来记录线程在修改baseCount失败时所加的数值,当线程A在修改baseCount,线程B来了,它发现baseCount它改不了,不行,不能等,后面还有长队呢,就自己加个值放到countCell数组里,之后在做统计。

ThreadLocalRandom.getProbe() 每个线程生成一个随机数

代码语言:javascript
复制
private final void addCount(long x, int check) {
        CounterCell[] as; long b, s;
    // 当counterCells 不为空,说明有线程进行了+1
        if ((as = counterCells) != null ||
            // 或者 修改baseCount,失败才会进入 
            !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
            CounterCell a; long v; int m;
            boolean uncontended = true;
            // counterCell数组为空
            if (as == null || (m = as.length - 1) < 0 ||
                // counterCell数组上指定位置为空
                (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
                // cas修改counterCell指定位置变量失败
                !(uncontended =
                  U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
                // x = 1, uncontended = false
                fullAddCount(x, uncontended);
                return;
            }
            if (check <= 1)
                return;
            s = sumCount();
        }
        if (check >= 0) {
            Node<K,V>[] tab, nt; int n, sc;
            // 这里使用while,可能出现扩容完,又需要扩容的情况
            while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
                   (n = tab.length) < MAXIMUM_CAPACITY) {
                int rs = resizeStamp(n);
                if (sc < 0) {
                    // sc < 0 表示有其他线程在扩容
                    // 判断扩容是否结束或者线程数已达最大值
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)
                        break;
                    // 辅助扩容,sc + 1
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        transfer(tab, nt);
                }
                // rs << RESIZE_STAMP_SHIFT 得出的是一个负数,再 +2 还是一个负数,
                // 这个值使一个初始标志,
                else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                             (rs << RESIZE_STAMP_SHIFT) + 2))
                    transfer(tab, null);
                s = sumCount();
            }
        }
    }

他就是改baseCount,不行就改counterCells

代码语言:javascript
复制
private final void fullAddCount(long x, boolean wasUncontended) {
        int h;
    // 判断线程生成的随机数是否等于0,但它不会随着你调用而改变
        if ((h = ThreadLocalRandom.getProbe()) == 0) {
            // 生成随机数
            ThreadLocalRandom.localInit();      // force initialization
            h = ThreadLocalRandom.getProbe();
            wasUncontended = true;
        }
        boolean collide = false;                // True if last slot nonempty
        for (;;) {
            CounterCell[] as; CounterCell a; int n; long v;
            if ((as = counterCells) != null && (n = as.length) > 0) {
                // 当counterCells 有值,长度大于0
                // 取索引位置的counterCell
                if ((a = as[(n - 1) & h]) == null) {
                    // cellsBusy表示没有counterCells数组有没有被使用,
                    // cellsBusy = 0 表示,没有线程在使用counterCells数组
                    if (cellsBusy == 0) {            // Try to attach new Cell
                        // 没有线程在使用,就new一个新的 
                        CounterCell r = new CounterCell(x); // Optimistic create
                        // 再次判断,是否有人在用,然后再修改cellBusy为1(使用中)
                        if (cellsBusy == 0 &&
                            U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                            boolean created = false;
                            try {               // Recheck under lock
                                CounterCell[] rs; int m, j;
                                // 改完cellsBusy后,再判断counterCells索引位置是不是null,
                                // 如果索引位置是null,就修改值,不是null,就表示有其他线程修改了
                                // 就不会进入这个if里面去了
                                if ((rs = counterCells) != null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) {
                                    rs[j] = r;
                                    created = true;
                                }
                            } finally {
                                cellsBusy = 0;
                            }
                            // 如果修改成功后就退出方法
                            if (created)
                                break;
                            continue;           // Slot is now non-empty
                        }
                    }
                    collide = false;
                }
                // 当counterCells索引位置有值不为空,
                // 如果wasUncontended是false,就改成true,在左后刷新随机数
                else if (!wasUncontended)       // CAS already known to fail
                    wasUncontended = true;      // Continue after rehash
                // 修改counterCells索引位置的值,成功就退出
                else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
                    break;
                // counterCels被其他线程修改了,或是长度大于等于CPU核心数
                // 这个也是去限制counterCells扩容的
                else if (counterCells != as || n >= NCPU)
                    collide = false;            // At max size or stale
                else if (!collide)
                    // 这个collide是控制下面的if分支是否执行
                    collide = true;
                else if (cellsBusy == 0 &&
                         U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                    // 当counterCells != null ,counterCells索引位置不为空,wasUncontended = true,collide = true,cellsBusy = 0,并且修改cellsBusy成功进入
                    // 要进入这里,最少要两次循环,
                    try {
                        if (counterCells == as) {// Expand table unless stale
                            // 给countCell扩容
                            CounterCell[] rs = new CounterCell[n << 1];
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            counterCells = rs;
                        }
                    } finally {
                        cellsBusy = 0;
                    }
                    collide = false;
                    continue;                   // Retry with expanded table
                }
                // 生成新的随机数
                h = ThreadLocalRandom.advanceProbe(h);
            }
             // 当counterCells = null进入这个if
            else if (cellsBusy == 0 && counterCells == as &&
                     // 习惯cellsBusy为1
                     U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
               // cas成功,
                boolean init = false;
                try {                           // Initialize table
                    // 查看counterCells有没有改变,及查看是否有其他线程已经初始化了
                    if (counterCells == as) {
                        // 当counterCells没有改变,就初始化它
                        CounterCell[] rs = new CounterCell[2];
                        rs[h & 1] = new CounterCell(x);
                        counterCells = rs;
                        init = true;
                    }
                } finally {
                    cellsBusy = 0;
                }
                if (init)
                    // 初始化成功后就退出循环,这个退出循环就是退出了这个方法了。
                    break;
            }
            // 上面情况都不满足
            // cas操作对BaseCount + 1,成功退出,不成功接着循环
            else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
                break;                          // Fall back on using base
        }
    }
代码语言:javascript
复制
  private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
        int n = tab.length, stride;
      // 
        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
            // 默认16,表示该线程转移元素时的长度
            stride = MIN_TRANSFER_STRIDE; // subdivide range
        if (nextTab == null) {            // initiating
            try {
                @SuppressWarnings("unchecked")
                // 长度扩容为2倍
                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
                nextTab = nt;
            } catch (Throwable ex) {      // try to cope with OOME
                sizeCtl = Integer.MAX_VALUE;
                return;
            }
            nextTable = nextTab;
            transferIndex = n;
        }
      // 新数组大小
        int nextn = nextTab.length;
      // 数组对应位置的节点转移后,会放置一个fwd(可以理解为占位),这个的作用是,在另一个线程在put时发现,这个fwd对象,
      // 就会知道数组正在转移,那么就会启动辅助转移元素的方法,在转移完后才能进行put
        ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
      // advance表示是否需要向前获取需要转移的元素
        boolean advance = true;
      // true表示当前线程的扩容任务完成
        boolean finishing = false; // to ensure sweep before committing nextTab
        for (int i = 0, bound = 0;;) {
            Node<K,V> f; int fh;
            // 关于这个while的理解:
            // 假设当stride(该线程转移元素的长度)= 2,那么它得出的i = 3,1,最后只会去转移索引为1,3的,
            // 当stride = 3 ,那么得出的i = 3,2,1,最后转移的就会使索引为3,2,1的
            // 所以这个while就相当于获取索引i的
            
            // 假设有两个线程在走,TransferIndex值就会被改变,走到第三个if会有一个线程失败,
            // 但它还是会继续循环获取到一个i
            while (advance) {
                int nextIndex, nextBound;
                // 一开始这个条件就不符合
                if (--i >= bound || finishing)
                    advance = false;
                else if ((nextIndex = transferIndex) <= 0) {
                    i = -1;
                    advance = false;
                }
                // 尝试获取下一个要迁移的元素长度
                else if (U.compareAndSwapInt
                         (this, TRANSFERINDEX, nextIndex,
                          nextBound = (nextIndex > stride ?
                                       nextIndex - stride : 0))) {
                    bound = nextBound;
                    // 下一个索引
                    i = nextIndex - 1;
                    advance = false;
                }
            }
            //这里的nextn是新数组大小
            if (i < 0 || i >= n || i + n >= nextn) {
                int sc;
                if (finishing) {
                    // 如果已经结束转移,就将table指向新数组
                    nextTable = null;
                    table = nextTab;
                    sizeCtl = (n << 1) - (n >>> 1);
                    return;
                }
                // 该线程做完后,sc - 1
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                    // 它这个判断和这个一样:sc != resizeStamp(n) << RESIZE_STAMP_SHIFT + 2
                    // 在进入第一个线程进入transfer之前,会修改sizeCtl = rs << RESIZE_STAMP_SHIFT) + 2 = resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2
                    // sc 是否等于进入transfer之前设定的初始值,相等表示没有其他线程在操作了,
                    // 因为每有一个线程操作,都会sc + 1
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                        return;
                    finishing = advance = true;
                    i = n; // recheck before commit
                }
            }
            // 如果第i个位置是null,就替换为fwd
            else if ((f = tabAt(tab, i)) == null)
                advance = casTabAt(tab, i, null, fwd);
            // 转移完后,进入下一个区域的转移
            else if ((fh = f.hash) == MOVED)
                advance = true; // already processed
            else {
                synchronized (f) {
                    // 对要转移的索引位置对象枷锁后
                    // 下面的转移逻辑和hashMap一样
                    if (tabAt(tab, i) == f) {
                       // 省略
                    }
                }
            }
        }
    }

这个方法和addCount里的类似

代码语言:javascript
复制
 final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
        Node<K,V>[] nextTab; int sc;
        if (tab != null && (f instanceof ForwardingNode) &&
            (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
            int rs = resizeStamp(tab.length);
             // 这里使用while,可能出现扩容完,又需要扩容的情况
            while (nextTab == nextTable && table == tab &&
                   (sc = sizeCtl) < 0) {
                 // sc < 0 表示有其他线程在扩容
                    // 判断扩容是否结束或者线程数已达最大值
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || transferIndex <= 0)
                    break;
                 // 辅助扩容,sc + 1
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                    transfer(tab, nextTab);
                    break;
                }
            }
            return nextTab;
        }
        return table;
    }
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2020/08/11 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档