前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >juc系列-并发Queue

juc系列-并发Queue

作者头像
topgunviper
发布2022-05-12 14:29:22
2390
发布2022-05-12 14:29:22
举报
文章被收录于专栏:啥都有的专栏啥都有的专栏

ConcurrentLinkedQueue是一个基于链表结构的无界队列,提供了Queue的基本特性FIFO,出入规则是:从head出,从tail进。非阻塞特性使其在高并发环境依然能有出色的性能。

ConcurrentLinkedQueue的基础数据结构:

代码语言:javascript
复制
public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
        implements Queue<E>, java.io.Serializable {

    private transient volatile Node<E> head;

    private transient volatile Node<E> tail;

    public ConcurrentLinkedQueue() {
        head = tail = new Node<E>(null);
    }
    
    。。。
    
    private static class Node<E> {
        volatile E item;
        volatile Node<E> next;
        。。。
    }

下面是ConcurrentLinkedQueue的一些需要注意的点:

1.不允许存储null值,否则抛出NPE

2.Iterator遍历时数据的弱一致性

3.size方法没有绝对的实时性

4.没有fast-fail机制,不会抛出ConcurrentModificationException

5.head,tail节点的延迟更新处理

6.无锁化设计的ABA问题

1.不允许存储null值,否则抛出NPE

每次offer操作时都需要进行checkNotNull操作,若item为null,直接跑NPE.

代码语言:javascript
复制
private static void checkNotNull(Object v) {
    if (v == null)
        throw new NullPointerException();
}
2.Iterator遍历时数据的弱一致性 3.size方法没有绝对的实时性 4.没有fast-fail机制

前两个问题都是由于ConcurrentLinkedQueue的异步特性造成的,在遍历时无法保证队列不会被其他线程修改.并且ConcurrentLinkedQueue没有fast-fail机制,即在遍历队列时,队列被别的线程修改并不会抛出ConcurrentModificationException。所以当前线程对于其他线程做的修改不能及时感知到。除了Iterator遍历/size()方法,其他所有批量操作/全局操作的方法都存在这个问题(如:toArray/addAll等)

5.head,tail节点的延迟更新处理

这点是最初接触ConcurrentLinkedQueue时比较困惑的地方,head和tail节点并不是实时更新的,也就是说每次offer/poll操作并不一定改变head/tail的值,这种处理方式可以减少入队出队时的cas操作,对性能是个很大的提升。下面结合offer和poll操作来具体分析下:

执行操作:初始化

代码语言:javascript
复制
ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<String>();

队列状态

concurrentlinkedqueue_init.png

执行操作:queue.offer("aaa");

队列状态

concurrentlinkedqueue_offer_aaa.png

执行操作:queue.offer("bbb");

队列状态

offer_bbb.png

执行操作:queue.offer("ccc");

队列状态

offer_ccc.png

执行操作:queue.offer("ddd");

队列状态

offer_ddd.png

下面根据offer源码具体分析:

offer操作源码:

代码语言:javascript
复制
    public boolean offer(E e) {
        checkNotNull(e);//NPE检查
        final Node<E> newNode = new Node<E>(e);

        for (Node<E> t = tail, p = t;;) {
            Node<E> q = p.next;                                      
(1)         if (q == null) {
                // p是最后一个节点,cas更新
                if (p.casNext(null, newNode)) {
                    //cas更新成功
                    if (p != t)
                        casTail(t, newNode);  //cas更新tail
                    return true;
                }
                // cas操作失败,重复操作
            }
(2)        else if (p == q)
                p = (t != (t = tail)) ? t : head;
(3)         else
                // Check for tail updates after two hops.
                p = (p != t && t != (t = tail)) ? t : q;
        }
    }

offer操作可以分为两个步骤:1.确定尾节点 2. 做cas更新

从上面的状态图中可以看到,tail指向的节点并不一定是队列的尾节点,所以要做元素入队操作的第一要务是确定尾节点.

重点分析下为什么q 会出现以上三种情况:

  1. q == null

当前的队列状态1

offer_bbb.png

执行操作:queue.offer("ccc");

当tail指向队列的最后一个节点bbb的时候,tail.next为null,此时q 为 null,执行cas更新节点的操作,p.casNext(null, newNode),将p的next值设置为newNode,等同于p.next = newNode; 队列的状态2如下:

offer_ccc.png

问题:为什么需要做下面这个判断?这个cas操作失败了会有什么后果?

代码语言:javascript
复制
if (p != t)
    casTail(t, newNode); 

下面继续分析

在上面队列状态基础上执行操作:queue.offer("ddd");

此时 p = tail,q = p.next; p指向bbb,q指向ccc。状态如下:

offerpq.png

这种情况下 q != null 且 p != q,所以执行(3)操作:

代码语言:javascript
复制
p = (p != t && t != (t = tail)) ? t : q;

这步操作后 p 指向了 节点ccc。即p指向了尾节点。尾节点确定了,那么下次循环即可做cas更新操作了。

得到最新状态3:

offer_ddd.png

在队列状态2【即执行queue.offer("ccc")后】的基础上做如下操作:

在并发环境中,当线程A执行操作queue.offer("ddd");之前,同时另一个线程执行完以下三次poll操作:

代码语言:javascript
复制
queue.poll();
queue.poll();
queue.poll();

此时线程A看到的队列的状态如下:

offer_poll.png

所以当 p == q时,表示当前节点已经不再链表中(已被移除)。所以这种情况下需要将p重新指向head或tail。

代码语言:javascript
复制
p = (t != (t = tail)) ? t : head;

至此,offer操作中的三种情况分析完毕。

下面分析poll操作。

当前状态

offer_ddd.png

执行操作:queue.poll(); 状态如下:

poll1.png

继续执行操作:queue.poll(); 状态如下:

poll2.png

继续执行操作:queue.poll(); 状态如下

poll3.png

可见head的更新和tail类似,都是采用了延迟更新的优化方式。 poll源码如下:

代码语言:javascript
复制
    public E poll() {
        restartFromHead:
        for (;;) {
            for (Node<E> h = head, p = h, q;;) {
                E item = p.item;

                if (item != null && p.casItem(item, null)) {
                    // 成功将item置为null
                    if (p != h) // 更新head
                        updateHead(h, ((q = p.next) != null) ? q : p);
                    return item;
                }
                else if ((q = p.next) == null) {
                    updateHead(h, p);
                    return null;
                }
                else if (p == q)
                    continue restartFromHead;
                else
                    p = q;
            }
        }
    }

poll操作和offer类似,先获取head,然后cas(依情况决定是否更新head),一般hop为2。

6.无锁化设计的ABA问题

在用cas做lock-free操作时有一个经典的ABA问题。那么ConcurrentLinkedQueue需要考虑这个问题吗?

ConcurrentLinkedQueue中不存在ABA问题,这主要依赖于Java语言的垃圾回收机制。当一个节点被poll或remove后,即被gc,该节点会被垃圾回收器回收。

使用场景
  1. Tomcat存储等待请求
代码语言:javascript
复制
protected ConcurrentLinkedQueue<SocketWrapper<Socket>> waitingRequests =
        new ConcurrentLinkedQueue<SocketWrapper<Socket>>();

ConcurrentLinkedDeque

JDK还提供了一个双端队列的实现版本。并发操作特性和ConcurrentLinkedQueue相似,提供了Deque接口特征。

参考: jdk1.7 java.util.concurrent.ConcurrentLinkedQueue源码

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2022-05-12,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 下面分析poll操作。
    • 使用场景
    • ConcurrentLinkedDeque
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档