前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >AQS之独占锁

AQS之独占锁

作者头像
spilledyear
发布2020-01-02 23:40:28
5770
发布2020-01-02 23:40:28
举报
文章被收录于专栏:小白鼠小白鼠

AbstractQueuedSynchronizer,抽象类,模板模式,子类通过实现其模板方法,从而实现不同的同步器,例如: ReentrantLock ReentrantReadWriteLock BlockingQueue CountDownLatch

  1. 本质是一个优化过的CLH同步队列,内部结构:双向链表,有头节点和尾节点,FIFO,尾进头出,每个线程会被封装成一个Node
  2. 有独占和共享两种模式,通过Node的内部属性nextWaiter表示,独占为EXCLUSIVE,独占为SHARED
  3. 状态state,被volatile关键字修饰,独占模式下,等于0代表可以获取锁,大于1代表锁重入;共享模式下,state的值代表可以同时有多少个线程获取锁,大于0表示可以获取锁
  4. tryAcquire,模板方法,一般由子类重写,如果你想实现一个同步器,重写该方法。该方法表示尝试获取锁,获取成功返回treu,失败返回false,不会阻塞当前线程
  5. acquire,该方法内部会先调用tryAcquire方法尝试获取锁,尝试获取成功则直接返回,否则将当前线程封装成Node并加入CLH队列尾部,修改其前驱节点的状态为SIGNAL并通过LockSupport阻塞当前线程,其前驱节点释放锁的会唤醒该节点所对应的线程
  6. 在实现一个同步器的时候,加入CLH队列和阻塞操作都由AQS帮我们实现,我们需要根据需求重写tryAcquiretryRelease等模板方法就可以了,基于这个特性,你可以实现所谓的公平锁和非公平锁等

本文主要基于ReentrantLock来分析AQS的独占模式

ReentrantLock

先看一个使用示例

代码语言:javascript
复制
Lock  lock = new ReentrantLock();
lock.lock();
try{
    xx.doXX
}finnally{
    lock.unlock();
}

进入ReentrantLock#lock方法,发现内部调用了Sync#lock方法,然后观察ReentrantLock中的其他方法,发现内部基本上都是委托给Sync进行调用,那很有必要了解一下Sync是什么

代码语言:javascript
复制
public void lock() {
    sync.lock();
}

Sync

从上图可以看出,ReentrantLock有三个内部类:Sync NonfairSync FairSync,Sync是一个抽象类,继承了AbstractQueuedSynchronizer,NonfairSyncFairSync实现了Sync。而ReentrantLock其实更像是一个门面类,内部实现完全交由Sync处理

ReentrantLock有两个构造函数,可以根据参数选择创建公平锁还是非公平锁

代码语言:javascript
复制
public ReentrantLock() {
    sync = new NonfairSync();
}

public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

ReentrantLock.FairSync

公平锁,ReentrantLock中的公平锁,由其内部类FairSync实现

ReentrantLock#lock

代码语言:javascript
复制
// ReentrantLock#lock
public void lock() {
    sync.lock();
}

// FairSync#lock
final void lock() {
    acquire(1);
}

// AbstractQueuedSynchronizer#acquire
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
  1. tryAcquire表示尝试获取锁,模板方法,由AbstractQueuedSynchronizer子类实现,获取到锁返回成功,没有获取到锁返回失败,不会阻塞线程
  2. addWaiter表示将当前线程封装成Node节点,然后添加到CLH队列
  3. acquireQueued方法的返回值表示在当前线程等待(阻塞)获取锁的过程中是否发生了中断
  4. 所以acquire方法中的if判断条件表示如果当前线程等待(阻塞)获取锁的过程中是否发生了中断,则执行当前线程的interrupt方法 Thread.currentThread().interrupt(),为什么要执行这个方法呢?因为在acquireQueued方法中执行了Thread.interrupted()方法,而该方法会清除中断位,这样可能会导致用户代码无法正确的判断中断,因此需要重新执行interrupt方法,将中断异抛出去
FairSync#tryAcquire

tryAcquire方法由AbstractQueuedSynchronizer子类实现,看看FairSync#tryAcquire是如何实现公平锁的

代码语言:javascript
复制
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    // 获取全局状态
    int c = getState();
    // state == 0 代表当前可以获取锁
    if (c == 0) {
        // hasQueuedPredecessors用于判断CLH队列中是有节点在排队,判断条件 head != tail  && (head.next == null || head.next.thread != Thread.currentThread()),head != tail说明不是初始化的那个节点;如果有下一个节点,还需要判断那个节点的thread是否和当前线程相同,如果相同,说明可以重入,此时可以获取锁
        // 通过CAS操作设置state
        // setExclusiveOwnerThread 用于保存当前线程到`exclusiveOwnerThread`属性,如果以上条件都满足,则获取锁成功
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    // 如果是锁重入,则需要更新state的值
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    否则返回失败
    return false;
}

所谓的公平锁,就是说在CLH队列中有其他节点的时候,就算此时刚好state==0,也不能插队直接获取锁,需要将当前节点加到队列尾部,排队。实现方式在上面的代码中都有解释,就不多说了

AbstractQueuedSynchronizer#addWaiter

tryAcquire方法返回失败的时候,说明获取锁失败,此时需要将当前线程封装成Node添加到CLH队列,这部分实现在AbstractQueuedSynchronizer#addWaiter方法中

代码语言:javascript
复制
private Node addWaiter(Node mode) {
    // 将当前线程封装成Node节点
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;

    // 如果尾节点存在,需要做的就是通过CAS操作将当前节点设置成尾节点
    if (pred != null) {
        // 1. 当前节点的前置指针指向尾节点;2. 通过CAS操作将当前节点设置成尾节点;3. 将之前的尾节点的后置指针指向当前节点(尾节点)
        node.prev = pred;

        // 如果 compareAndSetTail(pred, node) 失败,说明在这一时刻存在线程竞争,将交由enq方法中的自旋处理
        if (compareAndSetTail(pred, node)) {
            // 将之前的尾节点的后置指针指向当前节点(尾节点) ,如果线程1在下面这行代码之前被挂起,而线程2通过netx指针遍历,会出现 next== null 的情况,遇到这种情况需要通过prev指针进行反向遍历
            pred.next = node;
            return node;
        }
    }

    // 如果尾节点等于null,说明这个节点是第一个入队的节点,此时需要做一些初始化操作
    enq(node);
    return node;
}


private Node enq(final Node node) {
    // 自旋。考虑一下这里为啥要自旋处理?主要是为了处理addWaiter方法中设置为尾节点失败的情况,当两个线程同时进入addWaiter方法,如果第一个线程通过CAS操作设置尾节点成功,第二个线程通过CAS操作设置尾节点则必然失败,此时将会通过自旋的方式设置尾节点
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                // // 到这里说明这个节点是第一个入队的节点,此时需要做一些初始化操作:tail = head
                tail = head;
        } else {
            // 和addWaiter方法中的那部分逻辑一样,就是将当前节点设置成尾节点,然后设置前后指针   
            // 但这里可能会有一个问题,在这个循环体中,因为是通过cas操作设置尾节点,所以可以保证其是安全,同时可以保证其前后驱节点是正确的。但加入线程1执行到 t.next = node 时被挂起,然线程2开始从head节点开始遍历,此时 next 节点可能为空

            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

上面都给出了注释,有两个需要注意的点:

  1. 如果是第一个入队的节点,在入队之前需要做一些初始化操作,创建一个Node,并将它赋值给head和tail
  2. 通过CAS操作和自旋操作设置尾节点
AbstractQueuedSynchronizer#acquireQueued

尝试获取锁失败,然后将节点添加到队列尾部,接下来要处理的就是阻塞当前线程,这部分实现在AbstractQueuedSynchronizer#acquireQueued方法中

前面说过,这个方法的返回值表示在线程等待获取锁的过程中,是否发生了中断

代码语言:javascript
复制
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        // 前面说过,这个方法的返回值表示在线程等待获取锁的过程中,是否发生了中断,就是用这个interrupted变量
        boolean interrupted = false;

        // 自旋, 直到获取锁,或者被取消,但什么时候会取消呢?
        for (;;) {
            // p代表当前节点的前置节点
            final Node p = node.predecessor();

            // p == head 并且 尝试获取锁成功,说明此时可以获取锁。为什么要 前置节点 == head 这个条件呢?
            // 我们想一下CLH队列的特点,当我们只向队列中添加了一个节点的时候,此时队列中会有两个节点(初始化的时候创建了一个节点),此时我们添加的那个节点的前置指针指向的就是head,head的后置指针指向的是我们添加的那个节点
            // 这说明在CLF队列中,head节点是不参与竞争的,它的作用类似于一个逻辑节点,head.nex代表的那个节点,就是可以获得锁的那个节点,理解这一点很关键
            if (p == head && tryAcquire(arg)) {
                // 将当前节点设置为head节点,同时设置 `node.thread = null;node.prev = null;` 设置这个的目的是为了将之前的那个head节点从当前列表的脱离,help GC
                setHead(node);
                // 这个步骤执行之后, head 就完成脱离队列了
                p.next = null; // help GC

                failed = false;
                return interrupted;
            }

            // 当上面的条件不满足,也就是说没有获得锁的情况,需要将当前线程阻塞。shouldParkAfterFailedAcquire方法在前驱线程阻塞的情况下会返回true,如果不返回true,这里将一直自旋;返回true之后,该线程将被阻塞
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
             // 获取锁失败,此时需要取消排队,但是我不知道在什么情况下会执行到这里
            cancelAcquire(node);
    }
}

当第一个线程获取锁时,直接通过tryAcquire方法就可以获取锁,此时根本不涉及到CLH队列,即头尾节点都不会初始化,因为此时是可以通过CAS获取锁成功的;然后进来第二个线程的时候, 此时 p == head && !tryAcquire(arg) 所以会先通过执行shouldParkAfterFailedAcquire方法将第二个线程对应节点的前驱节点(即head节点)的 waitStatus属性设置为SIGNAL,然后阻塞第二个线程

代码语言:javascript
复制
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // 获取钱去节点的 waitStatus 值
    int ws = pred.waitStatus;

    // 如果前驱节点正在阻塞中,直接返回true
    if (ws == Node.SIGNAL)
        /*
            * This node has already set status asking a release
            * to signal it, so it can safely park.
            */
        return true;

    // waitStatus 大于0的情况只有一种,就是 CANCELLED,此时需要将该节点从CLF队列中移除
    if (ws > 0) {
        /*
            * Predecessor was cancelled. Skip over predecessors and
            * indicate retry.
            */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        // 如果是其他情况,需要将前驱节点的 waitStatus 设置为 阻塞状态,然后返回false
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

AbstractQueuedSynchronizer#shouldParkAfterFailedAcquire方法的作用 将当前节点的前驱节点的waitStatus属性设置为阻塞状态或者将无效的前驱节点从队列中移除,这也是CLH队列的特点,当前节点监听的是前驱节点的状态, 即前驱节点的waitStatus == SIGNAL,代表当前节点的对应的线程状态是阻塞状态,此时返回true,然后通过LockSupport阻塞当前线程

  1. 每个节点中有一个waitStatus字段,这个字段的取值有以下可能: SIGNAL(阻塞) CANCELLED(取消) CONDITION(todo) PROPAGATE(todo) 0(其他情况)
  2. 当前线程节点的状态维护在其前驱节点中
  3. 如果前驱节点正在阻塞中,直接返回true,因为释放锁的时候是前驱节点会通知后驱节点,所以只要判断前驱节点是否阻塞就可以了
  4. 如果前驱节点被取消了,需要将前驱节点从队列中删除
  5. 如果是其他情况,需要将前驱节点的waitStatus设置为阻塞状态
代码语言:javascript
复制
private final boolean parkAndCheckInterrupt() {
    // 阻塞当前线程
    LockSupport.park(this);
    // 判断当前线程是否发生中断
    return Thread.interrupted();
}
  1. 阻塞当前线程
  2. 返回当前线程是否发生中断

ReentrantLock#unlock

代码语言:javascript
复制
// ReentrantLock#unlock
public void unlock() {
    sync.release(1);
}

// AbstractQueuedSynchronizer#release
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
  1. tryRelease表示尝试释放锁,模板方法,由AbstractQueuedSynchronizer子类实现
  2. unparkSuccessor解除线程阻塞
Sync#tryRelease

这是抽象类Sync中的方法,ReentrantLock中的公平锁和非公平锁在释放锁的时候都会调用这个方法

代码语言:javascript
复制
protected final boolean tryRelease(int releases) {
    // 释放锁的时候,state - 1
    int c = getState() - releases;

    // 如果 hread.currentThread() != getExclusiveOwnerThread() 说明,当前线程没有获得锁,因此不能释放,此时抛出异常
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;

    // 如果 c==0 说明此时没有重入了,需要对 `上一个节点对应的线程` 进行解除阻塞操作,此时返回true
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }

    // 设置 state 的值,这里不存在竞争关系,所以不需要通过CAS操作
    setState(c);
    return free;
}
  1. 判断当前线程是否是持有锁的线程,如果不是,说明当前线程没有获得锁,因此不能释放,此时抛出异常
  2. 如果当前线程是持有锁的线程, 如果 (sate-1) == 0,说明没有重入锁了,此时需要对 上一个节点对应的线程 进行解除阻塞操作
  3. 如果 (sate-1) != 0 ,说明还有重入锁,此时不需要解除上一个节点的阻塞
AbstractQueuedSynchronizer#unparkSuccessor

如果Sync#tryRelease返回true,说明接下来需要对 上一个节点对应的线程 进行解除阻塞操作。从头节点开始判断,这里有一个判断条件(head != null && head.waitStatus != 0),为啥有这个判断条件?

  1. 当只有一个线程的时候,此时CLH队列都还没初始化,此时head == null && tail == null,所以没必要执行下面的操作
  2. head.waitStatus == 0时,说明该节点的waitStatus还只是默认状态。而根据我们前面的分析,如果在尝试获取锁时返回失败,会将该节点加入到CLH队列尾部,同时阻塞该节点所对应的线程。但是阻塞该线程有一个前置条件,就是该节点的前驱节点的 waitStatus == SIGNAL。同理,我们可以推理出,如果head.waitStatus == 0,说明没有后驱节点,所以此时也没必要继续往下执行了
代码语言:javascript
复制
private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}
  1. 如果head.waitStatus < 0, 则需要先通过CAS操作将 head.waitStatus设置为0
  2. 后驱节点为null的情况在上面已经说明过了,如果head.next == null,则需要通过prev指针反向遍历,找到最前面的那个节点(即head的下一个节点)
  3. s.waitStatus > 0只有一种情况,那就是CANCELLED,代表取消排队。在前面介绍过,在添加节点成功后且给前驱节点设置waitStatus = SIGNAL之前,会对waitStatus CANCELLED的情况进行处理, 即在AbstractQueuedSynchronizer#shouldParkAfterFailedAcquire方法中,会为当前节点找一个waitStatus == SIGNAL的前驱节点,而跳过那些waitStatus == CANCELLED的前驱节点。所以这种情况也需要通过前驱节点反向遍历
  4. (s == null || s.waitStatus > 0)出现的情况非常少,所以最开始直接通过head.next获取节点算是一个优化
  5. waitStatus == CANCELLED,说明线程在排队获取锁的时候被取消了,此时该线程都没有获得锁,也就没有发生阻塞,自然也不需要对该线程解除阻塞
  6. 根据上一步获取的节点获取线程,然后通过 LockSupport#unpark 方法对该线程解除阻塞

ReentrantLock.NonfairSync

非公平锁,ReentrantLock中的非公平锁,由其内部类NonfairSync实现

有了前面对AQS的了解,非公平锁的实现看起来就特别简单了

代码语言:javascript
复制
final void lock() {
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}

唯一的区别就是在执行lock方法的时候多了一个CAS操作,即不排队直接通过CAS去枪锁,这就是非公锁。如果这次CAS操作没有抢到锁,接下来还是得排队,其实就是多了一次让它插队得机会,具体能不能插队成功,还得看它得本事了

思考

AbstractQueuedLongSynchronizer#acquireQueued方法,通过自旋操作设置前驱节点的 waitStatus == SIGNAL,在finally块中有一个判断,如果failed == true,就取消排队

代码语言:javascript
复制
final boolean acquireQueued(final Node node, long arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            // 什么情况下会执行呢?
            cancelAcquire(node);
    }
}

那么什么情况下,会以 failed == true退出呢?那只有在抛异常的情况了?什么时候会抛异常呢?

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • ReentrantLock
  • ReentrantLock.FairSync
    • ReentrantLock#lock
      • ReentrantLock#unlock
      • ReentrantLock.NonfairSync
      • 思考
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档