前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >ConcurrentHashMap源码学习

ConcurrentHashMap源码学习

作者头像
路行的亚洲
发布2020-07-17 16:12:23
4880
发布2020-07-17 16:12:23
举报

ConcurrentHashMap源码学习

首先思考一下:

既然有了HashMap为什么还会出现ConcurrentHashMap?同时ConcurrentHashMap具有什么优势?ConcurrentHashMap与HashMap、HashTable有什么区别?ConcurrentHashMap中的sizeCtl有几种值,这些值代表的是什么状态?ConcurrentHashMap使用了哪些锁?DEFAULT_CONCURRENCY_LEVEL表示什么,有什么用?

1.相关变量

//最大容量2^30
private static final int MAXIMUM_CAPACITY = 1 << 30;
//默认容量16
private static final int DEFAULT_CAPACITY = 16;
//最大数组大小 =>2^31-9
//@Native public static final int  MAX_VALUE = 0x7fffffff;
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
//默认并发级别 16
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
//加载因子0.75
private static final float LOAD_FACTOR = 0.75f;
//链表转红黑树的必要条件之一
static final int TREEIFY_THRESHOLD = 8;
//红黑树转链表
static final int UNTREEIFY_THRESHOLD = 6;
//链表转红黑树的条件之二
static final int MIN_TREEIFY_CAPACITY = 64;
//扩容转移时的最小数组分组大小
private static final int MIN_TRANSFER_STRIDE = 16;
//sizrCtrl中用于生成标记的位数,对于32位数组至少6位
private static int RESIZE_STAMP_BITS = 16;
//扩容最大线程数 2^(31-16)-1=2^15-1
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
//在sizeCtl中记录标记的位移大小,也即偏移量
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
//用于转发节点的哈希,也即扩容操作
static final int MOVED     = -1; // hash for forwarding nodes
//hash为树的根,也即红黑树操作
static final int TREEBIN   = -2; // hash for roots of trees
//临时保留的hash值
static final int RESERVED  = -3; // hash for transient reservations
//2^31-1,用在计算hash时进行
//按位与计算消除负hash
//youy
static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash
//数组,或者叫桶
transient volatile Node<K,V>[] table;
//下一个数组,或者叫桶
private transient volatile Node<K,V>[] nextTable;
//元素个数计数器,当无并发时使用CAS进行更新
private transient volatile long baseCount;
//这里和hashMap的阀值类似,但是比HashMap的功能要多
//sizeCtl=-1,表示在进行初始化
//sizeCtl=0,默认值,在初始化时采用默认容量16
//sizeCtl>0,传入的容量,保存扩容的0.75倍作为下一次的扩容阀值
private transient volatile int sizeCtl;
//扩容时下一个hash表索引 +1
private transient volatile int transferIndex;
//扩容和/或创建CounterCell时使用的自旋锁(通过CAS锁定)。
private transient volatile int cellsBusy;
//表计数器,如果不为空,则大小为2的次幂
//用于并发时,统计hash表中的数组的大小和采用cas扩容的大小:
//baseCount+countCells
private transient volatile CounterCell[] counterCells;
// keySet,key的集合
private transient KeySetView<K,V> keySet;
//value值
private transient ValuesView<K,V> values;
//entrySet:键值对,类似,类似一组一组的
private transient EntrySetView<K,V> entrySet;

2.构造函数

//空参构造
public ConcurrentHashMap() {
}

 //构造函数,带初始化容量
public ConcurrentHashMap(int initialCapacity) {
    //对初始化容量进行校验
    if (initialCapacity < 0)
        throw new IllegalArgumentException();
    int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
               MAXIMUM_CAPACITY :
               tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
    //sizeCtl等于初始化容量的2的次幂
    this.sizeCtl = cap;
}

 //将传入的容量变成2的次幂
private static final int tableSizeFor(int c) {
    int n = c - 1;
    n |= n >>> 1;
    n |= n >>> 2;
    n |= n >>> 4;
    n |= n >>> 8;
    n |= n >>> 16;
    return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}

//构造函数,给定初始化容量和加载因子
public ConcurrentHashMap(int initialCapacity, float loadFactor) {
    this(initialCapacity, loadFactor, 1);
}

//构造一个ConcurrentHasHMap给定初始容量、加载因子、并发级别
public ConcurrentHashMap(int initialCapacity,
                         float loadFactor, int concurrencyLevel) {
    //进行相关参数校验
    if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
        throw new IllegalArgumentException();
    if (initialCapacity < concurrencyLevel)   // Use at least as many bins
        initialCapacity = concurrencyLevel;   // as estimated threads
    long size = (long)(1.0 + (long)initialCapacity / loadFactor);
    int cap = (size >= (long)MAXIMUM_CAPACITY) ?
        MAXIMUM_CAPACITY : tableSizeFor((int)size);
    this.sizeCtl = cap;
}

//构造函数带map
public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
    //sizeCtl为默认容量16
    this.sizeCtl = DEFAULT_CAPACITY;
    //放入map
    putAll(m);
}

//放入map信息
public void putAll(Map<? extends K, ? extends V> m) {
    //对给定的容量进行处理,调成2的次幂
    tryPresize(m.size());
    for (Map.Entry<? extends K, ? extends V> e : m.entrySet())
        putVal(e.getKey(), e.getValue(), false);
}

//尝试扩容hash表的大小以容纳给定数量的元素。
private final void tryPresize(int size) {
    //如果大于最大容量,则默认为最大容量,否者进行2的次幂处理
    int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
    tableSizeFor(size + (size >>> 1) + 1);
    //sc为capacity容量
    int sc;
    //如果sc=sizeCtrl=16
    while ((sc = sizeCtl) >= 0) {
        //tab=hash表
        Node<K,V>[] tab = table; int n;
        //如果hash表为空,长度为0,则
        if (tab == null || (n = tab.length) == 0) {
            //n为初始化容量,否者为c,而c是处理好的2的次幂的
            n = (sc > c) ? sc : c;
            //sun.misc.Unsafe U,进行CAS操作
            if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                    if (table == tab) {
                        @SuppressWarnings("unchecked")
                        //进行链表操作
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = nt;
                        //n为2的次幂,如果为16,
                        //则以16为例,则10000=>00100=4,
                        //也即变成原来的0.75,这也是阀值的情况
                        sc = n - (n >>> 2);
                    }
                } finally {
                    //记下阀值
                    sizeCtl = sc;
                }
            }
        }
        //否则如果c为处理好的容量,而sc=16,如果小于16,
        //或者>=最大容量,不做处理
        else if (c <= sc || n >= MAXIMUM_CAPACITY)
            break;
        else if (tab == table) {
            //修改扩容的标记大小
            int rs = resizeStamp(n);
            //sc<0时,则进行扩容操作
            if (sc < 0) {
                Node<K,V>[] nt;
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                    transferIndex <= 0)
                    break;
                //进行扩容操作
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                    transfer(tab, nt);
            }
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                         (rs << RESIZE_STAMP_SHIFT) + 2))
                transfer(tab, null);
        }
    }
}

带有map的构造中sizeCtl,也即阀值为16,同时将map数据放入ConcurrentHashMap中。放入之前,会先进行初始化容量,如果容量不够,则进行容量扩容操作。

3.方法

put方法

//进行put操作
public V put(K key, V value) {
    return putVal(key, value, false);
}

//进行put操作
final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();
    //进行高位运算,消除负号带来的影响
    int hash = spread(key.hashCode());
    int binCount = 0;
    //进行自旋
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        //table为null或空map时进行初始化
        if (tab == null || (n = tab.length) == 0)
            //初始化table,设置相关的容量、创建table表
            //写得很好,可以借鉴,锁自旋和cas的结合
            tab = initTable();
        //hash取模,如果为空,则说明首元素无值
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            //根据所在索引为null值,进行值的添加
            //casTabAt主要用于CAS比较该索引处是否为null
            //防止其他线程已改变该值,
            //null则插入,break结束上面的自旋
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }
        //首地址不为空,并且node的hash值为-1,则表示
        //ForwardingNode节点正在rehash扩容
        else if ((fh = f.hash) == MOVED)
            //进行扩容操作
            tab = helpTransfer(tab, f);
        else {
            V oldVal = null;
            //这里采用了synchronized,
            //如果程序走到这里则说明产生了hash冲突
            synchronized (f) {
                //这里的volatile获取首节点与
                //else if ((f = tabAt(tab, i = (n - 1) & hash)) == null)
                //处获取的节点对比判断f还是不是首节点
                //前面这个条件执行时,这里的
                //首节点是完全存在被其它线程干掉或者空旋的情况
                if (tabAt(tab, i) == f) {
                    //如果fh的hash值,判断是为了等待扩容完成
                    if (fh >= 0) {
                        binCount = 1;
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    //fh<0的情况,-2表示红黑树节点,进行树节点操作
                    else if (f instanceof TreeBin) {
                        Node<K,V> p;
                        binCount = 2;
                        //执行树节点操作
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                              value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            if (binCount != 0) {
                //如果计算>+变成红黑树的阀值,则会
                //进一步判断数组长度,从而确定
                //是否需要进行红黑树操作
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    //最后的插入数据后根据一定条件来进行扩容的方法
    addCount(1L, binCount);
    return null;
}

//拿到hash值 HASH_BITS=2^31-1
//与hashmap计算hash基本一样,但多了一步
//& HASH_BITS,HASH_BITS是0x7fffffff,
//该步是为了消除最高位上的负符号
//hash的负在ConcurrentHashMap中有特殊意义
//表示在扩容或者是树节点
static final int spread(int h) {
    return (h ^ (h >>> 16)) & HASH_BITS;
}

 //进行扩容while配合cas构成自旋cas,保证线程安全
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
    Node<K,V>[] nextTab; int sc;
    //三个条件判断的是扩容是否结束,
    //ForwardingNode再创建时持有nextTable数组的引用
    //nextTable会在扩容结束后被置为null
    if (tab != null && (f instanceof ForwardingNode) &&
        (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
        //如果上面的条件为true,则表明扩容未结束,
        //扩容标记进行修改,会变成一个负数
        //如果数组大小不变则rs不变
        int rs = resizeStamp(tab.length);
        //因为存在多线程扩容的情况,每次都需要
        // 重新判断扩容条件是否还满足
        //如果扩容已完成,
        // 那么table=nextTable, nextTable=null,
        // 并且sizeCtl变成下次扩容的阈值,
        // 下面的三项检查分别与这里的情况对应
        //sc = sizeCtl) < 0扩容sizeCtl是一定小于0
        while (nextTab == nextTable && table == tab &&
               (sc = sizeCtl) < 0) {
            //校验标记,resizeStamp的参数大小不变则相等
            //sc=rs+1说明最后一个扩容线程正在执行首位工作
            //sc==rs+MAX_RESIZERS说明扩容线程数超过最大值
            //transferIndex<=0,扩容中transferIndex
            // 表示最近一个被分配的stride区域的下边界
            // 说明数组被分配完了
            //在扩容前sizeCtl会被设置为
            // sc=(resizeStamp(n)<<RESIZE_STAMP_SHFIT)+2,
            // 这是一个负数
            //sc为阀值,而sc无符号右移16位
            //刚好得到resizeStamp(n),即rs的值
            if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                sc == rs + MAX_RESIZERS || transferIndex <= 0)
                break;
            //如果成功将sizeCtl的值加1,则进入transfer执行数据扩容操作
            //从这里可以看出,每当有新线程协助扩容时,会将sizeCtl的值+1
            //sc + 1表示增加一个扩容线程
            if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                //进行扩容操作
                transfer(tab, nextTab);
                break;
            }
        }
        return nextTab;
    }
    return table;
}

//初始化hash表
private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    //hash表为空,长度为空,则进行初始化
    while ((tab = table) == null || tab.length == 0) {
        //如果长度小于0,进行自旋,
        // -1代表正在初始化或者扩容,
        // 则本线程退让,依赖上面的自旋继续初始化
        if ((sc = sizeCtl) < 0)
            Thread.yield(); // lost initialization race; just spin
        //CAS判断SIZECTL和sc相同
        //SIZECTL赋值-1表示正在初始化,
        //只有一个线程进行初始化
        // 其它线程在上个if卡住
        //进行自旋,等待扩容操作执行完
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            try {
                //第一个线程初始化之后,
                //第二个线程还会进来所以需要再次判断一次
                if ((tab = table) == null || tab.length == 0) {
                    //获取值,如果大于0,则说明给定了值,否者给定默认值
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    @SuppressWarnings("unchecked")
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                    //创建基本Node数组结构
                    tabe = tab = nt;
                    //扩容阀值 0.75*n
                    sc = n - (n >>> 2);
                }
            } finally {
                //阀值
                sizeCtl = sc;
            }
            break;
        }
    }
    return tab;
}


/**
 * Finds or adds a node.
 * @return null if added
 */
//进行树节点操作,包含hash值,key、value
final TreeNode<K,V> putTreeVal(int h, K k, V v) {
    Class<?> kc = null;
    boolean searched = false;
    //进行自旋
    for (TreeNode<K,V> p = root;;) {
        int dir, ph; K pk;
        //根节点为空
        if (p == null) {
            //首节点等于根节点等于新创建的树节点
            first = root = new TreeNode<K,V>(h, k, v, null, null);
            break;
        }
        //hash值下小于父节点的hash值,则取左节点
        else if ((ph = p.hash) > h)
            dir = -1;
        //如果hash值大于父节点hash值,则取右节点
        else if (ph < h)
            dir = 1;
        //否者就是当前节点,直接返回
        else if ((pk = p.key) == k || (pk != null && k.equals(pk)))
            return p;
        //如果hash值相等,但是key不相等
        //由于没有实现Comparable接口,
        // 因此无法判断,因此会产生hash碰撞
        else if ((kc == null &&
                  (kc = comparableClassFor(k)) == null) ||
                 (dir = compareComparables(kc, k, pk)) == 0) {
            //判断是否执行过查找方法
            if (!searched) {
                TreeNode<K,V> q, ch;
                searched = true;
                //如果执行过,则分别查找左右树,查看是否有相同的对象,
                //如果找不到,则说明确实没有找到相同对象存在
                if (((ch = p.left) != null &&
                     (q = ch.findTreeNode(h, k, kc)) != null) ||
                    ((ch = p.right) != null &&
                     (q = ch.findTreeNode(h, k, kc)) != null))
                    return q;
            }
            //hash相同,key不相等,并且key没有实现Comparable接口,
            //保持一致的处理规则,以便进行排序,从而进行遍历
            dir = tieBreakOrder(k, pk);
        }
        //备份当前节点
        TreeNode<K,V> xp = p;
        //如果遍历完之后还是没有找到相同的key的节点,
        //则进行新的插入操作
        if ((p = (dir <= 0) ? p.left : p.right) == null) {
            TreeNode<K,V> x, f = first;
            first = x = new TreeNode<K,V>(h, k, v, f, xp);
            //f不为空,则说明在前驱节点
            if (f != null)
                f.prev = x;
            //如果dir<0,则表明是左节点
            if (dir <= 0)
                xp.left = x;
            //否者>0,右节点
            else
                xp.right = x;
            //变成红色
            if (!xp.red)
                x.red = true;
            else {
                //进行锁定,等待重平衡
                lockRoot();
                try {
                    //实现红黑树重排实现重平衡
                    root = balanceInsertion(root, x);
                } finally {
                    unlockRoot();
                }
            }
            //最终break掉,跳出自旋
            break;
        }
    }
    assert checkInvariants(root);
    return null;
}

  //进行红黑树判断操作,如果是则转成红黑树,否者进行链表操作
    private final void treeifyBin(Node<K,V>[] tab, int index) {
        Node<K,V> b; int n, sc;
        if (tab != null) {
            if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
                //数组长度不满足64,进行链表操作
                tryPresize(n << 1);
            else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
                //加锁,进行红黑树操作
                synchronized (b) {
                    if (tabAt(tab, index) == b) {
                        TreeNode<K,V> hd = null, tl = null;
                        for (Node<K,V> e = b; e != null; e = e.next) {
                            TreeNode<K,V> p =
                                new TreeNode<K,V>(e.hash, e.key, e.val,
                                                  null, null);
                            if ((p.prev = tl) == null)
                                hd = p;
                            else
                                tl.next = p;
                            tl = p;
                        }
                        setTabAt(tab, index, new TreeBin<K,V>(hd));
                    }
                }
            }
        }
    }
    

1.首先进行对key、value进行判断,如果为null,则抛异常。

2.进行高位运算,消除符号带来的影响,由于符号是进行扩容操作

3.进行自旋,如果table为空或者map为空,进行初始化

4.进行初始化hash表的过程中,initTable里面写得非常好,采用首先进行while死循环,进行自旋,同时采用Thread.yield(),因为此时可能会出现扩容的情况。采用CAS判断sizeCtl与sc相同,等待扩容操作完。

5.扩容完成之后,进行阀值的设置,这里用得很巧妙,采用位运算做的。保留阀值标记方便第二次扩容使用。

6.接着进行hash取模,如果为空,则表明首元素无值,则进行元素添加,采用cas的方式进行值的添加。

7.如果首地址为空,同时node的hash值为-1,则进行move操作,也即可能是链表或者红黑树的首元素,进行扩容操作。

8.如果走到了synchronized(f),则说明产生了hash冲突,则等待扩容操作完成。扩容完了之后进行放入操作。

9.最后的插入数据后根据一定条件来进行扩容的方法addCount(1L,binCount),完成操作

transfer方法

/**
 * Moves and/or copies the nodes in each bin to new table. See
 * above for explanation.
 */
//进行扩容操作,也就是resize操作
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
    int n = tab.length, stride;
    //根据cpu个数找出扩容时的数组跨度大小即最小分组16、32、64增长
    //假设n=32,则32/8得到cpu核数,如果得到的结果小于16,name就使用16
    //这里这样做的目的是为了让每个cpu处理的桶一样多,避免出现转移任务不均匀的现象
    //如果桶较少,默认一个cpu处理16个桶
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
        stride = MIN_TRANSFER_STRIDE; // subdivide range
    //新的table没有初始化,则进行初始化
    if (nextTab == null) {            // initiating
        try {
            //同时扩容两倍
            @SuppressWarnings("unchecked")
            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
            nextTab = nt;
        } catch (Throwable ex) {      // try to cope with OOME
            sizeCtl = Integer.MAX_VALUE;
            return;
        }
        //更新nextTable和转移下标更新,老的 tab 的 length
        nextTable = nextTab;
        transferIndex = n;
    }
    //新tab的长度
    int nextn = nextTab.length;
    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
    //设置开始时为true
    boolean advance = true;
    //结束时为false
    boolean finishing = false; // to ensure sweep before committing nextTab
    //进行自旋操作
    for (int i = 0, bound = 0;;) {
        Node<K,V> f; int fh;
        //如果是true,开始自旋
        //整个过程计算i的值,i的值会从n-1依次递减
        //其中n是原来数组的大小,
        //i会从15开始一直减小到1这样转移元素
        while (advance) {
            int nextIndex, nextBound;
            //当前分组没有转移完||扩容完成 --i,完成数组逆序转移
            if (--i >= bound || finishing)
                advance = false;
            else if ((nextIndex = transferIndex) <= 0) {
                i = -1;
                advance = false;
            }
            else if (U.compareAndSwapInt
                     (this, TRANSFERINDEX, nextIndex,
                      nextBound = (nextIndex > stride ?
                                   nextIndex - stride : 0))) {
                bound = nextBound;
                i = nextIndex - 1;
                advance = false;
            }
        }
        //如果一次遍历完成了,也就是整个map所有桶中的元素都转移完成了
        if (i < 0 || i >= n || i + n >= nextn) {
            int sc;
            //如果全部转移完成,则替换旧table
            //并设置下一次扩容阀值sizeCtl
            if (finishing) {
                nextTable = null;
                table = nextTab;
                sizeCtl = (n << 1) - (n >>> 1);
                return;
            }
            //当前线程扩容完成,把扩容线程数-1
            if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                //根据前面addCount的+2这里就有-2  判断是否是最后一个正在扩容的线程
                if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                    return;
                //扩容完成两边相等
                //把finishing设置为true
                //此时会走到上面的if条件
                finishing = advance = true;
                //提交前重新检查一次,看是否转移完成
                //第二次遍历会走到(fh = f.hash) == MOVED
                i = n; // recheck before commit
            }
        }
        else if ((f = tabAt(tab, i)) == null
            //如果桶中无数据,则直接放入
            // ForwardingNode标记该tab转移完成
            advance = casTabAt(tab, i, null, fwd);
        else if ((fh = f.hash) == MOVED)
            //已经处理
            advance = true; // already processed
        else {
            //锁定f转移元素
            synchronized (f) {
                //再次判断当前tab第一个元素是否有修改
                //也就是可能其他线程先一步迁移了元素
                if (tabAt(tab, i) == f) {
                    //把一个链表分化成两个链表
                    //规则是hash表中各元素的hash与hash表中的n进行与操作
                    //等于0的放到低位链到新has表迁移h表中的位置相对于旧桶不变
                    //高位链表迁移到新hash表中国的位置正好是其原来的位置+n
                    Node<K,V> ln, hn;
                    if (fh >= 0) { //不是树节点,链表操作
                        //第一个元素的hash值大于0
                        //说明该hash表中的元素是以链表的形式存放的
                        //与hashMap不同的是多了一步寻找lastRun
                        //这里的lastRun是提取出链表后面不用
                        //处理在特殊处理的子链表
                        int runBit = fh & n;
                        Node<K,V> lastRun = f;
                        for (Node<K,V> p = f.next; p != null; p = p.next) {
                            int b = p.hash & n;
                            if (b != runBit) {
                                runBit = b;
                                lastRun = p;
                            }
                        }
                        //看最后的元素属于低位链表还是高位链表
                        if (runBit == 0) {
                            //低位链表
                            ln = lastRun;
                            hn = null;
                        }
                        else {
                            //高位链表
                            hn = lastRun;
                            ln = null;
                        }
                        //遍历链表,将hash取模为0的放在低位链表
                        //不为0,也即为1的放在高位链表上
                        for (Node<K,V> p = f; p != lastRun; p = p.next) {
                            int ph = p.hash; K pk = p.key; V pv = p.val;
                            if ((ph & n) == 0)
                                ln = new Node<K,V>(ph, pk, pv, ln);
                            else
                                hn = new Node<K,V>(ph, pk, pv, hn);
                        }
                        //低位链表的位置不变
                        setTabAt(nextTab, i, ln);
                        //高位的需要在原来位置的基础上+n
                        setTabAt(nextTab, i + n, hn);
                        //标记当前tab表完成迁移
                        setTabAt(tab, i, fwd);
                        advance = true;
                    }
                    //树节点操作
                    //如果第一个元素为树节点,分化成两棵树
                    //根据hash&n取模,将0的放在低位树上
                    //不为0的放在高位树上
                    else if (f instanceof TreeBin) {
                        TreeBin<K,V> t = (TreeBin<K,V>)f;
                        TreeNode<K,V> lo = null, loTail = null;
                        TreeNode<K,V> hi = null, hiTail = null;
                        int lc = 0, hc = 0;
                        //遍历树,根据hash&n取模是否为0分化成两棵树
                        for (Node<K,V> e = t.first; e != null; e = e.next) {
                            int h = e.hash;
                            TreeNode<K,V> p = new TreeNode<K,V>
                                (h, e.key, e.val, null, null);
                            //低位树
                            if ((h & n) == 0) {
                                if ((p.prev = loTail) == null)
                                    lo = p;
                                else
                                    loTail.next = p;
                                loTail = p;
                                ++lc;
                            }
                            else {
                                if ((p.prev = hiTail) == null)
                                    hi = p;
                                else
                                    hiTail.next = p;
                                hiTail = p;
                                ++hc;
                            }
                        }
                        //如果分化的树中元素的个数小于6,则红黑树转换成链表
                        ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                            (hc != 0) ? new TreeBin<K,V>(lo) : t;
                        hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                            (lc != 0) ? new TreeBin<K,V>(hi) : t;
                        //低位树位置不变
                        setTabAt(nextTab, i, ln);
                        //高位+n
                        setTabAt(nextTab, i + n, hn);
                        //完成转移
                        setTabAt(tab, i, fwd);
                        //advance为true,进行返回
                        advance = true;
                    }
                }
            }
        }
    }
}

1.首先进入到helpTransfer方法,进行自旋+cas构成自旋cas,保证线程安全。

2.如果扩容完成,则nextTable会在结束后会被置为null.

3.sizeCtl如果<0,则表示正在扩容。

4.如果sc+1,则表示增加一个扩容线程,此时执行扩容操作。

5.进入扩容操作transfer,首先进行cpu个数进行处理,使得cpu能够均匀处理桶,如果桶较少,则默认一个cpu处理16个桶。

6.扩容操操作:首先如果是null容量,则扩容为原来的两倍。如果不是,则会设置两个flag,开始时为advence=true,结束时为finishing=false,进行自旋。整个过程计算i的值,i的值会从15开始一直减小到1迁移元素。对advance进行自旋,逆序数组转移。

7.如果一次遍历完成,则整个map的元素迁移完成,此时会记下新的table和sizeCtl的值,方便下次扩容使用。

8.synchronized中锁定f,进行链表和红黑树的操作。链表和树都是分化成两个,进行低位元素和高位元素的判断。如果是低位,则相对于原来的位置不变,如果是高位则相对于原来的元素位置+n。

9.同时如果红黑树分化成两个,如果红黑树较少时会转换为链表。

remove方法
//进行删除操作
public V remove(Object key) {
    //进行替换节点操作
    return replaceNode(key, null, null);
}

 //进行替换节点操作
final V replaceNode(Object key, V value, Object cv) {
    //拿到hash值
    int hash = spread(key.hashCode());
    //进行自旋
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        //元素不存在,直接break掉
        if (tab == null || (n = tab.length) == 0 ||
            (f = tabAt(tab, i = (n - 1) & hash)) == null)
            break;
        //如果为fh = f.hash) == MOVED=-1,
        // 则进行扩容操作
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        else { //进行链表操作
            V oldVal = null;
            //设置一个flag,删除处理未完成为false,
            //删除处理完成为true
            boolean validated = false;
            //锁首节点
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    //表明是链表
                    if (fh >= 0) {
                        validated = true;
                        for (Node<K,V> e = f, pred = null;;) {
                            K ek;
                            //找到需要替换的元素
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                V ev = e.val;
                                //传递替换的新值为null,或者新值和原来的值相等
                                if (cv == null || cv == ev ||
                                    (ev != null && cv.equals(ev))) {
                                    oldVal = ev;
                                    //value不为空,则进行替换
                                    if (value != null)
                                        e.val = value;
                                    //由于前驱节点不为空,
                                    //则说明不是链表的首节点
                                    //将前节点的后一个节点和
                                    // 当前节点的有一个节点联系上
                                    else if (pred != null)
                                        pred.next = e.next;
                                    else
                                        setTabAt(tab, i, e.next);
                                }
                                break;
                            }
                            pred = e;
                            if ((e = e.next) == null)
                                break;
                        }
                    }
                    //首节点为红黑树,则进行红黑树删除操作
                    else if (f instanceof TreeBin) {
                        validated = true;
                        TreeBin<K,V> t = (TreeBin<K,V>)f;
                        TreeNode<K,V> r, p;
                        //找到树节点,进行值的获取,并进行值的替换
                        if ((r = t.root) != null &&
                            (p = r.findTreeNode(hash, key, null)) != null) {
                            V pv = p.val;
                            if (cv == null || cv == pv ||
                                (pv != null && cv.equals(pv))) {
                                oldVal = pv;
                                if (value != null)
                                    p.val = value;
                                //如果删除节点数据后,
                                //树节点数据比较少,则转换成链表操作
                                else if (t.removeTreeNode(p))
                                    setTabAt(tab, i, untreeify(t.first));
                            }
                        }
                    }
                }
            }
            //如果删除处理完成,则进行break操作
            if (validated) {
                if (oldVal != null) {
                    if (value == null)
                        addCount(-1L, -1);
                    return oldVal;
                }
                break;
            }
        }
    }
    return null;
}

 //找到树节点元素,首先对当前节点进行判断,
// 左、右、当前节点,如果有,则直接返回。
//如果没有,则说明不是首元素,
//进行进一步获取直至拿到树节点数据
final TreeNode<K,V> findTreeNode(int h, Object k, Class<?> kc) {
    if (k != null) {
        TreeNode<K,V> p = this;
        //进行do-while循环操作,找到树节点
        do  {
            int ph, dir; K pk; TreeNode<K,V> q;
            TreeNode<K,V> pl = p.left, pr = p.right;
            //左节点
            if ((ph = p.hash) > h)
                p = pl;
            //右节点
            else if (ph < h)
                p = pr;
            //当前节点
            else if ((pk = p.key) == k || (pk != null && k.equals(pk)))
                return p;
            //左节点为空,则为右节点
            else if (pl == null)
                p = pr;
            //右节点为空,则为左节点
            else if (pr == null)
                p = pl;
            //进行相关接口查看
            else if ((kc != null ||
                      (kc = comparableClassFor(k)) != null) &&
                     (dir = compareComparables(kc, k, pk)) != 0)
                //判断dir<0,取左节点,否者取右节点
                p = (dir < 0) ? pl : pr;
            //获取树节点
            else if ((q = pr.findTreeNode(h, k, kc)) != null)
                return q;
            else
                p = pl;
        } while (p != null);
    }
    return null;
 }
}

1.删除的过程是一个替换值的过程,设置为null.进行高位运算,消除负号带来的扩容影响

2.进行自旋,元素如果不存在,则直接break掉

3.如果哈希值为-1,则进行扩容操作

4.首节点为链表和红黑树。设置一个validated为标识,完成则break掉。上锁首节点synchronized(f)。

5.首先进行链表的替换操作,分为首节点和是否存在后继节点,如果有,首节点之间替换,如果有后继节点,则之间将后继节点域前驱节点对接上。

6.如果删除的节点为红黑树的首节点,则直接替换,如果不是,则找到需要删除的节点。删除的过程和put操作的过程是一样的,需要寻找高位元素和低位元素。同时删除之后考虑红黑树变成链表的情况。

addCount方法

//判断是否需要扩容
//由于每次添加元素,元素的数量都会+1,
// 因此就必须需要考虑是否达到扩容阀值
//如果达到了,则需要进行扩容操作
private final void addCount(long x, int check) {
    CounterCell[] as; long b, s;
    //这里使用的思想和LongAdder类一摸一样
    //把数组的大小存储根据不同的线程
    //存储到不同的段上(分段锁)
    //并且有一个baseCount,优先更新baseCount,
    //如果失败了再更新不同线程对应的段
    //这样可以保证尽量小的减少冲突

    //先尝试把数量加到baseCount上,
    //如果失败则再加到分段的CounterCell上
    if ((as = counterCells) != null ||
        !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
        CounterCell a; long v; int m;
        boolean uncontended = true;
        //如果as为空或者长度为0或者当前线程所在的段为null
        //或者在当前线程的段上加数量失败
        if (as == null || (m = as.length - 1) < 0 ||
            (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
            !(uncontended =
              U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
            //强制增加数量
            //不同线程对应不同的段都更新失败
            //则说明发生了hash冲突
            //则进行扩容操作,对counterCell的容量
            //来减小多个线程hash到同一个段的概率
            fullAddCount(x, uncontended);
            return;
        }
        if (check <= 1)
            return;
        //计算元素个数
        s = sumCount();
    }
    if (check >= 0) {
        Node<K,V>[] tab, nt; int n, sc;
        //如果元素个数达到阀值,则进行扩容
        while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
               (n = tab.length) < MAXIMUM_CAPACITY) {
            int rs = resizeStamp(n);
            //sc<0,为负数,则表明正在进行扩容中
            if (sc < 0) {
                //校验标记,resizeStamp的参数大小不变则相等
                //sc=rs+1说明最后一个扩容线程正在执行首位工作
                //sc==rs+MAX_RESIZERS说明扩容线程数超过最大值
                //transferIndex<=0,扩容中transferIndex
                // 表示最近一个被分配的stride区域的下边界
                // 说明数组被分配完了
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                    transferIndex <= 0)
                    //扩容完成会触发nextTable==null
                    break;
                //如果成功将sizeCtl的值加1,则进入transfer执行数据扩容操作
                //从这里可以看出,每当有新线程协助扩容时,会将sizeCtl的值+1
                //sc + 1表示增加一个扩容线程
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                    //进行扩容操作
                    transfer(tab, nt);
            }
            //在扩容前sizeCtl会被设置为
            // sc=(resizeStamp(n)<<RESIZE_STAMP_SHFIT)+2,
            // 这是一个负数
            //sc为阀值,而sc无符号右移16位
            //刚好得到resizeStamp(n),即rs的值
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                         (rs << RESIZE_STAMP_SHIFT) + 2))
                transfer(tab, null);
            s = sumCount();
        }
    }
}

get方法

//通过key拿到value
public V get(Object key) {
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
    //拿到hash值
    int h = spread(key.hashCode());
    //如果元素所在的table中存在且有元素
    if ((tab = table) != null && (n = tab.length) > 0 &&
        (e = tabAt(tab, (n - 1) & h)) != null) {
        //如果第一个元素是要找的元素,直接返回
        if ((eh = e.hash) == h) {
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                return e.val;
        }
        //hash值小于0,在进行扩容操作
        //通过find找到元素
        else if (eh < 0)
            return (p = e.find(h, key)) != null ? p.val : null;
        //在链表中找到元素
        while ((e = e.next) != null) {
            if (e.hash == h &&
                ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
    return null;
}

ConcurrentHashMap的源码写得是非常好的,里面有很多东西值得学习!

虽然有了HashMap,但是在JDK7时,HashMap在进行扩容时多个线程同时进行put操作,会造成闭环的现象。而Jdk8修改了这个bug。同时要想并发高,此时就需要选用ConCurrentHashMap了,同时与HashTable不同的锁的粒度更细,不会因为分段锁中的一段而造成整个hash表阻塞。ConcurrentHashMap采用的是CAS+synchronized,同时采用CAS的时候会进行自旋。在没有产生哈希冲突时采用CAS,出现Hash冲突需要进行扩容时就会采用synchronized。同时结合分段锁segment进行操作。同时这里的sizeCtl和hashMap的阀值类似,但是比HashMap的功能要多:sizeCtl=-1,表示在进行初始化sizeCtl=0,默认值,在初始化时采用默认容量16sizeCtl>0,传入的容量,保存扩容的0.75倍作为下一次的扩容阀值

同时使用的锁:

CAS:compare and swap,乐观锁,进行cas操作,类似于执行操作系统底层的指令,而这种指令是不间断的,因此可以保存操作的安全性,同时数据不会出现不一致的情况。

Synchroinzed:同步锁,也属于可重入式锁,不可中断。通常作用于方法或者变量。同时又可分为偏向锁、轻量级锁、重量级锁。而偏向锁是如果一段同步代码一直被一个线程执行,那么这个线程会自动获取锁,降低获取锁的代价。轻量级锁是当锁是偏向锁时,被另一个线程锁访问,偏向锁会升级为重量级锁,这个线程会通过自旋的方式尝试获取锁不会阻塞,提高性能。重量级锁:当锁为轻量级锁时,当线程自旋一定次数之后,还没有获取锁,就会进行入阻塞状态,该锁升级为重量级锁使其他线程被阻塞,性能降低。

自旋锁:尝试获取锁的线程,在获取锁之前,会进行不断的循环尝试,这个过程称为自旋,这样的好处是减少上下文的切换,提高性能,但是消耗cpu。

分段锁segment:将锁进行更为细粒度的划分。在进行并发操作时,只是局部的锁定,而不影响其他线程的工作。

RenntrantLock:可重入式锁,是值一个线程在获取锁之后再次尝试获取锁时会自动获取锁,避免死锁。

DEFAULT_CONCURRENCY_LEVEL:默认并发级别,这个是16,和分度锁的初始化容量是一样的,同时和初始化数组的长度是一样的,这样做的目前是尽可能的是请求处理均匀分布在hash表中。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-03-02,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 后端技术学习 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • ConcurrentHashMap源码学习
    • 1.相关变量
      • 3.方法
        • remove方法
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档