前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Java并发——ConcurrentHashMap

Java并发——ConcurrentHashMap

原创
作者头像
翰墨飘香
发布2024-04-17 00:17:20
1620
发布2024-04-17 00:17:20
举报
文章被收录于专栏:Java并发Java并发

一、ConcurrentHashMap概述

ConcurrentHashMap是HashMap的线程安全版本,内部也是使用(数组 + 链表 + 红黑树)的结构来存储元素。相比于同样线程安全的HashTable来说,效率等各方面都有极大地提高。

二、为什么要使用ConcurrentHashMap

1、Hashmap线程不安全

(1)如果有多个线程同时调用 put() 方法的话,它很有可能会把 modCount 的值计算错

(2)多线程扩容期间取出的值不准确

(3)同时 put 碰撞导致数据丢失

(4)可见性问题无法保证

(5)扩容的时候死循环造成 CPU 100%

2、怎么解决线程安全问题?

方案

原理

是否推荐

Hashtable

Hashtable 是一个线程安全的类,HashTable容器使用synchronized来保证线程安全,在线程竞争激烈的情况下HashTable的效率非常地下。当一个线程访问同步方法时,其他线程会阻塞。不允许在迭代期间对内容进行修改,会抛出 ConcurrentModificationException 异常

Collections.synchronizedMap

使用对象锁来保证多线程场景下,操作安全,本质也是对 HashMap 进行全表锁,允许在迭代期间对内容进行修改

ConcurrentHashMap

采用分段锁,内部也是使用(数组 + 链表 + 红黑树)的结构来存储元素

三、ConcurrentHashMap数据结构

JDK1.7 的 ConcurrentHashMap 底层采用 分段的数组+链表 实现,使用segment分段锁的形式控制并发写入。

JDK1.8 采用的数据结构跟 HashMap1.8 的结构一样,数组+链表/红黑二叉树,使用CAS+synchronized关键字控制并发写入。

3.1 JDK1.7详细结构

在 ConcurrentHashMap 内部进行了 Segment 分段,Segment 继承了 ReentrantLock,可以理解为一把锁,各个 Segment 之间都是相互独立上锁的,互不影响。相比于之前的 Hashtable 每次操作都需要把整个对象锁住而言,大大提高了并发效率。因为它的锁与锁之间是独立的,而不是整个对象只有一把锁。

每个 Segment 的底层数据结构与 HashMap 类似,仍然是数组和链表组成的拉链法结构。默认有 0~15 共 16 个 Segment,所以最多可以同时支持 16 个线程并发操作(操作分别分布在不同的 Segment 上)。16 这个默认值可以在初始化的时候设置为其他值,但是一旦确认初始化以后,是不可以扩容的。

3.2 JDK1.8详细结构

在 Java 8 中,几乎完全重写了 ConcurrentHashMap,使用CAS+synchronized关键字控制并发且增加了红黑树结构

结构概述

图中的节点有三种类型。

第一种是最简单的,空着的位置代表当前还没有元素来填充。

第二种就是和 HashMap 非常类似的拉链法结构,在每一个槽中会首先填入第一个节点,但是后续如果计算出相同的 Hash 值,就用链表的形式往后进行延伸。

第三种结构就是红黑树结构,这是新增结构

当第二种情况的链表长度大于某一个阈值(默认为 8),且同时满足一定的容量要求的时候,ConcurrentHashMap 便会把这个链表从链表的形式转化为红黑树的形式,目的是进一步提高它的查找性能

后续如果由于删除或者其他原因调整了大小,当红黑树的节点小于或等于一个阈值( 6 )个以后,又会恢复为链表形态

红黑树结构

红黑树是一种自平衡的二叉搜索树,查找效率高,会自动平衡,防止极端不平衡从而影响查找效率的情况发。

由于自平衡的特点,即左右子树高度几乎一致,所以其查找性能近似于二分查找,时间复杂度是 O(log(n)) 级别

https://www.zhihu.com/question/483961397

为什么是转变红黑树?

Java 8 的 ConcurrentHashMap 引入红黑树的好处就是避免在极端的情况下冲突链表变得很长,在查询的时候,效率会非常慢。而红黑树具有自平衡的特点,所以,即便是极端情况下,也可以保证查询效率在 O(log(n))

为什么转红黑树阈值是8?

1、当链表长度大于或等于阈值(默认为 8)的时候,如果同时还满足容量大于或等于 MIN_TREEIFY_CAPACITY(默认为 64)的要求,就会把链表转换为红黑树。同样,后续如果由于删除或者其他原因调整了大小,当红黑树的节点小于或等于 6 个以后,又会恢复为链表形态

2、为什么转换?

单个 TreeNode 需要占用的空间大约是普通 Node 的两倍,所以只有当包含足够多的 Nodes 时才会转成 TreeNodes,而是否足够多就是由 TREEIFY_THRESHOLD 的值决定的。而当桶中节点数由于移除或者 resize 变少后,又会变回普通的链表的形式,以便节省空间

体现了时间和空间平衡的思想,最开始使用链表的时候,空间占用是比较少的,而且由于链表短,所以查询时间也没有太大的问题。可是当链表越来越长,需要用红黑树的形式来保证查询的效率

3、为什么阈值是8?

如果 hashCode 分布良好,也就是 hash 计算的结果离散好的话,那么红黑树这种形式是很少会被用到的,因为各个值都均匀分布,很少出现链表很长的情况。在理想情况下,链表长度符合泊松分布,各个长度的命中概率依次递减,当长度为 8 的时候,概率仅为 0.00000006。这是一个小于千万分之一的概率,通常我们的 Map 里面是不会存储这么多的数据的,所以通常情况下,并不会发生从链表向红黑树的转换

事实上,链表长度超过 8 就转为红黑树的设计,更多的是为了防止用户自己实现了不好的哈希算法时导致链表过长,从而导致查询效率低,而此时转为红黑树更多的是一种保底策略,用来保证极端情况下查询的效率。

通常如果 hash 算法正常的话,那么链表的长度也不会很长,那么红黑树也不会带来明显的查询时间上的优势,反而会增加空间负担。所以通常情况下,并没有必要转为红黑树,所以就选择了概率非常小,小于千万分之一概率,也就是长度为 8 的概率,把长度 8 作为转化的默认阈值。

所以如果平时开发中发现 HashMap 或是 ConcurrentHashMap 内部出现了红黑树的结构,这个时候往往就说明我们的哈希算法出了问题,需要留意是不是我们实现了效果不好的 hashCode 方法,并对此进行改进,以便减少冲突。

为什么

3.3 ConcurrentHashMap的Java7 和Java8对比

1、数据结构

Java7 Segment 分段锁 数组+链表

Java8 数组 + 链表 + 红黑树

2、并发度

Java 7 每个 Segment 独立加锁,最大并发个数就是 Segment 的个数,默认是 16。

Java 8 锁粒度更细,理想情况下 table 数组元素的个数(也就是数组长度)就是其支持并发的最大个数,并发度比之前有提高。

3、Hash碰撞处理

Java 7 在 Hash 冲突时,会使用拉链法,也就是链表的形式。

Java 8 先使用拉链法,在链表长度超过一定阈值时,将链表转换为红黑树,来提高查找效率

4、查询时间复杂度

Java 7 遍历链表的时间复杂度是 O(n),n 为链表长度。

Java 8 如果变成遍历红黑树,那么时间复杂度降低为 O(log(n)),n 为树的节点个数。

四、ConcurrentHashMap 源码分析(Java 8)

4.1 Node节点

Node 里面是 key-value 的形式,并且把 value 用 volatile 修饰,以便保证可见性,同时内部还有一个指向下一个节点的 next 指针,方便产生链表结构

代码语言:java
复制
 static class Node<K,V> implements Map.Entry<K,V> {
        final int hash;
        final K key;
        volatile V val;
        volatile Node<K,V> next;

        Node(int hash, K key, V val, Node<K,V> next) {
            this.hash = hash;
            this.key = key;
            this.val = val;
            this.next = next;
        }

       //...
  
    }

4.2 put方法

put方法中会逐步根据当前槽点是未初始化、空、扩容、链表、红黑树等不同情况做出不同的处理

代码语言:java
复制
public V put(K key, V value) {
        return putVal(key, value, false);
    }

/** Implementation for put and putIfAbsent */  
final V putVal(K key, V value, boolean onlyIfAbsent) {
    //ConcurrentHashMap不允许null的key和value
        if (key == null || value == null) throw new NullPointerException();
        //计算hash值
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            //如果数组为空,初始化数组
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            //找到hash值对应的数组下标
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                //如果该位置是空的,就用 CAS 的方式放入新值
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            //hash值等于 MOVED 代表在扩容
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            //该位置上是有值的情况
            else {
                V oldVal = null;
                //用 synchronized 锁住当前槽点,保证并发安全
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        //如果是链表的形式
                        if (fh >= 0) {
                            binCount = 1;
                             //遍历链表
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                //如果发现该 key 已存在,就判断是否需要进行覆盖,然后返回
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                 //到了链表的尾部也没有发现该 key,说明之前不存在,就把新值添加到链表的最后
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        //如果是红黑树的形式
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            //调用 putTreeVal 方法往红黑树里增加数据
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    //检查是否满足条件并把链表转换为红黑树的形式,默认的 TREEIFY_THRESHOLD 阈值是 8
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                        //putVal 的返回是添加前的旧值,所以返回 oldVal
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount);
        return null;
    }

4.3 get方法

get 的过程:

① 计算 Hash 值,并由此值找到对应的槽点;

② 如果数组是空的或者该位置为 null,那么直接返回 null 就可以了;

③ 如果该位置处的节点刚好就是我们需要的,直接返回该节点的值;

④ 如果该位置节点是红黑树或者正在扩容,就用 find 方法继续查找;

⑤ 否则那就是链表,就进行遍历链表查找。

代码语言:java
复制
public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        //计算 hash 值
        int h = spread(key.hashCode());
        //如果整个数组是空的,或者当前槽点的数据是空的,说明 key 对应的 value 不存在,直接返回 null
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) {
            //判断头结点是否就是我们需要的节点,如果是则直接返回
            if ((eh = e.hash) == h) {
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;
            }
            //如果头结点 hash 值小于 0,说明是红黑树或者正在扩容,就用对应的 find 方法来查找
            else if (eh < 0)
                return (p = e.find(h, key)) != null ? p.val : null;
            //遍历链表来查找
            while ((e = e.next) != null) {
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }

4.4 remove&replace

remove和replace方法底层都是replaceNode方法

代码语言:java
复制
 public V remove(Object key) {
        return replaceNode(key, null, null);
    }

public V replace(K key, V value) {
        if (key == null || value == null)
            throw new NullPointerException();
        return replaceNode(key, value, null);
    }

/**
     * Implementation for the four public remove/replace methods:
     * Replaces node value with v, conditional upon match of cv if
     * non-null.  If resulting value is null, delete.
     */    
final V replaceNode(Object key, V value, Object cv) {
        //计算hash值
        int hash = spread(key.hashCode());
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            //整个数组为空或者槽点钱为空,直接终止
            if (tab == null || (n = tab.length) == 0 ||
                (f = tabAt(tab, i = (n - 1) & hash)) == null)
                break;
            //扩容
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                boolean validated = false;
                //锁住当前槽点
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        //链表形式
                        if (fh >= 0) {
                            validated = true;
                            //循环
                            for (Node<K,V> e = f, pred = null;;) {
                                K ek;
                                //找到需要被处理的节点
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    V ev = e.val;
                                    if (cv == null || cv == ev ||
                                        (ev != null && cv.equals(ev))) {
                                        oldVal = ev;
                                        //新值不为空,替换节点的值
                                        if (value != null)
                                            e.val = value;
                                        //新值为空,删除这个节点
                                        else if (pred != null)
                                            pred.next = e.next;
                                        else
                                            setTabAt(tab, i, e.next);
                                    }
                                    break;
                                }
                                pred = e;
                                if ((e = e.next) == null)
                                    break;
                            }
                        }
                         //红黑树形式
                        else if (f instanceof TreeBin) {
                            validated = true;
                            TreeBin<K,V> t = (TreeBin<K,V>)f;
                            TreeNode<K,V> r, p;
                            if ((r = t.root) != null &&
                                (p = r.findTreeNode(hash, key, null)) != null) {
                                V pv = p.val;
                                if (cv == null || cv == pv ||
                                    (pv != null && cv.equals(pv))) {
                                    oldVal = pv;
                                    if (value != null)
                                        p.val = value;
                                    else if (t.removeTreeNode(p))
                                    //删除节点后 调用untreeify方法红黑树可能转为列表
                                        setTabAt(tab, i, untreeify(t.first));
                                }
                            }
                        }
                    }
                }
                if (validated) {
                    if (oldVal != null) {
                        if (value == null)
                            addCount(-1L, -1);
                        return oldVal;
                    }
                    break;
                }
            }
        }
        return null;
    }

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、ConcurrentHashMap概述
  • 二、为什么要使用ConcurrentHashMap
  • 三、ConcurrentHashMap数据结构
    • 3.1 JDK1.7详细结构
      • 3.2 JDK1.8详细结构
        • 结构概述
        • 红黑树结构
        • 为什么是转变红黑树?
      • 为什么转红黑树阈值是8?
        • 为什么
      • 3.3 ConcurrentHashMap的Java7 和Java8对比
        • 1、数据结构
        • 2、并发度
        • 3、Hash碰撞处理
        • 4、查询时间复杂度
    • 四、ConcurrentHashMap 源码分析(Java 8)
      • 4.1 Node节点
        • 4.2 put方法
          • 4.3 get方法
            • 4.4 remove&replace
            相关产品与服务
            容器服务
            腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档