前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >SynchronousQueue详解

SynchronousQueue详解

作者头像
忧愁的chafry
发布2022-10-30 16:27:13
3900
发布2022-10-30 16:27:13
举报
文章被收录于专栏:个人技术笔记个人技术笔记

SynchronousQueue介绍

  【1】SynchronousQueue是一个没有数据缓冲的BlockingQueue,生产者线程对其的插入操作put必须等待消费者的移除操作take。

  【2】如图所示,SynchronousQueue 最大的不同之处在于,它的容量为 0,所以没有一个地方来暂存元素,导致每次取数据都要先阻塞,直到有数据被放入;同理,每次放数据的时候也会阻塞,直到有消费者来取。

  【3】需要注意的是,SynchronousQueue 的容量不是 1 而是 0,因为 SynchronousQueue 不需要去持有元素,它所做的就是直接传递(direct handoff)。由于每当需要传递的时候,SynchronousQueue 会把元素直接从生产者传给消费者,在此期间并不需要做存储,所以如果运用得当,它的效率是很高的。

SynchronousQueue的源码分析

  【1】构造函数

代码语言:javascript
复制
//默认采用非公平
public SynchronousQueue() {
    this(false);
}
//可以选择模式
public SynchronousQueue(boolean fair) {
    transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}

【2】核心方法分析

代码语言:javascript
复制
//这些方法本质上都是调用属性值transferer的transfer方法
public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    if (transferer.transfer(e, false, 0) == null) {
        Thread.interrupted();
        throw new InterruptedException();
    }
}

public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    return transferer.transfer(e, true, 0) != null;
}

public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    if (transferer.transfer(e, true, unit.toNanos(timeout)) != null)
        return true;
    if (!Thread.interrupted())
        return false;
    throw new InterruptedException();
}

public E take() throws InterruptedException {
    E e = transferer.transfer(null, false, 0);
    if (e != null)
        return e;
    Thread.interrupted();
    throw new InterruptedException();
}

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    E e = transferer.transfer(null, true, unit.toNanos(timeout));
    if (e != null || !Thread.interrupted())
        return e;
    throw new InterruptedException();
}

public E poll() {
    return transferer.transfer(null, true, 0);
}

s

Transferer分析

  【1】Transferer是SynchronousQueue的内部抽象类,双栈和双队列算法共享该类。他只有一个transfer方法,用于转移元素,从生产者转移到消费者;或者消费者调用该方法从生产者取数据。

  【2】Transferer有两个实现类:TransferQueue和TransferStack。

  【3】这两个类的区别就在于是否公平。TransferQueue是公平的,TransferStack非公平。

  【4】源码展示

代码语言:javascript
复制
// 堆栈和队列共同的接口,负责执行 put or take
abstract static class Transferer<E> {
    // e 为空的,会直接返回特殊值,不为空会传递给消费者
    // timed 为 true,说明会有超时时间
    abstract E transfer(E e, boolean timed, long nanos);
}
TransferQueue分析

  【1】节点元素

代码语言:javascript
复制
//队列节点元素
static final class QNode {
    // 当前元素的下一个元素
    volatile QNode next;          
    // 当前元素的值,如果当前元素被阻塞住了,等其他线程来唤醒自己时,其他线程会把自己 set 到 item 里面
    volatile Object item;         
    // 可以阻塞住的当前线程
    volatile Thread waiter;       
    // 节点类型:true是 put,false是 take
    final boolean isData;         

   ....
}

  【2】构造方法

代码语言:javascript
复制
//队列头结点指针
transient volatile QNode head;
//队列尾结点指针
transient volatile QNode tail;

TransferQueue() {
    QNode h = new QNode(null, false); // initialize to dummy node.
    head = h;
    tail = h;
}

  【3】核心方法

代码语言:javascript
复制
@SuppressWarnings("unchecked")
E transfer(E e, boolean timed, long nanos) {

    QNode s = null; 
    //根据是否传入数据 判断是获取还是存放 
    boolean isData = (e != null);

    for (;;) {
        // 队列头和尾的临时变量,队列是空的时候,t=h
        QNode t = tail;
        QNode h = head;
        // tail 和 head 没有初始化时,无限循环,虽然这种 continue 非常耗cpu,但感觉不会碰到这种情况
        // 因为 tail 和 head 在 TransferQueue 初始化的时候,就已经被赋值空节点了
        if (t == null || h == null)         // saw uninitialized value
            continue;                       // spin
        // 首尾节点相同,说明是空队列
        // 或者尾节点的操作和当前节点操作一致
        if (h == t || t.isData == isData) { // empty or same-mode
            QNode tn = t.next;
            if (t != tail)                  //直至拿到尾节点
                continue;
            if (tn != null) {               // lagging tail
                advanceTail(t, tn);
                continue;
            }
            //超时直接返回 null
            if (timed && nanos <= 0)        // can't wait
                return null;
            //构建新节点
            if (s == null)
                s = new QNode(e, isData);
            //将新建节点塞入队列
            if (!t.casNext(null, s))        // failed to link in
                continue;

            advanceTail(t, s);             
            // 阻塞住自己
            Object x = awaitFulfill(s, e, timed, nanos);
            if (x == s) {                   // wait was cancelled
                clean(t, s);
                return null;
            }

            if (!s.isOffList()) {           // not already unlinked
                advanceHead(t, s);          // unlink if head
                if (x != null)              // and forget fields
                    s.item = s;
                s.waiter = null;
            }
            return (x != null) ? (E)x : e;

        } 
        // 队列不为空,并且当前操作和队尾不一致,也就是说当前操作是队尾是对应的操作
        // 比如说队尾是因为 take 被阻塞的,那么当前操作必然是 put
        else {
            // 也就是这行代码体现出队列的公平,每次操作时,从头开始按照顺序进行操作
            QNode m = h.next;              
            if (t != tail || m == null || h != head)
                continue;                   // inconsistent read

            Object x = m.item;
            if (isData == (x != null) ||    // m already fulfilled
                x == m ||                   // m cancelled
                !m.casItem(x, e)) {         // lost CAS
                advanceHead(h, m);          // dequeue and retry
                continue;
            }
            // 当前操作放到队头
            advanceHead(h, m);           
            // 释放队头阻塞节点
            LockSupport.unpark(m.waiter);
            return (x != null) ? (E)x : e;
        }
    }
}
TransferStack分析

  【1】节点元素

代码语言:javascript
复制
// 栈中节点的几种类型:
// 1. 消费者(请求数据的)
static final int REQUEST    = 0;
// 2. 生产者(提供数据的)
static final int DATA       = 1;
// 3. 二者正在匹配中
static final int FULFILLING = 2;

// 栈中的节点
static final class SNode {
    // 下一个节点
    volatile SNode next;        
    volatile SNode match;       // the node matched to this
    // 等待着的线程
    volatile Thread waiter;    
    Object item;                
    // 模式,也就是节点的类型,是消费者,是生产者,还是正在匹配中
    int mode;
...
}

  【2】核心方法

代码语言:javascript
复制
// TransferStack.transfer()方法
E transfer(E e, boolean timed, long nanos) {
    SNode s = null; // constructed/reused as needed
    // 根据e是否为null决定是生产者还是消费者
    int mode = (e == null) ? REQUEST : DATA;
    // 自旋+CAS
    for (;;) {
        // 栈顶元素
        SNode h = head;
        // 栈顶没有元素,或者栈顶元素跟当前元素是一个模式的
        // 也就是都是生产者节点或者都是消费者节点
        if (h == null || h.mode == mode) {  // empty or same-mode
            // 如果有超时而且已到期
            if (timed && nanos <= 0) {      // can't wait
                // 如果头节点不为空且是取消状态
                if (h != null && h.isCancelled())
                    // 就把头节点弹出,并进入下一次循环
                    casHead(h, h.next);     // pop cancelled node
                else
                    // 否则,直接返回null(超时返回null)
                    return null;
            } else if (casHead(h, s = snode(s, e, h, mode))) {
                // 入栈成功(因为是模式相同的,所以只能入栈)
                // 调用awaitFulfill()方法自旋+阻塞当前入栈的线程并等待被匹配到
                SNode m = awaitFulfill(s, timed, nanos);
                // 如果m等于s,说明取消了,那么就把它清除掉,并返回null
                if (m == s) {               // wait was cancelled
                    clean(s);
                    // 被取消了返回null
                    return null;
                }
                
                // 到这里说明匹配到元素了
                // 因为从awaitFulfill()里面出来要不被取消了要不就匹配到了
                // 如果头节点不为空,并且头节点的下一个节点是s
                // 就把头节点换成s的下一个节点
                // 也就是把h和s都弹出了
                // 也就是把栈顶两个元素都弹出了
                if ((h = head) != null && h.next == s)
                    casHead(h, s.next);     // help s's fulfiller
                // 根据当前节点的模式判断返回m还是s中的值
                return (E) ((mode == REQUEST) ? m.item : s.item);
            }
        } else if (!isFulfilling(h.mode)) { // try to fulfill
            // 到这里说明头节点和当前节点模式不一样
            // 如果头节点不是正在匹配中
            
            // 如果头节点已经取消了,就把它弹出栈
            if (h.isCancelled())            // already cancelled
                casHead(h, h.next);         // pop and retry
            else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
                // 头节点没有在匹配中,就让当前节点先入队,再让他们尝试匹配
                // 且s成为了新的头节点,它的状态是正在匹配中
                for (;;) { // loop until matched or waiters disappear
                    SNode m = s.next;       // m is s's match
                    // 如果m为null,说明除了s节点外的节点都被其它线程先一步匹配掉了
                    // 就清空栈并跳出内部循环,到外部循环再重新入栈判断
                    if (m == null) {        // all waiters are gone
                        casHead(s, null);   // pop fulfill node
                        s = null;           // use new node next time
                        break;              // restart main loop
                    }
                    SNode mn = m.next;
                    // 如果m和s尝试匹配成功,就弹出栈顶的两个元素m和s
                    if (m.tryMatch(s)) {
                        casHead(s, mn);     // pop both s and m
                        // 返回匹配结果
                        return (E) ((mode == REQUEST) ? m.item : s.item);
                    } else                  // lost match
                        // 尝试匹配失败,说明m已经先一步被其它线程匹配了
                        // 就协助清除它
                        s.casNext(m, mn);   // help unlink
                }
            }
        } else {                            // help a fulfiller
            // 到这里说明当前节点和头节点模式不一样
            // 且头节点是正在匹配中
            
            SNode m = h.next;               // m is h's match
            if (m == null)                  // waiter is gone
                // 如果m为null,说明m已经被其它线程先一步匹配了
                casHead(h, null);           // pop fulfilling node
            else {
                SNode mn = m.next;
                // 协助匹配,如果m和s尝试匹配成功,就弹出栈顶的两个元素m和s
                if (m.tryMatch(h))          // help match
                    // 将栈顶的两个元素弹出后,再让s重新入栈
                    casHead(h, mn);         // pop both h and m
                else                        // lost match
                    // 尝试匹配失败,说明m已经先一步被其它线程匹配了
                    // 就协助清除它
                    h.casNext(m, mn);       // help unlink
            }
        }
    }
}

// 三个参数:需要等待的节点,是否需要超时,超时时间
SNode awaitFulfill(SNode s, boolean timed, long nanos) {
    // 到期时间
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    // 当前线程
    Thread w = Thread.currentThread();
    // 自旋次数
    int spins = (shouldSpin(s) ?
                 (timed ? maxTimedSpins : maxUntimedSpins) : 0);
    for (;;) {
        // 当前线程中断了,尝试清除s
        if (w.isInterrupted())
            s.tryCancel();
        
        // 检查s是否匹配到了元素m(有可能是其它线程的m匹配到当前线程的s)
        SNode m = s.match;
        // 如果匹配到了,直接返回m
        if (m != null)
            return m;
        
        // 如果需要超时
        if (timed) {
            // 检查超时时间如果小于0了,尝试清除s
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                s.tryCancel();
                continue;
            }
        }
        if (spins > 0)
            // 如果还有自旋次数,自旋次数减一,并进入下一次自旋
            spins = shouldSpin(s) ? (spins-1) : 0;
        
        // 后面的elseif都是自旋次数没有了
        else if (s.waiter == null)
            // 如果s的waiter为null,把当前线程注入进去,并进入下一次自旋
            s.waiter = w; // establish waiter so can park next iter
        else if (!timed)
            // 如果不允许超时,直接阻塞,并等待被其它线程唤醒,唤醒后继续自旋并查看是否匹配到了元素
            LockSupport.park(this);
        else if (nanos > spinForTimeoutThreshold)
            // 如果允许超时且还有剩余时间,就阻塞相应时间
            LockSupport.parkNanos(this, nanos);
    }
}

// SNode里面的方向,调用者m是s的下一个节点
// 这时候m节点的线程应该是阻塞状态的
boolean tryMatch(SNode s) {
    // 如果m还没有匹配者,就把s作为它的匹配者
    if (match == null &&
        UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
        Thread w = waiter;
        if (w != null) {    // waiters need at most one unpark
            waiter = null;
            // 唤醒m中的线程,两者匹配完毕
            LockSupport.unpark(w);
        }
        // 匹配到了返回true
        return true;
    }
    // 可能其它线程先一步匹配了m,返回其是否是s
    return match == s;
}

SynchronousQueue总结

  【1】是一个没有数据缓冲的BlockingQueue,容量为0,它不会为队列中元素维护存储空间,它只是多个线程之间数据交换的媒介。

  【2】数据结构:链表,在其内部类中维护了数据

      先消费(take),后生产(put);

        第一个线程Thread0是消费者访问,此时队列为空,则入队(创建Node结点并赋值)

        第二个线程Thread1也是消费者访问,与队尾模式相同,继续入队

        第三个线程Thread2是生产者,携带了数据e,与队尾模式不同,不进行入队操作。直接将该线程携带的数据e返回给队首的消费者,并唤醒队首线程Thread1(默认非公平策略是栈结构),出队。

      反之,先生产(put)后消费(take),原理一样

  【3】锁:CAS+自旋(无锁)【阻塞:自旋了一定次数后调用 LockSupport.park()】

  【4】存取调用同一个方法:transfer()

      put、offer 为生产者,携带了数据 e,为 Data 模式,设置到 SNode或QNode 属性中。

      take、poll 为消费者,不携帯数据,为 Request 模式,设置到 SNode或QNode属性中。

  【5】过程

      线程访问阻塞队列,先判断队尾节点或者栈顶节点的 Node 与当前入队模式是否相同

      相同则构造节点 Node 入队,并阻塞当前线程,元素 e 和线程赋值给 Node 属性

      不同则将元素 e(不为 null) 返回给取数据线程,队首或栈顶线程被唤醒,出队

  【6】公平模式:TransferQueue,队尾匹配(判断模式),队头出队,先进先出

  【7】非公平模式(默认策略):TransferStack,栈顶匹配,栈顶出栈,后进先出

  【8】应用场景

      SynchronousQueue非常适合传递性场景做交换工作,生产者的线程和消费者的线程同步传递某些信息、事件或者任务。

      SynchronousQueue的一个使用场景是在线程池里。如果我们不确定来自生产者请求数量,但是这些请求需要很快的处理掉,那么配合SynchronousQueue为每个生产者请求分配一个消费线程是处理效率最高的办法。Executors.newCachedThreadPool()就使用了SynchronousQueue,这个线程池根据需要(新任务到来时)创建新的线程,如果有空闲线程则会重复使用,线程空闲了60秒后会被回收。

代码语言:javascript
复制
Transferer
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2022-10-13,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • SynchronousQueue介绍
  • SynchronousQueue的源码分析
    • Transferer分析
      • TransferQueue分析
      • TransferStack分析
  • SynchronousQueue总结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档