J.U.C 为常用的集合提供了并发安全的版本,前面讲解了 map 的并发安全集合 ConcurrentHashMap,List 并发安全集合 CopyOnWriteArrayList,Set 并发安全集合 CopyOnWriteArraySet,本篇文章就来介绍并发安全的队列 ConcurrentLinkedQueue。
ConcurrentLinkedQueue 是一个基于链接节点的无边界的线程安全队列,采用非阻塞算法实现线程安全。分以下部分讲解:
队列由单向链表实现,ConcurrentLinkedQueue 持有头尾指针(head/tail 属性)来管理队列。
队列进行出队入队时对节点的操作都是通过 CAS 实现,保证线程安全。
public class ConcurrentLinkedQueue<E> {
private transient volatile Node<E> head;
private transient volatile Node<E> tail;
/**
* 节点类
* CAS保证节点操作安全
*/
private static class Node<E> {
volatile E item;
volatile Node<E> next;
Node(E item) {
// 获得item 和 next 的偏移量
UNSAFE.putObject(this, itemOffset, item);
}
/**
* 更改Node中的数据域item
*/
boolean casItem(E cmp, E val) {
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
/**
* 更改Node中的指针域next
*/
void lazySetNext(Node<E> val) {
UNSAFE.putOrderedObject(this, nextOffset, val);
}
/**
* 更改Node中的指针域next
*/
boolean casNext(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
}
/**入队*/
public boolean offer(E e)
/**出队*/
public E poll()
}
看下源码:
public boolean offer(E e) {
checkNotNull(e);
final Node<E> newNode = new Node<E>(e);
for (Node<E> t = tail, p = t;;) {// 注意这里是个循环
Node<E> q = p.next;
if (q == null) {//(1)
/*
* q == null 表示p是最后一个节点,设置t.next=newNode
* 如果失败,说明表示其他线程已经修改了p的指向,循环尝试直到成功
*/
if (p.casNext(null, newNode)) {// 2)
// node 加入节点后会导致tail距离最后一个节点相差大于一个,需要更新tail
if (p != t)//(3)
casTail(t, newNode);//(4) casTail:设置tail 尾节点
return true;
}
}
else if (p == q)//(5)
/*
* 多线程环境下,offer(e)和poll()同时执行,
* 此时p节点被poll(),设置了p.next=p,所以此时q=p.next=p
* 这里要重新设置p结点
*/
p = (t != (t = tail)) ? t : head;//(6)
else
// tail并没有指向尾节点,重新设置p
p = (p != t && t != (t = tail)) ? t : q;//(7)
}
}
单看代码很难理解 offer()过程,不用担心,我们一起从头复现一下 offer()过程:
public E poll() {
restartFromHead:// 如果出现p被删除的情况需要从head重新开始
for (;;) {
for (Node<E> h = head, p = h, q;;) {
E item = p.item;
// item不为null,将item返回并设置为null
if (item != null && p.casItem(item, null)) {//(1)
if (p != h)//(2)
/*
* p != head 则更新head
* p.next != null,则将head更新为p.next ,否则更新为p
*/
updateHead(h, ((q = p.next) != null) ? q : p);//(3)
return item;
}
// p.next == null 队列为空
else if ((q = p.next) == null)//(4)
updateHead(h, p);
return null;
}
// 另一个线程已经把p从队列中删除并设置p.next = p,p已经被移除不能继续,需要重新开始
else if (p == q)//(5)
continue restartFromHead;
else
p = q;//(6)
}
}
}
/**
* 更新head
*/
final void updateHead(Node<E> h, Node<E> p) {
if (h != p && casHead(h, p))
h.lazySetNext(h);
}
同样复现一下 poll()过程,接着上文 offer()之后的数据:
需要保证线程安全的三种情况:
多个线程同时执行到 casNext()设置最后的节点,casNext()通过 CAS 实现,第一个线程执行成功设置了最后一个节点后,其他线程的在 CAS 时发现期望的最后节点和实际上的最后节点不一致,CAS 就会失败,然后继续循环尝试直到成功。
同样是通过 CAS 保证线程安全,多个线程同时执行到 casItem()设置当前节点 item=null,第一个线程执行成功设置了当前节点 item=null 后,其他线程的在 CAS 时发现期望的 item 与实际的 item 不一致,CAS 就会失败,然后继续循环尝试 poll 下一个节点直到成功。
线程 A 要设置 p.next=newNode,但是此时 poll()将 p 删除了。当 poll()将 p 删除时设置了 p.next=p,offer()方法中会检查这种情况,发现有 p.next=p 就重新设置一个合适的 p 节点,以便将 newNode 入队。
tail 更新时机:tail 节点不总是尾节点。如果 tail 节点的 next 节点不为空,则将入队节点设置成 tail 节点;如果 tail 节点的 next 节点为空,则只入队不更新尾节点。head 更新时机:并不是每次出队时都更新 head 节点。当 head 节点里有元素时,直接弹出 head 节点里的元素,而不会更新 head 节点;只有当 head 节点里没有元素时,出队操作才会更新 head 节点。
head 和 tail 的更新总是间隔了一个。如果让 tail 永远作为队列的队尾节点,代码不是逻辑简单容易实现吗?
如果让 tail 永远作为队列的队尾节点,实现的代码量会更少,而且逻辑更易懂。但是,这样做有一个缺点,如果大量的入队操作,每次都要执行 CAS 进行 tail 的更新,汇总起来对性能也会是大大的损耗。如果能减少 CAS 更新的操作,无疑可以大大提升入队的操作效率,所以 doug lea 大师每间隔 1 次(tail 和队尾节点的距离为 1)进行才利用 CAS 更新 tail。 对 head 的更新也是同样的道理。