前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >硬核的AQS

硬核的AQS

作者头像
shysh95
发布2021-04-07 11:33:02
2850
发布2021-04-07 11:33:02
举报
文章被收录于专栏:shysh95

Hi~朋友,不点蓝字关注,我们哪来故事?

摘要

  1. 如何保证线程安全
  2. JDK并发包中的同步控制
  3. JDK并发包中锁的特点
  4. 为什么要讲AQS
  5. AQS的核心数据结构
  6. AQS排他锁如何申请
  7. AQS排他锁如何释放

1. 如何保证线程安全

Java多线程在对共享资源进行访问时,如果不加以控制会存在线程安全问题,当我们使用多线程对共享资源访问时,通常会线程共享资源的进行访问线程数的控制:

  • 共享锁:我们会对共享资源分派许可,只有拿到许可的线程才可以访问到共享资源,否则就需要等待有许可可用(当然这里也可以放弃等待)。拿到许可的线程在离开共享区间(不再访问共享资源)时必须要归还许可,这样处于等待的线程才有可能拿到许可执行。(信号量采用的就是这种机制)
  • 排他锁:只有一个线程可以访问到共享资源,其他线程在未拿到排他锁时只能在共享区间外等待。当持有锁的线程释放锁以后,其他等待的线程才有可能获取到锁执行

2. JDK并发包中的同步控制

Java提供了大量的工具让我们对共享资源进行多线程访问控制,其中很多人最熟悉的是synchronized关键字,除了该关键字,JDK的并发包中也提供了大量的类来进行同步控制,主要有:

  • ReentrantLock
  • Semaphore
  • ReadWriteLock
  • CountDownLatch
  • CyclicBarrier
  • LockSupport
  • Condition

3. JDK包中的锁的特点

3.1 ReentrantLock

ReentrantLock的作用和synchronized的作用基本一致,但是ReentrantLock应该需要手动lock(如果锁已被占用则需要等待)和unlock,并且支持线程中断,支持公平锁(按照请求锁的顺序获得锁),尝试获得锁的时候可以指定最大等待时间(如果不指定时间则不进行等待,拿到锁返回true,否则返回false)。

ReentrantLock搭配Condition的await()以及signal()相当于synchronized搭配Object.wait()以及notify()。

3.2 Semaphore

Semaphore(信号量)允许多个线程同时访问一个共享资源,在构造信号量时必须指定允许的最大线程数,在使用信号量时,我们会尝试获取一个许可,如果获取失败,则需要等待,直到有线程释放许可或者当前线程被中断。

3.3 ReadWriteLock

ReadWriteLock(读写锁)是读写分离的锁,读与读之间不需要阻塞互斥,但读写、写写之间需要阻塞互斥。ReadWriteLock中可以获取一个读锁和一个写锁,通过读写锁可以提高读多写少的程序的并发运行效率。

3.4 CountDownLatch

CountDownLatch是一个倒计数器,在构造CountDownLatch时我们需要传一个数字,通过调用CountDownLatch的await方法,我们可以阻塞当前线程,直至倒计数器减到0,通过CountDownLatch的countdown()方法,我们可以将倒计数器的值减1。

CountDownLatch的作用就好比乘坐飞机,我们必须等所有的旅客检票完成以后,然后飞机才能起飞。

3.4 CyclicBarrier

CyclicBarrier(循环栅栏),循环栅栏的作用类似CountDownLatch,但是他会在将计数器减到0以后重新恢复原始计数器的值,循环的概念也就是这么来的,而且循环栅栏支持在计数器减到0以后触发一个动作(Runnable接口的实现类)。

3.5 LockSupport

LockSupport是一个线程阻塞工具,机制类似于信号量,通过park()可以消费一个许可,如果当前没有许可,线程会被阻塞,unpark()释放一个许可。LockSupport中的许可最多只有一个。

LockSupport不会造成死锁,假设我们的线程先执行了unpark,然后再调用park也不会阻塞(因为unpark已经释放了一个许可可被获得使用)。

Thread.resume()假设先于Thread.suspend()方法先执行,则会导致线程死锁。

4. 为什么要讲AQS

通过上图我们可以看出,前文我们所提到的锁在其内部都有一个Sync类(采用了组合的方式),而这个Sync类都继承自AbstractQueuedSynchronizer类。

4.1 AQS核心数据结构

在AQS中有两个内部类:

  • ConditionObject:保存着条件变量等待队列(由Condition.await()引起的等待),最终实现也是Node
  • Node:同步等待队列的具体实现类(保存在等待在这个锁上的线程)

4.2 Node

Node的数据结构如下:

  • volatile Node prev:等待链表中的上一个元素
  • volatile Node next:等待链表中的下一个元素
  • volatile Thread thread:当前线程对象
  • Node nextWaiter:下一个等待在条件变量队列中的节点

除了上述4个属性外,还有一个重要的属性就是:

  • volatile int waitStatus

该属性表示的是节点在队列中的状态,括号中的为状态的int值:

  • 初始状态(0)
  • CANCELLED(1):线程取消了等待,如果在取得锁的过程中发生了一些异常,则可能出现取消的情况,比如等待过程中出现了中断异常或者timeout
  • SIGNAL(-1):表示节点需要被唤醒
  • CONDITION(-2):表示线程等待在条件变量中
  • PROPAGATE(-3)

4.3 ConditionObject

由于我们这篇文章我们只专注分析普通的锁,不涉及条件变量等待,所以读者朋友们可自行阅读源码。

5. 排他锁申请

下面这张图是ReentrantLock使用排他锁的流程图:

下面我们分析一下排他锁的使用过程,我们可以以ReentrantLock为切入点,分析一下AQS的实现:

代码语言:javascript
复制
public void lock() {
    sync.lock();
}

我们在调用ReentrantLock的lock方法时,将会调用Sync的lock方法(),这里的Sync是一个抽象类,我们这里主要看非公平锁的实现方式,也就是NonfairSync的lock方法,如下:

代码语言:javascript
复制
final void lock() {
    // 1
    if (compareAndSetState(0, 1))
    // 2
        setExclusiveOwnerThread(Thread.currentThread());
    else
    // 3
        acquire(1);
}
  1. 通过CAS尝试性获得锁(改变AQS中的state属性值为1),如果尝试获得锁成功,执行第2步骤,否则执行第三步骤
  2. CAS获取锁成功以后,设置持有锁的线程为当前线程
  3. 如果没有获取到锁,则调用acquire尝试性获得许可,由于这里是排他锁,有且只有一个线程可以持有该锁

5.1 acquire

acquire方法是AQS中的一个方法,如下:

代码语言:javascript
复制
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

首先调用tryAcquire再次尝试获取锁,该方法在AQS中的实现是抛出UnsupportedOperationException异常,也就是说子类必须要实现这个方法,下面我们看一下tryAcquire在ReentrantLockSync中的nonfairTryAcquire(int acquires)方法如下:

代码语言:javascript
复制
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

这里的主要逻辑就是先获取一下锁的状态,如果锁的状态是未占有(state值为0),则再次使用CAS尝试去占有锁,如果尝试成功,则返回true。

通过CAS尝试获取锁失败以后,判断锁是否被当前线程持有,如果是,调用setState(nextc)将AQS中的state的值+1,然后返回true(这里解决了重入的问题)。

上述两种情况如果都不成立,则返回false,表示当前线程获取锁失败。

5.2 acquireQueued方法

代码语言:javascript
复制
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

当我们尝试获取锁失败以后,会调用acquireQueued该线程添加至等待队列,那么这个等待队列中的Node是怎么构建出来的呢?答案就是addWaiter方法。

代码语言:javascript
复制
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

addWaiter方法中首先构建了一个Node节点,Node节点中的线程为当前线程,然后尝试将该节点添加到AQS的等待队列中,如果原来等待队列的尾节点不为NULL的会采用CAS快速失败的方法进行添加,如果成功添加,则返回这个新节点,否则调用enq添加新节点到等待队列中,方法如下:

代码语言:javascript
复制
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            // 如果队列中还没有节点,CAS初始化一个空的头结点,并把尾节点设置为头结点
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            // CAS设置当前节点node为尾节点,并把原来的尾节点的next指向当前节点node
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

如果队列中暂时还没有节点的话,就初始化一个空的头节点,并将尾节点设置为头节点,然后将当前节点node的前一个节点改成tail节点,并通过CAS直至成功将node改为tail节点,并且原先的tail节点的next修改为当前节点note,然后返回当前节点node的前置节点(也就是原来的tail节点)。

当节点添加成功以后,我们会主动调用acquireQueued尝试获取锁,方法如下:

代码语言:javascript
复制
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        // 开启自旋
        for (;;) {
            // 获取当前节点node的前继节点
            final Node p = node.predecessor();
            // 如果前继节点是头结点(虚节点),那么意味着当前节点是第一个数据节点,那么久尝试获取锁,如果获取锁成功,将当前节点设置为头结点,setHead方法会将节点中的线程和prev节点都置为空
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 如果node的前继节点不是头结点或者获取锁失败,那么需要判断当前节点是否需要挂起
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        // 获取如果发生异常,则取消获取锁申请
        if (failed)
            cancelAcquire(node);
    }
}

如果判断当前节点线程是否需要挂起,主要依靠shouldParkAfterFailedAcquire来判断,如下:

代码语言:javascript
复制
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        return true;
    if (ws > 0) {
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

通过上述源码,我们可以看到,只有当前节点的前继节点的状态是SINGAL(待唤醒状态)时,当前节点线程可以被挂起(这里主要是为了防止无限循环资源浪费)。上述代码还处理前置节点被取消了特殊情况,将当前节点的前置节点修改为队列中最后一个waitSatus不是取消状态的节点。

将当前节点的线程挂起方法如下,实现采用的LockSupport。

代码语言:javascript
复制
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

取消获取所申请的方法是cancelAcquire,如下:

代码语言:javascript
复制
private void cancelAcquire(Node node) {
    // 过滤无效节点
    if (node == null)
        return;

    node.thread = null;

    // 跳过所有处于取消的节点
    Node pred = node.prev;
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;

    // 找到最后一个有效节点的后继节点
    Node predNext = pred.next;

    // 将当前节点的状态设置为取消
    node.waitStatus = Node.CANCELLED;

    // 如果当前节点是尾节点,设置尾节点为最后一个有效节点
    if (node == tail && compareAndSetTail(node, pred)) {
        // 将最后一个有效节点的next设置为null
        compareAndSetNext(pred, predNext, null);
    } else {
        // 如果最后一个有效节点不是头结点并且(最后一个有效节点是SINGAL状态或者可以被设置为SGINAL状态),并且最后一个有效节点的线程不为null
        int ws;
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
            Node next = node.next;
            // 如果当前节点后的后置节点是有效节点(存在且状态不为取消状态),设置最后一个有效节点的next为当前节点的后继节点
            if (next != null && next.waitStatus <= 0)
                compareAndSetNext(pred, predNext, next);
        } else {
            // 上述条件如果均不成立,唤醒当前节点的后继节点
            unparkSuccessor(node);
        }
        node.next = node; // help GC
    }
}

6. 排他锁释放

代码语言:javascript
复制
public void unlock() {
    sync.release(1);
}

ReentrantLock通过unlock方法来解锁,该方法会调用AQS中的release方法,如下:

代码语言:javascript
复制
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

tryRelease是由具体的AQS的子类来实现,下面看一下ReentrantLock中的Sync中的实现,如下:

代码语言:javascript
复制
protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    // 只有持有锁的线程才可以执行unlock操作
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        // 解锁成功,设置独占线程为空
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

下面再回到AQS的release方法,如下:

代码语言:javascript
复制
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

如果解锁成功,并且当前节点状态不是初始状态,然后就调用unparkSuccessor方法唤醒head后继节点的线程,方法如下:

代码语言:javascript
复制
private void unparkSuccessor(Node node) {

    // 获取当前节点状态
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    // 获取节点的后继节点
    Node s = node.next;
    // 如果节点的后继节点为null或者已经被取消,则从尾节点找到一个未取消的节点进行唤醒
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        // 唤醒线程
        LockSupport.unpark(s.thread);
}

解锁的过程还是相对比较简单,当一个线程在释放ReentrantLock的时候,需要从等待队列中唤醒其他需要唤醒的节点。

本期的Java AQS介绍到这,我是shysh95,关注+赞你就是最靓的仔,我们下期再见!

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-04-03,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 程序员修炼笔记 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档