前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Java并发-AbstractQueuedSynchronizer(AQS)JDK源代码分析

Java并发-AbstractQueuedSynchronizer(AQS)JDK源代码分析

作者头像
Fisherman渔夫
发布2020-02-19 11:28:31
7010
发布2020-02-19 11:28:31
举报
文章被收录于专栏:渔夫渔夫

一、概要分析:

1.1 引子

 学习Java并发编程不得不去了解一下java.util.concurrent这个包,这个包下面有许多我们经常用到的并发工具类,例如:ReentrantLock,CountDownLatch,CyclicBarrier, Semaphore等。而这些类的底层实现都依赖于AbstractQueuedSynchronizer这个类,由此可见这个类的重要性。所以在Java并发系列文章中我首先对AbstractQueuedSynchronizer这个类进行分析。为了叙述简单,后续有些地方会用AQS代表这个类。

1.2 AbstractQueuedSynchronizer 的用途

 相信要许多读者使用过ReentrantLock,但是却不知道AbstractQueuedSynchronizer的存在。其实ReentrantLock实现了一个内部类Sync,该内部类继承了AbstractQueuedSynchronizer,所有锁机制的实现都是依赖于Sync内部类,也可以说ReentrantLock的实现就是依赖于AbstractQueuedSynchronizer类。于此类似,CountDownLatch, CyclicBarrier, Semaphore这些类也是采用同样的方式来实现自己对于锁的控制。可见,AbstractQueuedSynchronizer是这些类的基石。那么AQS内部到底实现了什么以至于所以这些类都要依赖于它呢?

 队列同步器AbstractQueuedSynchronizer(以下简称同步器),是用来构建锁或者其他同步组件的基础框架, 它使用了一个int成员变量表示同步状态, 通过内置的FIFO队列来完成资源获取线程的排队工作, 并发包的作者(Doug Lea)期望它能够成为实现大部分同步需求的基础。同步器的主要使用方式是继承,子类通过继承同步器并实现它的抽象方法来管理同步状态, 在抽象方法的实现过程中免不了要对同步状态进行更改,这时就需要使用同步器提供的3个方法(getState()、setState(int newState)和compareAndSetState(int expect,int update))来进行操作,因为它们能够保证状态的改变是安全的。子类推荐被定义为自定义同步组件的静态内部类, 同步器自身没有实现任何同步接口,它仅仅是定义了若干同步状态获取和释放的方法来供自定义同步组件使用, 同步器既可以支持独占式地获取同步状态, 也可以支持共享式地获取同步状态,这样就可以方便实现不同类型的同步组件(ReentrantLock、ReentrantReadWriteLock和CountDownLatch等)。

 同步器是实现锁(也可以是任意同步组件)的关键,在锁的实现中聚合同步器,利用同步器实现锁的语义。可以这样理解二者之间的关系:锁是面向使用者的,它定义了使用者与锁交互的接口(比如可以允许两个线程并行访问),隐藏了实现细节;同步器面向的是锁的实现者,它简化了锁的实现方式, 隐藏了同步状态管理、 线程的排队、 等待与唤醒等底层操作。 锁和同步器很好地隔离了使用者和实现者所需关注的领域。  下图是关于ReentrantLock类(重入锁)的内部继承逻辑,反映了上面这段关于锁和同步器如何结合使用的设计思想,比如说如果你单纯地调用ReentrantLock构造器来构造了一个对象,那么你可能没有认识到其内部的很多逻辑是依靠AQS的子类来实现的,而这一切依靠的都是静态内部类设计思想的魅力,能够简化使用者的使用成本,至少不用学内部实现原理就能快速地调用Lock接口规定的锁方法,但是如果你是一个JDK源码爱好者,你一定不会放过AQS源代码的学习。

在这里插入图片描述
在这里插入图片描述

Lock接口:定义了有关自定义锁结构至少需要的抽象方法

ReentantLock类:是JDK提供的一个自定义锁类,使用此类的构造器进行锁对象的构造。

NonfairSync/FairSync静态内部类:是Sync的子类,分别提供不公平和公平的锁抢占方式

Sync静态内部抽象类:AQS的子类,实现了与重写了部分AQS方法

Condition接口:定义了有关锁下面的通知等待机制;

ConditionObject公共内部类:实现了Condition接口的抽象方法,完成了锁的通知等待机制。

1.3 AbstractQueuedSynchronizer类的成员变量分析

//同步队列的头结点
private transient volatile Node head; 

//同步队列的尾结点
private transient volatile Node tail;

//同步状态
private volatile int state;

//获取同步状态
protected final int getState() {
    return state;
}

//设置同步状态
protected final void setState(int newState) {
    state = newState;
}

//以CAS方式设置同步状态
protected final boolean compareAndSetState(int expect, int update) {
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

 上面的代码列出了AQS的所有成员变量,可以看到AQS的成员变量只有三个:

  1. 同步队列头结点引用 private transient volatile Node head;
  2. 同步队列尾结点引用private transient volatile Node tail;
  3. 同步状态private volatile int state;

 注意,这三个成员变量都使用了volatile关键字进行修饰,这就确保了多个线程对它的修改都是内存可见的。整个类的核心就是这个同步状态,可以看到同步状态其实就是一个int型的变量,同步状态决定了当前锁对象是否允许继续被占用或者,也可以根据锁状态来判断当前同步锁被多少个线程占据了。当然锁的状态根据不同子类的不同具体需求而有所不同,例如在ReentrantLock中,state等于0表示锁是开的,state大于0表示锁是锁着的,且其大小正比于一个线程重复占据一个重入锁的此数,而在Semaphore中,state大于0表示锁是开的,state等于0表示锁是锁着的。

1.4 AbstractQueuedSynchronizer的排队区实现原理:同步、条件队列

AbstractQueuedSynchronizer内部其实有两个排队区,一个是同步队列,一个是条件队列。从上图可以看出,同步队列只有一条,而条件队列可以有多条。其中唯一的同步队列的管理者是AQS对象本身,而条件队列由AQS内部的Conditon接口对象管理,每一个Condition对象都对应着一个由其管理的条件等待队列。同步队列的结点分别持有前后结点的引用(双向队列),而条件队列的结点只有一个指向后继结点的引用(向后指向的单向队列)。

在这里插入图片描述
在这里插入图片描述

 图中T表示线程,每个结点包含一个线程,线程在获取锁失败后首先进入同步队列排队,而想要进入条件队列该线程必须持有锁才行。所以在此类中,节点是线程的存储之处,所以节点和线程在某些语境下是通用的。

 接下来我们看看队列中每个结点的结构(相关域以及方法)。

//同步队列的结点
static final class Node {
    
    static final Node SHARED = new Node(); //表示当前线程以共享模式持有锁
    
    static final Node EXCLUSIVE = null;    //表示当前线程以独占模式持有锁

    static final int CANCELLED =  1;       //表示当前结点已经取消获取锁
    
    static final int SIGNAL    = -1;       //表示后继结点的线程需要运行
    
    static final int CONDITION = -2;       //表示当前结点在条件队列中排队
    
    static final int PROPAGATE = -3;       //表示后继结点可以直接获取锁

    volatile int waitStatus; //表示当前结点的等待状态
   
    volatile Node prev;      //表示同步队列中的前继结点

    volatile Node next;      //表示同步队列中的后继结点  

    volatile Thread thread;  //当前结点持有的线程引用
    
    Node nextWaiter;         //表示条件队列中的后继结点,条件队列中节点没有前继节点

    //当前结点状态是否是共享模式
    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    //返回当前结点的前继结点
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null) {
            throw new NullPointerException();
        } else {
            return p;
        }
    }
    
    //构造器1
    Node() {}
    
    //构造器2, 默认用这个构造器
    Node(Thread thread, Node mode) {
        //注意持有模式是赋值给nextWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }
    
    //构造器3, 只在条件队列中用到
    Node(Thread thread, int waitStatus) {
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

Node代表同步队列和条件队列中的一个结点,它是AbstractQueuedSynchronizer的内部类。Node有很多属性,比如持有模式,等待状态,同步队列中的前继和后继,以及条件队列中的后继引用等等。每个节点都存储着指向一个线程对象的引用变量,所以一定程度上我们完全可以把节点理解为线程对象。

1.5 理解独占模式和共享模式

AQS的内部类Node定义了两个节点Node常量SHARED和EXCLUSIVE,他们分别标识 AQS队列中等待线程的锁获取模式。

java并发包提供的加锁模式分为独占锁和共享锁:

  1. 独占锁模式下,每次只能有一个线程能持有锁,就是以独占方式实现的互斥锁。独占锁是一种悲观保守的加锁策略,它避免了读/读冲突,如果某个只读线程获取锁,则其他读线程都只能等待,这种情况下就限制了不必要的并发性,因为读操作并不会影响数据的一致性。
  2. 共享锁,则允许多个线程同时获取锁,并发访问 共享资源,如:ReadWriteLock。共享锁则是一种乐观锁,它放宽了加锁策略,允许多个执行读操作的线程同时访问共享资源。 java的并发包中提供了ReadWriteLock,读-写锁。它允许一个资源可以被多个读操作访问,或者被一个 写操作访问,但读写操作不能同时进行。

AQS提供了独占锁和共享锁必须实现的方法,具有独占锁功能的子类,它必须实现tryAcquiretryReleaseisHeldExclusively等;共享锁功能的子类,必须实现tryAcquireSharedtryReleaseShared等方法,带有Shared后缀的方法都是支持共享锁加锁的语义。

 独占锁获取锁时,设置节点模式为Node.EXCLUSIVE:

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

 共享锁获取锁,节点模式则为Node.SHARED

     private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        .....
    }

 而上述两个不同模式下获取锁得方式稍后会在各自模式的介绍过程中进行具体讲述。这里只需记住,独占/共享模式不是用来描述AQS对象的,因为AQS没有定义这两者的状态的相关域。独占和共享是节点的性质,节点又和线程有着自然的对应关系,所以独占模式以及共享模式是用来描述线程是如何来占用锁资源的。

1.6 理解结点的等待状态

 我们还看到每个结点都有一个等待状态(节点在等待序列中,自然状态名为等待状态),这个等待状态分为CANCELLEDSIGNALCONDITIONPROPAGATE四种状态:

    volatile int waitStatus; //表示当前结点的等待状态
  1. CANCELLED :当这个线程在排队过程中已经打算放弃了,它就会将自己座位上的牌子设置为CANCELLED,此状态的旧节点在新节点遍历向前找时会被清理出队列。具体见:shouldParkAfterFailedAcquire方法;
  2. SIGNAL :状态为SIGNAL的线程在执行完自己的代码后,退出线程前,回去唤醒下一个在等待队列中的线程/节点。只有保证前面节点的状态为SIGNAL,当前节点才能够保证被唤醒;
  3. CONDITION:表示该线程在条件队列中排队;
  4. PROPAGATE:提醒后面来的线程可以直接获取锁,这个状态只在共享模式用到,后面单独讲共享模式的时候会讲到。

 这几种waitStates状态用表格表示就是:

状态值

状态

说明

1

CANCELLED

取消状态

-1

SIGNAL

等待触发状态

-2

CONDITION

等待条件状态

-3

PROPAGATE

状态需要向后传播(只在共享模式下使用)

 这里与线程池状态判断中是否处于Running状态具有类似的逻辑判断方式,只要判断当前线程的状态值是否小于零,小于则意味着当前线程处于非取消状态,如果大于零,那么意味着线程已经处于取消状态了。

​ 下面举一些AQS子类的一个具体例子,来说明AQS类中等待状态的相关性质:

AQS中有一个state变量,该变量对不同的子类实现具有不同的意义,对ReentrantLock来说,它表示加锁的状态:

  • 无锁时state=0,有锁时state>0;
  • 第一次加锁时,将state设置为1;
  • 由于ReentrantLock是可重入锁,所以持有锁的线程可以多次加锁,经过判断加锁线程就是当前持有锁的线程时(即exclusiveOwnerThread==Thread.currentThread()),即可加锁,每次加锁都会将state的值+1,state等于几,就代表当前持有锁的线程加了几次锁;
  • 解锁时每解一次锁就会将state减1,state减到0后,锁就被释放掉,这时其它线程可以加锁;
  • 当持有锁的线程释放锁以后,如果是等待队列获取到了加锁权限,则会在等待队列头部取出第一个线程去获取锁,获取锁的线程会被移出队列;

1.7 结点进入同步队列时会进行的操作

enq方法会在addWaiter()方法中得到调用,是属于AbstractQueuedSynchronizer类的方法,即:java.util.concurrent.locks.AbstractQueuedSynchronizer#enq

//结点入队操作, 返回前一个结点
private Node enq(final Node node) {
    for (;;) {
        //获取同步队列尾结点引用
        Node t = tail;
        //如果尾结点为空说明同步队列还没有初始化
        if (t == null) {
            //初始化同步队列,并在之后会进入另一个自锁循环
            if (compareAndSetHead(new Node())) {
                tail = head;
            }
        } else {
            //1.指向当前尾结点
            node.prev = t;
            //2.设置当前结点为尾结点
            if (compareAndSetTail(t, node)) {
                //3.将旧的尾结点的后继指向新的尾结点
                t.next = node;
                //for循环唯一的出口
                return t;//注意返回的是之前的尾结点,相当于当前尾结点上一个节点
            }
        }
    }
}
[外链图片转存失败(img-DZvznqk8-1569126340368)(1201370-20180201190926046-468129747.png)]
[外链图片转存失败(img-DZvznqk8-1569126340368)(1201370-20180201190926046-468129747.png)]

static final int SIGNAL = -1; //表示后继结点的线程需要运行

 读者需要注意添加尾结点的顺序,分为三步:

  1. 指向尾结点
    1. 使用CAS方法更改尾结点(CAS为一种并发的策略),t作为一个指向尾结点的引用变量,当其他线程将AQS的尾部域tai改变的时候就会导致t不在指向当前AQS的尾部引用,所以按照CAS操作特性,那么就会使其进入下一个自旋(并且会更改所有涉及t的赋值语句,包括CAS语句之前的,比如:node.prev = t;)。
  2. 将旧尾结点的后继指向当前结点。 注意事项: 在并发环境中这三步操作不一定能保证完成,所以在清空同步队列所有已取消的结点这一操作中,为了寻找非取消状态的结点,不是从前向后遍历而是从后向前遍历的。还有就是每个结点进入队列中时它的等待状态是为0,只有后继结点的线程需要挂起时才会将前面结点的等待状态改为SIGNAL。

1.8 AQS 模型总结

 很多初学者可能对于AQS的用途不是非常了解,甚至认为其是由一个专门进行AQS对象管理的线程所管理的,其内部节点的增删、节点属性的修改都统统由这个线程管理,而其余线程只需要和这个线程进行交互,告诉其进行何种操作。但实际上AQS对象并非由一个线程单独进行管理,而是:一个AQS对象可以被多个线程同时访问,但是此线程的引用一般也储存于AQS对象中的链表数据结构中的节点中,否则AQS不负责这个线程的执行、休眠、和锁资源的分配。当然AQS对象中的所有节点(不管此节点所存线程对象引用是否为当前访问AQS的线程),所有的线程都能够进行访问到,但是有些AQS方法在当前访问AQS对象的线程与被访问的节点中的线程引用不相同则会报错。此模型的示意图如下:

在这里插入图片描述
在这里插入图片描述

 以线程1对象为例,线程1对象引用有存放于AQS对象中,再让线程1的run方法中调用访问AQS对象域或者方法,可以分为以下两种情况:

  1. 访问的是节点1,节点中存储的线程引用就等于当前线程,那么此时相关操作的权限就比较大 node.thread == Thread.getCurrentThread()
  2. 访问的是其他节点,节点中存储的线程引用不等于当前线程,那么此时相关操作的权限就比较小 node.thread != Thread.getCurrentThread(),没有将其引用放置于AQS引用的线程访问AQS对象,也属于此类情况。

 下面举一个相关方法执行权限的大小例子,

java.util.concurrent.locks.ReentrantLock.Sync#tryRelease,此方法用于释放锁:

protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();

 上面是tryRelease方法首先判断的逻辑:是否当前线程不等于锁的拥有者线程,如果不是,那么直接就抛出异常,不再向下运行了。这就是权限不够的一个例子,在ReentrantLock类中想让一个锁成功释放,只能通过占用锁的线程来调用tryRelease()方法,此方法对权限的要求十分严苛。当然并不是所有方法都有关于访问权限的设计,具体情况还是要根据相关方法的源代码来进行判断。

 在分析相关AQS抽象类内部实现逻辑时,要要把节点和线程看作同一件物品。就像并发中使用object.wait()方法时,由于wait方法必须在synchronized(object){...}代码块中,所以导致了在一个时刻中一个对象只能对应一个线程,所以简单的object.wait()调用表明上没有线程参与,但是却使当前占用对象锁的线程释放了锁资源。AQS中的节点也是一样,比如final Node node = addWaiter(Node.SHARED);,以此方式将一个新节点加入旧等队列中,表面上看新节点中连线程引用都没规定,但是隐含的是调用此方法时的线程就是此新节点的线程引用对象,目的就是将当前线程设计为共享模式下的抢占资源。object.wait()方法的实现由上锁机制实现,而后者虽然没有上锁,但是从逻辑上保证了这一一对应的特性。

二、独占模式源码分析

在这里插入图片描述
在这里插入图片描述

2.1 引子

 在上一章中我们介绍了AbstractQueuedSynchronizer基本的一些概念,主要讲了AQS的排队区是怎样实现的,什么是独占模式和共享模式以及如何理解结点的等待状态。

 我们要理清楚一个概念,线程的存在不意味着占用大量的资源,开始运行时,才占用CPU、内存等资源,否则线程/节点存在着,竞争着资源的过程只是一个轻量级的资源消耗(锁消耗的资源:自旋等待>休眠)。独占模式中,线程抢到锁以及没抢到锁,其本质上的区别在于能否有权限执行自己的代码了,而线程/节点除非被移除链表,否则一直是存在着的。

 在本篇中会介绍在独占模式下结点是怎样进入同步队列排队的,以及离开同步队列之前会进行哪些操作。

AQS为独占模式和共享模式下的获取锁分别提供三种获取方式:

  1. 不响应线程中断获取
  2. 响应线程中断获取
  3. 设置超时时间获取;
2.1.1 AQS中引入CAS

 后续我们会看到在AQS中广泛且大量地使用了CAS机制,这样设计的原因是作为一个等待序列是唯一的,但是在同一时刻可能会有多个节点需要加入至队尾,或者同时又多个节点被修改为取消状态,需要被移除链表。使用CAS就能以良好地并发特性来解决这些并发问题。

2.2 以不响应线程中断获取锁的实现逻辑,即acquire方法

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
}

 “不响应线程中断”的含义是:获取锁的整个过程是忽略线程中断的。其具体含义就是说,无论是acquire自己调用selfInterrupt方法引起的中断,还是其余线程致使当前节点中的线程中断,此方法对中断没有采取任何响应,只是简单地将当前中断标志位设置为true。比如:java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireQueued方法中的语句只有try-finally结构,没有catch,所以其甚至不会在控制台上打印出当前线程被中断了的提示。

此方法是独占模式下线程获取共享资源的顶层入口。

  • 如果获取到锁资源,线程直接返回。
  • 否则进入等待队列,直到获取到资源为止,且整个过程 忽略中断 的影响。
  • 注意锁资源的得到不是意味着返回一个锁对象,实际上锁对象的引用地址是很容易拿到的,只是线程(节点)拿到锁对象后,该线程获得了执行的机会,而其他线程在独占模式中只能等待其执行完毕。

方法内部执行逻辑图:

在这里插入图片描述
在这里插入图片描述

由上图可以看到方法执行的流程逻辑,下面用文字来说明:

  1. tryAcquire() 尝试直接获取资源,成功则 state 值被修改并返回 true,线程继续执行,在acquireQueued() 方法中有被调用到。
  2. 获取资源失败,执行 addWaiter() 将包含当前线程的节点加入等待队列的尾部,并标记为独占模式。
  3. acquireQueued() 使线程在等待队列中获取资源,一直获取到资源后才返回(资源的获得必须是头节点中的线程释放锁,且当前线程为头节点之后的节点,当前线程才能在后续中得到锁)。
  4. 如果线程在等待过程中被中断过,它是不响应的。 只是获取资源后才再进行自我中断 selfInterrupt(),将中断补上。 其中判定退出队列的条件是否满足和休眠当前线程完成了 自旋 的过程。 方法名 作用 返回值 tryAcquire() 试图获得唯一的锁资源 boolean值 addWaiter() 在当前等待队列有无队尾的情况下都能将当前节点加入至队列中成为新队尾 返回新队尾节点 acquireQueued() 用一个自旋,始终等待锁的释放以及当前线程/节点运行到头节点后一个节点的队列次序。 boolean值

方法源代码详细学习:

2.2.1.tryAcquirejava.util.concurrent.locks.AbstractQueuedSynchronizer#tryAcquire
protected boolean tryAcquire(int arg) {
     throw new UnsupportedOperationException();
}

方法所实现的功能:

  • 尝试去获取独占锁资源。如果获取成功,则直接返回 true,否则直接返回 false。
  • AQS 只定义了一个接口,具体资源的获取交由其子类去实现(通过 state 的 get / set / CAS)

注意一个设计模式上的细节:

 该方法需要子类来实现,但是却没有使用 abstract 来修饰。是因为 AQS 有独占和共享两种模式,而子类可以只实现其中一种功能,如果使用 abstract来修饰,每个子类都需要同时实现两种功能的方法,对子类不太友好。并且子类一般就是针对于某一个模式下的AQS实现,所以也不必将两种模式的方法都进行重写。

2.2.2 addWaiter(Node),即:java.util.concurrent.locks.AbstractQueuedSynchronizer#addWaiter
private Node addWaiter(Node mode) {
        //以给定模式构造结点。mode 有两种:EXCLUSIVE(独占)和 SHARED(共享)
        Node node = new Node(Thread.currentThread(), mode);
        //尝试快速方式直接放到队尾。
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {//CAS方法
                pred.next = node;
                return node;
            }
        }
        //上一步失败则通过 enq 入队。失败的原因有多种,比如目前队尾tail节点为空,这是需要enq方法的主要原因,
        //因enq方法能够在队尾为空的情况下,进行初始化,将新的节点作为头节点加入,
        //而快速方法只能提供有尾结点存在的情况下,加入线程;
        enq(node);
        return node;
}
......
private Node enq(final Node node) {
        //CAS " 自旋 ",直到成功加入队尾。
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize //队列为空,创建一个空的标志结点作为 head 结点,并将 tail 也指向它。
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else { //正常流程,放入队尾(由于CAS机制的性质,所以需要额外加入这个逻辑判断)
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
}

上述方法中的节点都涉及了两个方向的指向,所以显然这是关于同步队列增加新节点的方法。

自旋特性的体现

  1. 上面的for(;;)是一般配合CAS操作锁采用的自旋,之所以在addWaiter方法中先采用快速加入队尾方法后还在自旋锁中进行放入尾部操作(毕竟enq方法的调用就是为了处理没有队尾的情况)的原因是CAS处理中可能有多个线程同时发现没有队尾,我们对第一个进行了没有队尾时的节点加入操作,调用了enq方法,但是同样调用了CAS方法的后续线程所面对的可是有队尾的链表结构,这时只需要首先快速入队操作就可以了,如果此时在发现CAS方法返回false,那么在调用enq方法进入自旋也不迟。核心思想还是首要确保线程安全性,其次确保执行效率得到提高。
  2. 自旋也是CAS策略的一部分,之所以使用无条件for循环,原因就是在CAS机制中如果同一时刻有多个线程想被插入为当前队列的队尾,那么只能有一个线程设置队尾成功,其余线程都得失败,没有执行上面代码中的return t;语句,for语句的设定,就能保证,总有一次当前线程/节点能够加入到队列中成为队尾。

方法实现的功能:

 用于将当前线程加入到等待队列的队尾,并返回当前线程所在的结点。注意,虽然enq(node)返回的是当前尾结点的前一个节点,但是整个addWaiter方法返回的是当前尾结点,即新加入至尾部的新节点。

  • 生成新 Node 结点 node,如果 tail 结点不为空,则通过 CAS 指令插入到等待队列的队尾(同一时刻可能会有多个 Node 结点插入到等待队列中),并建立前后引用关系。
  • 如果 tail 结点为空,则将 head 结点指向一个空结点
2.2.3 acquireQueued(Node, int),即java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireQueued

此方法的jdk源代码:

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true; //标记是否成功拿到资源,true意味着失败了
        try {
            boolean interrupted = false; //标记等待过程中是否被中断过
            //自旋
            for (;;) {
                final Node p = node.predecessor(); //获得此节点上一个节点,若为空,则直接抛出异常。
                if (p == head && tryAcquire(arg)) {//如果前驱结点 p 是 head,即该结点 node 为第二结点,那么便有资格去尝试获取资源(可能是 p 释放完资源后唤醒,也可能被 interrupt)。
                    setHead(node); //获取资源后,将 head 指向 node。
                    p.next = null; // setHead 中 node.prev 已置为 null,此处再将 p.next 置为 null,就是为了方便 GC 回收以前的 head 结点 p,也就意味着之前拿完资源的结点 p 出队。
                    failed = false;
                    return interrupted; //返回等待过程中是否被中断过
                }
                //如果自己可以休息了,就进入 waiting 状态,直到被 unpark()。
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt()) //如果等待过程中被中断过,哪怕只有那么一次,就将 interrupted 标记为 true。
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
}

源代码逻辑分析:

通过 tryAcquire()addWaiter(),该线程获取资源失败,被放入等待队列尾部。

  • node 插入到队尾后,该线程不会立马挂起,会进行自旋 操作(一直执行如下逻辑过程):
    • 判断该 node 的前一个结点 pred 是否为 head 结点。
    • 如果是,则表明当前结点是队列中第一个有效结点,有最优先的执行权力,只等得到锁的资源了,即判断下面语句是否为真:
    • 根据tryAcquire() 返回的布尔值判断当前线程是否获取到了锁(具体方法有子类实现提供)。
  • 如果成功获取到锁,线程 node 无需挂起。
    • 如果获取锁失败,表示前驱线程还未完成,至少还未修改 state 的值,那么就需要继续等待。
  • 如果判断node前一个节点不是head节点,那么就直接退出当前的逻辑
  • 调用 shouldParkAfterFailedAcquire(),结点进入队尾后,检查状态,找到安全休息点。
  • 调用 parkAndCheckInterrupt() 使当前i线程(节点)进入 waiting 状态,等待 unpark() 或 interrupt() 唤醒。
    1. 被唤醒后,看是否有资格获取资源。如果获得资源,head 指向当前结点,并返回从入队到获得资源的整个过程中是否被中断过。
    2. 如果未获取资源,则重新调用 shouldParkAfterFailedAcquire()
2.2.4 shouldParkAfterFailedAcquire(Node, Node),即:java.util.concurrent.shouldParkAfterFailedAcquire

jdk源码阅读:

  private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
          int ws = pred.waitStatus;  //获得前驱结点状态。
          if (ws == Node.SIGNAL)
              //如果前驱结点状态为等待触发,则进入安全休息点。这是因为前面的节点执行完毕,就会主动地触发下一个节点开始线程执行,
              //所以可以告知调用者这次锁虽然获取失败了,但是可以休息了,但是具体如何休息的方法不归这个方法管。
              //所以这个方法通常和具体执行线程等待代码的语句通过&&逻辑相结合使用
              return true;
          if (ws > 0) {//ws大于0,意味着前面一个节点的状态为取消状态。
              //如果前驱为取消状态,就一直往前找,直到找到最近一个正常等待的状态,并排在它的后面。
              //那些取消状态的结点,由于被当前结点 " 加塞 " 到它们前边,它们相当于形成一个无引用链,稍后会被 GC 回收。
              do {
                  node.prev = pred = pred.prev;
              } while (pred.waitStatus > 0);
              pred.next = node;//这里只改变了节点的指向,但是没有改变节点的顺序。
          } else {
              //如果前驱结点正常,将前驱状态设置成 SIGNAL 等待触发(因为前驱正常状态有三种,不一定为SINGAL),
              //此方法使用CAS方式来进行。
           compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
          }
          return false;
         //代表这一次没符合要求,即前驱状态不为SINNGAL,但因为方法在acquireQueued方法内的自旋中,
         //所以下一次调用这个方法时,就直接返回true了。
  }

方法功能:用于检查状态,看是否真的可以进入 waiting 状态。

  1. 如果 pred 的 waitStatus 为 SIGNAL,则通过 parkAndCheckInterrupt() 方法把当前线程挂起,并等待被唤醒。
  2. 如果 pred 的 waitStatus > 0,表明 pred 的线程状态 CANCELLED,需从队列中删除(查看1.6中对于节点等待状态的取值说明就能理解其中含义)。并将当前节点前最近的非取消状态的节点设置为当前节点的前节点。
  3. 如果 pred 的 waitStatus == 0,则通过 CAS 指令修改 waitStatus 为 SIGNAL。(每个结点的 SIGNAL 状态都是被后一个结点设置的,因为本身其目的就是提示前面节点执行完毕后唤醒后面节点,为后面节点服务的)。
2.2.5 parkAndCheckInterrupt(),即:java.util.concurrent.locks.AbstractQueuedSynchronizer#parkAndCheckInterrupt
private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this); //调用 park() 使线程进入 waiting 状态。
        return Thread.interrupted(); //如果被唤醒,查看自己是不是被中断的。true,意味着线程被中断了。
}

目的:让线程真正进入等待状态。

park() 会让当前线程进入 waiting状态。在此状态下,有两种途径可以唤醒该线程:unpark()interrupt()。而前一个线程唤醒后一个线程此功能需要到线程的执行代码处实现。

需要注意 Thread.interrupted()会 清除当前线程的中断标记位。

 线程每次被唤醒时,都要进行中断检测,如果发现当前线程被中断,则返回true;

 从自旋的整个过程可以看出,并不是被唤醒的线程一定能获得锁,必须调用 tryAccquire()重新竞争,因为锁可能是非公平的(子类实现),有可能被新加入的线程获得,从而导致刚被唤醒的线程再次被阻塞。 如果已经在队列中的线程,必须按照顺序执行(等待前驱结点的相关操作,这是 公平的),非公平是针对那种还没进队列的线程可以和队列中的第一个结点 head 抢占资源。

在这里插入图片描述
在这里插入图片描述
2.2.6 selfInterrupt(),即:java.util.concurrent.locks.AbstractQueuedSynchronizer#selfInterrupt

jdk源代码:

    static void selfInterrupt() {
        Thread.currentThread().interrupt();
    }

根据 acquireQueued() 的结果决定是否执行中断。

acquireQueued() 中的 方法已经执行了中断,这里再执行一次中断的目的在于:

由于上面整个线程一直是挂在for循环的parkAndCheckInterrupt()方法里头,没有成功获取到锁之前不响应任何形式的线程中断,只有当线程成功获取到锁并从for循环出来后,他才会查看在这期间是否有人要求中断线程,如果是的话再去调用selfInterrupt()方法将自己挂起。

2.2.7 cancelAcquire(Node)即,java.util.concurrent.locks.AbstractQueuedSynchronizer#cancelAcquire方法

jdk源代码:

private void cancelAcquire(Node node) {
        // Ignore if node doesn't exist
        if (node == null)
            return;
        //设置该结点不再关联任何线程。
        node.thread = null;

        // 通过前继结点跳过取消状态的 node。
        Node pred = node.prev;
        while (pred.waitStatus > 0)//这里是因为前继节点但也可能是处于取消状态,
            node.prev = pred = pred.prev;
        
        // 获取过滤后的前继结点的后继结点。
        Node predNext = pred.next;
        // 设置状态为取消状态。
        node.waitStatus = Node.CANCELLED;

        // 1.如果当前结点是 tail,尝试更新 tail 结点,设置 tail 为 pred,更新失败则返回,成功则设置 tail 的后继结点为 null。
        if (node == tail && compareAndSetTail(node, pred)) {
            compareAndSetNext(pred, predNext, null);
        } else {
            // 2.如果当前结点不是 head 的后继结点,判断当前结点的前继结点的状态是否为 SIGNAL,如果不是则尝试设置前继结点的状态为 SIGNAL。
            int ws;
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
            } else {
                // 3.如果是 head 的后继结点或者状态判断设置失败,则唤醒当前结点的后继结点。
                unparkSuccessor(node);//unparkSuccessor()中并没有对队列做任何调整
            }
            node.next = node; // help GC
        }
}

上述代码的逻辑提体现了三种在取消节点过中遇到的数据结构(特别是边界值的处理):

结点取消分三种情况:(分类很奇怪,因为不是一头、一尾、一中间),且对应上述代码行1,2,3

  1. 当前结点是 tail

因为 tail 是队列的最后一个结点,如果该结点需要取消,则直接把该结点的前继结点的 next 指向 null,也就是把当前结点移除队列。

下图是第一种情况的示意图:

在这里插入图片描述
在这里插入图片描述

2.当前结点不是 head 的后继结点,也不是 tail。

将 node 的前继结点的 next 指向了 node 的后继结点。

在这里插入图片描述
在这里插入图片描述

3.当前节点是 head 的后继结点。

unpark 后继结点的线程,然后将 next 指向了自己。

在这里插入图片描述
在这里插入图片描述

对head的理解  从setHead()的实现以及所有调用的地方可以看出,在独占模式中,head指向的节点必定是拿到锁(或是竞争资源)的节点,而head的后继节点则是有资格争夺锁的节点,再后续的节点,就是阻塞着的了。head指向的节点,其关联的线程必定已经获取到资源,在执行了,所以head无需再关联到该线程了。head所指向的节点,也无需再参与任何的竞争操作了。  现在再来看node出队时的分类,就好理解了。head既然不会参与任何资源竞争了,自然也就和cancelAquire()无关了。因为head节点必然不是处于取消状态的节点。

2.2.8 方法实现的几个细节

一个重要的疑问:

 既然要删除结点,为什么没有对 prev 进行操作,仅仅是修改了 next。从数据结构上看,双向链表的结构完全没有得到保证?那为何是这般设计?

 因为修改指针的操作都是 CAS 操作,在 AQS 中所有以 compareAndSet 开头的方法都是尝试更新,并不保证成功。  不能用 CAS 操作更新 prev,因为 prev 是不确定的,更新失败有可能会导致整个队列的不完整,例如把 prev 指向一个已经移除队列的 node。另外一个重要原因是(这是我的思路,并且我感觉更为正确),因为cancelAcquire方法中的CAS操作只是用于对于一个节点的设置为取消状态,利用CAS保证其线程安全性是可以做到的,但是shouldParkAfterFailedAcquire方法要前向遍历多个节点,所以这里采用CAS操作就不合适了。

do {
    node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;

shouldParkAfterFailedAcquire方法不使用CAS机制的原因之二:

 因为当前节点在自旋过程中一定被设置为非取消状态,所以这样一来,其余线程在自旋中调用 shouldParkAfterFailedAcquire()方法时,至少会在当前节点时停止遍历。所以当前节点之前的节点不会因为 其他线程的shouldParkAfterFailedAcquire()方法调用而改变相关结构,那么也就意味着当前线程所对应的节点对象node.prev属性只会因为自己的do-while遍历而改变,但是其余线程不会使其改变。这就带来了线程安全性。所以 shouldParkAfterFailedAcquire()方法不使用CAS机制也是线程安全的方法。线程是否安全归根揭底是当前线程所访问/写的域有无机会被其他线程修改,如果没有那么就是线程安全的。

2.3 release(int)(线程释放锁的过程),即java.util.concurrent.locks.AbstractQueuedSynchronizer#release

jdk源码:

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head; //获得头结点。
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h); //唤醒等待队列里的下一个线程。
            return true;
        }
        return false;
}

 此方法是独占模式(exclusive mode)下线程释放共享资源的顶层入口。  释放指定量资源,如果彻底释放(即 state = 0,tryRelease(arg)方法返回true),然后唤醒等待队列里的其他线程来获取资源。  调用 tryRelease()释放资源。根据返回值判断该线程是否已经完成释放资源,自定义同步器在设计时需明确这一点。

2.3.1 tryRelease(int),即java.util.concurrent.locks.AbstractQueuedSynchronizer#tryRelease

jdk源码:

protected boolean tryRelease(int arg) {
     throw new UnsupportedOperationException();
}

功能:尝试去释放指定量的资源。

 跟tryAcquire()一样,该方法需要独占模式的AQS子类实现,因为AQS类此方法只能抛出异常,没有返回值。

 正常情况下,tryRelease()都会成功的,因为是独占模式,该线程释放资源,那么它肯定已经获得独占资源,直接减掉相应量的资源即可(state - = arg),也不需要考虑线程安全问题。  返回值:如果已经彻底释放资源(state = 0),返回 true,否则返回 false。

2.3.2 unparkSuccessor(Node),即java.util.concurrent.locks.AbstractQueuedSynchronizer#unparkSuccessor

jdk源码:

private void unparkSuccessor(Node node) {
        //node 为当前线程所在结点。
        int ws = node.waitStatus;
        if (ws < 0) //如果当前的状态为非取消状态,那么将其状态置零
            compareAndSetWaitStatus(node, ws, 0);

        Node s = node.next; //找到下一个需要唤醒的结点 s。
        if (s == null || s.waitStatus > 0) { //如果为null或取消状态,那么执行下面语句
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0) // <=0 的结点,都是有效结点。
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread); //唤醒
}

 功能:用于唤醒等待队列中下一个线程,此线程为输入节点参数的下一个节点。  执行流程:

  1. 如果输入节点的 waitStatus 值为非取消状态(>0),则用 CAS 指令重置为 0,而一般唤醒操作是head节点中的线程将要执行完毕时进行的,所以一般输入节点为当前等待队列的head节点。所以下面就将输入节点等价称呼为头节点。
  2. 尾部向前遍历找到离尾部最近且其waitStatus域小于0的节点,至于为什么从尾部开始向前遍历,通过LockSupport.unpark(s.thread)唤醒线程。

 注意事项:

 可以看到头节点的更改在释放锁,并唤醒下一个线程的过程中没有进行更改,但是为了当前抢占到锁的节点为头节点以保证下一个tryRelease方法的正确执行,头节点必须更新,但是这里为何没有此操作?答案是:不需要。因为我们只是在这里释放锁,且唤醒下一个节点。但是被唤醒的节点并不一定有机会能抢占到锁,就像我在2.2.5小节中所说的那样,被唤醒的节点可能没能抢过新加入AQS对象的节点。就是因为被唤醒节点的锁抢占的不确定性,所以head的更新应当放置于tryAcqure()方法中进行实现。所以总结一点:节点被唤醒不等于节点抢占到资源。

2.4 独占模式AQS子类的具体代码案例

 在上述对AQS类分析的过程中,提到了tryAcquire,即尝试获得线程锁的方法必须得由继承于AQS的子类实现,其为获得锁最关键的实现,所以我们有必要对其做一个举例,看看具体的子类是如何实现此方法的。

 以下方法选取自java.util.concurrent.locks.ReentrantLock.FairSync#tryAcquire.而ReentrantLock又是AQS类在独占模式下的一个实现。

protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {// 初次获取锁
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }// 重入获取锁
    else if (current == getExclusiveOwnerThread()) {
    	int nextc = c + acquires;// 重入时直接将状态值+1
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
        }
    return false;
}

 其中最为关键的代码就是setExclusiveOwnerThread(current);方法,其将当前线程设置为独占锁的线程。

三、共享模式

3.1 acquireShared(int)(线程获取锁的过程),即:java.util.concurrent.locks.AbstractQueuedSynchronizer#tryAcquieShared

 其获取锁的过程比独占模式的获取锁方法acquire()方法多了一个Shared后缀。

 jdk源码:

public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);//如果获取资源失败,通过此方法进如等待序列,继续尝试资源获取
}

 获取指定量的资源,获取成功则直接返回,获取失败则进入等待队列,直到获取到资源为止,整个过程忽略中断。

acquireShared() 的流程:(其还是一个尝试获取资源的方法)

  1. 成功得到资源,则方法调用直接结束。
  2. 获取资源失败则通过 doAcquireShared() 进入等待队列,直到获取到资源为止才返回。

 这里对比一下独占锁的acquire方法的源代码:

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
}

 可见共享或者独占的acquire() 的实现逻辑十分相似。

3.1.1 tryAcquireShared(int)方法,即:java.util.concurrent.locks.AbstractQueuedSynchronizer#tryAcquireShared

jkd源代码:

    protected int tryAcquireShared(int arg) {
        throw new UnsupportedOperationException();
    }

由此可见,共享模式下获取锁的方法也是必须由子类进行具体实现,不过区别是独占模式中acquire()方法返回布尔值,代表获得成功或失败;而在共享模式中,acquireShared()方法返回的是int类型值,并且有一下三种取值方式:

  1. 负数 代表获取失败。
  2. 0 代表获取成功,但没有剩余资源,其它线程不能够继续获取
  3. 正数 代表获取成功,还有剩余资源,其他线程还可以去获取

不过如何去判断资源是否有剩余,需要由子类继承AQS类后对本方法进行重写,决定其逻辑。

3.1.2 doAcquireShared(int)方法,即:java.util.concurrent.locks.AbstractQueuedSynchronizer#doAcquireShared

jdk源码:

private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED); //加入队列尾部,SHARED 模式。
        boolean failed = true; //是否成功标志
        try {
            boolean interrupted = false; //等待过程中是否被中断过的标志
            for (;;) {
                final Node p = node.predecessor(); //获得前驱结点
                if (p == head) { //如果结点为 head 结点的下一个,因为 head 是拿到资源的线程,此时 node 被唤醒,很可能是 head 用完资源来唤醒自己的。
                    int r = tryAcquireShared(arg); //尝试获取资源
                    if (r >= 0) { //获取资源成功
                        setHeadAndPropagate(node, r); //将 head 指向自己,如果还有剩余资源可以再唤醒之后的线程。
                        p.next = null; // help GC
                        if (interrupted) //如果等待过程中被打断过,此时将中断补上。
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                //判断状态,寻找安全点,进入 waiting 状态,等着被 unpark() 或 interrupt()。
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
}

 此方法用于将当前线程加入等待队列尾部休息,直到其他线程释放资源唤醒自己,自己成功拿到相应量的资源后才返回。

 此方法跟独占模式的acquireQueued比,很多处理是十分相似的:

 比如addWaiter()shouldParkAfterFailedAcquire(p, node)parkAndCheckInterrupt()方法等调用几乎是没区别的,以及都是只有当前结点 node 的前驱结点是 head 时,才会去尝试获取资源。资源有剩余的情况再去唤醒之后的结点。

​ 又有一些不同之处:

  1. 共享模式下方法的调用呈现更加的集成化,比如addWaiter()方法直接放置于doAcquireShared方法内部,而独占模式是作为参数放置于方法体参数中;
  2. 共享模式多了一个资源数的判断,允许了多个线程可以同时运行,即:
    1. 独占模式下的 tryRelease() 需要在完全释放掉资源(state = 0)后,才会返回 true 去唤醒其他线程,主要是基于独占下可重入的考量。
    2. 共享模式下的 releaseShared() 没有这种要求,共享模式实质就是控制一定量的线程并发执行,只要拥有资源的线程在释放掉部分资源后就可以唤醒后继等待结点。
3.1.3 setHeadAndPropagate(Node, int),即:java.util.concurrent.locks.AbstractQueuedSynchronizer#setHeadAndPropagate

jdk源代码:

private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; //获得 head 结点
        setHead(node); //head 指向当前结点
        // 如果还有剩余量,继续唤醒下一个结点
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();//这里用于后续节点的唤醒操作
        }
}

此方法比在独占模式中调用的setHead()方法的多了一步,即:当前节点苏醒的同时,如果条件符合(还有剩余资源),还会去唤醒后继结点,而唤醒操作是在下面3.1.6介绍的方法:doReleaseShared();

3.2 releaseShared()(线程释放锁的过程),即:

java.util.concurrent.locks.AbstractQueuedSynchronizer#releaseShared

jdk源代码:

public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();//这里用于后续节点的唤醒操作
            return true;
        }
        return false;
}

 此方法是共享模式下线程释放共享资源的顶层入口。

 会释放指定量(int arg)的资源,如果成功释放且允许唤醒等待线程,会唤醒等待队列里的其他线程来获取资源。

 例如,假设一种情况:有三个具有不同线程数的进程需要执行,分别消耗的资源总量是:A(5)和 B(7)、C(4);

一共的资源总数是13,先使A、B先进行进程的运行,然后C进程也争夺资源了,但是C发现资源只有1,不够自己所需的数目:4,所以C进程过来后进行了等待。接着,A 在运行过程中释放掉 2 个资源量,然后 tryReleaseShared(2) 返回 true 唤醒 C,C只有 3 个资源量仍不够,继续等待;随后 B 又释放 2 个资源量,tryReleaseShared(2) 返回 true 唤醒 C,C 发现资源量 5 个足够自己使用,然后 C 就可以跟 A 和 B 一起运行。  ReentrantReadWriteLock 读锁的 tryReleaseShared() 只有在完全释放掉资源(state = 0)才返回 true,所以自定义同步器可以根据需要决定 tryReleaseShared() 的返回值,就如下面所示:

3.2.1 tryReleaseShared(int),即:java.util.concurrent.locks.AbstractQueuedSynchronizer#tryReleaseShared

jdk源码:

 protected boolean tryReleaseShared(int arg) {
        throw new UnsupportedOperationException();
    }

此方法类似于独占模式下锁的释放方法,即:tryRelease(int);

3.2.2 doReleaseShared(),即:java.util.concurrent.locks.AbstractQueuedSynchronizer#doReleaseShared

jdk源代码:

     // 会唤醒等待共享锁的线程
    private void doReleaseShared() {
        for (;;) {
            // 将同步队列头赋值给节点h
            Node h = head;
            // 如果节点h不为null,且不等于同步队列尾
            if (h != null && h != tail) {
                // 得到节点h的状态
                int ws = h.waitStatus;
                // 如果状态是Node.SIGNAL,就要唤醒节点h后继节点的线程
                if (ws == Node.SIGNAL) {
                    // 将节点h的状态设置成0,如果设置失败,就继续循环,再试一次。
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    // 唤醒节点h后继节点的线程
                    unparkSuccessor(h);
                }
                // 如果节点h的状态是0,就设置ws的状态是PROPAGATE。
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            // 如果同步队列头head节点发生改变,继续循环,
            // 如果没有改变,就跳出循环
            if (h == head)
                break;
        }
    }

 根据当前队列头节点的状态,在唤醒下一个节点之后(调用了unparkSuccessor(h);方法)对头节点状态做出相关改变。

四、独占模式与共享模式中AQS方法调用逻辑总结

4.1 AQS独占模式方法分析

 独占模式下获得锁的方法

方法名

作用/地位

acquire(int arg)

独占模式中的使线程获得锁的最顶层方法

tryAcquire(int arg)

尝试获得锁,这是线程获得锁的关键步骤与核心方法,且必须得被相关子类方法重写实现。

acquireQueued(Node, int)

此方法用于在节点/线程没有抢占到锁资源时,拉入队列,并确保它能够在将来某个时刻能够抢占到资源;方法的返回值是当前线程有无被中断过的布尔值。

addWaiter(Node)

其功能是将当前线程节点存入队列且置于队尾,方法的返回值是新队尾。返回值作为参数传入acquireQueued方法中,是acquireQueued方法将节点放入队列的实际实现

shouldParkAfterFailedAcquire(Node pred, Node node)

返回是否可以将当前节点挂起的布尔值,其判决条件是当前节点的前面所有节点是否有不处于取消状态的节点,有则会将前面节点状态设置为SIGNAL,返回true

parkAndCheckInterrupt()

其在判决当前节点可以挂起后进行线程挂起操作(即只有在上一个方法返回true时,此方法才会被调用),如果线程可以通过被唤醒、中断而停止挂起,其返回值是是否被中断的布尔值。

cancelAcquire(Node)

是当前线程节点状态设置为取消状态,并且把当前取消状态的线程节点从等待序中移出

unparkSuccessor(Node)

用于唤醒当前线程节点的下一个节点,独占模式下,头节点将锁资源主动或被动地释放后,就会将其下一个节点唤醒,让其进入自旋,进行资源的抢占(抢占不一定成功)。

selfInterrupt( )

当前线程获得资源后的补充中断

 下图则是独占模式下一个节点能走的最长流程,并非所有节点都能够走完这个流程(比如说不是所有加入的节点会被设置为取消状态,即有调用cancelAcquire(Node)方法),注意流程图的箭头指向主要是指时间顺序,一个方法位于另一个方法中则说明其被另一个方法调用了。

在这里插入图片描述
在这里插入图片描述

 独占模式下释放锁的相关方法:

方法名

作用/地位

release(int)

此方法是独占模式(exclusive mode)下线程释放共享资源的顶层入口

tryRelease(int)

尝试去释放指定量的资源

unparkSuccessor(Node node)

用于唤醒当前线程节点的下一个节点。由于独占模式下占得资源的只能是head节点,所以将头节点所占资源释放后才进行紧挨着头节点的节点唤醒工作。

[外链图片转存失败(img-ZBIvf1iW-1569126340372)(%E7%BB%98%E5%9B%BE2.png)]

4.2 AQS共享模式方法分析

 共享模式下获得锁的相关方法

方法名

作用/地位

acquireShared(int)

此方法是共享模式下获取资源的最顶层方法

tryAcquireShared(int arg)

此方法是共享模式下获得锁资源的核心方法,其必须得由AQS子类方法重写。

doAcquireShared(int arg)

此方法用于将当前线程加入等待队列尾部休息,直到其他线程释放资源唤醒自己,使当前线程在将来某个时刻得到资源。

setHeadAndPropagate(Node node, int propagate)

首先将节点苏醒,其次,如果条件符合(还有剩余资源),还会去唤醒后继结点

addWaiter(Node)

其功能实现和独占模式下同名方法是一样的

selfInterrupt()

其功能实现和独占模式下同名方法是一样的

shouldParkAfterFailedAcquire(Node pred, Node node)

其功能实现和独占模式下同名方法是一样的

parkAndCheckInterrupt()

其功能实现和独占模式下同名方法是一样的

cancelAcquire(Node)

其功能实现和独占模式下同名方法是一样的

 下图则是共享模式下一个节点能走的最长流程,并非所有节点都能够走完这个流程(比如说不是所有加入的节点会被设置为取消状态,调用cancelAcquire(Node)),注意流程图的箭头指向主要是指时间顺序,一个方法位于另一个方法中则说明其被另一个方法调用了。

在这里插入图片描述
在这里插入图片描述

 共享模式下释放锁的方法

方法名

作用/地位

releaseShared(int arg)

此方法是共享模式下线程释放共享资源的顶层入口。

doReleaseShared()

根据当前队列头节点的状态,在唤醒下一个节点之后(调用了unparkSuccessor(h);方法)对头节点状态做出相关改变。

unparkSuccessor(h);

此方法和独占模式下同名方法相同,目的都是唤醒下一个节点。

在这里插入图片描述
在这里插入图片描述

4.3 AQS设计模式分析

4.3.1 从AQS继承角度看设计模式

 你或许会这么想当然地认为:AQS类中即写了acquire方法,有些了acquireShared方法,它们用于线程获取资源,同时也规定了当前线程/节点获取资源的方法是独占模式还是共享模式。那么我现在提出一个问题,如果你能回答正确说明你已将AQS类学得不错了:如果在同一个AQS对象中有一个节点元素被设置为独占模式,其余节点的模式又都被设计共享模式,那么独占模式下的那单个节点究竟何时会抢占到资源,或者换个问题,独占模式下节点得到了资源,那么共享模式下的节点还能抢占到资源吗?

 如果你有上述问题,那么你一定是对AQS设计模式产生一定思考了。事实上,Java的开发者早就知道上述问题不好回答,也不必回答,因为上面的问题在AQS设计模式下根本就不存在,即一个AQS对象中不会出现两个模式的系节点。那AQS类是通过何种设计方式实现这类性质的呢?接下来下面我们来看看关于AQS类的设计模式:

  1. AQS是抽象类不能够用来创建对象,所以其能够提供独占以及共享两种模式的获得资源的方法:acquire以及acquireShared。但是不能通过抽象化类来创建对象,所以不能调这两个方法,不会出现两个模式下的节点;
  2. AQS类中将tryAcquire以及TryAcquireShared设计为不是抽象的方法,这样一来子类只需重写其中一种方法即可。若方法都为抽象,那么就会导致子类必须重写AQS的这两个方法,缺一不可。
  3. 独占模式/共享模式下的AQS子类的方法只会调用AQS对应模式下的方法其提供配套的try-语句重写实现,比如:ReentrantLock类为独占模式,其只调用AbstractQueuedSynchronizer了的acquire方法,重写了与之配套使用的tryAcquire()方法。
  4. AQS类的子类很多作为其他类的静态内部类比如:java.util.concurrent.locks.ReentrantLock.Sync,但是java.util.concurrent.locks.ReentrantLock本身并没有继承AQS,这样一来就保持了外部线程不能直接访问acquire方法(因为内部类并非public修饰),这样就保证了封装上的安全性。比如我们就不能合法地调用reentantLock.Sync
4.3.2 从AQS类得中断设计调度看设计模式

 我们之前常说AQS类的中断是”不响应线程中断”,那么其究竟是如何实现的呢?现在让我们以独占模式为例来探究一番。

 之前我们有说到java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireQueued方法是acquire中的关键实现,其功能是:用于在节点/线程没有抢占到锁资源时,拉入队列,并确保它能够在将来某个时刻能够抢占到资源。而AQS中断模式就是通过:acquireQueued方法中所调用的parkAndCheckInterrupt方法以及在acquire方法中调用的selfInterrupt来实现的,下面开始详细介绍两者之间的联系。

 首先,再次贴出parkAndCheckInterrupt方法的jdk源代码:

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);//阻塞当前线程
    return Thread.interrupted();//返回线程中断状态并复位中断状态
}

 其作用为:阻塞当前线程,然后在阻塞结束后,返回线程的中断状态并复位中断状态。

 其次,再次贴出acquire方法的源代码:

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

 我们可以看到如果线程之前出现过中断,acquireQueued方法返回true,那么接下来才有可能进一步执行selfInterrupt方法。所以我们下面再贴出selfInterrupt方法的源代码:

  static void selfInterrupt() {
        Thread.currentThread().interrupt();
    }

 所以我们可以最终得到了以下代码的完整执行流程:

线程被中断-> 线程挂起阻塞 -> 线程被唤醒,返回中断值标志true,并恢复线程中断标志位 -> 主动使线程再次中断

 第一眼看这样的中断设计模式,显然会感觉到很奇怪,因为为何先中断处理再恢复之后再进行中断。这使要从park方法以及wait方法的区别说起。如果你学习过Java-并发编程的话一定会对于wait--notify/notifyAll方法会有所学习,我们可以看到使用park--unpark休眠/唤醒和前者所实现的功能是差不多的:都是阻塞当前线程继续向前运行,然后等待其它线程来对休眠线程进行唤醒,但是它们是有区别的,具体区别如下:

  1. wait--notify/notifyAll方法是继承于Object类的方法对,所有对象都能调用此方法,而park--unpark这一方法对是Unsafe类中的本地方法;
  2. 如果线程在中断状态下调用wait方法,那么线程会抛出相关异常,但是调用park方法不会抛出异常。但是要注意,它们在线程中断状态下使用都会导致阻塞作用失效,就wait方法来说,其在方法抛出异常之后还是会继续执行接下来的代码;而park方法则是不抛出异常的情况下继续执行接下来的代码;
  3. 如果在阻塞状态下调用中断方法,wait致使的阻塞状态会导致线程的抛出异常,park方法不会使当前线程阻塞也不会导致线程的抛出异常,而是继续执行接下来的代码;
  4. park-unpark方法对的灵活性更高。如果先调用unpark方法,那么下一次park方法将会不起到作用,而wait方法则没有此性质。(此特性保证了AQS在线程调度方面的效率,在独占模式中,如果前一个节点刚好执行完毕,调用了唤醒下一个节点线程的方法unpark,那么此时当前线程节点就不会进入阻塞状态,而是会快速地去选择抢占资源)

线程状态

中断

非中断

调用park方法

当前线程没有被阻塞,无须唤醒就直接执行return语句,使当前线程中断标志位恢复为false的同时使parkAndCheckInterrupt方法返回true;最终导致acquireQueued中的中断标志位interrupted = true;且使acquireQueued方法也返回true;在调用park方法方法后,线程仍会进入自旋,如果没有再次中断,下次park方法会成功阻塞线程。

当前线程被阻塞,

在调用park后调用unpark方法

由于上一个park方法并未成功将线程阻塞,所以这里调用相当于额外给了一个"permit",这样一来相当于下次park方法调用时也不会阻塞线程

调用unpark方法唤醒线程后,parkAndCheckInterrupt方法返回false,标志着线程没有中断;

 所以,知道了以上两点,那么就可以知道为什么要进行中断状态的复位了,实际上线程在中断状态下的方法执行流程是这样的:

线程中断 -> 调用park方法 -> 线程未能够成功阻塞 ->当前线程的中断标志位置为false ->acquireQueued方法内部中断标志位置为true -> 再次进入自旋for(;;) -> 若仍未抢占到资源 … -> 再次调用park方法 -> 线程成功阻塞

注意事项:如果线程中断过,哪怕只有一次,那么方法acquireQueued方法总是会返回true

使用park方法相较于wait方法的优势:

  1. 提升AQS模式下线程调度的效率(先unpark,则park不起作用)
  2. 可以在不抛出异常的情况下合理延迟处理中断
  3. 使线程在不占据锁的情况下进入阻塞模式。

 现在我们可以说延迟处理中断模式的核心就是park方法对于中断状态的响应方式是“跳过”不做任何额外处理。所谓“延迟”就是,通过将线程自带的中断标志位置为false,而将acquireQueued方法内的中断标志位置为true,最终通过selfInterrupt()在线程抢占到资源以后进行中断处理。

 而让如果想要利用到轻量的不占据锁资源的线程阻塞方法:park,成功实现在线程中断或非中断状态下的阻塞工作,我们自然就需要使用Thread.interrupted();,因为中断最重要的标志位我们可以通过acquireQueued#interrupted标志位来表示,但是如果不恢复当前线程的中断标志位,那么当前线程将一直处于自旋状态中,将一直抢占CPU资源,导致一种情况的出现:独占模式下,一个线程在执行,另外的线程在“陪跑”。

五、等待队列

 AQS 维护的队列是当前等待资源的队列,其又可以分为同步队列以及条件队列。两种不同队列中的节点结构实际上是相同的,都使用了AQS中的Node内部静态final对象。当前线程获取同步状态失败时,同步器会将当前线程以及等待状态等信息构造成为一个节点并将其加入同步队列。下面则给出同步队列和条件队列两者间不同的内部数据结构实现:

在这里插入图片描述
在这里插入图片描述

而往往同步队列和等待队列实际上是一起协同工作的,它们协同工作的图如下:

在这里插入图片描述
在这里插入图片描述

5.1 Condition 接口

 同步队列和等待队列的作用是不同的。每个线程只能存在于同步队列或等待队列中的一个。

 任意一个 Java 对象,都拥有一组监视器方法定义在( java.lang.Object 上),主要包括 wait()、notify()、notifyAll() 方法,这些方法与 synchronized 同步关键字配合,可以实现等待/通知模式。

Condition接口提供了类似Object的监视器方法,与lock配合可以实现等待/通知模式。

以下是Condition接口的所有抽象方法:

public interface Condition {
    void await() throws InterruptedException;
    void awaitUninterruptibly();
    long awaitNanos(long nanosTimeout) throws InterruptedException;
    boolean await(long time, TimeUnit unit) throws InterruptedException;
    boolean awaitUntil(Date deadline) throws InterruptedException;
    void signal();
    void signalAll();
}

方法

说明

await()

调用此方法后,会使当前线程在接收到唤醒信号(signal)之前或被中断之前一直处于等待休眠状态。调用此方法后,当前线程会释放持有的锁。如果当前等待线程从该方法返回(被唤醒),那么在返回之前会重新获取锁(获取到锁才能继续执行)。

await(long time,TimeUnit unit)

调用此方法后,会使当前线程在接收到唤醒信号之前、被中断之前或到达指定等待时间之前一直处于等待状态。如果在从此方法返回前检测到等待时间超时,则返回 false,否则返回 true。

awaitNanos(long nanosTimeout)

该方法等效于 await(long time,TimeUnit unit) 方法,只是等待的时间是 nanosTimeout 指定的以毫微秒数为单位的等待时间。该方法返回值是所剩毫微秒数的一个估计值,如果超时,则返回一个小于等于 0 的值。可以根据该返回值来确定是否要再次等待,以及再次等待的时间。

awaitUninterruptibly()

当前线程进入等待状态直到被通知,该方法对中断忽略。

awaitUntil(Date deadline)

当前线程进入等待状态直到被通知,中断或者到某个时间,如果没有到指定时间就被通知,返回 true,否则表示到了指定时间,返回 false。

signal()

唤醒一个等待线程,如果所有的线程都在等待此条件,则选择其中的一个唤醒。在从 await 返回之前,该线程必须重新获取锁。

signalAll()

唤醒所有等待线程,如果所有的线程都在等待此条件,则唤醒所有线程。 在从 await 返回之前,每个线程必须重新获取锁。

说明:除了awaitUninterruptibly方法,上述wait方法调用时,如果当前线程处于中断状态,那么调用此方法就会抛出异常。

5.2 Condition 的实现

 在AQS类中Condition的实现类是内部公共类ConditionObject

ConditionObject是 AQS 的内部类,Condition的操作需要获取相关联的锁,需要和同步器挂钩。每个 Condition对象都包含着一个队列(等待队列),Condition中也有结点的概念,在将线程放到等待队列中时会构造结点。

 等待队列也是一个FIFO的队列,在队列中的每个节点都包含了一个线程引用,该线程就是在 Condition对象上等待的线程,如果一个线程调用了 await方法,那么该线程将会释放锁,构造成结点加入等待队列并进入等待状态。  一个 Condition 包含一个等待队列,Condition 拥有首结点(firstWaiter)和尾结点(lastWaiter)。 以下使用生产者和消费者模式用例进行说明同步队列和等待队列之间的区别与协同工作。

 案例需求:在一个有大小的队列queue中,生产者往队列中放数据,消费者从队列中取数据,当队列不满时,生产者可以继续生产数据,当队列不空时,消费者可以继续取数据,如果不符合条件,则等待,直到符合条件为止。

具体内容不妨参见个人笔记:锁-Lock 章节的第七章

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、概要分析:
    • 1.1 引子
      • 1.2 AbstractQueuedSynchronizer 的用途
        • 1.3 AbstractQueuedSynchronizer类的成员变量分析
          • 1.4 AbstractQueuedSynchronizer的排队区实现原理:同步、条件队列
            • 1.5 理解独占模式和共享模式
              • 1.6 理解结点的等待状态
                • 1.7 结点进入同步队列时会进行的操作
                  • 1.8 AQS 模型总结
                  • 二、独占模式源码分析
                    • 2.1 引子
                      • 2.1.1 AQS中引入CAS
                    • 2.2 以不响应线程中断获取锁的实现逻辑,即acquire方法
                      • 2.2.1.tryAcquire:java.util.concurrent.locks.AbstractQueuedSynchronizer#tryAcquire
                      • 2.2.2 addWaiter(Node),即:java.util.concurrent.locks.AbstractQueuedSynchronizer#addWaiter
                      • 2.2.3 acquireQueued(Node, int),即java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireQueued
                      • 2.2.4 shouldParkAfterFailedAcquire(Node, Node),即:java.util.concurrent.shouldParkAfterFailedAcquire
                      • 2.2.5 parkAndCheckInterrupt(),即:java.util.concurrent.locks.AbstractQueuedSynchronizer#parkAndCheckInterrupt
                      • 2.2.6 selfInterrupt(),即:java.util.concurrent.locks.AbstractQueuedSynchronizer#selfInterrupt
                      • 2.2.7 cancelAcquire(Node)即,java.util.concurrent.locks.AbstractQueuedSynchronizer#cancelAcquire方法
                      • 2.2.8 方法实现的几个细节
                    • 2.3 release(int)(线程释放锁的过程),即java.util.concurrent.locks.AbstractQueuedSynchronizer#release
                      • 2.3.1 tryRelease(int),即java.util.concurrent.locks.AbstractQueuedSynchronizer#tryRelease
                      • 2.3.2 unparkSuccessor(Node),即java.util.concurrent.locks.AbstractQueuedSynchronizer#unparkSuccessor
                    • 2.4 独占模式AQS子类的具体代码案例
                    • 三、共享模式
                      • 3.1 acquireShared(int)(线程获取锁的过程),即:java.util.concurrent.locks.AbstractQueuedSynchronizer#tryAcquieShared
                        • 3.1.1 tryAcquireShared(int)方法,即:java.util.concurrent.locks.AbstractQueuedSynchronizer#tryAcquireShared
                        • 3.1.2 doAcquireShared(int)方法,即:java.util.concurrent.locks.AbstractQueuedSynchronizer#doAcquireShared
                        • 3.1.3 setHeadAndPropagate(Node, int),即:java.util.concurrent.locks.AbstractQueuedSynchronizer#setHeadAndPropagate
                      • 3.2 releaseShared()(线程释放锁的过程),即:
                        • 3.2.1 tryReleaseShared(int),即:java.util.concurrent.locks.AbstractQueuedSynchronizer#tryReleaseShared
                        • 3.2.2 doReleaseShared(),即:java.util.concurrent.locks.AbstractQueuedSynchronizer#doReleaseShared
                    • 四、独占模式与共享模式中AQS方法调用逻辑总结
                      • 4.1 AQS独占模式方法分析
                        • 4.2 AQS共享模式方法分析
                          • 4.3 AQS设计模式分析
                            • 4.3.1 从AQS继承角度看设计模式
                            • 4.3.2 从AQS类得中断设计调度看设计模式
                        • 五、等待队列
                          • 5.1 Condition 接口
                            • 5.2 Condition 的实现
                            领券
                            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档