前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >话说AQS

话说AQS

原创
作者头像
木子的昼夜
修改2021-03-10 09:34:14
2510
修改2021-03-10 09:34:14
举报

一、前言

  1. 什么是AQS (AbstractQueuedSynchronizer)翻译过来叫抽象同步队列, 他是除synchronized以外的另一种同步机制
  2. Lock锁的实现 就依赖AQS 后期会写Lock锁的使用及原理
  3. AQS的中心思想是:现场来了看一下共享资源是否空闲,如果共享资源空闲就上锁(修改状态位),等线程执行完业务代码就释放锁(状态位复位),其他线程来 如果共享资源有被上锁(状态位标志位占用),就进入等待队列监控状态位 一旦被复位 就去抢锁(争着修改状态位)
  4. AQS 维护了一个FIFO双向队列 ,FIFO就是先进先出 双向就是有_pre _next 两个指向前后两个节点的属性
双向队列.png
双向队列.png
  1. 总结一句话 3+4 ===》 state+双向队列
  2. AOS 提供了2中方式获取锁资源 一种是:独占锁 如: ReentrantLock 一种是:共享锁 如:CountDownLatch

二、DEMO

AQS 如果没有具体的实现类,DEMO是没有意义的 , 我们先简单看一下里边常用的一些方法吧

1. 排队的时候 我们喜欢偷瞟美女 ,所以我们先看NODE 他是AbstractQueuedSynchronizer的内部类,也就是平时排队的个体(有美女 有帅哥 )
代码语言:txt
复制
  static final class Node {
           // 标记Node为共享模式 
           static final Node SHARED = new Node();
           // 标记Node为独占模式
           static final Node EXCLUSIVE = null;
   
           // waitStatus 的几种状态
           // 当前Node因为超时或中断被取消,进入该状态的节点不会再一次被阻塞
           static final int CANCELLED =  1;
           // node进入队列后 要确保前一个节点为SIGNAL 前一个节点释放的时候需要unpark进入队列的这个节点
           static final int SIGNAL    = -1;
           // 
           static final int CONDITION = -2;
           static final int PROPAGATE = -3;
           // 还有 默认0 无状态
   
         
           volatile int waitStatus;
   
           // 双向队列 指向前一个个Node
           volatile Node prev;
           // 双向队列 指向后一个Nodesss
           volatile Node next;
           // 当前等待线程 !! Node的心脏
           volatile Thread thread;
   
           //  指向下一个在同一个condition里等待的node
           // 或者一个特殊的值:SHARED
           //conditions队列仅仅只能是独占模式  如果是共享模式 我们会用一个特殊的值标记SHARED
           Node nextWaiter;
   
           // 是否是共享模式 
           final boolean isShared() {
               return nextWaiter == SHARED;
           }
   
           // 返回前一个Node
           final Node predecessor() throws NullPointerException {
               Node p = prev;
               if (p == null)
                   throw new NullPointerException();
               else
                   return p;
           }
   		// 初始化head 和 共享标记的时候用
           Node() {  
           }
   		// addWaiter用
           Node(Thread thread, Node mode) {    
               this.nextWaiter = mode;
               this.thread = thread;
           }
   		// Condition 用 
           Node(Thread thread, int waitStatus) {
               this.waitStatus = waitStatus;
               this.thread = thread;
           }
       }
2. 排队的时候大家要试图去问问收银员 是否到自己了 这时候tryAcquire上场了(没有插队)
代码语言:txt
复制
  /**
        * Attempts to acquire in exclusive mode. This method should query
        * if the state of the object permits it to be acquired in the
        * exclusive mode, and if so to acquire it.
        * 尝试以独占锁模式去获取,他会查询state状态,如果允许他以独占锁模式获取 那就获取 
        * <p>This method is always invoked by the thread performing
        * acquire.  If this method reports failure, the acquire method
        * may queue the thread, if it is not already queued, until it is
        * signalled by a release from some other thread. This can be used
        * to implement method {@link Lock#tryLock()}.
        * 如果获取失败,会把当前线程放到队列里 直到这个线程被其他释放线程唤醒 
        * 也就是说你排队呢 拿着手机看抖音小媚媚  前边的人结账完了 回头叫你一声 
        * 嘿!别看了 结账了 
        * <p>The default
        * implementation throws {@link UnsupportedOperationException}.
        *
        * @param arg the acquire argument.
        * 这个值一般情况下都是1 
        * 
        * @return {@code true} if successful. Upon success, this object has
        *         been acquired.
        * 如果获取成功  返回true  
        */
       protected boolean tryAcquire(int arg) {
           throw new UnsupportedOperationException();
       }

什么鬼 ? 就给我抛一个异常 ? AQS只是一个思想,它的类里只有一个流程 没有具体实现 。

什么模式? 是不是跟”模板方法“很像

我们找一个实现吧 ReentrantLock#FairSync#tryAcquire 看一下类图

实现类继承关系.png
实现类继承关系.png
代码语言:txt
复制
 /**
        * Sync object for fair locks
        */
       static final class FairSync extends Sync {
           private static final long serialVersionUID = -3000897897090466540L;
   		//锁方法  
           final void lock() {
               acquire(1);
           }
   
           /**
            * Fair version of tryAcquire.  Don't grant access unless
            * recursive call or no waiters or is first.
            * tryAcquire的公平版,只有是递归调用或者是waiters或者是first的是会后才给获取权限
            * 没有很懂:理解就是满足一定条件(公平)才可以获取锁
            */
           protected final boolean tryAcquire(int acquires) {
               // 获取当前线程
               final Thread current = Thread.currentThread();
               // 获取状态 这个就是两个核心中的一个 : 状态位state
               // 这个直接调用的父类 AQS的getState()方法 
               // 继承的好处: 就是爸爸有的你也有 
               int c = getState();
               // 如果是0 那就是木有人占有 这时候准备修改状态 
               if (c == 0) {
                   // 1.hasQueuedPredecessors:  判断是否有其他线程比当前线程等待时间长 
                   // 2.compareAndSetState : CAS 线程安全的 设置state状态 0->acquires 
                   // 
                   if (!hasQueuedPredecessors() &&
                       compareAndSetState(0, acquires)) {
                       // 设置独占线程为当前线程
                       setExclusiveOwnerThread(current);
                       return true;
                   }
               }
               // 如果state不等于0 看一下占用state的是不是当前线程
               else if (current == getExclusiveOwnerThread()) {
                   // 如果当前线程占用state 那就是重入了 state总数就+acquires
                   int nextc = c + acquires;
                   if (nextc < 0)
                       throw new Error("Maximum lock count exceeded");
                   setState(nextc);
                   return true;
               }
               // 如果没有获取 就返回false 
               return false;
           }
       }
3. tryAcquire是获取锁 也就是去看看能不能到自己呢
代码语言:txt
复制
   比如你去逛街买烧饼,烧饼摊前排起了长龙,你后边看了一眼,轮不到自己,那怎么办 ?
代码语言:txt
复制
   为了爱吃烧饼的女朋友,当然是需要排队了  

AbstractQueuedSynchronizer#acquire

代码语言:txt
复制
   /**
        * Acquires in exclusive mode, ignoring interrupts. 
        *  申请独占锁,忽略中断
        * Implemented  by invoking at least once {@link #tryAcquire},
        * returning on success. 
        * Otherwise the thread is queued, possibly
        * repeatedly blocking and unblocking, invoking {@link
        * #tryAcquire} until success. 
        * 至少调用一次tryAcquire ,如果成功就得了,不成功就把当前线程放到等待队列里 
        * 核心2:双向等待队列出现了
        */
       public final void acquire(int arg) {
           // tryAcquire 看2例子
           // addWaiter(独占方式加入队列) AQS 方法 
           // acquireQueued 这是一个死循环动作, 直到获取锁 也就是买到烧饼
           if (!tryAcquire(arg) &&
               acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
               selfInterrupt();
       }

AbstractQueuedSynchronizer#addWaiter

代码语言:txt
复制
   	/**
        * Creates and enqueues node for current thread and given mode.
        * 你排队了,需要把你包装一下 因为你不再是你,你成了队伍中的一员
        * 现实中可能会给你一个号码 或者给你一个宣传单 或者先给你点儿小吃吃着 
        * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
        * @return the new node
        */
       private Node addWaiter(Node mode) {
           // 包装
           Node node = new Node(Thread.currentThread(), mode);
           // Try the fast path of enq; backup to full enq on failure
           // 获取队列尾部元素 
           Node pred = tail; 
           // 如果尾部元素不是null 那就追加  
           // 下边这个过程 就是队列尾部cas方式追加新元素 tail指向新元素 
           if (pred != null) {X
               // 当前插入的node的前一个node设置为tail 
               node.prev = pred;
               // cas把tail设置为刚添加的元素 
               if (compareAndSetTail(pred, node)) {
                   // 老tail 的下一个元素 指向新node 
                   pred.next = node;
                   return node;
               }
           }
           // 如果是null 那就一般情况下是需要初始化队列  
           enq(node);
           return node;
       }
   
   	/**
        * Inserts node into queue, initializing if necessary
        * 插入node到队列,如果有必要,会进行队列初始化
        * @param node the node to insert
        * @return node's predecessor
        */
       private Node enq(final Node node) {
           // 死等 直到成功为止
           for (;;) {
               Node t = tail;
               // 如果是null 初始化 
               if (t == null) {
                   // 设置一个新Node为head 并且tail也指向这个node 
                   if (compareAndSetHead(new Node()))
                       tail = head;
               } else {
                   // 设置node上一个元素为t(刚添加的空node)
                   node.prev = t;
                   // cas设置tail为node (来排队的人)
                   if (compareAndSetTail(t, node)) {
                       t.next = node;
                       return t;
                   }
               }
           }
       }
代码语言:txt
复制
        /**
        * Acquires in exclusive uninterruptible mode for thread already in
        * queue. Used by condition wait methods as well as acquire.
        * 在queue中的线程以独占并且不中断模式申请锁
        * @param node the node
        * @param arg the acquire argument
        * @return {@code true} if interrupted while waiting
        */
       final boolean acquireQueued(final Node node, int arg) {
           boolean failed = true;
           try {
               // 中断设置为false 
               boolean interrupted = false;
               for (;;) {
                   //获取当前Node的前一个Node 
                   final Node p = node.predecessor();
                   // 如果前一个Node是head,该小强买了 来1个烧饼
                   if (p == head && tryAcquire(arg)) {
                       // 设置小强为head 这样他的下一个人就知道可以轮到他了
                       setHead(node);
                       // 加快gc
                       p.next = null; // help GC
                       failed = false;
                       return interrupted;
                   }
                   // 如果没有获取 接着来 
                   //1. shouldParkAfterFailedAcquire 检查状态  如果状态合适就park
                   //2. parkAndCheckInterrupt 稍等片刻
                   //3. 等着被unpark吧 坐等前边人买完了叫你 
                   if (shouldParkAfterFailedAcquire(p, node) &&
                       parkAndCheckInterrupt())
                       interrupted = true;
               }
           } finally {
               // 失败 因为某种异常结束了 但是没有获取成功 
               // 取消获取锁的这个资格 
               // 比如小月月肚子疼 那就让他走吧 没资格吃烧饼了
               // 这个方法就不看了 主要就是吧小月月的前一个人的next 设置为 小月月的后一个人
               // 然后把小月月的后一个人的prev设置为小月月的前一个人 
               // 类比:链表的删除操作 
               if (failed)
                   cancelAcquire(node);
           }
       }
   
   
   
    /**
        * Checks and updates status for a node that failed to acquire.
        * 检查并设置获取锁失败的node的状态  
        * Returns true if thread should block. This is the main signal
        * 如果node需要阻塞 返回true 
        * control in all acquire loops.  Requires that pred == node.prev.
        * 
        * @param pred  前一个节点
        * @param node  当前加入的节点
        * @return {@code true} 如果可以阻塞 返回true 
        */
       private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
           int ws = pred.waitStatus;
           // 
           if (ws == Node.SIGNAL)
               // SIGNAL 状态  可安全park 
               return true;
           if (ws > 0) {
               /*
                * Predecessor was cancelled. Skip over predecessors and
                * indicate retry.
                * 状态大于0 那就是被取消了 需要往前找一个合法的node
                * 也就是下月月肚子疼要走了,需要让他前边的那个人的后一个人变成小月月后边的那个人
                * 类似于删除链表节点
                */
               do {
                   node.prev = pred = pred.prev;
               } while (pred.waitStatus > 0);
               // 将合法的node的next设置为当前node 
               pred.next = node;
           } else {
               /*
                * waitStatus must be 0 or PROPAGATE.  Indicate that we
                * need a signal, but don't park yet.  Caller will need to
                * retry to make sure it cannot acquire before parking.
                */
               // 设置状态为 为SIGNAL 
               compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
           }
           return false;
       }
   
   
      /**
        * Convenience method to park and then check if interrupted
        * PARK 并检查是否是终端  
        * @return {@code true} if interrupted
        */
       private final boolean parkAndCheckInterrupt() {
           // park 也是阻塞的一种 类比wait  
           LockSupport.park(this);
           // 需要看一下  park过程中有没有被中断
           return Thread.interrupted();
       }
4. new Node() -> tryAcquire ->tryRelease 小强-->排队买到饼-->走开通知下一个人买饼
代码语言:txt
复制
   
   /**
   	* Releases in exclusive mode.  Implemented by unblocking one or
        * more threads if {@link #tryRelease} returns true.
        * This method can be used to implement method {@link Lock#unlock}.
        *
        * @param arg the release argument.  This value is conveyed to
        *        {@link #tryRelease} but is otherwise uninterpreted and
        *        can represent anything you like.
        * @return the value returned from {@link #tryRelease}
        */
   public final boolean release(int arg) {
       if (tryRelease(arg)) {
           Node h = head;
           // 如果head不为空 并且waitStatus不是0 需要通知下一个Node
           // 买完了 告诉后边那个看抖音的人 该他买了
           if (h != null && h.waitStatus != 0)
               unparkSuccessor(h);
           return true;
       }
       return false;
   }
   
   protected final boolean tryRelease(int releases) {
        // state-需要释放的数量
        int c = getState() - releases;
        // 如果占有线程不是当前线程  抛异常  
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        // 
        boolean free = false;
        // 如果c ==0 也就是释放完了 lock lock release release 
        if (c == 0) {
            free = true;
            // 设置占有线程为null 
            setExclusiveOwnerThread(null);
        }
        // 设置状态位 == 0 
        setState(c);
        return free;
    }
   
    /**
        * Wakes up node's successor, if one exists.
        * 如果存在下一个Node的话 唤醒它   
        * @param node the node
        */
       private void unparkSuccessor(Node node) {
         
           // 先设置当前Node状态 为0  也就是说拜拜了 
           int ws = node.waitStatus;
           if (ws < 0)
               compareAndSetWaitStatus(node, ws, 0);
   
           /*
            * Thread to unpark is held in successor, which is normally
            * just the next node.  But if cancelled or apparently null,
            * traverse backwards from tail to find the actual
            * non-cancelled successor.
            */
           // 循环找一个合法的next  
           // 小强买完了看看后那个人是不是合法 万一他是小月月,正闹肚子呢 已经取消了cancelled 
           // 那肯定不唤醒他 让他拉粑粑去吧  接着看下月月的下一个是否合法 直到找见一个合法的 叫他来买饼
           Node s = node.next;
           if (s == null || s.waitStatus > 0) {
               s = null;
               for (Node t = tail; t != null && t != node; t = t.prev)
                   if (t.waitStatus <= 0)
                       s = t;
           }
           // 唤醒 unpark 
           if (s != null)
               LockSupport.unpark(s.thread);
       }

三、 假装学术讨论

  1. 本来挺清楚,看了看代码,感觉进山了,那句话说的好:只缘身在此山中

来一个山的全貌:

整体图.png
整体图.png
  1. 我看完了觉得 其实也没几件事

(1) 尝试获取锁

(2)如果自己是第一个来 初始化列表 获取锁

(3) 如果自己不是第一个来 加入队列等待被叫

(4)被叫醒 执行操作

(5)操作执行完 释放锁 叫下一个人执行

本文有很多图,如果不清楚的话,大家可以关注公众号:木子的昼夜

代码语言:txt
复制
发送"aqs"  即可获得高清图访问地址
代码语言:txt
复制
发送"路线" 即可获得本系列文章大纲 
代码语言:txt
复制
也可发送自己想问的问题给我,我会在看到的第一时间回复
最后附上自己公众号刚开始写 愿一起进步:
公众号二维码.jpg
公众号二维码.jpg

注意: 以上文字 仅代表个人观点,仅供参考,如有问题还请指出,立即马上连滚带爬的从被窝里出来改正。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、前言
  • 二、DEMO
  • 三、 假装学术讨论
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档