首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

JDK容器学习之Queue:ConcurrentLinkedQueue

并发安全的链表队列 ConcurrentLinkedQueue

并发安全的链表队列,主要适用于多线程环境中;底层数据结构为链表,由于队列本身频繁的出队和进队,那么这个线程安全是如何保障

I. 数据结构

从命名可以基本推测底层数据结构应该是链表,结合源码看下具体的链表节点

代码语言:javascript
复制
private static class Node<E> {
    volatile E item;
    volatile Node<E> next;

    Node(E item) {
        UNSAFE.putObject(this, itemOffset, item);
    }

    boolean casItem(E cmp, E val) {
        return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
    }

    void lazySetNext(Node<E> val) {
        UNSAFE.putOrderedObject(this, nextOffset, val);
    }

    boolean casNext(Node<E> cmp, Node<E> val) {
        return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
    }

    private static final sun.misc.Unsafe UNSAFE;
    private static final long itemOffset;
    private static final long nextOffset;

    static {
        try {
            // jdk内用于保障原子操作的辅助类
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = Node.class;
            itemOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("item"));
            nextOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("next"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}

// 链表头,注意 volatile 声明
private transient volatile Node<E> head;

// 链表尾,注意 volatile 声明
private transient volatile Node<E> tail;

从定义可以需要注意以下几点

  1. Node内部只有next节点,因此底层存储为单向链表
  2. head和tail声明为volatile,禁止指令重排和修改对其他线程及时可见,保障线程安全的基本前提之一
  3. sun.misc.Unsafe UNSAFE JDK内部大量使用的一个辅助类,用于保障基本的cas操作(其原理尚没有研究,后续在并发篇中详细探究下)

II. 线程安全保障

按照常见的线程安全保障机制,一般处理方案是对进队和出队操作进行加锁,保障同一时刻只能有一个线程对队列进行写操作 然而队列不同于Map,List, 出队和进队是比较频繁的操作,即队列会出现频繁的修改,如果加锁,性能无异会受到严重的影响 因此线程安全保障不是通过加锁来实现的

1. 进队 offer

通过源码分析,不加锁如何实现线程安全

代码语言:javascript
复制
public boolean offer(E e) {
    // 队列中不能塞null
    checkNotNull(e);
    final Node<E> newNode = new Node<E>(e);

    // 死循环,确保入队一定成功
    for (Node<E> t = tail, p = t;;) {
        Node<E> q = p.next;
        if (q == null) { // p 为最后一个节点,尝试入队
            if (p.casNext(null, newNode)) { // 将tail的next指向newNode
                // 入队成功
                if (p != t) // hop two nodes at a time
                    casTail(t, newNode);  // Failure is OK.
                return true;
            }
            // Lost CAS race to another thread; re-read next
        }
        else if (p == q)
            // 多线程环境下,若此时另一个线程执行了出队操作,且此时p出队了
            // 那么在poll方法中的updateHead方法会将head指向当前的q,而把p.next指向自己,即:p.next == p
            // 这个时候就会造成tail在head的前面,需要重新设置p
            // 如果tail已经改变,将p指向tail,但这个时候tail依然可能在head前面
            // 如果tail没有改变,直接将p指向head
            p = (t != (t = tail)) ? t : head;
        else
            // tail已经不是最后一个节点,将p指向最后一个节点
            p = (p != t && t != (t = tail)) ? t : q;
    }
}

上面的实现虽然很短,在单线程环境下很好理解,就是获取队列尾,然后将队列尾的next指向新的节点,并更新tail即可 (即代码中if条件命中的逻辑)

涉及到多线程进行并发的出队进队时,逻辑就没这么简单了,下面进行多线程的场景区分,辅助理解

case1: 另一个线程也进行入队操作,且优先完成入队操作

下面图解进行示意

  • 由于线程B完成入队,导致q指向新的队尾
  • q != null, q != p
  • 进入else逻辑,执行将p指向最后一个节点,再次循环

case2: 另一个线程执行出队操作,且节点p正好出队了

下面图解示意

  • 完成初始化之后,p,t指向tail
  • 另一个线程执行出队操作,正好将p出队了,因为出队中会执行updateHead逻辑,可能出现 p.next = p的场景
  • 此时进行q的赋值,得到结果 q==p, 而此时指向的tail节点不会为null(因为指向的是内存中原队列的Node节点,真实存在),因此进入 else if逻辑

2. 出队 poll

代码语言:javascript
复制
public E poll() {
    restartFromHead:
    for (;;) {
        for (Node<E> h = head, p = h, q;;) {
            E item = p.item;

            if (item != null && p.casItem(item, null)) {
                // Successful CAS is the linearization point
                // for item to be removed from this queue.
                if (p != h) // hop two nodes at a time
                    updateHead(h, ((q = p.next) != null) ? q : p);
                return item;
            }
            else if ((q = p.next) == null) {
                updateHead(h, p);
                return null;
            }
            else if (p == q)
                continue restartFromHead;
            else
                p = q;
        }
    }
}

出队操作,原理和入队操作差不多,都是通过非锁机制实现,通过CAS确保出队和入队本身的原子性;而为了保证多线程的并发修改安全,在死循环中进行了各种场景的兼容

3. 队列个数获取

单独拿出size方法,因为与常见的容器不同,ConcurrentLinkedQueuesize()方法是非并发安全,且每次都会进行扫描整个链表,结果如下

代码语言:javascript
复制
public int size() {
    int count = 0;
    for (Node<E> p = first(); p != null; p = succ(p))
        if (p.item != null)
            // Collection.size() spec says to max out
            if (++count == Integer.MAX_VALUE)
                break;
    return count;
}

III. 应用场景&小结

  1. 底层存储结构为单向链表
  2. 出队、入队都是非加锁操作,通过CAS保证出队和入队的原子性;为保证并发安全,出队和入队放在死循环中进行,对不同的并发场景进行兼容
  3. size()方法非线程安全,且时间复杂度为 O(n)
  4. 判断队列是否为空,请用 isEmpty() 进行替代
  5. 因为未加锁,出队入队的性能相对较好,切代码的实现比较优雅;然实际的业务场景中,非大神级人物尽量不要这么玩,维护成本太高

参考

下一篇
举报
领券