前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >(juc系列)aqs源码学习笔记

(juc系列)aqs源码学习笔记

作者头像
呼延十
发布2021-10-08 17:29:19
3000
发布2021-10-08 17:29:19
举报
文章被收录于专栏:呼延呼延

前言

本文源码基于: JDK13

JUC是Java提供的一个并发工具包,提供了很多并发工具.

本文主要将AQS.

java.util.concurrent.locks.AbstractQueuedSynchronizer.

是一个基类,也可以理解为一个框架.

它提供了对于同步状态的控制,以前线程等待时的FIFO队列.

Fields

AQS的属性.

state

代码语言:javascript
复制
    /**
     * The synchronization state.
     */
    private volatile int state;

核心属性,同步状态. 使用volatile修饰.

与之对应的三个方法:

代码语言:javascript
复制
/**
 * Returns the current value of synchronization state.
 * This operation has memory semantics of a {@code volatile} read.
 * @return current state value
 */
protected final int getState(){
        return state;
        }

/**
 * Sets the value of synchronization state.
 * This operation has memory semantics of a {@code volatile} write.
 * @param newState the new state value
 */
protected final void setState(int newState){
        state=newState;
        }

/**
 * Atomically sets synchronization state to the given updated
 * value if the current state value equals the expected value.
 * This operation has memory semantics of a {@code volatile} read
 * and write.
 *
 * @param expect the expected value
 * @param update the new value
 * @return {@code true} if successful. False return indicates that the actual
 *         value was not equal to the expected value.
 */
protected final boolean compareAndSetState(int expect,int update){
        return STATE.compareAndSet(this,expect,update);
        }

分别提供了get/set方法及CAS的赋值方法.

head

等待队列队头.

tail

等待队列队尾.

head 和 tail 是java.util.concurrent.locks.AbstractQueuedSynchronizer.Node 的实例, 构成了一个双向链表.

Node

Node是为了表达一个等待线程而抽象的数据结构,主要有以下几个属性.

代码语言:javascript
复制
// Node节点所在的等待状态
volatile int waitStatus;

//前置节点
volatile Node prev;

// 后置节点
volatile Node next;

// 在这个节点上的线程
volatile Thread thread;

// 下一个等待的节点
        Node nextWaiter;

他有两种模式,分别为共享模式及独占模式. 对应不同的操作.

其中waitStatus为枚举值,有以下几个值.

代码语言:javascript
复制
/** waitStatus value to indicate thread has cancelled. */
static final int CANCELLED=1;
/** waitStatus value to indicate successor's thread needs unparking. */
static final int SIGNAL=-1;
/** waitStatus value to indicate thread is waiting on condition. */
static final int CONDITION=-2;
/**
 * waitStatus value to indicate the next acquireShared should
 * unconditionally propagate.
 */
static final int PROPAGATE=-3;

Public-Methods

AQS的方法可太多了. 先看一下对外提供的API方法.

众所周知,AQS是为了同步(加锁)而设计的. 那么一定是有获取锁,释放锁的方法的.先从这里切入.

acquire(int arg)

独占式应用,典型的就是ReentrantLock. ReentrantLock源码学习

独占模式的加锁代码.

代码语言:javascript
复制
    /**
 * Acquires in exclusive mode, ignoring interrupts.  Implemented
 * by invoking at least once {@link #tryAcquire},
 * returning on success.  Otherwise the thread is queued, possibly
 * repeatedly blocking and unblocking, invoking {@link
 * #tryAcquire} until success.  This method can be used
 * to implement method {@link Lock#lock}.
 *
 * @param arg the acquire argument.  This value is conveyed to
 *        {@link #tryAcquire} but is otherwise uninterpreted and
 *        can represent anything you like.
 */
public final void acquire(int arg){
        if(!tryAcquire(arg)&&
        acquireQueued(addWaiter(Node.EXCLUSIVE),arg))
        selfInterrupt();
        }

独占模式的获取锁, 并且忽略中断. 至少调用一次tryAcquire.如果成功了就返回.

否则的话将线程加入等待队列,重复的进行tryAcquire. 直到成功为止.

traAcquire(int arg)

这个方法在AQS中是抽象的, protected修饰. 由子类具体进行实现.

它定义的:

独占模式的获取锁, 如果可以获取到,返回成功,如果获取失败,线程应该被放入等待队列.

如果线程已经在等待队列中, 应该是被其他线程唤醒了.

总之: 这个方法是非阻塞的,立即返回的,要么成功加锁,返回true. 要么加锁失败,返回flase. ,之后的操作就不归这个方法管了.

addWaiter(Node node)

private方法,给当前线程创建一个Node并且放入等待队列.

代码语言:javascript
复制
    /**
 * Creates and enqueues node for current thread and given mode.
 *
 * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
 * @return the new node
 */
private Node addWaiter(Node mode){
        Node node=new Node(mode);

        for(;;){
        Node oldTail=tail;
        if(oldTail!=null){
        node.setPrevRelaxed(oldTail);
        if(compareAndSetTail(oldTail,node)){
        oldTail.next=node;
        return node;
        }
        }else{
        initializeSyncQueue();
        }
        }
        }
  1. 创建一个Node.
  2. 如果队尾为空,说明等待队列没有初始化,进行初始化.
  3. 将当前节点设置为新的队尾.
acquireQueued(Node node, int arg)

一个final方法,子类无法重写.

将等待队列中的所有线程,进行获取锁的行为.

代码语言:javascript
复制
    /**
 * Acquires in exclusive uninterruptible mode for thread already in
 * queue. Used by condition wait methods as well as acquire.
 *
 * @param node the node
 * @param arg the acquire argument
 * @return {@code true} if interrupted while waiting
 */
final boolean acquireQueued(final Node node,int arg){
        boolean interrupted=false;
        try{
        for(;;){
final Node p=node.predecessor();
        if(p==head&&tryAcquire(arg)){
        setHead(node);
        p.next=null; // help GC
        return interrupted;
        }
        if(shouldParkAfterFailedAcquire(p,node))
        interrupted|=parkAndCheckInterrupt();
        }
        }catch(Throwable t){
        cancelAcquire(node);
        if(interrupted)
        selfInterrupt();
        throw t;
        }
        }

如果当前节点的前置节点是头结点,说明当前节点是优先级最高的那个.尝试获取锁.

如果当前节点不是优先级最高的,或者获取锁失败了. 调用shouldParkAfterFailedAcquire.

shouldParkAfterFailedAcquire
代码语言:javascript
复制
/**
 * Checks and updates status for a node that failed to acquire.
 * Returns true if thread should block. This is the main signal
 * control in all acquire loops.  Requires that pred == node.prev.
 *
 * @param pred node's predecessor holding status
 * @param node the node
 * @return {@code true} if thread should block
 */
private static boolean shouldParkAfterFailedAcquire(Node pred,Node node){
        int ws=pred.waitStatus;
        if(ws==Node.SIGNAL)
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        return true;
        if(ws>0){
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        do{
        node.prev=pred=pred.prev;
        }while(pred.waitStatus>0);
        pred.next=node;
        }else{
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        pred.compareAndSetWaitStatus(ws,Node.SIGNAL);
        }
        return false;
        }

如果前置节点是SIGNAL.说明前置节点优先级更高,当前线程应该park.

如果前置节点被取消了,扔掉中间的取消节点. 不park.

如果前置节点是其他状态,设置为SIGNAL. 优先级最高. 不park.

不park的原因是再来一次. 检测一遍.

如果当前线程需要被park.则park且检查下是否中断了.

parkAndCheckInterrupt
代码语言:javascript
复制
/**
 * Convenience method to park and then check if interrupted.
 *
 * @return {@code true} if interrupted
 */
private final boolean parkAndCheckInterrupt(){
        LockSupport.park(this);
        return Thread.interrupted();
        }

如果发生异常,则取消掉这次获取锁.

cancelAcquire(Node node)
代码语言:javascript
复制
    /**
 * Cancels an ongoing attempt to acquire.
 *
 * @param node the node
 */
private void cancelAcquire(Node node){
        // Ignore if node doesn't exist
        if(node==null)
        return;

        node.thread=null;

        // Skip cancelled predecessors
        Node pred=node.prev;
        while(pred.waitStatus>0)
        node.prev=pred=pred.prev;

        // predNext is the apparent node to unsplice. CASes below will
        // fail if not, in which case, we lost race vs another cancel
        // or signal, so no further action is necessary, although with
        // a possibility that a cancelled node may transiently remain
        // reachable.
        Node predNext=pred.next;

        // Can use unconditional write instead of CAS here.
        // After this atomic step, other Nodes can skip past us.
        // Before, we are free of interference from other threads.
        node.waitStatus=Node.CANCELLED;

        // If we are the tail, remove ourselves.
        if(node==tail&&compareAndSetTail(node,pred)){
        pred.compareAndSetNext(predNext,null);
        }else{
        // If successor needs signal, try to set pred's next-link
        // so it will get one. Otherwise wake it up to propagate.
        int ws;
        if(pred!=head&&
        ((ws=pred.waitStatus)==Node.SIGNAL||
        (ws<=0&&pred.compareAndSetWaitStatus(ws,Node.SIGNAL)))&&
        pred.thread!=null){
        Node next=node.next;
        if(next!=null&&next.waitStatus<=0)
        pred.compareAndSetNext(predNext,next);
        }else{
        unparkSuccessor(node);
        }

        node.next=node; // help GC
        }
        }
  1. 当前node的thread设置为null.
  2. 扔掉当前节点之前的所有被取消了的节点.
  3. 取消掉当前节点。
  4. 设置尾节点为前一个节点.

release(int arg)

独占式的解锁.

代码语言:javascript
复制
    /**
 * Releases in exclusive mode.  Implemented by unblocking one or
 * more threads if {@link #tryRelease} returns true.
 * This method can be used to implement method {@link Lock#unlock}.
 *
 * @param arg the release argument.  This value is conveyed to
 *        {@link #tryRelease} but is otherwise uninterpreted and
 *        can represent anything you like.
 * @return the value returned from {@link #tryRelease}
 */
public final boolean release(int arg){
        if(tryRelease(arg)){
        Node h=head;
        if(h!=null&&h.waitStatus!=0)
        unparkSuccessor(h);
        return true;
        }
        return false;
        }

调用tryRelease(int arg). 如果解锁成功,唤醒头结点的后继节点. 如果解锁失败, 返回false.

tryRelease(int arg)

解锁操作,由子类负责具体实现,可以后期针对ReentrantLock学习.

这个方法,非阻塞式, 即时返回true/false. 代表是否释放成功.

unparkSuccessor(Node node)

代码语言:javascript
复制
    /**
 * Wakes up node's successor, if one exists.
 *
 * @param node the node
 */
private void unparkSuccessor(Node node){
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws=node.waitStatus;
        if(ws< 0)
        node.compareAndSetWaitStatus(ws,0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s=node.next;
        if(s==null||s.waitStatus>0){
        s=null;
        for(Node p=tail;p!=node&&p!=null;p=p.prev)
        if(p.waitStatus<=0)
        s=p;
        }
        if(s!=null)
        LockSupport.unpark(s.thread);
        }

在等待队列中,从后向前找到正序的第一个需要唤醒的Node. 执行unpark操作.

acquireShared(int arg)

共享锁的相关实现,可以查看CountDownLatch的相关代码. CountDownLatch源码解析

共享模式的获取锁.忽略中断.

至少调用一次TryAcquireShared, 如果成功就返回,失败就将线程加入等待队列. 重复调用TryAcquireShared知道成功.

代码语言:javascript
复制
    /**
 * Acquires in shared mode, ignoring interrupts.  Implemented by
 * first invoking at least once {@link #tryAcquireShared},
 * returning on success.  Otherwise the thread is queued, possibly
 * repeatedly blocking and unblocking, invoking {@link
 * #tryAcquireShared} until success.
 *
 * @param arg the acquire argument.  This value is conveyed to
 *        {@link #tryAcquireShared} but is otherwise uninterpreted
 *        and can represent anything you like.
 */
public final void acquireShared(int arg){
        if(tryAcquireShared(arg)< 0)
        doAcquireShared(arg);
        }

tryAcquireShared(int arg)

抽象方法,由子类负责实现.

如果获取锁成功,直接返回. 如果获取失败,线程加入等待队列,如果线程已经加入,等待被其他人释放锁的动作唤醒.

doAcquireShared(int arg)

代码语言:javascript
复制
    /**
 * Acquires in shared uninterruptible mode.
 * @param arg the acquire argument
 */
private void doAcquireShared(int arg){
final Node node=addWaiter(Node.SHARED);
        boolean interrupted=false;
        try{
        for(;;){
final Node p=node.predecessor();
        if(p==head){
        int r=tryAcquireShared(arg);
        if(r>=0){
        setHeadAndPropagate(node,r);
        p.next=null; // help GC
        return;
        }
        }
        if(shouldParkAfterFailedAcquire(p,node))
        interrupted|=parkAndCheckInterrupt();
        }
        }catch(Throwable t){
        cancelAcquire(node);
        throw t;
        }finally{
        if(interrupted)
        selfInterrupt();
        }
        }
  1. 首先添加一个SHARED模式的节点到等待队列.
  2. 如果当前节点的前置节点是head. 说明当前节点的优先级最高,尝试获取锁. 如果成功,则返回.
  3. 如果当前节点不是优先级最高的,或者获取锁失败了,调用shouldParkAfterFailedAcquire判断是否需要进行park. 如果需要,则park当前线程并检查中断.
  4. 如果不需要park.则自旋. 进行下一次判断,是否需要获取锁.
  5. 如果catch异常,则取消这次获取锁,流程同上面独占模式取消.

releaseShared(int arg)

共享模式的释放锁.

代码语言:javascript
复制
    /**
 * Releases in shared mode.  Implemented by unblocking one or more
 * threads if {@link #tryReleaseShared} returns true.
 *
 * @param arg the release argument.  This value is conveyed to
 *        {@link #tryReleaseShared} but is otherwise uninterpreted
 *        and can represent anything you like.
 * @return the value returned from {@link #tryReleaseShared}
 */
public final boolean releaseShared(int arg){
        if(tryReleaseShared(arg)){
        doReleaseShared();
        return true;
        }
        return false;
        }

非阻塞式的释放锁.调用tryReleaseShared.

如果释放成功,调用doReleaseShared.如果失败,返回false.

tryReleaseShared(int arg)

抽象方法,具体由子类进行实现.

非阻塞式的,返回释放的结果.

doReleaseShared()

代码语言:javascript
复制
    /**
 * Release action for shared mode -- signals successor and ensures
 * propagation. (Note: For exclusive mode, release just amounts
 * to calling unparkSuccessor of head if it needs signal.)
 */
private void doReleaseShared(){
        /*
         * Ensure that a release propagates, even if there are other
         * in-progress acquires/releases.  This proceeds in the usual
         * way of trying to unparkSuccessor of head if it needs
         * signal. But if it does not, status is set to PROPAGATE to
         * ensure that upon release, propagation continues.
         * Additionally, we must loop in case a new node is added
         * while we are doing this. Also, unlike other uses of
         * unparkSuccessor, we need to know if CAS to reset status
         * fails, if so rechecking.
         */
        for(;;){
        Node h=head;
        if(h!=null&&h!=tail){
        int ws=h.waitStatus;
        if(ws==Node.SIGNAL){
        if(!h.compareAndSetWaitStatus(Node.SIGNAL,0))
        continue;            // loop to recheck cases
        unparkSuccessor(h);
        }
        else if(ws==0&&
        !h.compareAndSetWaitStatus(0,Node.PROPAGATE))
        continue;                // loop on failed CAS
        }
        if(h==head)                   // loop if head changed
        break;
        }
        }

共享模式的释放锁操作. 通知后继者并且确保传播.

独占式的解锁,只需要唤醒下一个即可。而共享式的解锁,需要广播解锁消息.

遍历等待队列,将SIGNAL的节点继任者全部唤醒.

完.

参考文章

完。


本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • Fields
    • state
      • head
        • tail
          • Node
      • Public-Methods
        • acquire(int arg)
          • traAcquire(int arg)
          • addWaiter(Node node)
          • acquireQueued(Node node, int arg)
      • release(int arg)
        • tryRelease(int arg)
          • unparkSuccessor(Node node)
          • acquireShared(int arg)
            • tryAcquireShared(int arg)
              • doAcquireShared(int arg)
              • releaseShared(int arg)
                • tryReleaseShared(int arg)
                  • doReleaseShared()
                  • 参考文章
                  领券
                  问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档