专栏首页爱编码Java的ConcurrentLinkedQueue

Java的ConcurrentLinkedQueue

简介

实现一个线程安全的队列有两种实现方式一种是使用阻塞算法,另一种是使用非阻塞算法。使用阻塞算法的队列可以用一个(入队和出队用同一把锁)或两个锁(入队和出队用不同的锁)等方式来实现,而非阻塞的实现方式则可以使用循环CAS的方式来实现。

下面就来学习一下本文的主角ConcurrentLinkedQueue:

1)ConcurrentLinkedQueue的结构 2)offer方法原理 3)poll方法原理 4)HOPS设计

ConcurrentLinkedQueue的结构

首先看一下源码,从中你可以发现,它就是个队列的数据结构。

private static class Node<E> {
        volatile E item;
        volatile Node<E> next;
        //省略了很多。。。
        ........
}
//头节点
private transient volatile Node<E> head;
//尾节点
private transient volatile Node<E> tail;

public ConcurrentLinkedQueue() {
        //初始化时,头尾节点都指向同一个null节点。
       head = tail = new Node<E>(null);
}

Node节点主要包含了两个域:

一个是数据域item,另一个是next指针,用于指向下一个节点从而构成链式队列,并且都是用volatile进行修饰的,以保证内存可见性。而默认构造函数,head和tail指针会指向一个item域为null的节点

offer方法原理

入队列就是将入队节点添加到队列的尾部。

public boolean offer(E e) {
    checkNotNull(e);
    // 新建一个node
    final Node<E> newNode = new Node<E>(e);

    // 不断重试(for只有初始化条件,没有判断条件),直到将node加入队列
    // 初始化p、t都是指向tail
    // 循环过程中一直让p指向最后一个节点。让t指向tail
    for (Node<E> t = tail, p = t;;) {
        // q一直指向p的下一个
        Node<E> q = p.next;
        if (q == null) {
            // p is last node
            // 如果q为null表示p是最后一个元素,尝试加入队列
            // 如果失败,表示其他线程已经修改了p指向的节点
            if (p.casNext(null, newNode)) {
                // Successful CAS is the linearization point
                // for e to become an element of this queue,
                // and for newNode to become "live".
                // node加入队列之后,tail距离最后一个节点已经相差大于一个了,需要更新tail
                if (p != t) // hop two nodes at a time
                    // 这儿允许设置tail为最新节点的时候失败,因为添加node的时候是根据p.next是不是为null判断的,
                    casTail(t, newNode);  // Failure is OK.
                return true;
            }
            // Lost CAS race to another thread; re-read next
        }
        else if (p == q)
            // 虽然q是p.next,但是因为是多线程,在offer的同时也在poll,如offer的时候正好p被poll了,那么在poll方法中的updateHead方法会将head指向当前的q,而把p.next指向自己,即:p.next == p
            // 这个时候就会造成tail在head的前面,需要重新设置p
            // 如果tail已经改变,将p指向tail,但这个时候tail依然可能在head前面
            // 如果tail没有改变,直接将p指向head
            // We have fallen off list.  If tail is unchanged, it
            // will also be off-list, in which case we need to
            // jump to head, from which all live nodes are always
            // reachable.  Else the new tail is a better bet.
            p = (t != (t = tail)) ? t : head;
        else
            // Check for tail updates after two hops.
            // tail已经不是最后一个节点,将p指向最后一个节点
            p = (p != t && t != (t = tail)) ? t : q;
    }
}

入队列就是将入队节点添加到队列的尾部。为了方便理解入队时队列的变化,以及head节点和tair节点的变化,每添加一个节点做了一个队列的快照图。

上图的步骤解析如下:

第一步添加元素1。队列更新head节点指向元素1,head的next指向是null。tail节点指向还是原来的null。第二步添加元素2。队列更新head节点指向元素1,head的next节点指向是元素2,而此时tail节点指向元素2,tail的next指向null。第三步添加元素3,添加完后tail节点指向元素2,而tail的next节点指向元素3。第四步添加元素4,添加完后tail节点指向元素4,然后将tail的next指向null。

可能你还是不相信,那么我们debug看看是否真的是如此的

那么它是如何保证多线程下安全的呢?

从源代码角度来看整个入队过程主要做二件事情:

第一是定位出尾节点。

Node<E> q = p.next;
  if (q == null) {
     ................//省略很多代码
}else if (p == q)
    p = (t != (t = tail)) ? t : head;
else 
  p = (p != t && t != (t = tail)) ? t : q;

虽然q是p.next,但是因为是多线程,在offer的同时也在poll,如offer的时候正好p被poll了,那么在poll方法中的updateHead方法会将head指向当前的q,而把p.next指向自己,即:p.next == p这个时候就会造成tail在head的前面,需要重新设置p如果tail已经改变,将p指向tail,但这个时候tail依然可能在head前面 如果tail没有改变,直接将p指向head

第二是使用CAS算法能将入队节点设置成尾节点的next节点,如不成功则重试。

p.casNext(null, n)方法用于将入队节点设置为当前队列尾节点的next节点,p如果是null表示p是当前队列的尾节点,如果不为null表示有其他线程更新了尾节点,则需要重新获取当前队列的尾节点。

大致示意图如下:

poll()方法原理

出队列的就是从队列里返回一个节点元素,并清空该节点对元素的引用。下面让我们通过每个节点出队的快照来观察下head节点的变化。

从上图可知,并不是每次出队时都更新head节点,当head节点里有元素时,直接弹出head节点里的元素,而不会更新head节点。只有当head节点里没有元素时,出队操作才会更新head节点。这种做法也是通过hops来减少使用CAS更新head节点的消耗,从而提高出队效率。让我们再通过源码来深入分析下出队过程。

public E poll() {
    // 如果出现p被删除的情况需要从head重新开始
    restartFromHead:
    for (;;) {
        for (Node<E> h = head, p = h, q;;) {
            E item = p.item;

            if (item != null && p.casItem(item, null)) {
                // Successful CAS is the linearization point
                // for item to be removed from this queue.
                if (p != h) // hop two nodes at a time
                    updateHead(h, ((q = p.next) != null) ? q : p);
                return item;
            }
            else if ((q = p.next) == null) {
                // 队列为空
                updateHead(h, p);
                return null;
            }
            else if (p == q)
                // 当一个线程在poll的时候,另一个线程已经把当前的p从队列中删除——将p.next = p,p已经被移除不能继续,需要重新开始
                continue restartFromHead;
            else
                p = q;
        }
    }
}

final void updateHead(Node<E> h, Node<E> p) {
    if (h != p && casHead(h, p))
        h.lazySetNext(h);
}

首先获取头节点的元素,然后判断头节点元素是否为空,如果为空,表示另外一个线程已经进行了一次出队操作将该节点的元素取走,如果不为空,则使用CAS的方式将头节点的引用设置成null,如果CAS成功,则直接返回头节点的元素,如果不成功,表示另外一个线程已经进行了一次出队操作更新了head节点,导致元素发生了变化,需要重新获取头节点。

HOPS的设计

通过上面对offer和poll方法的分析,我们发现tail和head是延迟更新的,两者更新触发时机为:

tail更新触发时机:当tail指向的节点的下一个节点不为null的时候,会执行定位队列真正的队尾节点的操作,找到队尾节点后完成插入之后才会通过casTail进行tail更新;当tail指向的节点的下一个节点为null的时候,只插入节点不更新tail

head更新触发时机:当head指向的节点的item域为null的时候,会执行定位队列真正的队头节点的操作,找到队头节点后完成删除之后才会通过updateHead进行head更新;当head指向的节点的item域不为null的时候,只删除节点不更新head

总结

站在巨人的肩膀上,才能看得更远。比如下面的两篇优秀文章可以阅读。

参考文章

http://ifeve.com/concurrentlinkedqueue/ https://juejin.im/post/5aeeae756fb9a07ab11112af

本文分享自微信公众号 - 爱编码(ilovecode),作者:zero

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2019-07-25

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 经典算法之最优二叉树

    我想学过数据结构的小伙伴一定都认识哈夫曼,这位大神发明了大名鼎鼎的“最优二叉树”,为了纪念他呢,我们称之为“哈夫曼树”。哈夫曼树可以用于哈夫曼编码,编码的话学问...

    用户3467126
  • Java的ConcurrentHashMap

    ConcurrentHashMap是Java中的一个线程安全且高效的HashMap实现。

    用户3467126
  • Elasticsearch的Index和Mapping(二)

    本文使用的Elasticsearch版本为6.5.4,基本命令以及操作大都通用。下面通过MySQL与Elasticsearch的对比图,让我们更好地理解接下来的...

    用户3467126
  • 【原创】Java并发编程系列29 | ConcurrentLinkedQueue

    J.U.C 为常用的集合提供了并发安全的版本,前面讲解了 map 的并发安全集合 ConcurrentHashMap,List 并发安全集合 CopyOnWri...

    java进阶架构师
  • 9.并发包非阻塞队列ConcurrentLinkedQueue

    jdk1.7.0_79   队列是一种非常常用的数据结构,一进一出,先进先出。   在Java并发包中提供了两种类型的队列,非阻塞队列与阻塞队列,当然它们都...

    用户1148394
  • 数据结构基础温故-4.树与二叉树(下)

    上面两篇我们了解了树的基本概念以及二叉树的遍历算法,还对二叉查找树进行了模拟实现。数学表达式求值是程序设计语言编译中的一个基本问题,表达式求值是栈应用的一个典型...

    Edison Zhou
  • 精选 TOP 面试题 001 | LeetCode 237. 删除链表中的节点

    请编写一个函数,使其可以删除某个链表中给定的(非末尾)节点,你将只被给定要求被删除的节点。

    江不知
  • JS数据结构第五篇 --- 二叉树和二叉查找树

    从逻辑结构角度来看,前面说的链表、栈、队列都是线性结构;而今天要了解的“二叉树”属于树形结构。

    tandaxia
  • NOIP 2011初赛普及组C/C++答案详解

    3 C 8G = 8 * 1024 M 8 * 1024 / 2 = 4096张 注意,题目说的是“大约”,不要求精确。

    海天一树
  • K8s节点故障:资源控制器会触发哪些动作

    定义:在Kubernetes中,不可达节点被称为分区节点partitioned node,为了了解操作方法,让我们创建一个分区节点方案并了解其行为。

    公众号: 云原生生态圈

扫码关注云+社区

领取腾讯云代金券