首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

并发编程之SynchronousQueue

上个礼拜断断续续看了挺久的SynchronousQueue,第一眼理解的意思和现在看懂了之后的意思感觉还是有那么大的差别的。所以不懂的东西慢慢看总会看懂的~~

下面进入正题

What is SynchronousQueue

首先什么是SynchronousQueue呢?它也是一种阻塞队列,实现了BlockingQueue 的接口。

我一开始看的时候,以为它和Exchanger 一样,也是可以两个线程之间交换东西。事实上一个类代码读下来,SynchronousQueue更加的细腻,当然二者有相似之处。

都可以理解为多个线程换东西,而SynchronousQueue和Exchanger类特点方面却又区别:

前者被设定为没有容量,内部用链表实现,定位是阻塞队列,在线程池中用到

而Exchanger在并发不大时候使用Object来交换,而当并发大时候,Exchanger则使用数组来作为线程交换的中介空间。

所以二者用法场景上有区别。

具体可看:Java并发学习(十八)-并发工具Exchanger

在SynchronousQueue里面有两种性质的队列,公平模式和非公平模式,对于公平模式,可以理解为FIFO(先进先出);而非公平模式则可以理解为LIFO(后进先出)。

Synchronous有以下特点:

里面没有容量,每一个put操作必须要等待一个take操作,否则不能继续添加元素,反之亦然。但事实上,公平模式是用队列实现,而非公平模式则为栈实现,阻塞的线程信息也是通过相应的Node进行链接的,但是通过peek, contains, clear, isEmpty … 等方法是无法获得其值的。

公平性就是每次进行交换的线程都和队头元素(最先进来的)进行比较,而非公平性则是和最后进来的元素进行比较,这就是里面实现公平性和非公平行的思路。

再看看SynchronousQueue的定义:

public class SynchronousQueue extends AbstractQueue

implements BlockingQueue, java.io.Serializable

里面定义了数据的交换规范Transferer 接口:

abstract static class Transferer {

abstract E transfer(E e, boolean timed, long nanos);

}

两种模式都实现了它,当参数e为null时候,说明该线程是想去获得数据,而当参数e不为null时候,说明该线程是想去提供数据。

下面分别来分析公平性和非公平性的实现代码:

UnFair模式

在Fair模式中,节点是通过栈来存储的,当然栈只是逻辑结构,还是通过链表实现,先看看它的定义:

static final class TransferStack extends Transferer {

//代表未完成的消费者

static final int REQUEST = 0;

//表明该节点是生产者

static final int DATA = 1;

//正在和另一个线程交换数据中

static final int FULFILLING = 2;

//由m判断是否在交换数据

static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }

...

}

SNode

上面三种状态,指的是里面所存储数据节点的SNode的状态,接下来看SNode定义:

static final class SNode {

volatile SNode next; // 下一个节点。

volatile SNode match; // 和本节点配对的节点。

volatile Thread waiter; // 也就是需要被pack或者unpack的线程

Object item; // //传输的数据。

int mode; //模式。指的是TransferStack里面的三种

SNode(Object item) {

this.item = item;

}

/**

* 把val接入到cmp后面一个节点。

*/

boolean casNext(SNode cmp, SNode val) {

return cmp == next &&

UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);

}

/**

* 尝试去匹配某个Snode s,如果是这个节点,就唤醒它

* */

boolean tryMatch(SNode s) {

if (match == null &&

UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) { //如果当前match还没有被匹配占坑,那么就把s来占坑。

Thread w = waiter; //waiter估计不是当前节点。

if (w != null) { // waiters need at most one unpark

waiter = null; //每人等待,

LockSupport.unpark(w); //唤醒

}

return true;

}

return match == s;

}

/**

* 尝试取消,就是把match换为自己。自己匹配自己,。前提是match为null。

* 也就是说,如果已经匹配了,那么这个方法不能取消。

*/

void tryCancel() {

UNSAFE.compareAndSwapObject(this, matchOffset, null, this);

}

/**

* 和上一个方法对比,则可以知道,判断match是不是自己。

*/

boolean isCancelled() {

return match == this;

}

// Unsafe 的CAS机制

private static final sun.misc.Unsafe UNSAFE;

private static final long matchOffset;

private static final long nextOffset;

static {

try {

UNSAFE = sun.misc.Unsafe.getUnsafe();

Class k = SNode.class;

matchOffset = UNSAFE.objectFieldOffset

(k.getDeclaredField("match"));

nextOffset = UNSAFE.objectFieldOffset

(k.getDeclaredField("next"));

} catch (Exception e) {

throw new Error(e);

}

}

}

开始看SynchronousQueue源码时候,卡了那么一会,因为说它里面是没有元素的,但是我在这里疑问了好久,既然没有元素,为什么要一个SNode来保存节点信息呢?并且节点还是穿起来的?

当然最终是想通了,SynchronousQueue设定就是一个不能peek, contains等操作的节点,但是里面是有一条链表来保存并发下竞争的线程的。也就是说,整个SynchronousQueue的运行机制也还是通过维护一个链表来实现的。当有并发时,通过判断链表插入一端节点的类型(mode),从而确定是否进行交换。

transfer方法

上面说了它的节点,再看主要的方法,无论Fair或者UnFair,都是通过对transfer 的不同的实现,从而完成相应的功能。

SynchronousQueue里面有个内部抽象类,主要用途是定义规范,里面只有一个方法transfer:

abstract static class Transferer {

abstract E transfer(E e, boolean timed, long nanos);

}

当线程想要交换时候,都会调用这个方法,timed表示是否有超时时间,而e则是用来放交换的数据。

当e为null时候,表示需要获得数据;

当e不为null时候,表示提供数据。

接下来看UnFair的transfer方法:

E transfer(E e, boolean timed, long nanos) {

SNode s = null;

int mode = (e == null) ? REQUEST : DATA; //获取mode,看是拿还是取。

for (;;) { //自旋。

SNode h = head;

if (h == null || h.mode == mode) { // empty or same-mode //空或者相同的模式。

if (timed && nanos

if (h != null && h.isCancelled()) //取消把。

casHead(h, h.next); //弹出已经取消的节点。

else

return null;

} else if (casHead(h, s = snode(s, e, h, mode))) {

//否则,那么就在头节点后面插入这个节点。

SNode m = awaitFulfill(s, timed, nanos); //创建一个,等待中未完成的节点m。

if (m == s) {

//如果已经被取消了,由上面可知,自己指向自己时候就是取消。这里是m=s,即自己指向自己。

clean(s);

return null;

}

if ((h = head) != null && h.next == s)

//因为上面已经将s设置为head,如果满足这个条件说明有其他节点t插入到s前面,

//变成了head,而且这个t就是和s匹配的节点,他们已经完成匹配,所以这个判断最终走到return。

casHead(h, s.next); // //帮忙释放掉s,也就是把s.next插入到h后面。

return (E) ((mode == REQUEST) ? m.item : s.item); //返回相应的item。

}

} else if (!isFulfilling(h.mode)) {

//不是正在进行匹配模式,所以是互补的模式。

if (h.isCancelled()) //如果h取消了,那么就换下一个。

casHead(h, h.next);

else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {

//尝试将当前节点打上"正在匹配"的标记,并设置为head。 并开始匹配

for (;;) { //自旋,直到配对。

SNode m = s.next; // m和s是正在匹配两个节点。

if (m == null) { // 没有等待的了,那么就退出循环。

casHead(s, null); // pop fulfill node

s = null; // use new node next time

break; // restart main loop

}

SNode mn = m.next; //获取m的next节点,如果s和m匹配成功,mn就得补上head的位置了。

if (m.tryMatch(s)) { //用m的下一个用配对s

casHead(s, mn); // pop both s and m //配对成功,那么就弹出m和s,因为head是没有用的,一次弹出两个。

return (E) ((mode == REQUEST) ? m.item : s.item); //返回相应元素。

} else // 没匹配成功的话,说明m可能被其他节点满足了。

s.casNext(m, mn); // 说明m已经被其他节点匹配了,那就把m移除

}

}

} else { // 正在匹配的过程。

SNode m = h.next; // m和h配对

if (m == null) //栈里面没有任何等待者了,其他节点把m匹配走了

casHead(h, null); // pop fulfilling node

else {

SNode mn = m.next; //如果m和h匹配成功,则mn就成为新head了。

if (m.tryMatch(h)) // help match //匹配上了

casHead(h, mn); // 弹出h和m。

else // 被别的线程抢先了,那么就需要解除连接。

h.casNext(m, mn); // help unlink

}

}

}

}

整个过程可以用下面几段文字概括:

如果当前栈为空或者包含与给定节点模式相同的节点,尝试 将节点压入栈内,并等待一个匹配节点,最后返回匹配节点 或者null(如果被取消)。

如果当前栈包含于给定节点模式互补的节点,尝试将这个节点打上FULFILLING标记,然后压入栈中,和相应的节点进行匹配,然后将两个节点(当前节点和互补节点)弹出栈,并返回匹配节点的数据。另外,匹配和删除动作不是必须要做的,因为其他线程会执行动作3

如果栈顶已经存在一个FULFILLING(正在满足其他节点)的节点,帮助这个节点完成匹配和移除(出栈)的操作。然后继续执行(主循环)。这部分代码基本和动作2的代码一样,只是不会返回节点的数据。

awaitFulfill方法

接下来看看awaitFulfill方法,这个方法主要为自旋/阻塞直到节点被匹配:

SNode awaitFulfill(SNode s, boolean timed, long nanos) {

final long deadline = timed ? System.nanoTime() + nanos : 0L; //获得超时时间

Thread w = Thread.currentThread(); //获得当前线程

int spins = (shouldSpin(s) ?

(timed ? maxTimedSpins : maxUntimedSpins) : 0); //判断是否需要自旋,以及自旋的次数。注意有timed这个boolean变量。

for (;;) { //自旋操作。

if (w.isInterrupted()) //当,当前线程中断时候,就取消。

s.tryCancel();

SNode m = s.match; //获取它的match。

if (m != null) //如果m匹配到了,那么返回m

return m;

if (timed) { //再次检测一次timed。

nanos = deadline - System.nanoTime();

if (nanos

s.tryCancel();

continue;

}

}

if (spins > 0) //再次检查spins。

spins = shouldSpin(s) ? (spins-1) : 0;

else if (s.waiter == null)

s.waiter = w; // establish waiter so can park next iter

else if (!timed) //如果没有timed,那么直接park当前线程。

LockSupport.park(this);

else if (nanos > spinForTimeoutThreshold) //带有nanos超时时间的park。

LockSupport.parkNanos(this, nanos);

}

}

clean方法

再看看clean方法:

void clean(SNode s) {

s.item = null; // forget item 把item和waiter都置空。

s.waiter = null; // forget thread

SNode past = s.next; //获得下一个SNode。

if (past != null && past.isCancelled()) //如果past被cancell了,那么就再past一个。

past = past.next;

SNode p; //从头节点开始清除。

while ((p = head) != null && p != past && p.isCancelled())

casHead(p, p.next); //做的事就是,把头节点,链接到下一个节点,节点不能为cancelled。

while (p != null && p != past) {

SNode n = p.next; //在去除链接头节点以后的节点,同样也不能为null。

if (n != null && n.isCancelled())

p.casNext(n, n.next);

else

p = n;

}

}

Fair模式

Fair模式和UnFair模式实现的思路基本一直,不同的是,UnFair模式使用栈(LIFO),而Fair模式则使用队列(FIFO),从而有了公平和非公平之分。

下面看看它的定义:

static final class TransferQueue extends Transferer {

//头节点

transient volatile QNode head;

//尾节点

transient volatile QNode tail;

//保存一个刚刚被清除的节点。用于删除最后一个刚插入的节点。

transient volatile QNode cleanMe;

TransferQueue() {

QNode h = new QNode(null, false); // 空哨兵节点head

head = h;

tail = h;

}

}

QNode

再看看用于存储节点的QNode类:

static final class QNode {

volatile QNode next; // next node in queue //一个next

volatile Object item; // CAS'ed to or from null 一个item

volatile Thread waiter; // to control park/unpark 一个waiter,用来指向线程。

final boolean isData; //一个isDate,判断是否为数据。

/**

* 构造方法

*/

QNode(Object item, boolean isData) {

this.item = item;

this.isData = isData;

}

/**

* 替换下一个节点。

*/

boolean casNext(QNode cmp, QNode val) {

return next == cmp &&

UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);

}

/**

* 替换item

*/

boolean casItem(Object cmp, Object val) {

return item == cmp &&

UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);

}

/**

* 也是把item设为自己。,前一个是把match设为自己。

*/

void tryCancel(Object cmp) {

UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this);

}

boolean isCancelled() {

return item == this;

}

/**

*判断是否,这个几点已经删除了。也就是自己指向自己。

*/

boolean isOffList() {

return next == this;

}

// Unsafe mechanics

private static final sun.misc.Unsafe UNSAFE;

private static final long itemOffset;

private static final long nextOffset;

static {

try {

UNSAFE = sun.misc.Unsafe.getUnsafe();

Class k = QNode.class;

itemOffset = UNSAFE.objectFieldOffset

(k.getDeclaredField("item"));

nextOffset = UNSAFE.objectFieldOffset

(k.getDeclaredField("next"));

} catch (Exception e) {

throw new Error(e);

}

}

}

在Fair模式下使用的TransferQueue,此时并没有用几个标志性状态来标明节点当前状态,因为栈和队列操作方法不同,栈只能在一端进行操作,而队列则是一端进,一端出。

transfer方法

接下来看它的transfer方法:

E transfer(E e, boolean timed, long nanos) {

QNode s = null; // constructed/reused as needed

boolean isData = (e != null); //判断是否为数据,

for (;;) { //自旋操作

QNode t = tail; //分别获取tail和head。

QNode h = head;

if (t == null || h == null) // 还未初始化

continue; // spin 如果t和h都为null,则自旋。

if (h == t || t.isData == isData) { //为null,或者是同样的提供者,比如都是生产者。

QNode tn = t.next;

if (t != tail) //不一致读,被其他线程抢先了。

continue;

if (tn != null) { // 说明其他线程已经添加了新节点tn,但还没将其设置为tail。

advanceTail(t, tn); //帮忙设置tail

continue;

}

if (timed && nanos

return null;

if (s == null) //如果s为null,那么初始化它。

s = new QNode(e, isData);

if (!t.casNext(null, s)) // failed to link in 尝试把s插入到t节点的后面。

continue;

advanceTail(t, s); // 尝试将s设置为队列尾节点。 从尾节点插入。

Object x = awaitFulfill(s, e, timed, nanos); //阻塞,等待被匹配的过程。

if (x == s) { // 如果发现被取消了,那么就调用clean方法。

clean(t, s);

return null;

}

if (!s.isOffList()) { //如果s还在list。

advanceHead(t, s); //把s来替换t,从队头插入嘛。

if (x != null)

s.item = s;

s.waiter = null;

}

return (x != null) ? (E)x : e; //最终返回,x或者,e。

} else { // complementary-mode 互补的模式,刚好遇到了。

QNode m = h.next; // 获得h的next,即能匹配的节点。但是注意是和h.next进行匹配。

if (t != tail || m == null || h != head)

continue; // 不一致读。

Object x = m.item; //获取m的item

if (isData == (x != null) || // m already fulfilled m已经完成了。

x == m || // m cancelled m已经取消

!m.casItem(x, e)) { // lost CAS CAS失败

advanceHead(h, m); // 将h出队,m设置为头结点,并且充实

continue;

}

advanceHead(h, m); //成功的完成了。

LockSupport.unpark(m.waiter); //唤醒m的waiter。

return (x != null) ? (E)x : e; //返回。

}

}

}

transfer方法的基本思路是在一个自旋的过程中尝试两种操作:

1. 如果队列为null,或者尾节点是相同模式的节点(提供或者获取数据),则尝试将节点加入等待的队列,直到被匹配或者取消,并返回匹配到的节点。

否则则是第二种情况。

2. 处于两个匹配的节点,则让其与头结点相匹配(FIFO的公平性),并且尝试将这个节点出队然后返回匹配到的数据。

同时,方法内部考虑到了多线程不一致情况,当出现不一致情况,则continue再次尝试。在循环开始的时候也会做一个非空检测,以避免当前线程看到未初始化的头尾节点。

awaitFulfill方法

TransferQueue里面的awaitFulfill代码与TransferStack里面的基本一致,这里就不多说,贴出代码即可:

Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {

/* Same idea as TransferStack.awaitFulfill */

final long deadline = timed ? System.nanoTime() + nanos : 0L;

Thread w = Thread.currentThread();

int spins = ((head.next == s) ?

(timed ? maxTimedSpins : maxUntimedSpins) : 0);

for (;;) {

if (w.isInterrupted())

s.tryCancel(e);

Object x = s.item;

if (x != e)

return x;

if (timed) {

nanos = deadline - System.nanoTime();

if (nanos

s.tryCancel(e);

continue;

}

}

if (spins > 0)

--spins;

else if (s.waiter == null)

s.waiter = w;

else if (!timed)

LockSupport.park(this);

else if (nanos > spinForTimeoutThreshold)

LockSupport.parkNanos(this, nanos);

}

}

clean方法

TransferQueue的clean方法相对于TransferStack里面的要复杂一些,因为

/**

* 对 中断的 或 等待超时的 节点进行清除操作

*/

void clean(QNode pred, QNode s) {

s.waiter = null; // 清除 thread 引用

/*

* 在程序运行中的任何时刻, 最后插入的节点不能被删除(这里的删除指 通过 CAS 直接删除, 因为这样直接删除会有多删除其他节点的风险)

* 当 节点 s 是最后一个节点时, 将 s.pred 保存为 cleamMe 节点, 下次再进行清除操作

*/

while (pred.next == s) { // 说明没有出现不一致情况,没有被别的线程删除。

QNode h = head;

QNode hn = h.next;

if (hn != null && hn.isCancelled()) { //hn被取消,所以h节点需要向前推一个

advanceHead(h, hn);

continue;

}

QNode t = tail; // 用来和后面进行一致性读写。

if (t == h) //队列为空, 直接返回

return;

QNode tn = t.next;

if (t != tail) // 出现了不一致性,其他的线程改变了tail,重新来

continue;

if (tn != null) {

advanceTail(t, tn); // 把tn设置为tail。

continue;

}

if (s != t) { // 节点 s 不是尾节点, 则 直接 CAS 删除节点(在队列中间进行这种删除是没有风险的)

QNode sn = s.next;

if (sn == s || pred.casNext(s, sn))

return;

}

//走到这里,说明s是队列的尾节点,即要删除的是s

QNode dp = cleanMe; // s 是队列的尾节点, 则需要 cleanMe

if (dp != null) {

QNode d = dp.next; // cleanMe 不为 null, 进行删除删一次的 s节点, 也就是这里的节点d

QNode dn;

if (d == null || //d存在,d部位dp,d没有被删除。

d == dp || // d is off list or d已经不再队列了

!d.isCancelled() || // d not cancelled or d没有被cancel

(d != t && // d not tail and d不是tail节点。

(dn = d.next) != null && // has successor d有后继并且也在队列。 因为最终要删除d。所以需要dn。

dn != d && // that is on list

dp.casNext(d, dn))) // d unspliced 最终这里去删除。

casCleanMe(dp, null); //清除 cleanMe 节点, 这里的 dp == pred 若成立, 说明清除节点s, 成功,

if (dp == pred)

return; // s is already saved node

} else if (casCleanMe(null, pred)) //设置cleanMe节点。

// 原来的 cleanMe 是 null, 则将 pred 标记为 cleamMe 为下次 清除 s 节点做标识

return; // 推迟清楚s

}

}

clean方法复杂的原因是,当需要删除的节点是tail节点时候,不能马上删除该节点,而是将s的前驱节点设为cleanMe节点,下次清理其他

取消节点时候顺便把s移除。

这样的处理方式,相对于直接删除尾节点更加的安全与高效,因为相对于队列中的其他节点,tail节点变化概率最大的。

如果直接将pre.next设为null,这样次数另一个线程新插入(因为实在tail端进行插入的)时候,则会引起不一致情况。而且实现方面clean方法并没有加锁,所以将需要删除的tail节点标志进行下一次删除是多次操作。

另一方面,CAS的标志位cleanMe操作为原子的,而下一次再进行删除时候,如果仍然要删除tail节点,则上一次锁标记的cleanMe节点一定不再是tail节点,则正常进行CAS删除即可。

上述clean方法具体思路如下:

删除的节点不是queue尾节点, 这时 直接 pred.casNext(s, s.next) 方式来进行删除。

删除的节点是队尾节点

此时 cleanMe == null, 则 前继节点pred标记为 cleanMe, 为下次删除做准备

此时 cleanMe != null, 先删除上次需要删除的节点, 然后将 cleanMe至null, 让后再将 pred 赋值给 cleanMe,下一次再进行删除操作。

序列化方法

通看前面整个类代码,SynchronousQueue里面都没有用到锁,但是在序列化方法writeObject和readObject里面,则用到了ReentrantLock,

为什么呢?

因为这个序列化里面,transferer域本身不用序列化,但需要保存transfer里面的内部栈和队列。

@SuppressWarnings("serial")

static class WaitQueue implements java.io.Serializable { }

static class LifoWaitQueue extends WaitQueue {

private static final long serialVersionUID = -3633113410248163686L;

}

static class FifoWaitQueue extends WaitQueue {

private static final long serialVersionUID = -3623113410248163686L;

}

private ReentrantLock qlock;

private WaitQueue waitingProducers;

private WaitQueue waitingConsumers;

private void writeObject(java.io.ObjectOutputStream s)

throws java.io.IOException {

boolean fair = transferer instanceof TransferQueue;

//判断是哪一种方式写入。

if (fair) {

qlock = new ReentrantLock(true);

waitingProducers = new FifoWaitQueue();

waitingConsumers = new FifoWaitQueue();

}

else {

qlock = new ReentrantLock();

waitingProducers = new LifoWaitQueue();

waitingConsumers = new LifoWaitQueue();

}

s.defaultWriteObject();

}

private void readObject(java.io.ObjectInputStream s)

throws java.io.IOException, ClassNotFoundException {

s.defaultReadObject();

if (waitingProducers instanceof FifoWaitQueue)

transferer = new TransferQueue();

else

transferer = new TransferStack();

}

这个类还是比较难的,不像其他的阻塞队列,里面没有用到锁来控制安全性,而是通过CAS的lock-free来保证并发下安全性。

  • 发表于:
  • 原文链接https://kuaibao.qq.com/s/20190211G03P6000?refer=cp_1026
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券