首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >[JDK] SynchronousQueue 源码阅读【2】

[JDK] SynchronousQueue 源码阅读【2】

作者头像
架构探险之道
发布2019-07-25 16:31:09
5390
发布2019-07-25 16:31:09
举报

[JDK] SynchronousQueue 源码阅读【2】

—-

手机用户请横屏获取最佳阅读体验,REFERENCES中是本文参考的链接,如需要链接和更多资源,可百度”Yiyuery”获取,多处同步更新:

  • CSDN
  • 简书
  • 个人博客地址 ```

java.util.concurrent.SynchronousQueue

翻译

类注释

  • P5
/** Dual Queue */static final class TransferQueue<E> extends Transferer<E> {   /*
   * This extends Scherer-Scott dual queue algorithm, differing,
   * among other ways, by using modes within nodes rather than
   * marked pointers. The algorithm is a little simpler than
   * that for stacks because fulfillers do not need explicit
   * nodes, and matching is done by CAS'ing QNode.item field
   * from non-null to null (for put) or vice versa (for take).
   */  /** Node class for TransferQueue. */
  //内部的节点类,用于表示一个请求,这里可以看出TransferQueue内部是一个单链表,因此可以保证先进先出
  static final class QNode {       volatile QNode next;          // next node in queue
      volatile Object item;         // CAS'ed to or from null
      volatile Thread waiter;       // to control park/unpark 用于判断是入队还是出队,true表示的是入队操作,也就是添加数据
      final boolean isData;      QNode(Object item, boolean isData) {           this.item = item;           this.isData = isData;
      }       // QNode内部通过volatile关键字以及Unsafe类的CAS方法来实现线程安全
      // compareAndSwapObject方法第一个参数表示需要改变的对象,第二个参数表示偏移量
      // 第三个参数表示参数期待的值,第四个参数表示更新后的值
      // 下面的方法调用的意思是将当前的QNode对象(this)的next字段赋值为val,当目前的next的值是cmp时就会更新next字段成功
      boolean casNext(QNode cmp, QNode val) {           return next == cmp &&
              UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
      }       //方法的原理同上面的类似,这里就是更新item的值了
      boolean casItem(Object cmp, Object val) {           return item == cmp &&
              UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
      }       /**
       * Tries to cancel by CAS'ing ref to this as item.
         方法的原理同上面的类似,这里把item赋值为自己,就表示取消当前节点表示的操作了
       */
      void tryCancel(Object cmp) {
          UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this);
      }       //调用tryCancel方法后item就会是this,就表示当前任务被取消了
      boolean isCancelled() {           return item == this;
      }       /**
       * Returns true if this node is known to be off the queue
       * because its next pointer has been forgotten due to
       * an advanceHead operation.
        表示当前任务已经被返回了
       */
      boolean isOffList() {           return next == this;
      }       // Unsafe mechanics
      private static final sun.misc.Unsafe UNSAFE;       private static final long itemOffset;       private static final long nextOffset;       static {           try {
              UNSAFE = sun.misc.Unsafe.getUnsafe();
              Class<?> k = QNode.class;
              itemOffset = UNSAFE.objectFieldOffset
                  (k.getDeclaredField("item"));
              nextOffset = UNSAFE.objectFieldOffset
                  (k.getDeclaredField("next"));
          } catch (Exception e) {               throw new Error(e);
          }
      }
  }   /** Head of queue 首节点*/
  transient volatile QNode head;   /** Tail of queue 尾部节点*/
  transient volatile QNode tail;   /**
   * Reference to a cancelled node that might not yet have been
   * unlinked from queue because it was the last inserted node
   * when it was cancelled.
   */
  transient volatile QNode cleanMe;   //构造函数中会初始化一个出队的节点,并且首尾都指向这个节点
  TransferQueue() {
      QNode h = new QNode(null, false); // initialize to dummy node.
      head = h;
      tail = h;
  }   /**
   * Tries to cas nh as new head; if successful, unlink
   * old head's next node to avoid garbage retention.
   */
  void advanceHead(QNode h, QNode nh) {       if (h == head &&
          UNSAFE.compareAndSwapObject(this, headOffset, h, nh))
          h.next = h; // forget old next
  }   /**
   * Tries to cas nt as new tail.
   */
  void advanceTail(QNode t, QNode nt) {       if (tail == t)
          UNSAFE.compareAndSwapObject(this, tailOffset, t, nt);
  }   /**
   * Tries to CAS cleanMe slot.
   */
  boolean casCleanMe(QNode cmp, QNode val) {       return cleanMe == cmp &&
          UNSAFE.compareAndSwapObject(this, cleanMeOffset, cmp, val);
  }   /**
   * Puts or takes an item. transfer方法用于提交数据或者是获取数据
   */
  @SuppressWarnings("unchecked")   E transfer(E e, boolean timed, long nanos) {       /* Basic algorithm is to loop trying to take either of
       * two actions:
       *
       * 1. If queue apparently empty or holding same-mode nodes,
       *    try to add node to queue of waiters, wait to be
       *    fulfilled (or cancelled) and return matching item.
       *
       * 2. If queue apparently contains waiting items, and this
       *    call is of complementary mode, try to fulfill by CAS'ing
       *    item field of waiting node and dequeuing it, and then
       *    returning matching item.
       *
       * In each case, along the way, check for and try to help
       * advance head and tail on behalf of other stalled/slow
       * threads.
       *
       * The loop starts off with a null check guarding against
       * seeing uninitialized head or tail values. This never
       * happens in current SynchronousQueue, but could if
       * callers held non-volatile/final ref to the
       * transferer. The check is here anyway because it places
       * null checks at top of loop, which is usually faster
       * than having them implicitly interspersed.
       */      QNode s = null; // constructed/reused as needed
      boolean isData = (e != null);//如果e不为null,就说明是添加数据的入队操作      for (;;) {
          QNode t = tail;
          QNode h = head;           if (t == null || h == null)         // saw uninitialized value
              continue;                       // spin
          //当队列为空的时候或者新加的操作和队尾的操作是同一个操作,可能都是入队操作也可能是出队操作,说明当前没有反向操作的线程空闲      
          if (h == t || t.isData == isData) { // empty or same-mode
              QNode tn = t.next;               //这是一个检查,确保t指向队尾
              if (t != tail)                  // inconsistent read
                  continue;               //tn不为null,说明t不是尾部节点,就执行advanceTail操作,将tn作为尾部节点,继续循环
              if (tn != null) {               // lagging tail
                  advanceTail(t, tn);                   continue;
              }               //如果timed为true,表示带有超时参数,等待超时期间没有其他相反操作的线程提交就会直接返回null
              //这里如果nanos初始值就是0,比如不带超时时间的offer和poll方法,当队尾的节点不是相反操作时就会直接返回null
              if (timed && nanos <= 0)        // can't wait
                  return null;               //如果没有超时时间或者超时时间不为0的话就创建新的节点
              if (s == null)
                  s = new QNode(e, isData);               //使tail的next指向新的节点
              if (!t.casNext(null, s))        // failed to link in
                  continue;               //更新TransferQueue的tail指向新的节点,这样tail节点就始终是尾部节点        
              advanceTail(t, s);              // swing tail and wait
              //如果当前操作是带超时时间的,则进行超时等待,否则就挂起线程,直到有新的反向操作提交
              Object x = awaitFulfill(s, e, timed, nanos);               //当挂起的线程被中断或是超时时间已经过了,awaitFulfill方法就会返回当前节点,这样就会有x == s为true
              if (x == s) {                   // wait was cancelled
                  //将队尾节点移出,并重新更新尾部节点,返回null,就是入队或是出队操作失败了
                  clean(t, s);                   return null;
              }               if (!s.isOffList()) {           // not already unlinked
                  advanceHead(t, s);          // unlink if head
                  if (x != null)              // and forget fields
                      s.item = s;
                  s.waiter = null;
              }               return (x != null) ? (E)x : e;          } else {                            // complementary-mode 提交操作的时候刚刚好有反向的操作在等待
              QNode m = h.next;               // node to fulfill
              if (t != tail || m == null || h != head)                   continue;                   // inconsistent read              Object x = m.item;               //这里先判断m是否是有效的操作
              if (isData == (x != null) ||    // m already fulfilled
                  x == m ||                   // m cancelled
                  !m.casItem(x, e)) {         // lost CAS
                  advanceHead(h, m);          // dequeue and retry
                  continue;
              }               //更新头部节点
              advanceHead(h, m);              // successfully fulfilled
              //唤醒m节点的被挂起的线程
              LockSupport.unpark(m.waiter);               //返回的结果用于给对应的操作,如take、offer等判断是否执行操作成功
              return (x != null) ? (E)x : e;
          }
      }
  }   /**
   * Spins/blocks until node s is fulfilled.
   * 下面看看执行挂起线程的方法awaitFulfill
   * @param s the waiting node
   * @param e the comparison value for checking match
   * @param timed true if timed wait
   * @param nanos timeout value
   * @return matched item, or s if cancelled
   */
  Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {       /* Same idea as TransferStack.awaitFulfill */
      //首先获取超时时间
      final long deadline = timed ? System.nanoTime() + nanos : 0L;       //当前操作所在的线程
      Thread w = Thread.currentThread();       //线程被挂起或是进入超时等待之前阻止自旋的次数
      int spins = ((head.next == s) ?
                   (timed ? maxTimedSpins : maxUntimedSpins) : 0);       for (;;) {            //这里首先判断线程是否被中断了,如果被中断了就取消等待,并设置s的item指向s本身作为标记
          if (w.isInterrupted())
              s.tryCancel(e);
          Object x = s.item;           //x != e就表示超时时间到了或是线程被中断了,也就是执行了tryCancel方法
          if (x != e)               return x;           //这里先判断超时的时间是否过了
          if (timed) {
              nanos = deadline - System.nanoTime();               if (nanos <= 0L) {
                  s.tryCancel(e);                   continue;
              }
          }           //这里通过多几次循环来避免直接挂起线程
          if (spins > 0)
              --spins;           else if (s.waiter == null)
              s.waiter = w;           else if (!timed)
              //park操作会让线程挂起进入等待状态(Waiting),需要其他线程调用unpark方法唤醒
              LockSupport.park(this);           else if (nanos > spinForTimeoutThreshold)
              //parkNanos操作会让线程挂起进入限期等待(Timed Waiting),不用其他线程唤醒,时间到了会被系统唤醒
              LockSupport.parkNanos(this, nanos);
      }
  }   /**
   * Gets rid of cancelled node s with original predecessor pred.
   */
  void clean(QNode pred, QNode s) {
      s.waiter = null; // forget thread
      /*
       * At any given time, exactly one node on list cannot be
       * deleted -- the last inserted node. To accommodate this,
       * if we cannot delete s, we save its predecessor as
       * "cleanMe", deleting the previously saved version
       * first. At least one of node s or the node previously
       * saved can always be deleted, so this always terminates.
       */
      while (pred.next == s) { // Return early if already unlinked
          QNode h = head;
          QNode hn = h.next;   // Absorb cancelled first node as head
          if (hn != null && hn.isCancelled()) {
              advanceHead(h, hn);               continue;
          }
          QNode t = tail;      // Ensure consistent read for tail
          if (t == h)               return;
          QNode tn = t.next;           if (t != tail)               continue;           if (tn != null) {
              advanceTail(t, tn);               continue;
          }           if (s != t) {        // If not tail, try to unsplice
              QNode sn = s.next;               if (sn == s || pred.casNext(s, sn))                   return;
          }
          QNode dp = cleanMe;           if (dp != null) {    // Try unlinking previous cancelled node
              QNode d = dp.next;
              QNode dn;               if (d == null ||               // d is gone or
                  d == dp ||                 // d is off list or
                  !d.isCancelled() ||        // d not cancelled or
                  (d != t &&                 // d not tail and
                   (dn = d.next) != null &&  //   has successor
                   dn != d &&                //   that is on list
                   dp.casNext(d, dn)))       // d unspliced
                  casCleanMe(dp, null);               if (dp == pred)                   return;      // s is already saved node
          } else if (casCleanMe(null, pred))               return;          // Postpone cleaning s
      }
  }   private static final sun.misc.Unsafe UNSAFE;   private static final long headOffset;   private static final long tailOffset;   private static final long cleanMeOffset;   static {       try {
          UNSAFE = sun.misc.Unsafe.getUnsafe();
          Class<?> k = TransferQueue.class;
          headOffset = UNSAFE.objectFieldOffset
              (k.getDeclaredField("head"));
          tailOffset = UNSAFE.objectFieldOffset
              (k.getDeclaredField("tail"));
          cleanMeOffset = UNSAFE.objectFieldOffset
              (k.getDeclaredField("cleanMe"));
      } catch (Exception e) {           throw new Error(e);
      }
  }
}

总结

  • Java 6的SynchronousQueue的实现采用了一种性能更好的无锁算法 — 扩展的“Dual stack and Dual queue”算法。性能比Java5的实现有较大提升。竞争机制支持公平和非公平两种:非公平竞争模式使用的数据结构是后进先出栈(Lifo Stack);公平竞争模式则使用先进先出队列(Fifo Queue),性能上两者是相当的,一般情况下,Fifo通常可以支持更大的吞吐量,但Lifo可以更大程度的保持线程的本地化。

代码实现里的Dual QueueStack内部是用链表LinkedList来实现的,其节点状态为以下三种状态:

    - 持有数据 – put()方法的元素
   - 持有请求 – take()方法
   - 空

这个算法的特点就是任何操作都可以根据节点的状态判断执行,而不需要用到锁。其核心接口是Transfer,生产者的put或消费者的take都使用这个接口,根据第一个参数来区别是入列(栈)还是出列(栈)。

  • SynchronousQueue 中每一个插入操作必须等待一个与之配对的删除操作,反之亦然。SynchronousQueue本身没有容量存储元素,但是它是通过管理提交操作的线程队列来实现阻塞队列的

SynchronousQueue 没有任何内部容量。SynchronousQueue 非常适合于线程之间交换信息,均衡生产者与消费者的处理速率。 典型的应用场景是线程池newCachedThreadPool,从上面的源码可以看出,如果入队操作和出队操作的处理速度相差比较大的话有可能会创建大量线程,有耗尽内存的风险。

  • SynchronousQueue默认无参构造器使用的是 TransferStack ,fair==false,创建一个非公平的同步队列(后进先出stack)

当fair==true,创建一个公平的同步队列,TransferQueue,最先等待的最先释放(先进先出queue)

  • 可以实现控制线程先进先出进行排序,也就是先被挂起的线程先被唤醒,这个内部是通过链表来实现的。SynchronousQueue默认是不保证证唤醒的顺序的
  • 不带超时时间的offer和poll方法不会挂起线程,而take和put方法可能会挂起线程。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-01-30,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 架构探险之道 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • [JDK] SynchronousQueue 源码阅读【2】
    • 翻译
      • 总结
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档