前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >AQS源码解析(一)

AQS源码解析(一)

作者头像
黑洞代码
发布2021-01-14 15:13:03
4580
发布2021-01-14 15:13:03
举报

AQS源码解析(1)——独占模式

概述 目录


1.AQS相关类图

2.AQS的父类AOS源码解析

3.AQS的独占锁的加锁源码解析

4.AQS的独占锁的解锁源码解析

第1节 AQS相关类图


二话不说,先来一张图,说明一下AbstractQueuedSynchronizer(简称AQS或者同步器)在并发编程中的重要性。

从类图中可以看出,ThreadPoolExecutor(线程池)的工作线程Work类,ReentrantLock(重入锁)的内部类Sync的两个子类FairSync(公平锁)和NonfairSync(非公平锁),ReentrantReadWriteLock(重入读写锁),Semaphore(信号量),CountDownLatch(闭锁)等多种并发组件都用到了AbstractQueuedSynchronizer类。

第2节 AQS的父类AOS源码解析


AbstractOwnableSynchronizer简称(AOS):此类为创建可能需要所有权概念的锁和相关同步器提供了基础。用于记录当前资源的拥有者——保存线程对象的引用。

代码语言:javascript
复制
/**
 * A synchronizer that may be exclusively owned by a thread.  This
 * class provides a basis for creating locks and related synchronizers
 * that may entail a notion of ownership.  The
 * {@code AbstractOwnableSynchronizer} class itself does not manage or
 * use this information. However, subclasses and tools may use
 * appropriately maintained values to help control and monitor access
 * and provide diagnostics.
 *
 * @since 1.6
 * @author Doug Lea
 */
public abstract class AbstractOwnableSynchronizer
    implements java.io.Serializable {

    /** Use serial ID even though all fields transient. */
    private static final long serialVersionUID = 3737899427754241961L;

    /**
     * Empty constructor for use by subclasses.
     */
    protected AbstractOwnableSynchronizer() { }

    /**
     * The current owner of exclusive mode synchronization.
     */
    private transient Thread exclusiveOwnerThread;

    /**
     * Sets the thread that currently owns exclusive access.
     * A {@code null} argument indicates that no thread owns access.
     * This method does not otherwise impose any synchronization or
     * {@code volatile} field accesses.
     * @param thread the owner thread
     */
    protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }

    /**
     * Returns the thread last set by {@code setExclusiveOwnerThread},
     * or {@code null} if never set.  This method does not otherwise
     * impose any synchronization or {@code volatile} field accesses.
     * @return the owner thread
     */
    protected final Thread getExclusiveOwnerThread() {
        return exclusiveOwnerThread;
    }
}

第3节 AQS的独占锁的加锁源码解析


AQS的执行流程图如下。

没有成功获取到独占资源的线程将会进入队列中进行排队。AQS作为独占锁使用时维护的了一个CLH(Craig, Landin, and Hagersten)双向队列。

队列的基本组成组成单位是结点即Node类。

代码语言:javascript
复制
/**
     * Wait queue node class.
     *
     * <p>The wait queue is a variant of a "CLH" (Craig, Landin, and
     * Hagersten) lock queue. CLH locks are normally used for
     * spinlocks.  We instead use them for blocking synchronizers, but
     * use the same basic tactic of holding some of the control
     * information about a thread in the predecessor of its node.  A
     * "status" field in each node keeps track of whether a thread
     * should block.  A node is signalled when its predecessor
     * releases.  Each node of the queue otherwise serves as a
     * specific-notification-style monitor holding a single waiting
     * thread. The status field does NOT control whether threads are
     * granted locks etc though.  A thread may try to acquire if it is
     * first in the queue. But being first does not guarantee success;
     * it only gives the right to contend.  So the currently released
     * contender thread may need to rewait.
     *
     * <p>To enqueue into a CLH lock, you atomically splice it in as new
     * tail. To dequeue, you just set the head field.
     * <pre>
     *      +------+  prev +-----+       +-----+
     * head |      | <---- |     | <---- |     |  tail
     *      +------+       +-----+       +-----+
     * </pre>
     *
     * <p>Insertion into a CLH queue requires only a single atomic
     * operation on "tail", so there is a simple atomic point of
     * demarcation from unqueued to queued. Similarly, dequeuing
     * involves only updating the "head". However, it takes a bit
     * more work for nodes to determine who their successors are,
     * in part to deal with possible cancellation due to timeouts
     * and interrupts.
     *
     * <p>The "prev" links (not used in original CLH locks), are mainly
     * needed to handle cancellation. If a node is cancelled, its
     * successor is (normally) relinked to a non-cancelled
     * predecessor. For explanation of similar mechanics in the case
     * of spin locks, see the papers by Scott and Scherer at
     * http://www.cs.rochester.edu/u/scott/synchronization/
     *
     * <p>We also use "next" links to implement blocking mechanics.
     * The thread id for each node is kept in its own node, so a
     * predecessor signals the next node to wake up by traversing
     * next link to determine which thread it is.  Determination of
     * successor must avoid races with newly queued nodes to set
     * the "next" fields of their predecessors.  This is solved
     * when necessary by checking backwards from the atomically
     * updated "tail" when a node's successor appears to be null.
     * (Or, said differently, the next-links are an optimization
     * so that we don't usually need a backward scan.)
     *
     * <p>Cancellation introduces some conservatism to the basic
     * algorithms.  Since we must poll for cancellation of other
     * nodes, we can miss noticing whether a cancelled node is
     * ahead or behind us. This is dealt with by always unparking
     * successors upon cancellation, allowing them to stabilize on
     * a new predecessor, unless we can identify an uncancelled
     * predecessor who will carry this responsibility.
     *
     * <p>CLH queues need a dummy header node to get started. But
     * we don't create them on construction, because it would be wasted
     * effort if there is never contention. Instead, the node
     * is constructed and head and tail pointers are set upon first
     * contention.
     *
     * <p>Threads waiting on Conditions use the same nodes, but
     * use an additional link. Conditions only need to link nodes
     * in simple (non-concurrent) linked queues because they are
     * only accessed when exclusively held.  Upon await, a node is
     * inserted into a condition queue.  Upon signal, the node is
     * transferred to the main queue.  A special value of status
     * field is used to mark which queue a node is on.
     *
     * <p>Thanks go to Dave Dice, Mark Moir, Victor Luchangco, Bill
     * Scherer and Michael Scott, along with members of JSR-166
     * expert group, for helpful ideas, discussions, and critiques
     * on the design of this class.
     */
    static final class Node {
        /** Marker to indicate a node is waiting in shared mode */
        static final Node SHARED = new Node();
        /** Marker to indicate a node is waiting in exclusive mode */
        static final Node EXCLUSIVE = null;

        /** waitStatus value to indicate thread has cancelled */
        static final int CANCELLED =  1;
        /** waitStatus value to indicate successor's thread needs unparking */
        static final int SIGNAL    = -1;
        /** waitStatus value to indicate thread is waiting on condition */
        static final int CONDITION = -2;
        /**
         * waitStatus value to indicate the next acquireShared should
         * unconditionally propagate
         */
        static final int PROPAGATE = -3;

        /**
         * Status field, taking on only the values:
         *   SIGNAL:     The successor of this node is (or will soon be)
         *               blocked (via park), so the current node must
         *               unpark its successor when it releases or
         *               cancels. To avoid races, acquire methods must
         *               first indicate they need a signal,
         *               then retry the atomic acquire, and then,
         *               on failure, block.
         *   CANCELLED:  This node is cancelled due to timeout or interrupt.
         *               Nodes never leave this state. In particular,
         *               a thread with cancelled node never again blocks.
         *   CONDITION:  This node is currently on a condition queue.
         *               It will not be used as a sync queue node
         *               until transferred, at which time the status
         *               will be set to 0. (Use of this value here has
         *               nothing to do with the other uses of the
         *               field, but simplifies mechanics.)
         *   PROPAGATE:  A releaseShared should be propagated to other
         *               nodes. This is set (for head node only) in
         *               doReleaseShared to ensure propagation
         *               continues, even if other operations have
         *               since intervened.
         *   0:          None of the above
         *
         * The values are arranged numerically to simplify use.
         * Non-negative values mean that a node doesn't need to
         * signal. So, most code doesn't need to check for particular
         * values, just for sign.
         *
         * The field is initialized to 0 for normal sync nodes, and
         * CONDITION for condition nodes.  It is modified using CAS
         * (or when possible, unconditional volatile writes).
         */
        volatile int waitStatus;

        /**
         * Link to predecessor node that current node/thread relies on
         * for checking waitStatus. Assigned during enqueuing, and nulled
         * out (for sake of GC) only upon dequeuing.  Also, upon
         * cancellation of a predecessor, we short-circuit while
         * finding a non-cancelled one, which will always exist
         * because the head node is never cancelled: A node becomes
         * head only as a result of successful acquire. A
         * cancelled thread never succeeds in acquiring, and a thread only
         * cancels itself, not any other node.
         */
        volatile Node prev;

        /**
         * Link to the successor node that the current node/thread
         * unparks upon release. Assigned during enqueuing, adjusted
         * when bypassing cancelled predecessors, and nulled out (for
         * sake of GC) when dequeued.  The enq operation does not
         * assign next field of a predecessor until after attachment,
         * so seeing a null next field does not necessarily mean that
         * node is at end of queue. However, if a next field appears
         * to be null, we can scan prev's from the tail to
         * double-check.  The next field of cancelled nodes is set to
         * point to the node itself instead of null, to make life
         * easier for isOnSyncQueue.
         */
        volatile Node next;

        /**
         * The thread that enqueued this node.  Initialized on
         * construction and nulled out after use.
         */
        volatile Thread thread;

        /**
         * Link to next node waiting on condition, or the special
         * value SHARED.  Because condition queues are accessed only
         * when holding in exclusive mode, we just need a simple
         * linked queue to hold nodes while they are waiting on
         * conditions. They are then transferred to the queue to
         * re-acquire. And because conditions can only be exclusive,
         * we save a field by using special value to indicate shared
         * mode.
         */
        Node nextWaiter;

        /**
         * Returns true if node is waiting in shared mode.
         */
        final boolean isShared() {
            return nextWaiter == SHARED;
        }

        /**
         * Returns previous node, or throws NullPointerException if null.
         * Use when predecessor cannot be null.  The null check could
         * be elided, but is present to help the VM.
         *
         * @return the predecessor of this node
         */
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

        Node() {    // Used to establish initial head or SHARED marker
        }

        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }

        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }

Node类的各个属性的含义如下。

属性名称

描述

int waitStatus

表示节点的状态。其中包含的状态有:1.CANCELLED,值为1,表示当前的线程被取消;2.SIGNAL,值为-1,表示当前节点的后继节点包含的线程需要运行,也就是unpark;3.CONDITION,值为-2,表示当前节点在等待condition,也就是在condition队列中;4.PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行;5.值为0,表示当前节点在sync队列中,等待着获取锁。

Node prev

前驱节点,比如当前节点被取消,那就需要前驱节点和后继节点来完成连接。

Node next

后继节点。

Node nextWaiter

存储condition队列中的后继节点。

Thread thread

入队列时的当前线程。

独占锁加锁过程调用acquire()方法。

代码语言:javascript
复制
/**
     * Acquires in exclusive mode, ignoring interrupts.  Implemented
     * by invoking at least once {@link #tryAcquire},
     * returning on success.  Otherwise the thread is queued, possibly
     * repeatedly blocking and unblocking, invoking {@link
     * #tryAcquire} until success.  This method can be used
     * to implement method {@link Lock#lock}.
     *
     * @param arg the acquire argument.  This value is conveyed to
     *        {@link #tryAcquire} but is otherwise uninterpreted and
     *        can represent anything you like.
     */
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

上述逻辑主要包括: 1. 尝试获取(调用tryAcquire更改状态,需要保证原子性); 在tryAcquire方法中使用了同步器提供的对state操作的方法,利用compareAndSet保证只有一个线程能够对状态进行成功修改,而没有成功修改的线程将进入sync队列排队。 2. 如果获取不到,将当前线程构造成节点Node并加入sync队列; 进入队列的每个线程都是一个节点Node,从而形成了一个双向队列,类似CLH队列,这样做的目的是线程间的通信会被限制在较小规模(也就是两个节点左右)。 3. 再次尝试获取,如果没有获取到那么将当前线程从线程调度器上摘下,进入等待状态。

tryAcuqire()方法的源码如下。

代码语言:javascript
复制
/**
     * Attempts to acquire in exclusive mode. This method should query
     * if the state of the object permits it to be acquired in the
     * exclusive mode, and if so to acquire it.
     *
     * <p>This method is always invoked by the thread performing
     * acquire.  If this method reports failure, the acquire method
     * may queue the thread, if it is not already queued, until it is
     * signalled by a release from some other thread. This can be used
     * to implement method {@link Lock#tryLock()}.
     *
     * <p>The default
     * implementation throws {@link UnsupportedOperationException}.
     *
     * @param arg the acquire argument. This value is always the one
     *        passed to an acquire method, or is the value saved on entry
     *        to a condition wait.  The value is otherwise uninterpreted
     *        and can represent anything you like.
     * @return {@code true} if successful. Upon success, this object has
     *         been acquired.
     * @throws IllegalMonitorStateException if acquiring would place this
     *         synchronizer in an illegal state. This exception must be
     *         thrown in a consistent fashion for synchronization to work
     *         correctly.
     * @throws UnsupportedOperationException if exclusive mode is not supported
     */
    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }

从这里可以看出,这里tryAcuqire()没有实现任何逻辑,因此这个方法是要留给AQS子类实现的,用于表示对资源的获取。

如果tryAcuqire()返回false,即获取资源失败,则会执行addWaiter()方法。

代码语言:javascript
复制
/**
     * Creates and enqueues node for current thread and given mode.
     *
     * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
     * @return the new node
     */
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

上述逻辑主要包括:

1. 使用当前线程构造Node;

对于一个节点需要做的是将当节点前驱节点指向尾节点(current.prev = tail),尾节点指向它(tail = current),原有的尾节点的后继节点指向它(t.next = current)而这些操作要求是原子的。上面的操作是利用尾节点的设置来保证的,也就是compareAndSetTail来完成的。

2. 先行尝试在队尾添加;

如果尾节点已经有了,然后做如下操作:

(1)分配引用T指向尾节点;

(2)将节点的前驱节点更新为尾节点(current.prev = tail);

(3)如果尾节点是T,那么将当尾节点设置为该节点(tail = current,原子更新);

(4)T的后继节点指向当前节点(T.next = current)。

注意第3点是要求原子的。

这样可以以最短路径O(1)的效果来完成线程入队,是最大化减少开销的一种方式。

3. 如果队尾添加失败或者是第一个入队的节点。

如果是第1个节点,也就是sync队列没有初始化,那么会进入到enq这个方法,进入的线程可能有多个,或者说在addWaiter中没有成功入队的线程都将进入enq这个方法。

可以看到enq的逻辑是确保进入的Node都会有机会顺序的添加到sync队列中,而加入的步骤如下:

(1)如果尾节点为空,那么原子化的分配一个头节点,并将尾节点指向头节点,这一步是初始化;

(2)然后是重复在addWaiter中做的工作,但是在一个while(true)的循环中,直到当前节点入队为止。

进入队列之后,接下来就是要进行锁的获取,或者说是访问控制了,只有一个线程能够在同一时刻继续的运行,而其他的进入等待状态。而每个线程都是一个独立的个体,它们自省的观察,当条件满足的时候(自己的前驱是头结点并且原子性的获取了状态),那么这个线程能够继续运行。

addWaiter()执行完成后,将执行acquireQueued()方法。

代码语言:javascript
复制
/*
     * Various flavors of acquire, varying in exclusive/shared and
     * control modes.  Each is mostly the same, but annoyingly
     * different.  Only a little bit of factoring is possible due to
     * interactions of exception mechanics (including ensuring that we
     * cancel if tryAcquire throws exception) and other control, at
     * least not without hurting performance too much.
     */

    /**
     * Acquires in exclusive uninterruptible mode for thread already in
     * queue. Used by condition wait methods as well as acquire.
     *
     * @param node the node
     * @param arg the acquire argument
     * @return {@code true} if interrupted while waiting
     */
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

上述逻辑主要包括:

1. 获取当前节点的前驱节点;

需要获取当前节点的前驱节点,而头结点所对应的含义是当前站有锁且正在运行。

2. 当前驱节点是头结点并且能够获取状态,代表该当前节点占有锁;

如果满足上述条件,那么代表能够占有锁,根据节点对锁占有的含义,设置头结点为当前节点。

3. 否则进入等待状态。

如果没有轮到当前节点运行,那么将当前线程从线程调度器上摘下,也就是进入等待状态。

这里针对acquire做一下总结:

1. 状态的维护;

需要在锁定时,需要维护一个状态(int类型),而对状态的操作是原子和非阻塞的,通过同步器提供的对状态访问的方法对状态进行操纵,并且利用compareAndSet来确保原子性的修改。

2. 状态的获取;

一旦成功的修改了状态,当前线程或者说节点,就被设置为头节点。

3. 队列的维护。

在获取资源未果的过程中条件不符合的情况下(不该自己,前驱节点不是头节点或者没有获取到资源)进入睡眠状态,停止线程调度器对当前节点线程的调度。

这时引入的一个释放的问题,也就是说使睡眠中的Node或者说线程获得通知的关键,就是前驱节点的通知,而这一个过程就是释放,释放会通知它的后继节点从睡眠中返回准备运行。

第4节 AQS的独占锁的解锁源码解析


独占锁解锁过程调用release()方法。

代码语言:javascript
复制
/**
     * Releases in exclusive mode.  Implemented by unblocking one or
     * more threads if {@link #tryRelease} returns true.
     * This method can be used to implement method {@link Lock#unlock}.
     *
     * @param arg the release argument.  This value is conveyed to
     *        {@link #tryRelease} but is otherwise uninterpreted and
     *        can represent anything you like.
     * @return the value returned from {@link #tryRelease}
     */
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

上述逻辑主要包括:

1. 尝试释放状态;

tryRelease能够保证原子化的将状态设置回去,如果释放状态成功过之后,将会进入后继节点的唤醒过程。

2. 唤醒当前节点的后继节点所包含的线程。

通过LockSupport的unpark方法将休眠中的线程唤醒,让其继续acquire状态。

代码语言:javascript
复制
/**
     * Wakes up node's successor, if one exists.
     *
     * @param node the node
     */
    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

上述逻辑主要包括,该方法取出了当前节点的next引用,然后对其线程(Node)进行了唤醒,这时就只有一个或合理个数的线程被唤醒,被唤醒的线程继续进行对资源的获取与争夺。

回顾整个资源的获取和释放过程:

在获取时,维护了一个sync队列,每个节点都是一个线程在进行自旋,而依据就是自己是否是首节点的后继并且能够获取资源;

在释放时,仅仅需要将资源还回去,然后通知一下后继节点并将其唤醒。

这里需要注意,队列的维护(首节点的更换)是依靠消费者(获取时)来完成的,也就是说在满足了自旋退出的条件时的一刻,这个节点就会被设置成为首节点。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-10-22,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 落叶飞翔的蜗牛 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档