前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >并发容器和队列

并发容器和队列

作者头像
胖虎
发布2020-12-22 14:29:02
3410
发布2020-12-22 14:29:02
举报
文章被收录于专栏:晏霖晏霖晏霖

点击上方“晏霖”,选择“置顶或者星标”

曾经有人关注了我

后来他有了女朋友

2.9.1 Java中的并发容器

在我们开发中,经常会使用到容器来存储对象或数据,容器的作用非常大,合理使用各个容器的特性和方法可以简化开发,提高系统性能和安全性。

我们所说的容器,在Java中可分为两大块,首先就是Collection接口的分支,有Set、List、Queue,另一块就是Map接口的分支,代表集合HashMap,他们派生出来的子类也很多,每个子类都有自己的特性。这部分属于Java基础,在面试中几乎常常被问到,熟练掌握Java集合特性,是面试官在试探一个Java程序员基础知识扎实程度重要手段之一。

早期,我们在Java多线程下操作一个容器除了借助synchronized关键字,就是使用同步容器。典型的同步容器有Vector和Hashtable,而且当时在设计Hashtable的时候还没有考虑到驼峰命名。我们列举出一些常见的线程安全的和非线程安全的容器,如下表2-14所示。

表2-14 早期Java中线程安全与非安全容器

线程安全

非线程安全

Vector

ArrayList、LinkedList

Properties

HashSet、TreeSet

HashTable

HashMap

……

……

在多线程中,无论是使用同步方法还是同步容器,使其对容器操作具有线程安全都是允许的,但是最大的问题就是效率,我们为了提高效率才使用的多线程,或者某些并发场景下,都是对效率问题不可忽视的,或者有些复杂的场景还会多线程交替对容器进行存取,可能会发生ArrayIndexOutOfBoundsException异常。又例如在多线程处理HashMap使,put操作会引起死循环,这种死循环会导致cpu接近100%。总结一下就是,同步容器对所有容器操作串行化,来实现他们的线程安全性,代价就是效率,因为串行化严重降低并发性和吞吐量。所以,在Java5.0提供来多种并发容器,不仅可以保证线程安全同时又能保证高效操作。增加来ConcurrentHashMap来代替基于散列的Map,以及CopyOnWriteArrayList在多线程操作下代替List,还有为提高阻塞队列性能的ConcurrentLinkedQueue。在我所接触的项目中,开发人员对ConcurrentHashMap使用率远远高于CopyOnWriteArrayList和ConcurrentLinkedQueue,原因可能第一是因为并发场景使用少,第二对于数据容器结构,Map使用起来较为广泛和方便。本小结有必要解释一下CopyOnWriteArrayList,其实对于我来说,更比较喜欢用List,只是个人习惯而已,在多线程和并发情况下,CopyOnWriteArrayList可以代替List,类似这样的还有CopyOnWriteArraySet代替同步Set。使用方法和普通的List和Set没有区别。可以看到他们的命名规则是在ArrayList和ArraySet加前缀CopyOnWrite,CopyOnWrite理解成“写时复制”,这也是他保证线程安全的特征,指的是,在每次修改或写入时,都会创建并重新生成一个新的容器副本。我们可以把这个过程形象的理解成项目的版本迭代开发,每个人每次迭代都会对前一个版本进行完整更新,在此基础上进行开发。每次要变更这个容器都会复制一份,在此基础上进行修改,这样保证了多个线程同时对这个容器进行迭代时,不会被彼此干扰。CopyOnWriteArrayList更像是一种读写分离的实现,他不能保证修改过程中对于读取数据的实时准确性,但是可以保证最终一致性。可见COW这一过程需要复制底层数组,这是有一定性能开销对,特别对于容器数据量较大时,影响比较大。CopyOnWriteArrayList还有一个应用场景,在循环中对普通的List进行remove操作时会出现ConcurrentModificationException异常,还可能会出现数组下标越界异常,对于这样的操作我们可以使用CopyOnWriteArrayList代替List,但是我推荐把集合转成Iterator进行循环对其remove操作。

2.9.2 ConcurrentHashMap原理

大家对ConcurrentHashMap应该是非常熟悉的,在并发和多线程中代替HashMap,保证线程安全,并且提高并发效率。对于使用者来说也是很简单的,其内部提供操作容器的方法与HashMap基本一致,而且在设计上也沿用了一些HashMap的概念。ConcurrentHashMap是在JDK5时引入的线程安全的哈希式集合,在JDK8之前一直采用分段锁的设计思想,分段锁是由Segment内部类实现的,他继承ReentrantLock,用来管理每个HashEntry,相当于把容器分成众多的Segment,对于修改数据时只需对他所在对Segment进行同步达到线程安全对。在JDK8之后,ConcurrentHashMap抛弃了分段锁的概念,直接使用Node内部类作为存储具体的键值对,把put流程的控制粒度更加细化,引入了CAS无锁操作和synchronized来保证并发安全。并且数据结构跟HashMap1.8的结构一样,数组+链表/红黑树。以上我只是简单说一些变更的内容和一些概念,这些概念可以引申出许多问题,很多同学可能在面试过程中已经领教过了。而且在JDK8之后的ConcurrentHashMap源码已经达到了6300多行,引入了很多内部方法,涉及到的知识点是非常多的。要了解每一行代码的含义是一件非常难的事,所以我们要归纳其重点,了解其精华即可。下面我们要结合源码来聊一聊ConcurrentHashMap原理的重要内容。还是老办法,深入一个容器或者对象的源码,首先要看内部定义的属性和类,请看示例代码2-37。

代码清单2-37 ConcurrentHashMap.java

//最大容量
private static final int MAXIMUM_CAPACITY = 1 << 30;
//默认容量
private static final int DEFAULT_CAPACITY = 16;
//array的大小,只用于toarray方法
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
//根据这个数来计算segment的个数,segment的个数是仅小于这个数且是2的几次方的一个数(ssize)
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
//加载因子
private static final float LOAD_FACTOR = 0.75f;
//变成红黑树的链表节点数,超过8就由链表转换成红黑树
static final int TREEIFY_THRESHOLD = 8;
//当树节点小于6自动转换成链表
static final int UNTREEIFY_THRESHOLD = 6;
// 在转变成树之前,还会有一次判断,只有键值对数量大于 64 才会发生转换。这是为了避免在哈希表建立初期,多个键值对恰好被放入了同一个链表中而导致不必要的转化。
static final int MIN_TREEIFY_CAPACITY = 64;
//每次进行转移的最小值这个值作为一个下限来避免Rsisize遇到过多的内存争用
private static final int MIN_TRANSFER_STRIDE = 16;
// 生成sizeCtl所使用的bit位数
private static final int RESIZE_STAMP_BITS = 16;
// 进行扩容所允许的最大线程数
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
// 记录sizeCtl中的大小所需要进行的偏移位数
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
// 一系列的标识
//forwarding nodes 主要作用是表征一个节点已经被处理干净(resize的时候被转移到新表了)
static final int MOVED     = -1; // hash for forwarding nodes
//表示树的根节点
static final int TREEBIN   = -2; // hash for roots of trees
//表示transient
static final int RESERVED  = -3; // hash for transient reservations
//31个1用来计算普通的node的哈希码
static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash
// 获取可用的CPU个数
static final int NCPU = Runtime.getRuntime().availableProcessors();
// 进行序列化的属性
private static final ObjectStreamField[] serialPersistentFields = {
    new ObjectStreamField("segments", Segment[].class),
    new ObjectStreamField("segmentMask", Integer.TYPE),
    new ObjectStreamField("segmentShift", Integer.TYPE),
};

//省略部分代码

//也是用节点数组来构成哈希表,在第一次插入的时候会懒初始化,size是2的整数次幂,直接通过iterators来访问
transient volatile Node<K,V>[] table;
//默认null,扩容时新生成的数组,是原数组的2倍
private transient volatile Node<K,V>[] nextTable;
//用来计算整个表的size,和longadder里面的base一致
private transient volatile long baseCount;
//用来控制表初始化,当为负数值。各种状态下这个int的值如下:
//初始化的:-1
//resize:-(1 + 被激活的参与resize的数量)
//表为空:表的initial size,如果没有这个值则为0
//初始化后:该表下次应该resize的值,等于当前表的size的0.75倍
private transient volatile int sizeCtl;
//当resizing的时候下一个tab下标索引值(当前值+1)
private transient volatile int transferIndex;
//当resize和创建counterCells的时候的自选锁,和longadder一致
private transient volatile int cellsBusy;
//和longadder的cells一致
private transient volatile CounterCell[] counterCells;

了解来其内部重要的属性和一些字段的含义,接下来我们要看看,这个容器最重要的组成结构Node,它是一个最核心的内部类,它包装了key-value键值对,所有插入ConcurrentHashMap的数据都包装在这里面,类似于一个HashMap,请看示例代码2-38。

代码清单2-38 ConcurrentHashMap.java

//Node节点定义,可以看出Node是一个键值对
static class Node<K,V> implements Map.Entry<K,V> {
    final int hash;
    final K key;
//ConcurrentHashMap中的HashEntry相对于HashMap中的Entry有一定的差异性:HashEntry中的value以及next都被volatile修饰,这样在多线程读写过程中能够保持它们的可见性。
    volatile V val;
    volatile Node<K,V> next;

    Node(int hash, K key, V val) {
        this.hash = hash;
        this.key = key;
        this.val = val;
    }

    Node(int hash, K key, V val, Node<K,V> next) {
        this(hash, key, val);
        this.next = next;
    }

    public final K getKey()     { return key; }
    public final V getValue()   { return val; }
    public final int hashCode() { return key.hashCode() ^ val.hashCode(); }
    public final String toString() {
        return Helpers.mapEntryToString(key, val);
    }
//不允许直接setValue
    public final V setValue(V value) {
        throw new UnsupportedOperationException();
    }

    public final boolean equals(Object o) {
        Object k, v, u; Map.Entry<?,?> e;
        return ((o instanceof Map.Entry) &&
                (k = (e = (Map.Entry<?,?>)o).getKey()) != null &&
                (v = e.getValue()) != null &&
                (k == key || k.equals(key)) &&
                (v == (u = val) || v.equals(u)));
    }

    // 它增加了find方法辅助map.get()方法。
    Node<K,V> find(int h, Object k) {
        Node<K,V> e = this;
        if (k != null) {
            do {
                K ek;
                if (e.hash == h &&
                    ((ek = e.key) == k || (ek != null && k.equals(ek))))
                    return e;
            } while ((e = e.next) != null);
        }
        return null;
    }
}

这个Node内部类与HashMap中定义的Node类很相似,但是有一些差别:

1. 它对value和next属性设置了volatile同步锁。

2. 它不允许调用setValue方法直接改变Node的value域。

3. 它增加了find方法辅助map.get()方法。

我们除了Node,还需要了解其他重要的内部类,由于篇幅原因,内部类的代码省略,大家在源码中自行查看。

代码清单2-39 ConcurrentHashMap.java

//当链表长度过长的时候,会转换为TreeNode,是把这些结点包装成TreeNode放在TreeBin对象中,由TreeBin完成对红黑树的包装。在红黑树结构中实际存放数据的节点
static final class TreeNode<K,V> extends Node<K,V> {……}
//存储对红黑树节点的引用。他是一个对象,包装的很多TreeNode节点,它代替了TreeNode的根节点,也就是说在实际的ConcurrentHashMap“数组”中,存放的是TreeBin对象,而不是TreeNode对象。
static final class TreeBin<K,V> extends Node<K,V> {……}
//一个用于连接两个table的节点类。它包含一个nextTable指针,用于指向下一张表。而且这个节点的key value next指针全部为null,它的hash值为-1. 这里面定义的find的方法是从nextTable里进行查询下一个节点,而不是以自身为头节点进行查找下一个节点,可以比喻成两个车厢之间的链子。
static final class ForwardingNode<K,V> extends Node<K,V> {……}
//占位加锁节点,执行某个方法时,对其加锁。
static final class ReservationNode<K,V> extends Node<K,V> {……}

根据以上我们对ConcurrentHashMap内部重要对属性和内部类的初步认识,可以对ConcurrentHashMap整体的结构分析,整理出图片,如图2-23所示。

图2-23 ConcurrentHashMap数据结构

图2-23示例中,左侧一列数据我们叫他table,这也是源码中定义的,这个table的数据存储分2中情况,这和JDK8中HashMap存储结构是一样的,分别是链表和红黑树。当且仅当槽内元素个数增加到8个,并且table的容量已经扩容到大于等于64时,节点链表转为红黑树,当某个槽内元素个数减少到6个时,由红黑树转回链表。链表转为红黑树需要2个条件都满足才可以,如果容量没有达到64时,而链表大于8个元素,只会扩容不会转为红黑树,扩容后元素将会重新排列。

下面介绍ConcurrentHashMap核心函数分析。和HashMap一样,我们最常用的就是put()和get(),首先我们从put()讲起。向ConcurrentHashMap存入元素使用put()方法,put()内部调用putVal()函数,这里才是存储一个元素核心的内容,对于putVal函数的流程大体如下。

① 判断存储的key、value是否为空,若为空,则抛出异常,否则,进入步骤②

② 计算key的hash值,随后进入无限循环,该无限循环可以确保成功插入数据,若table表为空或者长度为0,则初始化table表,否则,进入步骤③。

③ 根据key的hash值取出table表中的结点元素,若取出的结点为空(该桶为空),则使用CAS将key、value、hash值生成的结点放入桶中。否则,进入步骤④。

④ 若该结点的的hash值为MOVED,则对该桶中的结点进行转移,否则,进入步骤⑤

⑤ 对桶中的第一个结点(即table表中的结点)进行加锁,对该桶进行遍历,桶中的结点的hash值与key值与给定的hash值和key值相等,则根据标识选择是否进行更新操作(用给定的value值替换该结点的value值),若遍历完桶仍没有找到hash值与key值和指定的hash值与key值相等的结点,则直接新生一个结点并赋值为之前最后一个结点的下一个结点。进入步骤⑥

⑥ 若binCount值达到红黑树转化的阈值,则将桶中的结构转化为红黑树存储,最后,增加binCount的值。

我们把上述步骤转换称流程图如2-24所示。

图 2-24 ConcurrentHashMap存储元素流程图

接下来我们来看一下get()函数,无论在HashMap还是ConcurrentHashMap中取元素的操作是非常简单的,下面我们结合源码分析get()函数,如示例代码2-40所示。

代码清单2-40 ConcurrentHashMap.java

public V get(Object key) {
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
// 计算key的hash值
    int h = spread(key.hashCode());
    if ((tab = table) != null && (n = tab.length) > 0 &&
        (e = tabAt(tab, (n - 1) & h)) != null) {
// 表不为空并且表的长度大于0并且key所在的桶不为空
        if ((eh = e.hash) == h) {
// 表中的元素的hash值与key的hash值相等
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))
// 键相等,返回值
                return e.val;
        }
        else if (eh < 0)// 结点hash值小于0
// 在桶(链表/红黑树)中查找
            return (p = e.find(h, key)) != null ? p.val : null;
        while ((e = e.next) != null) {// 对于结点hash值大于0的情况
            if (e.hash == h &&
                ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
    return null;
}

get函数根据key的hash值来计算在哪个桶中,再遍历桶,查找元素,若找到则返回该结点,否则,返回null。

最后我们来聊聊ConcurrentHashMap的size()、mappingCount()函数,这两个函数都是用来统计ConcurrentHashMap元素的数量,唯一的区别就是,mappingCount()返回值支持long类型,如果当前容器元素大于Integer的最大值,使用size()函数只会返回Integer.MAX_VALUE,如果使用mappingCount()就会返回真实的数量,mappingCount()函数也是JDK8之后推荐使用的。无论是size()函数还是mappingCount()函数,核心统计的方法都是sumCount()。ConcurrentHashMap 提供了 baseCount、counterCells 两个辅助变量和一个CounterCell 辅助内部类。sumCount()就是迭代 counterCells来统计sum的过程。put 操作时,肯定会影响size(),在put()方法最后会调用addCount()方法。在addCount()方法中,如果counterCells == null, 则对 baseCount 做 CAS 自增操作。如果并发导致 baseCount CAS 失败了使用 counterCells。如果counterCells CAS 失败了,在 fullAddCount 方法中,会继续死循环操作,直到成功。ConcurrentHashMap在统计元素的数量大费周折,然而最后的结果还是一个估计值,多线程同时插入和移除时,统计数量并不是一定准确的,如果在统计容器元素数量的时候阻塞一切的插入和移除方法,返回的结果应该是准确的,当然ConcurrentHashMap就是一个支持并发高性能容器,如果出现这样的操作,牺牲的就是性能,所以在设计的时候也是权衡了利弊,不能保证鱼和熊掌兼得,那就牺牲微小的准确性来提高其性能。

2.9.3 Java中的队列

队列是一种数据结构,他有先进先出的性质,这点他和栈的性质正好相反。一般使用都是在队列尾部加入元素和从队列头部移除元素,我们经常把他使用在并发环境下。队列也可以理解成生产者-消费者模式,生产者将资源缓存进队列,消费者从缓存队列中取出资源。Java中提供2种队列,一种是BlockingQueue接口为代表的阻塞队列,应用较为广泛,一种是以ConcurrentLinkedQueue为代表的非阻塞队列,性能高于阻塞队列。

我们对两种队列进行应用层面上的分析,首先我们要了解什么是阻塞队列。

阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。

我们看一下BlockingQueue提供的几种方法,如下表2-15所示。

表2-15 阻塞队列提供的几种方法

方法\处理方式

抛出异常

返回特殊值

一直阻塞

超时退出

插入方法

add(e)队列未满时,返回true;队列满则抛出。

offer(e)队列未满时,返回true;队列满时返回false。非阻塞立即返回。

put(e)队列未满时,直接插入没有返回值;队列满时会阻塞等待,一直等到队列未满时再插入。

offer(e,time,unit)设定等待的时间,如果在指定时间内还不能往队列中插入数据则返回false,插入成功返回true。

移除方法

remove()队列不为空时,返回队首值并移除;队列为空时抛出。

poll()队列不为空时返回队首值并移除;队列为空时返回null。非阻塞立即返回。

take()队列不为空返回队首值并移除;当队列为空时会阻塞等待,一直等到队列不为空时再返回队首值。

poll(time,unit)设定等待的时间,如果在指定时间内队列还未孔则返回null,不为空则返回队首值。

检查方法

element()

peek()

不可用

不可用

注:异常:是指当阻塞队列满时候,再往队列里插入元素,会抛出IllegalStateException("Queue full")异常。当队列为空时,从队列里获取元素时会抛出NoSuchElementException异常 。

下面我们来了解在JDK7中一共提供了7个阻塞队列,分别是。

1. ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列,在创建对象时必须制定容量大小,支持公平锁和非公平锁。

2. LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列,此队列的默认长度为Integer.MAX_VALUE,按照先进先出的顺序进行排序。

3. PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列,默认自然序进行排序,也可以自定义实现compareTo()方法来指定元素排序规则,不能保证同优先级元素的顺序。

4. DelayQueue:一个使用优先级队列实现的无界阻塞队列,在创建元素时,可以指定多久才能从队列中获取当前元素。只有延时期满后才能从队列中获取元素DelayQueue可以运用在以下应用场景:1.缓存系统的设计:可以用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦能从DelayQueue中获取元素时,表示缓存有效期到了。2.定时任务调度。使用DelayQueue保存当天将会执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始执行,从比如TimerQueue就是使用DelayQueue实现的。)。

5. SynchronousQueue:一个不存储元素的阻塞队列,每一个put操作必须等待take操作,否则不能添加元素。支持公平锁和非公平锁(Executors.newCachedThreadPool()就使用了SynchronousQueue,这个线程池根据需要(新任务到来时)创建新的线程,如果有空闲线程则会重复使用,线程空闲了60秒后会被回收)。

6. LinkedTransferQueue:一个由链表结构组成的无界阻塞队列,相当于其它队列,LinkedTransferQueue队列多了transfer和tryTransfer方法。

7. LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列,队列头部和尾部都可以添加和移除元素,多线程并发时,可以将锁的竞争最多降到一半。

下面来探讨阻塞队列的实现原理,本文以ArrayBlockingQueue为例,其他阻塞队列实现原理可能和ArrayBlockingQueue有一些差别,但是大体思路应该类似,有兴趣的朋友可自行查看其他阻塞队列的实现源码。

ArrayBlockingQueue提供了3中构造函数,如下表2-16所示。

表2-15 ArrayBlockingQueue的3种构造方法

构造方法

描述

public ArrayBlockingQueue(int capacity)

构造指定大小的有界队列

public ArrayBlockingQueue(int capacity, boolean fair)

构造指定大小的有界队列,指定为公平或非公平锁

public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c)

构造指定大小的有界队列,指定为公平或非公平锁,指定在初始化时加入一个集合

ArrayBlockingQueue类中的几个成员变量,如下代码清单2-41所示。

代码清单2-41 ArrayBlockingQueue.java

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
    /**
     * 序列化ID。这个类依赖于默认的序列化
     *甚至对于默认序列化的项数组,即使
     *它是空的。否则,它不能被声明为.,即
     *这里需要。
     */
    private static final long serialVersionUID = -817911632652898426L;
    /** 排队的物品 */
    final Object[] items;
    /** 用于下一次获取、轮询、查看或删除的项目索引 */
    int takeIndex;
    /** 下一个put、.或add的项目索引 */
    int putIndex;
    /** 队列中的元素数量 */
    int count;
    /*
     * 并发控制使用经典的两条件算法
     *发现在任何教科书。
     */
    /** 主锁保护所有通道 */
    final ReentrantLock lock;
    /** 等待获取的条件 */
    private final Condition notEmpty;
    /** 等待放入的条件 */
    private final Condition notFull;

可以看出,ArrayBlockingQueue中用来存储元素的实际上是一个数组,takeIndex和putIndex分别表示队首元素和队尾元素的下标,count表示队列中元素的个数。lock是一个可重入锁,notEmpty和notFull是等待条件。

下面我简单的使用ArrayBlockingQueue写一个案例,使用单线程进行存入,多线程取出,如下代码2-42所示

代码清单2-42 BlockingQueueTest.java

public class BlockingQueueTest {
    public static void main(String[] args) throws InterruptedException {
        ArrayBlockingQueue abq = new ArrayBlockingQueue(10, true);
        for (int i = 1; i <= 10; i++) {
            put(abq, i);
            get(abq);
        }
    }

    private static void get(ArrayBlockingQueue abq) throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(2);
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    Integer integer = null;
                    try {
                        integer = (Integer) abq.take();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    if (null == integer) {
                        System.out.println("队列是空,没有取到元素");
                    } else {
                        System.out.println("取到了一个元素是:" + integer + "---目前队列中元素个数是 :" + abq.size());
                    }
                }
            });
        executorService.shutdown();
    }

    /**
     * 添加元素
     *
     * @param abq
     * @param i
     */
    private static void put(ArrayBlockingQueue abq, int i) throws InterruptedException {
        abq.put(i);
        System.out.println("存入了一个元素是   " + i);
    }
}

插入和取出的方法最常用的是put和take,因为只有这两个是阻塞的,使用时都需要先获取锁,没有获取到锁的线程会被挡在第⼀道大门之外 自旋拿锁,直到获取到锁。就算拿到锁了之后,也不⼀定会顺利进⾏put/take操作,需要判断队列是否可用(是否满/空),如果不可用,则会被阻塞,并释放锁。其他的方法根据实际情况而定。

运行结果如下,可以看出多线程并发也没有发生线程安全问题,而且体现出队列是先入先出。

存入了一个元素是 1

存入了一个元素是 2

取到了一个元素是:1---目前队列中元素个数是 :1

存入了一个元素是 3

取到了一个元素是:2---目前队列中元素个数是 :1

存入了一个元素是 4

取到了一个元素是:3---目前队列中元素个数是 :1

存入了一个元素是 5

取到了一个元素是:4---目前队列中元素个数是 :1

存入了一个元素是 6

取到了一个元素是:5---目前队列中元素个数是 :1

存入了一个元素是 7

取到了一个元素是:6---目前队列中元素个数是 :1

存入了一个元素是 8

取到了一个元素是:7---目前队列中元素个数是 :1

存入了一个元素是 9

取到了一个元素是:8---目前队列中元素个数是 :0

存入了一个元素是 10

取到了一个元素是:9---目前队列中元素个数是 :1

取到了一个元素是:10---目前队列中元素个数是 :0

阻塞队列介绍完毕,阻塞队列使用put/take方法可以实现在队列已满或空的时候达到线程阻塞状态,阻塞这种方式在线程并发时固然安全,但是也会造成效率上的问题,所以说今天我们来讲一个非阻塞队列——ConcurrentLinkedQueue,他能保证并发安全,而且还可以提高效率。通常 ConcurrentLinkedQueue 的性能好于 BlockingQueue。它是一个基于链接节点的无界线程安全队列。该队列的元素遵循先进先出的原则。头是最先加入的,尾是最近加入的,也就是插入的时候往尾部插,取出元素从头部取。该队列不允许null元素。

ConcurrentLinkedQueue内部是遵循CAS(比较并交换)的方式来实现。

下面介绍ConcurrentLinkedQueue几个重要方法。

首先重点说明add()和offer() 都是加入元素的方法(在ConcurrentLinkedQueue中这俩个方法没有任何区别,个别资料说不推荐使用add方法,说队列满了插入会抛异常,下面贴出源码,add方法调用的就是offer,所以这两个没区别,并且官方文档也有如下说明)

public boolean add(E e) {

return offer(e);

}

其次是poll()和peek()都是取头元素节点,区别在于前者会删除元素,后者不会。

下面我对队列的入队和出队的方法进行讲解。

1、入队

元素插入需要做两件事,第一是将入队节点设置成当前队列的最后一个节点。第二是更新tail节点,如果原来的tail节点的next节点不为空,则将tail更新为刚入队的节点(即队尾结点),如果原来的tail节点为空,则tail节点不动,把元素插入到tail的next节点处。也就是说每次tail移动都要隔着一个节点。请看如下2-43源码。

代码清单2-43 ConcurrentLinkedQueue.java

public boolean offer(E e) {
        //首先入队的对象不允许为null
        checkNotNull(e);
        //入队前,创建一个入队节点,构造一个内部函数
        final Node<E> newNode = new Node<E>(e);
        //死循环,入队不成功反复入队。
        for (Node<E> t = tail, p = t;;) {
             //创建一个指向tail节点的引用
            Node<E> q = p.next;
            //如果q=null说明p是尾节点则插入
            if (q == null) {
                // cas插入
                if (p.casNext(null, newNode)) {
                    //cas成功说明新增节点已经被放入链表,然后设置当前尾节点
                    if (p != t) // 一次跳两个节点
                        casTail(t, newNode);  // 失败是可以的.
                    return true;
                }
                // 丢失的CAS与另一线程竞争;重新读取下一个
            }
            else if (p == q)
            //多线程操作时候,由于poll时候会把老的head变为自引用,然后head的next变为新head,所以这里需要
            //重新找新的head,因为新的head后面的节点才是激活的节点
                p = (t != (t = tail)) ? t : head;
            else
                // Check for tail updates after two hops.
 p = (p != t && t != (t = tail)) ? t : q;
        }
    }
private static void checkNotNull(Object v) {
        if (v == null)
            throw new NullPointerException();
    }

2、出队

不是每次出队时都更新head节点,当head节点里有元素时,直接弹出head节点里的元素,而不会更新head节点。只有当head节点里没有元素时,则弹出head的next结点并更新head结点为原来head的next结点的next结点。

poll操作是在链表头部获取并且移除一个元素,下面看看实现原理,如示例代码2-44所示。

代码清单2-44 ConcurrentLinkedQueue.java

public E poll() {
        restartFromHead:
        //死循环
        for (;;) {
            //死循环
            for (Node<E> h = head, p = h, q;;) {
                //保存当前节点值
                E item = p.item;
                //当前节点有值则cas变为null
                if (item != null && p.casItem(item, null)) {
                    //cas成功标志当前节点以及从链表中移除
                    if (p != h) // 类似tail间隔2设置一次头节点
                        updateHead(h, ((q = p.next) != null) ? q : p);
                    return item;
                }
                //当前队列为空则返回null
                else if ((q = p.next) == null) {
                    updateHead(h, p);
                    return null;
                }
                //自引用了,则重新找新的队列头节点
                else if (p == q)
                    continue restartFromHead;
                else
                    p = q;
            }
        }
}

观察入队和出队的源码可以发现,无论入队还是出队,都是在死循环中进行的,也就是说,当一个线程调用了入队、出队操作时,会尝试获取链表的tail、head结点进行插入和删除操作,而插入和删除是通过CAS操作实现的,而CAS具有原子性。

故此,如果有其他任何一个线程成功执行了插入、删除都会改变tail/head结点,那么当前线程的插入和删除操作就会失败,则通过循环再次定位tail、head结点位置进行插入、删除,直到成功为止。

也就是说,ConcurrentLinkedQueue的线程安全是通过其插入、删除时采取CAS操作来保证的。不会出现同一个tail结点的next指针被多个同时插入的结点所抢夺的情况出现。

对于该队列来说size()函数是通过遍历整个集合,因此在队列容量较大时会有一定效率问题,如果只是查看队列是否为空可以使用isEmpty()函数来代替size()。

此外,如果在执行期间添加或删除元素。对于size()方法,返回的结果可能不准确。因此,此方法在并发时通常不太有用。

胖虎

热 爱 生 活 的 人

终 将 被 生 活 热 爱

我在这里等你哟!

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-12-10,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 晏霖 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • ArrayBlockingQueue类中的几个成员变量,如下代码清单2-41所示。
相关产品与服务
容器服务
腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档