前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >并发编程(四)-AQS图解源码解析

并发编程(四)-AQS图解源码解析

作者头像
小土豆Yuki
发布2021-10-14 16:53:38
2500
发布2021-10-14 16:53:38
举报
文章被收录于专栏:洁癖是一只狗

AQS能干什么

抢占资源的线程直接执行处理业务,但是没有抢到的资源的进入就如排队等待机制,抢占失败的资源继续等待,但是等待线程仍然能保持获取锁的可能.

其中排序等待队列,如果共享资源占用,就需要一定的阻塞等待唤醒机制来保证锁分配,将暂时获取不到锁的线程加入到队列中,这个队列就是AQS的抽象表现,他将请求线程封装成一个Node节点,通过CAS,自旋,LockSupport.park的方式维护state变量的状态.

源码解析AQS

我们拿ReentratLock当做入口,ReentratLock有分为公平锁和非公平锁,默认是非公平锁,看看其中其中lock底层的源码

其中公平锁和非公平锁获取锁的唯一区别就是红色部分,公平锁多了一个限制条件,hasQueuedPredecessots(),这个是公平锁加锁时候判断等待队列是否存放有效的节点方法

代码语言:javascript
复制
public final boolean hasQueuedPredecessors() {
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }

我们按照非公平锁进行讲解AQS流程

初始阶段,队列为空,state=0表示没有人占用资源

线程A处理业务,此时state更新为1,表示持有锁

代码语言:javascript
复制
final void lock() {
   //线程A进入这个分支,设置state=1,
    if (compareAndSetState(0, 1))
        //然后设置处理的线程为线程A
         setExclusiveOwnerThread(Thread.currentThread());
   else
        acquire(1); 
}

线程B开始执行,发现此时state=1,所以就会进入acquire,发现有人在持有锁,且不是自己

代码语言:javascript
复制
 //线程B进入acquire
 public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
       selfInterrupt();
 }

//线程B尝试获取锁
protected final boolean tryAcquire(int acquires) {
     return nonfairTryAcquire(acquires);
}
//线程B尝试获取锁具体逻辑
final boolean nonfairTryAcquire(int acquires) {
    //此时的线程是线程B
    final Thread current = Thread.currentThread();
    //此时的state=1
    int c = getState();
   // 不满足不进入
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
   //判断处理业务的线程是否是线程B
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
//线程B最后返回false
    return false;
}

线程B在进入addWaiter方法,先建立默认的哨兵节点,头节点和尾节点都指向哨兵节点

代码语言:javascript
复制
//线程B进入这里
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
   //此时的pred节点为null,不符合
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
//线程B进入这里
    enq(node);
    return node;
}

private Node enq(final Node node) {
//自旋循环处理
    for (;;) {
        Node t = tail;
       //节点是空,进入这里
        if (t == null) { // Must initialize
            //这里建立哨兵节点,设置为头节点
             if (compareAndSetHead(new Node()))
               //尾节点也指向指向头节点也就是哨兵节点 
               tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

然后在第二次循环处理,线程B节点前节点指哨兵节点,队列的尾节点指向线程B节点,哨兵节点下一个节点为线程B节点

代码语言:javascript
复制
private Node enq(final Node node) {
//自旋循环处理
    for (;;) {
        Node t = tail;
       //节点是空,进入这里
        if (t == null) { // Must initialize
            //这里建立哨兵节点,设置为头节点
             if (compareAndSetHead(new Node()))
               //尾节点也指向指向头节点也就是哨兵节点 
               tail = head;
        } else {//第二次循环进入这里
            //线程B节点指向哨兵节点
            node.prev = t;
           //队列的尾节点指向线程B节点
            if (compareAndSetTail(t, node)) {
               //哨兵节点下一个节点指向线程B节点
                t.next = node;
                return t;
            }
        }
    }
}

此时线程C,和线程B一样不能抢到资源,进入addWaiter,线程C的前节点指向线程B,然后队列尾节点指向线程C节点,线程B节点下一个节点指向线程C节点

代码语言:javascript
复制
//线程C进入这里
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
   //此时这个tail节点就是上一个线程B节点
    Node pred = tail;
   //此时pred不为null,即线程B节点
    if (pred != null) {
        //线程C节点的前一个节点指向pred即线程B节点
        node.prev = pred;
       //队列尾节点指向线程C
        if (compareAndSetTail(pred, node)) {
            //线程B节点下一个节点指向线程C
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

当线程B和线程C进入队列中后,然后进入acquireQueued,比如此时是线程B进入这里,这里第一次循环处理的时候,只会把前驱节点waitStatus设置成-1

代码语言:javascript
复制
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
       //循环处理线程B
        for (;;) {
            //这里获取线程B的上一个节点即哨兵节点
            final Node p = node.predecessor();
            //哨兵节点即头结点,此时线程B还想获取锁资源,但是此时资源还是被线程A占有
           //所以此时不符合
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
         // 第一次循环进入这里 
         //获取锁失败之后的线程进入这里
         //线程B进入这里判断哨兵节点的waitstatus,第一次进入更新为Node.Signal=-1
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

然后进入第二次处理,此时重点看shouldParkAfterFailedAcquire,和parkAndCheckInterrupt,最终线程都会阻塞,记住这里后面我们还会回来

代码语言:javascript
复制
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
       //循环处理线程B
        for (;;) {
            //这里获取线程B的上一个节点即哨兵节点
            final Node p = node.predecessor();
            //哨兵节点即头结点,此时线程B还想获取锁资源,但是此时资源还是被线程A占有
           //所以此时不符合
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
         // 第二次循环进入这里 
         //获取锁失败之后的线程进入这里
         //线程B进入这里判断哨兵节点的waitstatus,然后返回true
            if (shouldParkAfterFailedAcquire(p, node) &&
               //然后第二次线程B进入这里
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
//
private final boolean parkAndCheckInterrupt() {
        线程B进入阻塞,后面的其他的线程也都会进入这里进行阻塞
        LockSupport.park(this);
        return Thread.interrupted();
}

此时线程A已经执行完成了,开始释放锁了,调用release,开始释放资源,设置state=0,持有线程为null

代码语言:javascript
复制
//线程A释放锁
public final boolean release(int arg) {
    //尝试释放锁,这里释放成功,返回true
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

protected final boolean tryRelease(int releases) {
   //计算int c=0-1=0
    int c = getState() - releases;
   //此时当前线程是A,持有资源也是线程A 
   if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
 //此时c=0.符合进入
    if (c == 0) {
        free = true;
        //设置资源为的线程为null
        setExclusiveOwnerThread(null);
    }
  //设置资源的状态为0
    setState(c);
    return free;
}

release后面继续执行,然后唤醒正在阻塞的线程B,恢复哨兵节点的状态为0

代码语言:javascript
复制
//线程A释放锁
public final boolean release(int arg) {
    //尝试释放锁,这里释放成功,返回true
    if (tryRelease(arg)) {
        //这里返回哨兵节点
        Node h = head;
       //哨兵节点不为null,且waitStatus=-1,符合进入
        if (h != null && h.waitStatus != 0)
          //唤醒线程B,同时设置哨兵waitStatus=0
            unparkSuccessor(h);
        return true;
    }
    return false;
}

private void unparkSuccessor(Node node) {
   //哨兵节点是waitStatus=-1
    int ws = node.waitStatus;
    if (ws < 0)
       //进入这里 哨兵节点又会变成waitStatus=0
        compareAndSetWaitStatus(node, ws, 0);

   // 这里获取到下一个节点就是线程B
    Node s = node.next;
  // 此时线程B的状态是-1不符合跳过
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
  // 进入这里,唤醒线程B,
    if (s != null)
        LockSupport.unpark(s.thread);
}

然后继续看看我们前面遗留的问题,就是此时还在阻塞的线程B和C,我们看看线程B,继续执行之前的循环逻辑,让线程B出队列持有资源,队列中的线程C也是同理最终获取到锁

代码语言:javascript
复制
final boolean acquireQueued(final Node node, int arg) 
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {//线程B唤醒,继续循环
           //线程B获取哨兵节点
            final Node p = node.predecessor();
           //p就是头节点,线程B再次尝试获取锁,因为此时资源是无人占用,所以此时线程B获取到了锁
            if (p == head && tryAcquire(arg)) {
               // 头结点指向线程B节点,同时是指thread=null,prev=null
                setHead(node);
             //释放之前的哨兵节点,设置为null
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) 
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-10-08,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 洁癖是一只狗 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档