前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >JDK8的ConcurrentHashMap源码学习笔记

JDK8的ConcurrentHashMap源码学习笔记

作者头像
itliusir
发布2018-05-21 16:58:07
7390
发布2018-05-21 16:58:07
举报
文章被收录于专栏:刘君君刘君君

正文:

目标

  • 首要目标:保持并发的可读性,同时最小化更新产生的竞争
  • 次要目标:保持与HashMap相同或更好的空间消耗,并支持许多线程在空表上的高初始插入率。

设计

  • 使用CAS代替之前版本的分段锁
  • 红黑树

putVal()方法

代码语言:javascript
复制
/**
 * sizeCtl:表初始化和调整控制。当负值时,表被初始化或调整大小:-1用于初始化,-(1 +主动调整大小的线程数)用于调整大小,默认为0。初始化完成后,保存下一个元素count值,以调整表的大小。
 */
private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    //volatile读
    while ((tab = table) == null || tab.length == 0) {
	    //如果一个线程发现sizeCtl<0,意味着另外的线程执行CAS操作成功,当前线程只需要让出cpu时间片
        //volatile读
        if ((sc = sizeCtl) < 0) 
            Thread.yield(); // lost initialization race; just spin
        //第一次put操作的线程会执行Unsafe.compareAndSwapInt方法修改sizeCtl为-1,有且只有一个线程能够修改成功,其它线程通过Thread.yield()让出CPU时间片等待table初始化完成
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            try {
                if ((tab = table) == null || tab.length == 0) {
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    @SuppressWarnings("unchecked")
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                    table = tab = nt;
                    sc = n - (n >>> 2);
                }
            } finally {
                sizeCtl = sc;
            }
            break;
        }
    }
    return tab;
}
/**
 * 使用Unsafe.getObjectVolatile()获取数组元素,原因是Java数组是无法表达元素是volatile、final的
 */
final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();
    int hash = spread(key.hashCode());
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) {
      Node<K,V> f; int n, i, fh;
      if (tab == null || (n = tab.length) == 0)
        //初始化
        tab = initTable();
      //为空,则设置为头节点(CAS)
      else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
        if (casTabAt(tab, i, null,
                     new Node<K,V>(hash, key, value, null)))
          break;                   // no lock when adding to empty bin
      }
      else if ((fh = f.hash) == MOVED)
        tab = helpTransfer(tab, f);
      else {
        //节点添加
        V oldVal = null;
        synchronized (f) {
          if (tabAt(tab, i) == f) {
            if (fh >= 0) {
              binCount = 1;
              for (Node<K,V> e = f;; ++binCount) {
                K ek;
                if (e.hash == hash &&
                    ((ek = e.key) == key ||
                     (ek != null && key.equals(ek)))) {
                  oldVal = e.val;
                  if (!onlyIfAbsent)
                    e.val = value;
                  break;
                }
                Node<K,V> pred = e;
                if ((e = e.next) == null) {
                  pred.next = new Node<K,V>(hash, key,
                                            value, null);
                  break;
                }
              }
            }
            else if (f instanceof TreeBin) {
              Node<K,V> p;
              binCount = 2;
              if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                    value)) != null) {
                oldVal = p.val;
                if (!onlyIfAbsent)
                  p.val = value;
              }
            }
          }
        }
        if (binCount != 0) {
          //达到阈值转为红黑树
          if (binCount >= TREEIFY_THRESHOLD)
            treeifyBin(tab, i);
          if (oldVal != null)
            return oldVal;
          break;
        }
      }
    }
    addCount(1L, binCount);
    return null;
}
/**
 * 如果当前table的个数未达到MIN_TREEIFY_CAPACITY(64)则先进行扩容
 */
private final void treeifyBin(Node<K,V>[] tab, int index) {
    Node<K,V> b; int n, sc;
    if (tab != null) {
      if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
        tryPresize(n << 1);
      else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
        synchronized (b) {
          if (tabAt(tab, index) == b) {
            TreeNode<K,V> hd = null, tl = null;
            for (Node<K,V> e = b; e != null; e = e.next) {
              TreeNode<K,V> p =
                new TreeNode<K,V>(e.hash, e.key, e.val,
                                  null, null);
              if ((p.prev = tl) == null)
                hd = p;
              else
                tl.next = p;
              tl = p;
            }
            setTabAt(tab, index, new TreeBin<K,V>(hd));
          }
        }
      }
    }
}
/**
 * 扩容(size = 2n)
 */
private final void tryPresize(int size) {
    //size如果大于max(Int)/2 则直接扩为max(Int),否则为大于3*size + 1的最小二次幂
    int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
    tableSizeFor(size + (size >>> 1) + 1);
    int sc;
    //volatile读,sizeCtl大于0指没有进行初始化和扩容的操作
    while ((sc = sizeCtl) >= 0) {
      Node<K,V>[] tab = table; int n;
      if (tab == null || (n = tab.length) == 0) {
        n = (sc > c) ? sc : c;
        //执行Unsafe.compareAndSwapInt方法修改sizeCtl为-1,有且只有一个线程能够修改成功,其它线程通过Thread.yield()让出CPU时间片等待table初始化完成
        if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
          try {
            if (table == tab) {
              @SuppressWarnings("unchecked")
              //初始化
              Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
              table = nt;
              sc = n - (n >>> 2);
            }
          } finally {
            sizeCtl = sc;
          }
        }
      }
      //最大了
      else if (c <= sc || n >= MAXIMUM_CAPACITY)
        break;
      else if (tab == table) {
        int rs = resizeStamp(n);
        if (sc < 0) {
          //竞争失败
          Node<K,V>[] nt;
		 //正在咨询大佬...
          if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
              sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
              transferIndex <= 0)
            break;
          if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
            transfer(tab, nt);
        }
        //竞争成功
        else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                     (rs << RESIZE_STAMP_SHIFT) + 2))
          transfer(tab, null);
      }
    }
}

transfer()方法

代码语言:javascript
复制
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
    int n = tab.length, stride;
    //分片
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
      stride = MIN_TRANSFER_STRIDE; // subdivide range
    //nextTab初始化(CAS保证只有一个线程调用此方法)
    if (nextTab == null) {            // initiating
      try {
        //根据当前数组长度n,新建一个两倍长度的数组nextTable
        @SuppressWarnings("unchecked")
        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
        nextTab = nt;
      } catch (Throwable ex) {      // try to cope with OOME
        sizeCtl = Integer.MAX_VALUE;
        return;
      }
      nextTable = nextTab;
      transferIndex = n;
    }
    int nextn = nextTab.length;
    //初始化ForwardingNode(nextTab) 用于并发移动时,其它线程可以知道这个节点正在被移动,或已经被移动
    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
    boolean advance = true;
    //循环的关键变量,判断是否已经扩容完成,完成就return,退出循环
    boolean finishing = false; // to ensure sweep before committing nextTab
    for (int i = 0, bound = 0;;) {
      Node<K,V> f; int fh;
      while (advance) {
        int nextIndex, nextBound;
        //i指当前处理的槽位序号,bound指需要处理的槽位边界
        if (--i >= bound || finishing)
          advance = false;
        else if ((nextIndex = transferIndex) <= 0) {
          i = -1;
          advance = false;
        }
        else if (U.compareAndSwapInt
                 (this, TRANSFERINDEX, nextIndex,
                  nextBound = (nextIndex > stride ?
                               nextIndex - stride : 0))) {
          bound = nextBound;
          i = nextIndex - 1;
          advance = false;
        }
      }
      if (i < 0 || i >= n || i + n >= nextn) {
        int sc;
        if (finishing) {
          nextTable = null;
          table = nextTab;
          //sizeCtl设为新数组大小的3/4(2n-n/2)
          sizeCtl = (n << 1) - (n >>> 1);
          return;
        }
        //resizeStamp()没看懂
        if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
          if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
            return;
          finishing = advance = true;
          i = n; // recheck before commit
        }
      }
      else if ((f = tabAt(tab, i)) == null)
        advance = casTabAt(tab, i, null, fwd);
      else if ((fh = f.hash) == MOVED)
        advance = true; // already processed
      else {
        synchronized (f) {
          if (tabAt(tab, i) == f) {
            Node<K,V> ln, hn;
            if (fh >= 0) {
              //使用fn&n可以快速把链表中的元素分为两份
              int runBit = fh & n;
              Node<K,V> lastRun = f;
              for (Node<K,V> p = f.next; p != null; p = p.next) {
                int b = p.hash & n;
                if (b != runBit) {
                  runBit = b;
                  lastRun = p;
                }
              }
              if (runBit == 0) {
                ln = lastRun;
                hn = null;
              }
              else {
                hn = lastRun;
                ln = null;
              }
              for (Node<K,V> p = f; p != lastRun; p = p.next) {
                int ph = p.hash; K pk = p.key; V pv = p.val;
                if ((ph & n) == 0)
                  ln = new Node<K,V>(ph, pk, pv, ln);
                else
                  hn = new Node<K,V>(ph, pk, pv, hn);
              }
              //放到新Tab
              setTabAt(nextTab, i, ln);
              setTabAt(nextTab, i + n, hn);
              setTabAt(tab, i, fwd);
              advance = true;
            }
            else if (f instanceof TreeBin) {
              TreeBin<K,V> t = (TreeBin<K,V>)f;
              TreeNode<K,V> lo = null, loTail = null;
              TreeNode<K,V> hi = null, hiTail = null;
              int lc = 0, hc = 0;
              for (Node<K,V> e = t.first; e != null; e = e.next) {
                int h = e.hash;
                TreeNode<K,V> p = new TreeNode<K,V>
                  (h, e.key, e.val, null, null);
                //红黑树同样是根据h&n分为两份
                if ((h & n) == 0) {
                  if ((p.prev = loTail) == null)
                    lo = p;
                  else
                    loTail.next = p;
                  loTail = p;
                  ++lc;
                }
                else {
                  if ((p.prev = hiTail) == null)
                    hi = p;
                  else
                    hiTail.next = p;
                  hiTail = p;
                  ++hc;
                }
              }
              //拆分之后如果长度小于默认阈值(6)则转为链表
              ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
              (hc != 0) ? new TreeBin<K,V>(lo) : t;
              hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
              (lc != 0) ? new TreeBin<K,V>(hi) : t;
              //设置新的Tab
              setTabAt(nextTab, i, ln);
              setTabAt(nextTab, i + n, hn);
              setTabAt(tab, i, fwd);
              advance = true;
            }
          }
        }
      }
    }
}

HashMap vs ConcurrentHashMap

  • ConcurrentHashMap是线程安全的,在并发环境下不需要额外同步
  • ConcurrentHashMap有很好的扩展性,在多线程环境下性能方面比做了同步的HashMap要好,但是在单线程环境下,HashMap会比ConcurrentHashMap好一点

参考链接

晚安~

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018-03-27,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 正文:
  • 目标
  • 设计
    • putVal()方法
      • transfer()方法
      • HashMap vs ConcurrentHashMap
      • 参考链接
      • 晚安~
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档