专栏首页后端技术学习ConcurrentHashMap源码学习

ConcurrentHashMap源码学习

ConcurrentHashMap源码学习

首先思考一下:

既然有了HashMap为什么还会出现ConcurrentHashMap?同时ConcurrentHashMap具有什么优势?ConcurrentHashMap与HashMap、HashTable有什么区别?ConcurrentHashMap中的sizeCtl有几种值,这些值代表的是什么状态?ConcurrentHashMap使用了哪些锁?DEFAULT_CONCURRENCY_LEVEL表示什么,有什么用?

1.相关变量

//最大容量2^30
private static final int MAXIMUM_CAPACITY = 1 << 30;
//默认容量16
private static final int DEFAULT_CAPACITY = 16;
//最大数组大小 =>2^31-9
//@Native public static final int  MAX_VALUE = 0x7fffffff;
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
//默认并发级别 16
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
//加载因子0.75
private static final float LOAD_FACTOR = 0.75f;
//链表转红黑树的必要条件之一
static final int TREEIFY_THRESHOLD = 8;
//红黑树转链表
static final int UNTREEIFY_THRESHOLD = 6;
//链表转红黑树的条件之二
static final int MIN_TREEIFY_CAPACITY = 64;
//扩容转移时的最小数组分组大小
private static final int MIN_TRANSFER_STRIDE = 16;
//sizrCtrl中用于生成标记的位数,对于32位数组至少6位
private static int RESIZE_STAMP_BITS = 16;
//扩容最大线程数 2^(31-16)-1=2^15-1
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
//在sizeCtl中记录标记的位移大小,也即偏移量
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
//用于转发节点的哈希,也即扩容操作
static final int MOVED     = -1; // hash for forwarding nodes
//hash为树的根,也即红黑树操作
static final int TREEBIN   = -2; // hash for roots of trees
//临时保留的hash值
static final int RESERVED  = -3; // hash for transient reservations
//2^31-1,用在计算hash时进行
//按位与计算消除负hash
//youy
static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash
//数组,或者叫桶
transient volatile Node<K,V>[] table;
//下一个数组,或者叫桶
private transient volatile Node<K,V>[] nextTable;
//元素个数计数器,当无并发时使用CAS进行更新
private transient volatile long baseCount;
//这里和hashMap的阀值类似,但是比HashMap的功能要多
//sizeCtl=-1,表示在进行初始化
//sizeCtl=0,默认值,在初始化时采用默认容量16
//sizeCtl>0,传入的容量,保存扩容的0.75倍作为下一次的扩容阀值
private transient volatile int sizeCtl;
//扩容时下一个hash表索引 +1
private transient volatile int transferIndex;
//扩容和/或创建CounterCell时使用的自旋锁(通过CAS锁定)。
private transient volatile int cellsBusy;
//表计数器,如果不为空,则大小为2的次幂
//用于并发时,统计hash表中的数组的大小和采用cas扩容的大小:
//baseCount+countCells
private transient volatile CounterCell[] counterCells;
// keySet,key的集合
private transient KeySetView<K,V> keySet;
//value值
private transient ValuesView<K,V> values;
//entrySet:键值对,类似,类似一组一组的
private transient EntrySetView<K,V> entrySet;

2.构造函数

//空参构造
public ConcurrentHashMap() {
}

 //构造函数,带初始化容量
public ConcurrentHashMap(int initialCapacity) {
    //对初始化容量进行校验
    if (initialCapacity < 0)
        throw new IllegalArgumentException();
    int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
               MAXIMUM_CAPACITY :
               tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
    //sizeCtl等于初始化容量的2的次幂
    this.sizeCtl = cap;
}

 //将传入的容量变成2的次幂
private static final int tableSizeFor(int c) {
    int n = c - 1;
    n |= n >>> 1;
    n |= n >>> 2;
    n |= n >>> 4;
    n |= n >>> 8;
    n |= n >>> 16;
    return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}

//构造函数,给定初始化容量和加载因子
public ConcurrentHashMap(int initialCapacity, float loadFactor) {
    this(initialCapacity, loadFactor, 1);
}

//构造一个ConcurrentHasHMap给定初始容量、加载因子、并发级别
public ConcurrentHashMap(int initialCapacity,
                         float loadFactor, int concurrencyLevel) {
    //进行相关参数校验
    if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
        throw new IllegalArgumentException();
    if (initialCapacity < concurrencyLevel)   // Use at least as many bins
        initialCapacity = concurrencyLevel;   // as estimated threads
    long size = (long)(1.0 + (long)initialCapacity / loadFactor);
    int cap = (size >= (long)MAXIMUM_CAPACITY) ?
        MAXIMUM_CAPACITY : tableSizeFor((int)size);
    this.sizeCtl = cap;
}

//构造函数带map
public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
    //sizeCtl为默认容量16
    this.sizeCtl = DEFAULT_CAPACITY;
    //放入map
    putAll(m);
}

//放入map信息
public void putAll(Map<? extends K, ? extends V> m) {
    //对给定的容量进行处理,调成2的次幂
    tryPresize(m.size());
    for (Map.Entry<? extends K, ? extends V> e : m.entrySet())
        putVal(e.getKey(), e.getValue(), false);
}

//尝试扩容hash表的大小以容纳给定数量的元素。
private final void tryPresize(int size) {
    //如果大于最大容量,则默认为最大容量,否者进行2的次幂处理
    int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
    tableSizeFor(size + (size >>> 1) + 1);
    //sc为capacity容量
    int sc;
    //如果sc=sizeCtrl=16
    while ((sc = sizeCtl) >= 0) {
        //tab=hash表
        Node<K,V>[] tab = table; int n;
        //如果hash表为空,长度为0,则
        if (tab == null || (n = tab.length) == 0) {
            //n为初始化容量,否者为c,而c是处理好的2的次幂的
            n = (sc > c) ? sc : c;
            //sun.misc.Unsafe U,进行CAS操作
            if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                    if (table == tab) {
                        @SuppressWarnings("unchecked")
                        //进行链表操作
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = nt;
                        //n为2的次幂,如果为16,
                        //则以16为例,则10000=>00100=4,
                        //也即变成原来的0.75,这也是阀值的情况
                        sc = n - (n >>> 2);
                    }
                } finally {
                    //记下阀值
                    sizeCtl = sc;
                }
            }
        }
        //否则如果c为处理好的容量,而sc=16,如果小于16,
        //或者>=最大容量,不做处理
        else if (c <= sc || n >= MAXIMUM_CAPACITY)
            break;
        else if (tab == table) {
            //修改扩容的标记大小
            int rs = resizeStamp(n);
            //sc<0时,则进行扩容操作
            if (sc < 0) {
                Node<K,V>[] nt;
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                    transferIndex <= 0)
                    break;
                //进行扩容操作
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                    transfer(tab, nt);
            }
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                         (rs << RESIZE_STAMP_SHIFT) + 2))
                transfer(tab, null);
        }
    }
}

带有map的构造中sizeCtl,也即阀值为16,同时将map数据放入ConcurrentHashMap中。放入之前,会先进行初始化容量,如果容量不够,则进行容量扩容操作。

3.方法

put方法

//进行put操作
public V put(K key, V value) {
    return putVal(key, value, false);
}

//进行put操作
final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();
    //进行高位运算,消除负号带来的影响
    int hash = spread(key.hashCode());
    int binCount = 0;
    //进行自旋
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        //table为null或空map时进行初始化
        if (tab == null || (n = tab.length) == 0)
            //初始化table,设置相关的容量、创建table表
            //写得很好,可以借鉴,锁自旋和cas的结合
            tab = initTable();
        //hash取模,如果为空,则说明首元素无值
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            //根据所在索引为null值,进行值的添加
            //casTabAt主要用于CAS比较该索引处是否为null
            //防止其他线程已改变该值,
            //null则插入,break结束上面的自旋
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }
        //首地址不为空,并且node的hash值为-1,则表示
        //ForwardingNode节点正在rehash扩容
        else if ((fh = f.hash) == MOVED)
            //进行扩容操作
            tab = helpTransfer(tab, f);
        else {
            V oldVal = null;
            //这里采用了synchronized,
            //如果程序走到这里则说明产生了hash冲突
            synchronized (f) {
                //这里的volatile获取首节点与
                //else if ((f = tabAt(tab, i = (n - 1) & hash)) == null)
                //处获取的节点对比判断f还是不是首节点
                //前面这个条件执行时,这里的
                //首节点是完全存在被其它线程干掉或者空旋的情况
                if (tabAt(tab, i) == f) {
                    //如果fh的hash值,判断是为了等待扩容完成
                    if (fh >= 0) {
                        binCount = 1;
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    //fh<0的情况,-2表示红黑树节点,进行树节点操作
                    else if (f instanceof TreeBin) {
                        Node<K,V> p;
                        binCount = 2;
                        //执行树节点操作
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                              value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            if (binCount != 0) {
                //如果计算>+变成红黑树的阀值,则会
                //进一步判断数组长度,从而确定
                //是否需要进行红黑树操作
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    //最后的插入数据后根据一定条件来进行扩容的方法
    addCount(1L, binCount);
    return null;
}

//拿到hash值 HASH_BITS=2^31-1
//与hashmap计算hash基本一样,但多了一步
//& HASH_BITS,HASH_BITS是0x7fffffff,
//该步是为了消除最高位上的负符号
//hash的负在ConcurrentHashMap中有特殊意义
//表示在扩容或者是树节点
static final int spread(int h) {
    return (h ^ (h >>> 16)) & HASH_BITS;
}

 //进行扩容while配合cas构成自旋cas,保证线程安全
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
    Node<K,V>[] nextTab; int sc;
    //三个条件判断的是扩容是否结束,
    //ForwardingNode再创建时持有nextTable数组的引用
    //nextTable会在扩容结束后被置为null
    if (tab != null && (f instanceof ForwardingNode) &&
        (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
        //如果上面的条件为true,则表明扩容未结束,
        //扩容标记进行修改,会变成一个负数
        //如果数组大小不变则rs不变
        int rs = resizeStamp(tab.length);
        //因为存在多线程扩容的情况,每次都需要
        // 重新判断扩容条件是否还满足
        //如果扩容已完成,
        // 那么table=nextTable, nextTable=null,
        // 并且sizeCtl变成下次扩容的阈值,
        // 下面的三项检查分别与这里的情况对应
        //sc = sizeCtl) < 0扩容sizeCtl是一定小于0
        while (nextTab == nextTable && table == tab &&
               (sc = sizeCtl) < 0) {
            //校验标记,resizeStamp的参数大小不变则相等
            //sc=rs+1说明最后一个扩容线程正在执行首位工作
            //sc==rs+MAX_RESIZERS说明扩容线程数超过最大值
            //transferIndex<=0,扩容中transferIndex
            // 表示最近一个被分配的stride区域的下边界
            // 说明数组被分配完了
            //在扩容前sizeCtl会被设置为
            // sc=(resizeStamp(n)<<RESIZE_STAMP_SHFIT)+2,
            // 这是一个负数
            //sc为阀值,而sc无符号右移16位
            //刚好得到resizeStamp(n),即rs的值
            if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                sc == rs + MAX_RESIZERS || transferIndex <= 0)
                break;
            //如果成功将sizeCtl的值加1,则进入transfer执行数据扩容操作
            //从这里可以看出,每当有新线程协助扩容时,会将sizeCtl的值+1
            //sc + 1表示增加一个扩容线程
            if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                //进行扩容操作
                transfer(tab, nextTab);
                break;
            }
        }
        return nextTab;
    }
    return table;
}

//初始化hash表
private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    //hash表为空,长度为空,则进行初始化
    while ((tab = table) == null || tab.length == 0) {
        //如果长度小于0,进行自旋,
        // -1代表正在初始化或者扩容,
        // 则本线程退让,依赖上面的自旋继续初始化
        if ((sc = sizeCtl) < 0)
            Thread.yield(); // lost initialization race; just spin
        //CAS判断SIZECTL和sc相同
        //SIZECTL赋值-1表示正在初始化,
        //只有一个线程进行初始化
        // 其它线程在上个if卡住
        //进行自旋,等待扩容操作执行完
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            try {
                //第一个线程初始化之后,
                //第二个线程还会进来所以需要再次判断一次
                if ((tab = table) == null || tab.length == 0) {
                    //获取值,如果大于0,则说明给定了值,否者给定默认值
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    @SuppressWarnings("unchecked")
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                    //创建基本Node数组结构
                    tabe = tab = nt;
                    //扩容阀值 0.75*n
                    sc = n - (n >>> 2);
                }
            } finally {
                //阀值
                sizeCtl = sc;
            }
            break;
        }
    }
    return tab;
}


/**
 * Finds or adds a node.
 * @return null if added
 */
//进行树节点操作,包含hash值,key、value
final TreeNode<K,V> putTreeVal(int h, K k, V v) {
    Class<?> kc = null;
    boolean searched = false;
    //进行自旋
    for (TreeNode<K,V> p = root;;) {
        int dir, ph; K pk;
        //根节点为空
        if (p == null) {
            //首节点等于根节点等于新创建的树节点
            first = root = new TreeNode<K,V>(h, k, v, null, null);
            break;
        }
        //hash值下小于父节点的hash值,则取左节点
        else if ((ph = p.hash) > h)
            dir = -1;
        //如果hash值大于父节点hash值,则取右节点
        else if (ph < h)
            dir = 1;
        //否者就是当前节点,直接返回
        else if ((pk = p.key) == k || (pk != null && k.equals(pk)))
            return p;
        //如果hash值相等,但是key不相等
        //由于没有实现Comparable接口,
        // 因此无法判断,因此会产生hash碰撞
        else if ((kc == null &&
                  (kc = comparableClassFor(k)) == null) ||
                 (dir = compareComparables(kc, k, pk)) == 0) {
            //判断是否执行过查找方法
            if (!searched) {
                TreeNode<K,V> q, ch;
                searched = true;
                //如果执行过,则分别查找左右树,查看是否有相同的对象,
                //如果找不到,则说明确实没有找到相同对象存在
                if (((ch = p.left) != null &&
                     (q = ch.findTreeNode(h, k, kc)) != null) ||
                    ((ch = p.right) != null &&
                     (q = ch.findTreeNode(h, k, kc)) != null))
                    return q;
            }
            //hash相同,key不相等,并且key没有实现Comparable接口,
            //保持一致的处理规则,以便进行排序,从而进行遍历
            dir = tieBreakOrder(k, pk);
        }
        //备份当前节点
        TreeNode<K,V> xp = p;
        //如果遍历完之后还是没有找到相同的key的节点,
        //则进行新的插入操作
        if ((p = (dir <= 0) ? p.left : p.right) == null) {
            TreeNode<K,V> x, f = first;
            first = x = new TreeNode<K,V>(h, k, v, f, xp);
            //f不为空,则说明在前驱节点
            if (f != null)
                f.prev = x;
            //如果dir<0,则表明是左节点
            if (dir <= 0)
                xp.left = x;
            //否者>0,右节点
            else
                xp.right = x;
            //变成红色
            if (!xp.red)
                x.red = true;
            else {
                //进行锁定,等待重平衡
                lockRoot();
                try {
                    //实现红黑树重排实现重平衡
                    root = balanceInsertion(root, x);
                } finally {
                    unlockRoot();
                }
            }
            //最终break掉,跳出自旋
            break;
        }
    }
    assert checkInvariants(root);
    return null;
}

  //进行红黑树判断操作,如果是则转成红黑树,否者进行链表操作
    private final void treeifyBin(Node<K,V>[] tab, int index) {
        Node<K,V> b; int n, sc;
        if (tab != null) {
            if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
                //数组长度不满足64,进行链表操作
                tryPresize(n << 1);
            else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
                //加锁,进行红黑树操作
                synchronized (b) {
                    if (tabAt(tab, index) == b) {
                        TreeNode<K,V> hd = null, tl = null;
                        for (Node<K,V> e = b; e != null; e = e.next) {
                            TreeNode<K,V> p =
                                new TreeNode<K,V>(e.hash, e.key, e.val,
                                                  null, null);
                            if ((p.prev = tl) == null)
                                hd = p;
                            else
                                tl.next = p;
                            tl = p;
                        }
                        setTabAt(tab, index, new TreeBin<K,V>(hd));
                    }
                }
            }
        }
    }
    

1.首先进行对key、value进行判断,如果为null,则抛异常。

2.进行高位运算,消除符号带来的影响,由于符号是进行扩容操作

3.进行自旋,如果table为空或者map为空,进行初始化

4.进行初始化hash表的过程中,initTable里面写得非常好,采用首先进行while死循环,进行自旋,同时采用Thread.yield(),因为此时可能会出现扩容的情况。采用CAS判断sizeCtl与sc相同,等待扩容操作完。

5.扩容完成之后,进行阀值的设置,这里用得很巧妙,采用位运算做的。保留阀值标记方便第二次扩容使用。

6.接着进行hash取模,如果为空,则表明首元素无值,则进行元素添加,采用cas的方式进行值的添加。

7.如果首地址为空,同时node的hash值为-1,则进行move操作,也即可能是链表或者红黑树的首元素,进行扩容操作。

8.如果走到了synchronized(f),则说明产生了hash冲突,则等待扩容操作完成。扩容完了之后进行放入操作。

9.最后的插入数据后根据一定条件来进行扩容的方法addCount(1L,binCount),完成操作

transfer方法

/**
 * Moves and/or copies the nodes in each bin to new table. See
 * above for explanation.
 */
//进行扩容操作,也就是resize操作
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
    int n = tab.length, stride;
    //根据cpu个数找出扩容时的数组跨度大小即最小分组16、32、64增长
    //假设n=32,则32/8得到cpu核数,如果得到的结果小于16,name就使用16
    //这里这样做的目的是为了让每个cpu处理的桶一样多,避免出现转移任务不均匀的现象
    //如果桶较少,默认一个cpu处理16个桶
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
        stride = MIN_TRANSFER_STRIDE; // subdivide range
    //新的table没有初始化,则进行初始化
    if (nextTab == null) {            // initiating
        try {
            //同时扩容两倍
            @SuppressWarnings("unchecked")
            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
            nextTab = nt;
        } catch (Throwable ex) {      // try to cope with OOME
            sizeCtl = Integer.MAX_VALUE;
            return;
        }
        //更新nextTable和转移下标更新,老的 tab 的 length
        nextTable = nextTab;
        transferIndex = n;
    }
    //新tab的长度
    int nextn = nextTab.length;
    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
    //设置开始时为true
    boolean advance = true;
    //结束时为false
    boolean finishing = false; // to ensure sweep before committing nextTab
    //进行自旋操作
    for (int i = 0, bound = 0;;) {
        Node<K,V> f; int fh;
        //如果是true,开始自旋
        //整个过程计算i的值,i的值会从n-1依次递减
        //其中n是原来数组的大小,
        //i会从15开始一直减小到1这样转移元素
        while (advance) {
            int nextIndex, nextBound;
            //当前分组没有转移完||扩容完成 --i,完成数组逆序转移
            if (--i >= bound || finishing)
                advance = false;
            else if ((nextIndex = transferIndex) <= 0) {
                i = -1;
                advance = false;
            }
            else if (U.compareAndSwapInt
                     (this, TRANSFERINDEX, nextIndex,
                      nextBound = (nextIndex > stride ?
                                   nextIndex - stride : 0))) {
                bound = nextBound;
                i = nextIndex - 1;
                advance = false;
            }
        }
        //如果一次遍历完成了,也就是整个map所有桶中的元素都转移完成了
        if (i < 0 || i >= n || i + n >= nextn) {
            int sc;
            //如果全部转移完成,则替换旧table
            //并设置下一次扩容阀值sizeCtl
            if (finishing) {
                nextTable = null;
                table = nextTab;
                sizeCtl = (n << 1) - (n >>> 1);
                return;
            }
            //当前线程扩容完成,把扩容线程数-1
            if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                //根据前面addCount的+2这里就有-2  判断是否是最后一个正在扩容的线程
                if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                    return;
                //扩容完成两边相等
                //把finishing设置为true
                //此时会走到上面的if条件
                finishing = advance = true;
                //提交前重新检查一次,看是否转移完成
                //第二次遍历会走到(fh = f.hash) == MOVED
                i = n; // recheck before commit
            }
        }
        else if ((f = tabAt(tab, i)) == null
            //如果桶中无数据,则直接放入
            // ForwardingNode标记该tab转移完成
            advance = casTabAt(tab, i, null, fwd);
        else if ((fh = f.hash) == MOVED)
            //已经处理
            advance = true; // already processed
        else {
            //锁定f转移元素
            synchronized (f) {
                //再次判断当前tab第一个元素是否有修改
                //也就是可能其他线程先一步迁移了元素
                if (tabAt(tab, i) == f) {
                    //把一个链表分化成两个链表
                    //规则是hash表中各元素的hash与hash表中的n进行与操作
                    //等于0的放到低位链到新has表迁移h表中的位置相对于旧桶不变
                    //高位链表迁移到新hash表中国的位置正好是其原来的位置+n
                    Node<K,V> ln, hn;
                    if (fh >= 0) { //不是树节点,链表操作
                        //第一个元素的hash值大于0
                        //说明该hash表中的元素是以链表的形式存放的
                        //与hashMap不同的是多了一步寻找lastRun
                        //这里的lastRun是提取出链表后面不用
                        //处理在特殊处理的子链表
                        int runBit = fh & n;
                        Node<K,V> lastRun = f;
                        for (Node<K,V> p = f.next; p != null; p = p.next) {
                            int b = p.hash & n;
                            if (b != runBit) {
                                runBit = b;
                                lastRun = p;
                            }
                        }
                        //看最后的元素属于低位链表还是高位链表
                        if (runBit == 0) {
                            //低位链表
                            ln = lastRun;
                            hn = null;
                        }
                        else {
                            //高位链表
                            hn = lastRun;
                            ln = null;
                        }
                        //遍历链表,将hash取模为0的放在低位链表
                        //不为0,也即为1的放在高位链表上
                        for (Node<K,V> p = f; p != lastRun; p = p.next) {
                            int ph = p.hash; K pk = p.key; V pv = p.val;
                            if ((ph & n) == 0)
                                ln = new Node<K,V>(ph, pk, pv, ln);
                            else
                                hn = new Node<K,V>(ph, pk, pv, hn);
                        }
                        //低位链表的位置不变
                        setTabAt(nextTab, i, ln);
                        //高位的需要在原来位置的基础上+n
                        setTabAt(nextTab, i + n, hn);
                        //标记当前tab表完成迁移
                        setTabAt(tab, i, fwd);
                        advance = true;
                    }
                    //树节点操作
                    //如果第一个元素为树节点,分化成两棵树
                    //根据hash&n取模,将0的放在低位树上
                    //不为0的放在高位树上
                    else if (f instanceof TreeBin) {
                        TreeBin<K,V> t = (TreeBin<K,V>)f;
                        TreeNode<K,V> lo = null, loTail = null;
                        TreeNode<K,V> hi = null, hiTail = null;
                        int lc = 0, hc = 0;
                        //遍历树,根据hash&n取模是否为0分化成两棵树
                        for (Node<K,V> e = t.first; e != null; e = e.next) {
                            int h = e.hash;
                            TreeNode<K,V> p = new TreeNode<K,V>
                                (h, e.key, e.val, null, null);
                            //低位树
                            if ((h & n) == 0) {
                                if ((p.prev = loTail) == null)
                                    lo = p;
                                else
                                    loTail.next = p;
                                loTail = p;
                                ++lc;
                            }
                            else {
                                if ((p.prev = hiTail) == null)
                                    hi = p;
                                else
                                    hiTail.next = p;
                                hiTail = p;
                                ++hc;
                            }
                        }
                        //如果分化的树中元素的个数小于6,则红黑树转换成链表
                        ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                            (hc != 0) ? new TreeBin<K,V>(lo) : t;
                        hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                            (lc != 0) ? new TreeBin<K,V>(hi) : t;
                        //低位树位置不变
                        setTabAt(nextTab, i, ln);
                        //高位+n
                        setTabAt(nextTab, i + n, hn);
                        //完成转移
                        setTabAt(tab, i, fwd);
                        //advance为true,进行返回
                        advance = true;
                    }
                }
            }
        }
    }
}

1.首先进入到helpTransfer方法,进行自旋+cas构成自旋cas,保证线程安全。

2.如果扩容完成,则nextTable会在结束后会被置为null.

3.sizeCtl如果<0,则表示正在扩容。

4.如果sc+1,则表示增加一个扩容线程,此时执行扩容操作。

5.进入扩容操作transfer,首先进行cpu个数进行处理,使得cpu能够均匀处理桶,如果桶较少,则默认一个cpu处理16个桶。

6.扩容操操作:首先如果是null容量,则扩容为原来的两倍。如果不是,则会设置两个flag,开始时为advence=true,结束时为finishing=false,进行自旋。整个过程计算i的值,i的值会从15开始一直减小到1迁移元素。对advance进行自旋,逆序数组转移。

7.如果一次遍历完成,则整个map的元素迁移完成,此时会记下新的table和sizeCtl的值,方便下次扩容使用。

8.synchronized中锁定f,进行链表和红黑树的操作。链表和树都是分化成两个,进行低位元素和高位元素的判断。如果是低位,则相对于原来的位置不变,如果是高位则相对于原来的元素位置+n。

9.同时如果红黑树分化成两个,如果红黑树较少时会转换为链表。

remove方法

//进行删除操作
public V remove(Object key) {
    //进行替换节点操作
    return replaceNode(key, null, null);
}

 //进行替换节点操作
final V replaceNode(Object key, V value, Object cv) {
    //拿到hash值
    int hash = spread(key.hashCode());
    //进行自旋
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        //元素不存在,直接break掉
        if (tab == null || (n = tab.length) == 0 ||
            (f = tabAt(tab, i = (n - 1) & hash)) == null)
            break;
        //如果为fh = f.hash) == MOVED=-1,
        // 则进行扩容操作
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        else { //进行链表操作
            V oldVal = null;
            //设置一个flag,删除处理未完成为false,
            //删除处理完成为true
            boolean validated = false;
            //锁首节点
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    //表明是链表
                    if (fh >= 0) {
                        validated = true;
                        for (Node<K,V> e = f, pred = null;;) {
                            K ek;
                            //找到需要替换的元素
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                V ev = e.val;
                                //传递替换的新值为null,或者新值和原来的值相等
                                if (cv == null || cv == ev ||
                                    (ev != null && cv.equals(ev))) {
                                    oldVal = ev;
                                    //value不为空,则进行替换
                                    if (value != null)
                                        e.val = value;
                                    //由于前驱节点不为空,
                                    //则说明不是链表的首节点
                                    //将前节点的后一个节点和
                                    // 当前节点的有一个节点联系上
                                    else if (pred != null)
                                        pred.next = e.next;
                                    else
                                        setTabAt(tab, i, e.next);
                                }
                                break;
                            }
                            pred = e;
                            if ((e = e.next) == null)
                                break;
                        }
                    }
                    //首节点为红黑树,则进行红黑树删除操作
                    else if (f instanceof TreeBin) {
                        validated = true;
                        TreeBin<K,V> t = (TreeBin<K,V>)f;
                        TreeNode<K,V> r, p;
                        //找到树节点,进行值的获取,并进行值的替换
                        if ((r = t.root) != null &&
                            (p = r.findTreeNode(hash, key, null)) != null) {
                            V pv = p.val;
                            if (cv == null || cv == pv ||
                                (pv != null && cv.equals(pv))) {
                                oldVal = pv;
                                if (value != null)
                                    p.val = value;
                                //如果删除节点数据后,
                                //树节点数据比较少,则转换成链表操作
                                else if (t.removeTreeNode(p))
                                    setTabAt(tab, i, untreeify(t.first));
                            }
                        }
                    }
                }
            }
            //如果删除处理完成,则进行break操作
            if (validated) {
                if (oldVal != null) {
                    if (value == null)
                        addCount(-1L, -1);
                    return oldVal;
                }
                break;
            }
        }
    }
    return null;
}

 //找到树节点元素,首先对当前节点进行判断,
// 左、右、当前节点,如果有,则直接返回。
//如果没有,则说明不是首元素,
//进行进一步获取直至拿到树节点数据
final TreeNode<K,V> findTreeNode(int h, Object k, Class<?> kc) {
    if (k != null) {
        TreeNode<K,V> p = this;
        //进行do-while循环操作,找到树节点
        do  {
            int ph, dir; K pk; TreeNode<K,V> q;
            TreeNode<K,V> pl = p.left, pr = p.right;
            //左节点
            if ((ph = p.hash) > h)
                p = pl;
            //右节点
            else if (ph < h)
                p = pr;
            //当前节点
            else if ((pk = p.key) == k || (pk != null && k.equals(pk)))
                return p;
            //左节点为空,则为右节点
            else if (pl == null)
                p = pr;
            //右节点为空,则为左节点
            else if (pr == null)
                p = pl;
            //进行相关接口查看
            else if ((kc != null ||
                      (kc = comparableClassFor(k)) != null) &&
                     (dir = compareComparables(kc, k, pk)) != 0)
                //判断dir<0,取左节点,否者取右节点
                p = (dir < 0) ? pl : pr;
            //获取树节点
            else if ((q = pr.findTreeNode(h, k, kc)) != null)
                return q;
            else
                p = pl;
        } while (p != null);
    }
    return null;
 }
}

1.删除的过程是一个替换值的过程,设置为null.进行高位运算,消除负号带来的扩容影响

2.进行自旋,元素如果不存在,则直接break掉

3.如果哈希值为-1,则进行扩容操作

4.首节点为链表和红黑树。设置一个validated为标识,完成则break掉。上锁首节点synchronized(f)。

5.首先进行链表的替换操作,分为首节点和是否存在后继节点,如果有,首节点之间替换,如果有后继节点,则之间将后继节点域前驱节点对接上。

6.如果删除的节点为红黑树的首节点,则直接替换,如果不是,则找到需要删除的节点。删除的过程和put操作的过程是一样的,需要寻找高位元素和低位元素。同时删除之后考虑红黑树变成链表的情况。

addCount方法

//判断是否需要扩容
//由于每次添加元素,元素的数量都会+1,
// 因此就必须需要考虑是否达到扩容阀值
//如果达到了,则需要进行扩容操作
private final void addCount(long x, int check) {
    CounterCell[] as; long b, s;
    //这里使用的思想和LongAdder类一摸一样
    //把数组的大小存储根据不同的线程
    //存储到不同的段上(分段锁)
    //并且有一个baseCount,优先更新baseCount,
    //如果失败了再更新不同线程对应的段
    //这样可以保证尽量小的减少冲突

    //先尝试把数量加到baseCount上,
    //如果失败则再加到分段的CounterCell上
    if ((as = counterCells) != null ||
        !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
        CounterCell a; long v; int m;
        boolean uncontended = true;
        //如果as为空或者长度为0或者当前线程所在的段为null
        //或者在当前线程的段上加数量失败
        if (as == null || (m = as.length - 1) < 0 ||
            (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
            !(uncontended =
              U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
            //强制增加数量
            //不同线程对应不同的段都更新失败
            //则说明发生了hash冲突
            //则进行扩容操作,对counterCell的容量
            //来减小多个线程hash到同一个段的概率
            fullAddCount(x, uncontended);
            return;
        }
        if (check <= 1)
            return;
        //计算元素个数
        s = sumCount();
    }
    if (check >= 0) {
        Node<K,V>[] tab, nt; int n, sc;
        //如果元素个数达到阀值,则进行扩容
        while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
               (n = tab.length) < MAXIMUM_CAPACITY) {
            int rs = resizeStamp(n);
            //sc<0,为负数,则表明正在进行扩容中
            if (sc < 0) {
                //校验标记,resizeStamp的参数大小不变则相等
                //sc=rs+1说明最后一个扩容线程正在执行首位工作
                //sc==rs+MAX_RESIZERS说明扩容线程数超过最大值
                //transferIndex<=0,扩容中transferIndex
                // 表示最近一个被分配的stride区域的下边界
                // 说明数组被分配完了
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                    transferIndex <= 0)
                    //扩容完成会触发nextTable==null
                    break;
                //如果成功将sizeCtl的值加1,则进入transfer执行数据扩容操作
                //从这里可以看出,每当有新线程协助扩容时,会将sizeCtl的值+1
                //sc + 1表示增加一个扩容线程
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                    //进行扩容操作
                    transfer(tab, nt);
            }
            //在扩容前sizeCtl会被设置为
            // sc=(resizeStamp(n)<<RESIZE_STAMP_SHFIT)+2,
            // 这是一个负数
            //sc为阀值,而sc无符号右移16位
            //刚好得到resizeStamp(n),即rs的值
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                         (rs << RESIZE_STAMP_SHIFT) + 2))
                transfer(tab, null);
            s = sumCount();
        }
    }
}

get方法

//通过key拿到value
public V get(Object key) {
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
    //拿到hash值
    int h = spread(key.hashCode());
    //如果元素所在的table中存在且有元素
    if ((tab = table) != null && (n = tab.length) > 0 &&
        (e = tabAt(tab, (n - 1) & h)) != null) {
        //如果第一个元素是要找的元素,直接返回
        if ((eh = e.hash) == h) {
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                return e.val;
        }
        //hash值小于0,在进行扩容操作
        //通过find找到元素
        else if (eh < 0)
            return (p = e.find(h, key)) != null ? p.val : null;
        //在链表中找到元素
        while ((e = e.next) != null) {
            if (e.hash == h &&
                ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
    return null;
}

ConcurrentHashMap的源码写得是非常好的,里面有很多东西值得学习!

虽然有了HashMap,但是在JDK7时,HashMap在进行扩容时多个线程同时进行put操作,会造成闭环的现象。而Jdk8修改了这个bug。同时要想并发高,此时就需要选用ConCurrentHashMap了,同时与HashTable不同的锁的粒度更细,不会因为分段锁中的一段而造成整个hash表阻塞。ConcurrentHashMap采用的是CAS+synchronized,同时采用CAS的时候会进行自旋。在没有产生哈希冲突时采用CAS,出现Hash冲突需要进行扩容时就会采用synchronized。同时结合分段锁segment进行操作。同时这里的sizeCtl和hashMap的阀值类似,但是比HashMap的功能要多:sizeCtl=-1,表示在进行初始化sizeCtl=0,默认值,在初始化时采用默认容量16sizeCtl>0,传入的容量,保存扩容的0.75倍作为下一次的扩容阀值

同时使用的锁:

CAS:compare and swap,乐观锁,进行cas操作,类似于执行操作系统底层的指令,而这种指令是不间断的,因此可以保存操作的安全性,同时数据不会出现不一致的情况。

Synchroinzed:同步锁,也属于可重入式锁,不可中断。通常作用于方法或者变量。同时又可分为偏向锁、轻量级锁、重量级锁。而偏向锁是如果一段同步代码一直被一个线程执行,那么这个线程会自动获取锁,降低获取锁的代价。轻量级锁是当锁是偏向锁时,被另一个线程锁访问,偏向锁会升级为重量级锁,这个线程会通过自旋的方式尝试获取锁不会阻塞,提高性能。重量级锁:当锁为轻量级锁时,当线程自旋一定次数之后,还没有获取锁,就会进行入阻塞状态,该锁升级为重量级锁使其他线程被阻塞,性能降低。

自旋锁:尝试获取锁的线程,在获取锁之前,会进行不断的循环尝试,这个过程称为自旋,这样的好处是减少上下文的切换,提高性能,但是消耗cpu。

分段锁segment:将锁进行更为细粒度的划分。在进行并发操作时,只是局部的锁定,而不影响其他线程的工作。

RenntrantLock:可重入式锁,是值一个线程在获取锁之后再次尝试获取锁时会自动获取锁,避免死锁。

DEFAULT_CONCURRENCY_LEVEL:默认并发级别,这个是16,和分度锁的初始化容量是一样的,同时和初始化数组的长度是一样的,这样做的目前是尽可能的是请求处理均匀分布在hash表中。

本文分享自微信公众号 - 后端技术学习(gh_9f5627e6cc61),作者:亚洲

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2020-03-02

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • RocketMQ学习5

    进行消息发送的过程首先会准备好路由信息,最终是由netty完成的,也即使用nettyRemotingClient来实现的。

    路行的亚洲
  • dubbo源码学习二

    昨天我们已经知道dubbo的入口可以通过自定义标签找到DubboNamespaceHandler找到解析自定义标签的相关类了。还有4个我们需要关注:Config...

    路行的亚洲
  • LinkedBlockingQueue源码学习

    采用线程池和阻塞队列实现生产/消费者模型。其中LinkedBlockingQueue是阻塞队列,同时线程安全,其特点:

    路行的亚洲
  • java线程池(五):ForkJoinPool源码分析之一(外部提交及worker执行过程)

    在前文中介绍了如何使用ForkJoinPool和ForkJoin的一些基本原理。现在继续来分析ForkJoin,原本计划从源码开始分析。但是ForkJoinPo...

    冬天里的懒猫
  • 从0.5到1写个rpc框架 - 6:调用异常节点自动重试

    再加上其它服务也按照一定频率更新本地缓存,因此往往不会那么及时地发现曾经的小伙伴已经下线了。导致的后果就是,会向不再存在的节点发送请求,结果连接异常。

    acupt
  • ThreadPoolExecutor介绍

    综上,getAndIncrement() 方法并不是原子操作。 只是保证了他和其他函数对 value 值得更新都是有效的。 整个方法本身并不是线程安全的,但...

    提莫队长
  • View·从 InputEvent 到 dispatchTouchEvent 源码分析(二)

    延续上一篇文章(View·InputEvent事件投递源码分析(一))得出的结论,本文接着对 View、ViewGroup 的事件派发、拦截进行源码分析。

    幺鹿
  • 算法模板——Dinic网络最大流 1

    实现功能:同sap网络最大流 今天第一次学Dinic,感觉最大的特点就是——相当的白话,相当的容易懂,而且丝毫不影响复杂度,顶多也就是代码长个几行 主要原理就是...

    HansBug
  • React Fiber源码分析 第三篇(异步状态)

    调用setState时, 会调用classComponentUpdater的enqueueSetState方法, 同时将新的state作为payload参数传进

    菜的黑人牙膏
  • 100 Same Tree

    /** *题意:判断两棵二叉树 * Definition for a binary tree node. * function TreeNode(val...

    用户1624346

扫码关注云+社区

领取腾讯云代金券