首页
学习
活动
专区
圈层
工具
发布

企业常用的并发编程Queue的源码分析

老顾导读

1、前言

2、阻塞队列

3、ArrayBlockingQueue源码分析

4、LinkedBlockingQueue源码分析

5、非阻塞队列

6、ConcurrentLinkedQueue源码详解

7、总结

一、前言

我们在开发过程中,经常会听到把这个对象放到队列中,然后再拿出来;还有什么发送消息,消费消息,消息中间件kafka、rocketmq等技术术语。

这些需求本质就是生产者,消费者模式,今天老顾就来分享一下java中常用的几个Queue,以及他们的不同之处,以及如何保证线程安全的?

二、阻塞队列

顾名思义就是这些队列是阻塞的,即BlockingQueue;为什么是阻塞?因为它是基于ReentrantLock加锁的机制,保证了线程安全。

看看有哪些常用BlockingQueue实现,如下图:

上图说明了BlockingQueue----->Queue-->Collection

队列的特点是:先进先出(FIFO)

BlockingQueue的方法

BlockingQueue 具有 4 组不同的方法用于插入、移除以及对队列中的元素进行检查。如果请求的操作不能得到立即执行的话,每个方法的表现也不同。这些方法如下:

不同的行为方式解释:

1、(抛出异常)

add添加元素,如果队列已满,导致无法添加,会直接抛一个异常

remove移除元素,如果队列为空,导致无法执行,会直接抛一个异常

2、(有返回值,不抛出异常)

如果试图的操作无法立即执行,返回一个true或false的值

3、(阻塞,一直等待)

如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行。

4、(阻塞,等待超时)

如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行,但等待时间不会超过给定值。返回一个特定值以告知该操作是否成功(典型的是 true / false)。

不能向BlockingQueue插入一个空对象,否则会抛出NullPointerException,相应的实现类校验代码。

private static void checkNotNull(Object v) { if (v == null) throw new NullPointerException();}

BlockingQueue特性

1、BlockingQueue :不接受 null 元素。试图 add、put 或 offer 一个 null 元素时,某些实现会抛出 NullPointerException。null 被用作指示 poll 操作失败的警戒值。

2、BlockingQueue: 可以是限定容量的。它在任意给定时间都可以有一个 remainingCapacity,超出此容量,便无法无阻塞地 put 附加元素。没有任何内部容量约束的 BlockingQueue 总是报告Integer.MAX_VALUE 的剩余容量。

3、BlockingQueue :实现主要用于生产者-使用者队列,但它另外还支持 Collection 接口。因此,举例来说,使用remove(x) 从队列中移除任意一个元素是有可能的。然而,这种操作通常不 会有效执行,只能有计划地偶尔使用,比如在取消排队信息时。

4、BlockingQueue :实现是线程安全的。所有排队方法都可以使用内部锁或其他形式的并发控制来自动达到它们的目的。

案例代码

我们下面来介绍一下常用的几个queue

三、ArrayBlockingQueue源码分析

ArrayBlockingQueue底层是使用一个数组实现队列的,并且在构造ArrayBlockingQueue时需要指定容量,也就意味着底层数组一旦创建了,容量就不能改变了,因此ArrayBlockingQueue是一个容量限制的阻塞队列。因此,在队列全满时执行入队将会阻塞,在队列为空时出队同样将会阻塞。

关键的成员变量

里面非常重要的就 ReentrantLock lock;只有一把锁,我们在来看看添加元素的方法

put方法总结:

1. ArrayBlockingQueue不允许元素为null

2. 获取锁,加锁操作

3. ArrayBlockingQueue在队列已满时将会调用notFull的await()方法释放锁并处于阻塞状态

4. 一旦ArrayBlockingQueue不为满的状态,就将元素入队

5.释放锁

再来看看移除元素代码

take方法总结:

获取锁 加锁

如果队列为空,那么将阻塞

否则调用dequeue()出队一个元素。

释放锁

小结

ArrayBlockingQueue的并发阻塞是通过ReentrantLock和Condition来实现的,ArrayBlockingQueue内部只有一把锁,意味着同一时刻只有一个线程能进行入队或者出队的操作。

四、LinkedBlockingQueue 源码分析

LinkedBlockingQueue是一个基于链表实现的可选容量的阻塞队列。队头的元素是插入时间最长的,队尾的元素是最新插入的。新的元素将会被插入到队列的尾部。

LinkedBlockingQueue的容量限制是可选的,如果在初始化时没有指定容量,那么默认使用int的最大值作为队列容量。

底层数据结构

LinkedBlockingQueue内部是使用链表实现一个队列的,但是却有别于一般的队列,在于该队列至少有一个节点,头节点不含有元素。结构图如下:

源码分析

核心成员变量

LinkedBlockingQueue可以指定容量,内部维持一个队列,所以有一个头节点head和一个尾节点last,内部维持两把锁,一个用于入队,一个用于出队,还有锁关联的Condition对象

添加元素源码

可以看出同一时刻,只能有一个线程执行入队操作,因为putLock在将元素插入到队列尾部时加锁了

移除元素源码

获取了takeLock锁,当队列为空时,就加入到notEmpty(的条件等待队列中,当队列不为空时就取走一个元素,当取完发现还有元素可取时,再通知一下自己的伙伴(等待在条件队列中的线程);最后,如果队列从满到非满,通知一下put线程。

我们再来看看remove()方法

再remove方法使用的时候,就直接加了2把锁;即不让加元素,也不让消费元素

LinkedBlockingQueue总结

LinkedBlockingQueue是允许两个线程同时在两端进行入队或出队的操作的,但一端同时只能有一个线程进行操作,这是通过两把锁来区分的;

五、非阻塞队列

上面介绍了阻塞队列,这里我们再来介绍一下非阻塞队列ConcurrentLinkedQueue,两者的区别:使用阻塞算法的队列可以用一个锁(入队和出队用同一把锁)或两个锁(入队和出队用不同的锁)等方式来实现,而非阻塞的实现方式则可以使用循环CAS的方式来实现

ConcurrentLinkedQueue是一个基于链接节点的无界线程安全队列,它采用先进先出的规则对节点进行排序,当我们添加一个元素的时候,它会添加到队列的尾部,当我们获取一个元素时,它会返回队列头部的元素

ConcurrentLinkedQueue的类图如下

六、ConcurrentLinkedQueue源码详解

ConcurrentLinkedQueue由head节点和tail节点组成,每个节点(Node)由节点元素(item)和指向下一个节点的引用(next)组成,节点与节点之间就是通过这个next关联起来,从而组成一张链表结构的队列。

ConcurrentLinkedQueue是没有put和take方法的,因为是非阻塞的,添加元素直接使用offer方法,我们来看看源码

能明显看到是cas方式判断,是否能够添加元素

再来看看poll方法

ConcurrentLinkedQueue总结

使用 CAS 原子指令来处理对数据的并发访问,这是非阻塞算法得以实现的基础

七、总结

今天老顾介绍并发编程中,经常会用的几种queue,希望能够帮助小伙伴们深入的了解。谢谢!!!

  • 发表于:
  • 原文链接https://page.om.qq.com/page/OsCMZvOR38p6WfJGPBHDy_ig0
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。
领券