前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >JDK容器学习之Queue:ConcurrentLinkedQueue

JDK容器学习之Queue:ConcurrentLinkedQueue

作者头像
一灰灰blog
发布2018-02-06 16:29:17
5670
发布2018-02-06 16:29:17
举报
文章被收录于专栏:小灰灰

并发安全的链表队列 ConcurrentLinkedQueue

并发安全的链表队列,主要适用于多线程环境中;底层数据结构为链表,由于队列本身频繁的出队和进队,那么这个线程安全是如何保障

I. 数据结构

从命名可以基本推测底层数据结构应该是链表,结合源码看下具体的链表节点

代码语言:javascript
复制
private static class Node<E> {
    volatile E item;
    volatile Node<E> next;

    Node(E item) {
        UNSAFE.putObject(this, itemOffset, item);
    }

    boolean casItem(E cmp, E val) {
        return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
    }

    void lazySetNext(Node<E> val) {
        UNSAFE.putOrderedObject(this, nextOffset, val);
    }

    boolean casNext(Node<E> cmp, Node<E> val) {
        return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
    }

    private static final sun.misc.Unsafe UNSAFE;
    private static final long itemOffset;
    private static final long nextOffset;

    static {
        try {
            // jdk内用于保障原子操作的辅助类
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = Node.class;
            itemOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("item"));
            nextOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("next"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}

// 链表头,注意 volatile 声明
private transient volatile Node<E> head;

// 链表尾,注意 volatile 声明
private transient volatile Node<E> tail;

从定义可以需要注意以下几点

  1. Node内部只有next节点,因此底层存储为单向链表
  2. head和tail声明为volatile,禁止指令重排和修改对其他线程及时可见,保障线程安全的基本前提之一
  3. sun.misc.Unsafe UNSAFE JDK内部大量使用的一个辅助类,用于保障基本的cas操作(其原理尚没有研究,后续在并发篇中详细探究下)

II. 线程安全保障

按照常见的线程安全保障机制,一般处理方案是对进队和出队操作进行加锁,保障同一时刻只能有一个线程对队列进行写操作 然而队列不同于Map,List, 出队和进队是比较频繁的操作,即队列会出现频繁的修改,如果加锁,性能无异会受到严重的影响 因此线程安全保障不是通过加锁来实现的

1. 进队 offer

通过源码分析,不加锁如何实现线程安全

代码语言:javascript
复制
public boolean offer(E e) {
    // 队列中不能塞null
    checkNotNull(e);
    final Node<E> newNode = new Node<E>(e);

    // 死循环,确保入队一定成功
    for (Node<E> t = tail, p = t;;) {
        Node<E> q = p.next;
        if (q == null) { // p 为最后一个节点,尝试入队
            if (p.casNext(null, newNode)) { // 将tail的next指向newNode
                // 入队成功
                if (p != t) // hop two nodes at a time
                    casTail(t, newNode);  // Failure is OK.
                return true;
            }
            // Lost CAS race to another thread; re-read next
        }
        else if (p == q)
            // 多线程环境下,若此时另一个线程执行了出队操作,且此时p出队了
            // 那么在poll方法中的updateHead方法会将head指向当前的q,而把p.next指向自己,即:p.next == p
            // 这个时候就会造成tail在head的前面,需要重新设置p
            // 如果tail已经改变,将p指向tail,但这个时候tail依然可能在head前面
            // 如果tail没有改变,直接将p指向head
            p = (t != (t = tail)) ? t : head;
        else
            // tail已经不是最后一个节点,将p指向最后一个节点
            p = (p != t && t != (t = tail)) ? t : q;
    }
}

上面的实现虽然很短,在单线程环境下很好理解,就是获取队列尾,然后将队列尾的next指向新的节点,并更新tail即可 (即代码中if条件命中的逻辑)

涉及到多线程进行并发的出队进队时,逻辑就没这么简单了,下面进行多线程的场景区分,辅助理解

case1: 另一个线程也进行入队操作,且优先完成入队操作

下面图解进行示意

  • 由于线程B完成入队,导致q指向新的队尾
  • q != null, q != p
  • 进入else逻辑,执行将p指向最后一个节点,再次循环
conOffer
conOffer

case2: 另一个线程执行出队操作,且节点p正好出队了

下面图解示意

  • 完成初始化之后,p,t指向tail
  • 另一个线程执行出队操作,正好将p出队了,因为出队中会执行updateHead逻辑,可能出现 p.next = p的场景
  • 此时进行q的赋值,得到结果 q==p, 而此时指向的tail节点不会为null(因为指向的是内存中原队列的Node节点,真实存在),因此进入 else if逻辑
conOffer2
conOffer2
2. 出队 poll
代码语言:javascript
复制
public E poll() {
    restartFromHead:
    for (;;) {
        for (Node<E> h = head, p = h, q;;) {
            E item = p.item;

            if (item != null && p.casItem(item, null)) {
                // Successful CAS is the linearization point
                // for item to be removed from this queue.
                if (p != h) // hop two nodes at a time
                    updateHead(h, ((q = p.next) != null) ? q : p);
                return item;
            }
            else if ((q = p.next) == null) {
                updateHead(h, p);
                return null;
            }
            else if (p == q)
                continue restartFromHead;
            else
                p = q;
        }
    }
}

出队操作,原理和入队操作差不多,都是通过非锁机制实现,通过CAS确保出队和入队本身的原子性;而为了保证多线程的并发修改安全,在死循环中进行了各种场景的兼容

3. 队列个数获取

单独拿出size方法,因为与常见的容器不同,ConcurrentLinkedQueuesize()方法是非并发安全,且每次都会进行扫描整个链表,结果如下

代码语言:javascript
复制
public int size() {
    int count = 0;
    for (Node<E> p = first(); p != null; p = succ(p))
        if (p.item != null)
            // Collection.size() spec says to max out
            if (++count == Integer.MAX_VALUE)
                break;
    return count;
}

III. 应用场景&小结

  1. 底层存储结构为单向链表
  2. 出队、入队都是非加锁操作,通过CAS保证出队和入队的原子性;为保证并发安全,出队和入队放在死循环中进行,对不同的并发场景进行兼容
  3. size()方法非线程安全,且时间复杂度为 O(n)
  4. 判断队列是否为空,请用 isEmpty() 进行替代
  5. 因为未加锁,出队入队的性能相对较好,切代码的实现比较优雅;然实际的业务场景中,非大神级人物尽量不要这么玩,维护成本太高

参考

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 并发安全的链表队列 ConcurrentLinkedQueue
    • I. 数据结构
      • II. 线程安全保障
        • 1. 进队 offer
        • 2. 出队 poll
        • 3. 队列个数获取
      • III. 应用场景&小结
        • 参考
        相关产品与服务
        容器服务
        腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档