—-
手机用户请
横屏
获取最佳阅读体验,REFERENCES
中是本文参考的链接,如需要链接和更多资源,可百度”Yiyuery”获取,多处同步更新:
java.util.concurrent.SynchronousQueue
类注释
/** Dual Queue */static final class TransferQueue<E> extends Transferer<E> { /*
* This extends Scherer-Scott dual queue algorithm, differing,
* among other ways, by using modes within nodes rather than
* marked pointers. The algorithm is a little simpler than
* that for stacks because fulfillers do not need explicit
* nodes, and matching is done by CAS'ing QNode.item field
* from non-null to null (for put) or vice versa (for take).
*/ /** Node class for TransferQueue. */
//内部的节点类,用于表示一个请求,这里可以看出TransferQueue内部是一个单链表,因此可以保证先进先出
static final class QNode { volatile QNode next; // next node in queue
volatile Object item; // CAS'ed to or from null
volatile Thread waiter; // to control park/unpark 用于判断是入队还是出队,true表示的是入队操作,也就是添加数据
final boolean isData; QNode(Object item, boolean isData) { this.item = item; this.isData = isData;
} // QNode内部通过volatile关键字以及Unsafe类的CAS方法来实现线程安全
// compareAndSwapObject方法第一个参数表示需要改变的对象,第二个参数表示偏移量
// 第三个参数表示参数期待的值,第四个参数表示更新后的值
// 下面的方法调用的意思是将当前的QNode对象(this)的next字段赋值为val,当目前的next的值是cmp时就会更新next字段成功
boolean casNext(QNode cmp, QNode val) { return next == cmp &&
UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
} //方法的原理同上面的类似,这里就是更新item的值了
boolean casItem(Object cmp, Object val) { return item == cmp &&
UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
} /**
* Tries to cancel by CAS'ing ref to this as item.
方法的原理同上面的类似,这里把item赋值为自己,就表示取消当前节点表示的操作了
*/
void tryCancel(Object cmp) {
UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this);
} //调用tryCancel方法后item就会是this,就表示当前任务被取消了
boolean isCancelled() { return item == this;
} /**
* Returns true if this node is known to be off the queue
* because its next pointer has been forgotten due to
* an advanceHead operation.
表示当前任务已经被返回了
*/
boolean isOffList() { return next == this;
} // Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE; private static final long itemOffset; private static final long nextOffset; static { try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = QNode.class;
itemOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("item"));
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) { throw new Error(e);
}
}
} /** Head of queue 首节点*/
transient volatile QNode head; /** Tail of queue 尾部节点*/
transient volatile QNode tail; /**
* Reference to a cancelled node that might not yet have been
* unlinked from queue because it was the last inserted node
* when it was cancelled.
*/
transient volatile QNode cleanMe; //构造函数中会初始化一个出队的节点,并且首尾都指向这个节点
TransferQueue() {
QNode h = new QNode(null, false); // initialize to dummy node.
head = h;
tail = h;
} /**
* Tries to cas nh as new head; if successful, unlink
* old head's next node to avoid garbage retention.
*/
void advanceHead(QNode h, QNode nh) { if (h == head &&
UNSAFE.compareAndSwapObject(this, headOffset, h, nh))
h.next = h; // forget old next
} /**
* Tries to cas nt as new tail.
*/
void advanceTail(QNode t, QNode nt) { if (tail == t)
UNSAFE.compareAndSwapObject(this, tailOffset, t, nt);
} /**
* Tries to CAS cleanMe slot.
*/
boolean casCleanMe(QNode cmp, QNode val) { return cleanMe == cmp &&
UNSAFE.compareAndSwapObject(this, cleanMeOffset, cmp, val);
} /**
* Puts or takes an item. transfer方法用于提交数据或者是获取数据
*/
@SuppressWarnings("unchecked") E transfer(E e, boolean timed, long nanos) { /* Basic algorithm is to loop trying to take either of
* two actions:
*
* 1. If queue apparently empty or holding same-mode nodes,
* try to add node to queue of waiters, wait to be
* fulfilled (or cancelled) and return matching item.
*
* 2. If queue apparently contains waiting items, and this
* call is of complementary mode, try to fulfill by CAS'ing
* item field of waiting node and dequeuing it, and then
* returning matching item.
*
* In each case, along the way, check for and try to help
* advance head and tail on behalf of other stalled/slow
* threads.
*
* The loop starts off with a null check guarding against
* seeing uninitialized head or tail values. This never
* happens in current SynchronousQueue, but could if
* callers held non-volatile/final ref to the
* transferer. The check is here anyway because it places
* null checks at top of loop, which is usually faster
* than having them implicitly interspersed.
*/ QNode s = null; // constructed/reused as needed
boolean isData = (e != null);//如果e不为null,就说明是添加数据的入队操作 for (;;) {
QNode t = tail;
QNode h = head; if (t == null || h == null) // saw uninitialized value
continue; // spin
//当队列为空的时候或者新加的操作和队尾的操作是同一个操作,可能都是入队操作也可能是出队操作,说明当前没有反向操作的线程空闲
if (h == t || t.isData == isData) { // empty or same-mode
QNode tn = t.next; //这是一个检查,确保t指向队尾
if (t != tail) // inconsistent read
continue; //tn不为null,说明t不是尾部节点,就执行advanceTail操作,将tn作为尾部节点,继续循环
if (tn != null) { // lagging tail
advanceTail(t, tn); continue;
} //如果timed为true,表示带有超时参数,等待超时期间没有其他相反操作的线程提交就会直接返回null
//这里如果nanos初始值就是0,比如不带超时时间的offer和poll方法,当队尾的节点不是相反操作时就会直接返回null
if (timed && nanos <= 0) // can't wait
return null; //如果没有超时时间或者超时时间不为0的话就创建新的节点
if (s == null)
s = new QNode(e, isData); //使tail的next指向新的节点
if (!t.casNext(null, s)) // failed to link in
continue; //更新TransferQueue的tail指向新的节点,这样tail节点就始终是尾部节点
advanceTail(t, s); // swing tail and wait
//如果当前操作是带超时时间的,则进行超时等待,否则就挂起线程,直到有新的反向操作提交
Object x = awaitFulfill(s, e, timed, nanos); //当挂起的线程被中断或是超时时间已经过了,awaitFulfill方法就会返回当前节点,这样就会有x == s为true
if (x == s) { // wait was cancelled
//将队尾节点移出,并重新更新尾部节点,返回null,就是入队或是出队操作失败了
clean(t, s); return null;
} if (!s.isOffList()) { // not already unlinked
advanceHead(t, s); // unlink if head
if (x != null) // and forget fields
s.item = s;
s.waiter = null;
} return (x != null) ? (E)x : e; } else { // complementary-mode 提交操作的时候刚刚好有反向的操作在等待
QNode m = h.next; // node to fulfill
if (t != tail || m == null || h != head) continue; // inconsistent read Object x = m.item; //这里先判断m是否是有效的操作
if (isData == (x != null) || // m already fulfilled
x == m || // m cancelled
!m.casItem(x, e)) { // lost CAS
advanceHead(h, m); // dequeue and retry
continue;
} //更新头部节点
advanceHead(h, m); // successfully fulfilled
//唤醒m节点的被挂起的线程
LockSupport.unpark(m.waiter); //返回的结果用于给对应的操作,如take、offer等判断是否执行操作成功
return (x != null) ? (E)x : e;
}
}
} /**
* Spins/blocks until node s is fulfilled.
* 下面看看执行挂起线程的方法awaitFulfill
* @param s the waiting node
* @param e the comparison value for checking match
* @param timed true if timed wait
* @param nanos timeout value
* @return matched item, or s if cancelled
*/
Object awaitFulfill(QNode s, E e, boolean timed, long nanos) { /* Same idea as TransferStack.awaitFulfill */
//首先获取超时时间
final long deadline = timed ? System.nanoTime() + nanos : 0L; //当前操作所在的线程
Thread w = Thread.currentThread(); //线程被挂起或是进入超时等待之前阻止自旋的次数
int spins = ((head.next == s) ?
(timed ? maxTimedSpins : maxUntimedSpins) : 0); for (;;) { //这里首先判断线程是否被中断了,如果被中断了就取消等待,并设置s的item指向s本身作为标记
if (w.isInterrupted())
s.tryCancel(e);
Object x = s.item; //x != e就表示超时时间到了或是线程被中断了,也就是执行了tryCancel方法
if (x != e) return x; //这里先判断超时的时间是否过了
if (timed) {
nanos = deadline - System.nanoTime(); if (nanos <= 0L) {
s.tryCancel(e); continue;
}
} //这里通过多几次循环来避免直接挂起线程
if (spins > 0)
--spins; else if (s.waiter == null)
s.waiter = w; else if (!timed)
//park操作会让线程挂起进入等待状态(Waiting),需要其他线程调用unpark方法唤醒
LockSupport.park(this); else if (nanos > spinForTimeoutThreshold)
//parkNanos操作会让线程挂起进入限期等待(Timed Waiting),不用其他线程唤醒,时间到了会被系统唤醒
LockSupport.parkNanos(this, nanos);
}
} /**
* Gets rid of cancelled node s with original predecessor pred.
*/
void clean(QNode pred, QNode s) {
s.waiter = null; // forget thread
/*
* At any given time, exactly one node on list cannot be
* deleted -- the last inserted node. To accommodate this,
* if we cannot delete s, we save its predecessor as
* "cleanMe", deleting the previously saved version
* first. At least one of node s or the node previously
* saved can always be deleted, so this always terminates.
*/
while (pred.next == s) { // Return early if already unlinked
QNode h = head;
QNode hn = h.next; // Absorb cancelled first node as head
if (hn != null && hn.isCancelled()) {
advanceHead(h, hn); continue;
}
QNode t = tail; // Ensure consistent read for tail
if (t == h) return;
QNode tn = t.next; if (t != tail) continue; if (tn != null) {
advanceTail(t, tn); continue;
} if (s != t) { // If not tail, try to unsplice
QNode sn = s.next; if (sn == s || pred.casNext(s, sn)) return;
}
QNode dp = cleanMe; if (dp != null) { // Try unlinking previous cancelled node
QNode d = dp.next;
QNode dn; if (d == null || // d is gone or
d == dp || // d is off list or
!d.isCancelled() || // d not cancelled or
(d != t && // d not tail and
(dn = d.next) != null && // has successor
dn != d && // that is on list
dp.casNext(d, dn))) // d unspliced
casCleanMe(dp, null); if (dp == pred) return; // s is already saved node
} else if (casCleanMe(null, pred)) return; // Postpone cleaning s
}
} private static final sun.misc.Unsafe UNSAFE; private static final long headOffset; private static final long tailOffset; private static final long cleanMeOffset; static { try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = TransferQueue.class;
headOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("head"));
tailOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("tail"));
cleanMeOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("cleanMe"));
} catch (Exception e) { throw new Error(e);
}
}
}
代码实现里的Dual Queue
或Stack
内部是用链表LinkedList
来实现的,其节点状态为以下三种状态:
- 持有数据 – put()方法的元素
- 持有请求 – take()方法
- 空
这个算法的特点就是任何操作都可以根据节点的状态判断执行,而不需要用到锁。其核心接口是Transfer
,生产者的put或消费者的take都使用这个接口,根据第一个参数来区别是入列(栈)还是出列(栈)。
SynchronousQueue 没有任何内部容量。SynchronousQueue 非常适合于线程之间交换信息,均衡生产者与消费者的处理速率。 典型的应用场景是线程池newCachedThreadPool,从上面的源码可以看出,如果入队操作和出队操作的处理速度相差比较大的话有可能会创建大量线程,有耗尽内存的风险。
当fair==true,创建一个公平的同步队列,TransferQueue,最先等待的最先释放(先进先出queue)