专栏首页小白鼠AQS之独占锁

AQS之独占锁

AbstractQueuedSynchronizer,抽象类,模板模式,子类通过实现其模板方法,从而实现不同的同步器,例如: ReentrantLock ReentrantReadWriteLock BlockingQueue CountDownLatch

  1. 本质是一个优化过的CLH同步队列,内部结构:双向链表,有头节点和尾节点,FIFO,尾进头出,每个线程会被封装成一个Node
  2. 有独占和共享两种模式,通过Node的内部属性nextWaiter表示,独占为EXCLUSIVE,独占为SHARED
  3. 状态state,被volatile关键字修饰,独占模式下,等于0代表可以获取锁,大于1代表锁重入;共享模式下,state的值代表可以同时有多少个线程获取锁,大于0表示可以获取锁
  4. tryAcquire,模板方法,一般由子类重写,如果你想实现一个同步器,重写该方法。该方法表示尝试获取锁,获取成功返回treu,失败返回false,不会阻塞当前线程
  5. acquire,该方法内部会先调用tryAcquire方法尝试获取锁,尝试获取成功则直接返回,否则将当前线程封装成Node并加入CLH队列尾部,修改其前驱节点的状态为SIGNAL并通过LockSupport阻塞当前线程,其前驱节点释放锁的会唤醒该节点所对应的线程
  6. 在实现一个同步器的时候,加入CLH队列和阻塞操作都由AQS帮我们实现,我们需要根据需求重写tryAcquiretryRelease等模板方法就可以了,基于这个特性,你可以实现所谓的公平锁和非公平锁等

本文主要基于ReentrantLock来分析AQS的独占模式

ReentrantLock

先看一个使用示例

Lock  lock = new ReentrantLock();
lock.lock();
try{
    xx.doXX
}finnally{
    lock.unlock();
}

进入ReentrantLock#lock方法,发现内部调用了Sync#lock方法,然后观察ReentrantLock中的其他方法,发现内部基本上都是委托给Sync进行调用,那很有必要了解一下Sync是什么

public void lock() {
    sync.lock();
}

Sync

从上图可以看出,ReentrantLock有三个内部类:Sync NonfairSync FairSync,Sync是一个抽象类,继承了AbstractQueuedSynchronizer,NonfairSyncFairSync实现了Sync。而ReentrantLock其实更像是一个门面类,内部实现完全交由Sync处理

ReentrantLock有两个构造函数,可以根据参数选择创建公平锁还是非公平锁

public ReentrantLock() {
    sync = new NonfairSync();
}

public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

ReentrantLock.FairSync

公平锁,ReentrantLock中的公平锁,由其内部类FairSync实现

ReentrantLock#lock

// ReentrantLock#lock
public void lock() {
    sync.lock();
}

// FairSync#lock
final void lock() {
    acquire(1);
}

// AbstractQueuedSynchronizer#acquire
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
  1. tryAcquire表示尝试获取锁,模板方法,由AbstractQueuedSynchronizer子类实现,获取到锁返回成功,没有获取到锁返回失败,不会阻塞线程
  2. addWaiter表示将当前线程封装成Node节点,然后添加到CLH队列
  3. acquireQueued方法的返回值表示在当前线程等待(阻塞)获取锁的过程中是否发生了中断
  4. 所以acquire方法中的if判断条件表示如果当前线程等待(阻塞)获取锁的过程中是否发生了中断,则执行当前线程的interrupt方法 Thread.currentThread().interrupt(),为什么要执行这个方法呢?因为在acquireQueued方法中执行了Thread.interrupted()方法,而该方法会清除中断位,这样可能会导致用户代码无法正确的判断中断,因此需要重新执行interrupt方法,将中断异抛出去
FairSync#tryAcquire

tryAcquire方法由AbstractQueuedSynchronizer子类实现,看看FairSync#tryAcquire是如何实现公平锁的

protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    // 获取全局状态
    int c = getState();
    // state == 0 代表当前可以获取锁
    if (c == 0) {
        // hasQueuedPredecessors用于判断CLH队列中是有节点在排队,判断条件 head != tail  && (head.next == null || head.next.thread != Thread.currentThread()),head != tail说明不是初始化的那个节点;如果有下一个节点,还需要判断那个节点的thread是否和当前线程相同,如果相同,说明可以重入,此时可以获取锁
        // 通过CAS操作设置state
        // setExclusiveOwnerThread 用于保存当前线程到`exclusiveOwnerThread`属性,如果以上条件都满足,则获取锁成功
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    // 如果是锁重入,则需要更新state的值
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    否则返回失败
    return false;
}

所谓的公平锁,就是说在CLH队列中有其他节点的时候,就算此时刚好state==0,也不能插队直接获取锁,需要将当前节点加到队列尾部,排队。实现方式在上面的代码中都有解释,就不多说了

AbstractQueuedSynchronizer#addWaiter

tryAcquire方法返回失败的时候,说明获取锁失败,此时需要将当前线程封装成Node添加到CLH队列,这部分实现在AbstractQueuedSynchronizer#addWaiter方法中

private Node addWaiter(Node mode) {
    // 将当前线程封装成Node节点
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;

    // 如果尾节点存在,需要做的就是通过CAS操作将当前节点设置成尾节点
    if (pred != null) {
        // 1. 当前节点的前置指针指向尾节点;2. 通过CAS操作将当前节点设置成尾节点;3. 将之前的尾节点的后置指针指向当前节点(尾节点)
        node.prev = pred;

        // 如果 compareAndSetTail(pred, node) 失败,说明在这一时刻存在线程竞争,将交由enq方法中的自旋处理
        if (compareAndSetTail(pred, node)) {
            // 将之前的尾节点的后置指针指向当前节点(尾节点) ,如果线程1在下面这行代码之前被挂起,而线程2通过netx指针遍历,会出现 next== null 的情况,遇到这种情况需要通过prev指针进行反向遍历
            pred.next = node;
            return node;
        }
    }

    // 如果尾节点等于null,说明这个节点是第一个入队的节点,此时需要做一些初始化操作
    enq(node);
    return node;
}


private Node enq(final Node node) {
    // 自旋。考虑一下这里为啥要自旋处理?主要是为了处理addWaiter方法中设置为尾节点失败的情况,当两个线程同时进入addWaiter方法,如果第一个线程通过CAS操作设置尾节点成功,第二个线程通过CAS操作设置尾节点则必然失败,此时将会通过自旋的方式设置尾节点
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                // // 到这里说明这个节点是第一个入队的节点,此时需要做一些初始化操作:tail = head
                tail = head;
        } else {
            // 和addWaiter方法中的那部分逻辑一样,就是将当前节点设置成尾节点,然后设置前后指针   
            // 但这里可能会有一个问题,在这个循环体中,因为是通过cas操作设置尾节点,所以可以保证其是安全,同时可以保证其前后驱节点是正确的。但加入线程1执行到 t.next = node 时被挂起,然线程2开始从head节点开始遍历,此时 next 节点可能为空

            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

上面都给出了注释,有两个需要注意的点:

  1. 如果是第一个入队的节点,在入队之前需要做一些初始化操作,创建一个Node,并将它赋值给head和tail
  2. 通过CAS操作和自旋操作设置尾节点
AbstractQueuedSynchronizer#acquireQueued

尝试获取锁失败,然后将节点添加到队列尾部,接下来要处理的就是阻塞当前线程,这部分实现在AbstractQueuedSynchronizer#acquireQueued方法中

前面说过,这个方法的返回值表示在线程等待获取锁的过程中,是否发生了中断

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        // 前面说过,这个方法的返回值表示在线程等待获取锁的过程中,是否发生了中断,就是用这个interrupted变量
        boolean interrupted = false;

        // 自旋, 直到获取锁,或者被取消,但什么时候会取消呢?
        for (;;) {
            // p代表当前节点的前置节点
            final Node p = node.predecessor();

            // p == head 并且 尝试获取锁成功,说明此时可以获取锁。为什么要 前置节点 == head 这个条件呢?
            // 我们想一下CLH队列的特点,当我们只向队列中添加了一个节点的时候,此时队列中会有两个节点(初始化的时候创建了一个节点),此时我们添加的那个节点的前置指针指向的就是head,head的后置指针指向的是我们添加的那个节点
            // 这说明在CLF队列中,head节点是不参与竞争的,它的作用类似于一个逻辑节点,head.nex代表的那个节点,就是可以获得锁的那个节点,理解这一点很关键
            if (p == head && tryAcquire(arg)) {
                // 将当前节点设置为head节点,同时设置 `node.thread = null;node.prev = null;` 设置这个的目的是为了将之前的那个head节点从当前列表的脱离,help GC
                setHead(node);
                // 这个步骤执行之后, head 就完成脱离队列了
                p.next = null; // help GC

                failed = false;
                return interrupted;
            }

            // 当上面的条件不满足,也就是说没有获得锁的情况,需要将当前线程阻塞。shouldParkAfterFailedAcquire方法在前驱线程阻塞的情况下会返回true,如果不返回true,这里将一直自旋;返回true之后,该线程将被阻塞
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
             // 获取锁失败,此时需要取消排队,但是我不知道在什么情况下会执行到这里
            cancelAcquire(node);
    }
}

当第一个线程获取锁时,直接通过tryAcquire方法就可以获取锁,此时根本不涉及到CLH队列,即头尾节点都不会初始化,因为此时是可以通过CAS获取锁成功的;然后进来第二个线程的时候, 此时 p == head && !tryAcquire(arg) 所以会先通过执行shouldParkAfterFailedAcquire方法将第二个线程对应节点的前驱节点(即head节点)的 waitStatus属性设置为SIGNAL,然后阻塞第二个线程

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // 获取钱去节点的 waitStatus 值
    int ws = pred.waitStatus;

    // 如果前驱节点正在阻塞中,直接返回true
    if (ws == Node.SIGNAL)
        /*
            * This node has already set status asking a release
            * to signal it, so it can safely park.
            */
        return true;

    // waitStatus 大于0的情况只有一种,就是 CANCELLED,此时需要将该节点从CLF队列中移除
    if (ws > 0) {
        /*
            * Predecessor was cancelled. Skip over predecessors and
            * indicate retry.
            */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        // 如果是其他情况,需要将前驱节点的 waitStatus 设置为 阻塞状态,然后返回false
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

AbstractQueuedSynchronizer#shouldParkAfterFailedAcquire方法的作用 将当前节点的前驱节点的waitStatus属性设置为阻塞状态或者将无效的前驱节点从队列中移除,这也是CLH队列的特点,当前节点监听的是前驱节点的状态, 即前驱节点的waitStatus == SIGNAL,代表当前节点的对应的线程状态是阻塞状态,此时返回true,然后通过LockSupport阻塞当前线程

  1. 每个节点中有一个waitStatus字段,这个字段的取值有以下可能: SIGNAL(阻塞) CANCELLED(取消) CONDITION(todo) PROPAGATE(todo) 0(其他情况)
  2. 当前线程节点的状态维护在其前驱节点中
  3. 如果前驱节点正在阻塞中,直接返回true,因为释放锁的时候是前驱节点会通知后驱节点,所以只要判断前驱节点是否阻塞就可以了
  4. 如果前驱节点被取消了,需要将前驱节点从队列中删除
  5. 如果是其他情况,需要将前驱节点的waitStatus设置为阻塞状态
private final boolean parkAndCheckInterrupt() {
    // 阻塞当前线程
    LockSupport.park(this);
    // 判断当前线程是否发生中断
    return Thread.interrupted();
}
  1. 阻塞当前线程
  2. 返回当前线程是否发生中断

ReentrantLock#unlock

// ReentrantLock#unlock
public void unlock() {
    sync.release(1);
}

// AbstractQueuedSynchronizer#release
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
  1. tryRelease表示尝试释放锁,模板方法,由AbstractQueuedSynchronizer子类实现
  2. unparkSuccessor解除线程阻塞
Sync#tryRelease

这是抽象类Sync中的方法,ReentrantLock中的公平锁和非公平锁在释放锁的时候都会调用这个方法

protected final boolean tryRelease(int releases) {
    // 释放锁的时候,state - 1
    int c = getState() - releases;

    // 如果 hread.currentThread() != getExclusiveOwnerThread() 说明,当前线程没有获得锁,因此不能释放,此时抛出异常
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;

    // 如果 c==0 说明此时没有重入了,需要对 `上一个节点对应的线程` 进行解除阻塞操作,此时返回true
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }

    // 设置 state 的值,这里不存在竞争关系,所以不需要通过CAS操作
    setState(c);
    return free;
}
  1. 判断当前线程是否是持有锁的线程,如果不是,说明当前线程没有获得锁,因此不能释放,此时抛出异常
  2. 如果当前线程是持有锁的线程, 如果 (sate-1) == 0,说明没有重入锁了,此时需要对 上一个节点对应的线程 进行解除阻塞操作
  3. 如果 (sate-1) != 0 ,说明还有重入锁,此时不需要解除上一个节点的阻塞
AbstractQueuedSynchronizer#unparkSuccessor

如果Sync#tryRelease返回true,说明接下来需要对 上一个节点对应的线程 进行解除阻塞操作。从头节点开始判断,这里有一个判断条件(head != null && head.waitStatus != 0),为啥有这个判断条件?

  1. 当只有一个线程的时候,此时CLH队列都还没初始化,此时head == null && tail == null,所以没必要执行下面的操作
  2. head.waitStatus == 0时,说明该节点的waitStatus还只是默认状态。而根据我们前面的分析,如果在尝试获取锁时返回失败,会将该节点加入到CLH队列尾部,同时阻塞该节点所对应的线程。但是阻塞该线程有一个前置条件,就是该节点的前驱节点的 waitStatus == SIGNAL。同理,我们可以推理出,如果head.waitStatus == 0,说明没有后驱节点,所以此时也没必要继续往下执行了
private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}
  1. 如果head.waitStatus < 0, 则需要先通过CAS操作将 head.waitStatus设置为0
  2. 后驱节点为null的情况在上面已经说明过了,如果head.next == null,则需要通过prev指针反向遍历,找到最前面的那个节点(即head的下一个节点)
  3. s.waitStatus > 0只有一种情况,那就是CANCELLED,代表取消排队。在前面介绍过,在添加节点成功后且给前驱节点设置waitStatus = SIGNAL之前,会对waitStatus CANCELLED的情况进行处理, 即在AbstractQueuedSynchronizer#shouldParkAfterFailedAcquire方法中,会为当前节点找一个waitStatus == SIGNAL的前驱节点,而跳过那些waitStatus == CANCELLED的前驱节点。所以这种情况也需要通过前驱节点反向遍历
  4. (s == null || s.waitStatus > 0)出现的情况非常少,所以最开始直接通过head.next获取节点算是一个优化
  5. waitStatus == CANCELLED,说明线程在排队获取锁的时候被取消了,此时该线程都没有获得锁,也就没有发生阻塞,自然也不需要对该线程解除阻塞
  6. 根据上一步获取的节点获取线程,然后通过 LockSupport#unpark 方法对该线程解除阻塞

ReentrantLock.NonfairSync

非公平锁,ReentrantLock中的非公平锁,由其内部类NonfairSync实现

有了前面对AQS的了解,非公平锁的实现看起来就特别简单了

final void lock() {
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}

唯一的区别就是在执行lock方法的时候多了一个CAS操作,即不排队直接通过CAS去枪锁,这就是非公锁。如果这次CAS操作没有抢到锁,接下来还是得排队,其实就是多了一次让它插队得机会,具体能不能插队成功,还得看它得本事了

思考

AbstractQueuedLongSynchronizer#acquireQueued方法,通过自旋操作设置前驱节点的 waitStatus == SIGNAL,在finally块中有一个判断,如果failed == true,就取消排队

final boolean acquireQueued(final Node node, long arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            // 什么情况下会执行呢?
            cancelAcquire(node);
    }
}

那么什么情况下,会以 failed == true退出呢?那只有在抛异常的情况了?什么时候会抛异常呢?

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Java线程池

    线程池的核心实现类,基于ThreadPoolExecutor可以实现满足不同场景的线程池

    spilledyear
  • AQS之共享锁

    通过 AQS独占锁,我们对AQS的数据结构有了基本的了解。它本质上就是一个优化过的CLH队列,因为CLF队列只有一个前驱指针,而AQS除了前驱指针,还有一个后驱...

    spilledyear
  • Zookeeper

    即所谓的配置中心.发布订阅一般有两种设计模式,分别为: Push模式和Pull模式. ZK采用推拉模式相结合的方式: 客户端向服务端注册自己需要监听的节点,一旦...

    spilledyear
  • 【原创】Java并发编程系列14 | AQS源码分析

    AbstractQueuedSynchronizer是Java并发包java.util.concurrent的核心基础组件,是实现Lock的基础。

    java进阶架构师
  • AQS原理浅析关于Lock及AQS的一些补充:羊群效应

    JavaEdge
  • ReentrantLock 源码浅析

    tomas家的小拨浪鼓
  • SynchronousQueue 源码解析

    SynchronousQueue 一个阻塞队列,其中每个插入操作必须等待另一个线程进行相应的删除操作,反之亦然。 同步队列没有任何内部容量,甚至没有一个容量。 ...

    JavaEdge
  • JDK源码解析实战 - AbstractQueuedSynchronizer源码解析

    AbstractQueuedSynchronizer 抽象同步队列简称 AQS ,它是实现同步器的基础组件, 并发包中锁的底层就是使用 AQS 实现的. 大...

    JavaEdge
  • ADI人工设计智能与网页智能设计

    ADI是人工智能,它使用机器学习来预测设计趋势,及实现设计的生成。ADI根据用户的需求生成个性化的设计。

    mixlab
  • 操作系统丶并发并行和线程

    py3study

扫码关注云+社区

领取腾讯云代金券