前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >ReentrantLock

ReentrantLock

原创
作者头像
hhss
修改2021-02-14 15:32:31
3920
修改2021-02-14 15:32:31
举报

https://zhuanlan.zhihu.com/p/249147493

ReentrantLock底层使用了CAS+AQS队列实现

一、CAS

1、CAS(Compare and Swap)

CAS是一种无锁算法。有3个操作数:内存值V、旧的预期值A、要修改的新值B。当且仅当预期值A和内存值V相同时,将内存值V修改为B,否则什么都不做。

代码语言:javascript
复制
do{
  备份旧数据;
  基于旧数据构造新数据;
}while(!CAS( 内存地址,备份的旧数据,新数据 ))

该操作是一个原子操作,被广泛的应用在Java的底层实现中。

在Java中,CAS主要是由sun.misc.Unsafe这个类通过JNI调用CPU底层指令实现。

2、CAS的开销

CAS速度非常快:

  1. CAS是CPU指令级的操作,只有一步原子操作;
  2. CAS避免了请求操作系统来裁定锁的问题,不用麻烦操作系统,直接在CPU内部就搞定了

CAS仍然可能有消耗:可能出现cache miss的情况,会有更大的cpu时间消耗。

二、AQS

AQS是一个用于构建锁和同步容器的框架。

AQS使用一个FIFO的队列(也叫CLH队列,是CLH锁的一种变形),表示排队等待锁的线程。队列头节点称作“哨兵节点”或者“哑节点”,它不与任何线程关联。其他的节点与等待线程关联,每个节点维护一个等待状态waitStatus。结构如下图所示:

三、ReentrantLock的流程

首先ReentrantLock默认是非公平锁,也可以指定为公平锁

ReentrantLock的2个构造函数

代码语言:javascript
复制
public ReentrantLock() {
    sync = new NonfairSync(); //默认,非公平
}
 
public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync(); //根据参数创建
}

①ReentrantLock先通过CAS尝试获取锁,

②如果此时锁已经被占用,该线程加入AQS队列并wait();

③当前驱线程的锁被释放,挂在CLH队列为首的线程就会被notify(),然后继续CAS尝试获取锁,

此时:

非公平锁,如果有其他线程尝试lock(),有可能被其他刚好申请锁的线程抢占

公平锁,只有在CLH队列头的线程才可以获取锁,新来的线程只能插入到队尾。

四、 lock() 和 unlock() 的实现

4.1 lock()函数

ReentrantLock中的公平锁和非公平锁的实现,其中tryAcquire是公平锁和非公平锁实现的区别,下面的两种类型的锁的tryAcquire的实现,从中我们可以看出在公平锁中,每一次的tryAcquire都会检查CLH队列中是否仍有前驱的元素,如果仍然有那么继续等待,通过这种方式来保证先来先服务的原则;而非公平锁,首先是检查并设置锁的状态,这种方式会出现即使队列中有等待的线程,但是新的线程仍然会与排队线程中的对头线程竞争(但是排队的线程是先来先服务的),所以新的线程可能会抢占已经在排队的线程的锁,这样就无法保证先来先服务,但是已经等待的线程们是仍然保证先来先服务的,所以总结一下公平锁和非公平锁的区别:

1、公平锁能保证:老的线程排队使用锁,新线程仍然排队使用锁。 2、非公平锁保证:老的线程排队使用锁;但是无法保证新线程抢占已经在排队的线程的锁

4.1.1 非公平锁:

直接调用CAS。如果成功通过CAS修改了state,指定当前线程为该锁的独占线程,标志自己成功获取锁。

如果CAS失败的话,调用acquire();

代码语言:javascript
复制
    final void lock() { //非公平锁
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }

    final void lock() { //公平锁
            acquire(1);
    }

4.1.2 公平锁:

调用acquire()函数:

首先,调用tryAcquire(),会尝试再次通过CAS修改state为1。

如果失败而且发现锁是被当前线程占用的,就执行重入(state++);

如果锁是被其他线程占有,那么当前线程执行tryAcquire返回失败,并且执行addWaiter()进入等待队列,并挂起自己interrupt()。

代码语言:javascript
复制
public final void acquire(int arg) {
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

4.1.3 两者区别

这里需要注意,公平锁和非公平锁的区别所在:调用lock()函数的时候,

非公平直接采用cas去获取锁,如果失败再去调用acquire(1),acquire(1)再调用tryAcquire();

公平锁则会先调用acquire(1),acquire(1)再调用tryAcquire()。

公平锁的tryAcquire

代码语言:javascript
复制
 /**
         * Fair version of tryAcquire.  Don't grant access unless
         * recursive call or no waiters or is first.
         */
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                // !hasQueuedPredecessors()保证了不论是新的线程还是已经排队的线程都顺序使用锁
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

非公平锁的

代码语言:javascript
复制
/**
         * Performs non-fair tryLock.  tryAcquire is implemented in
         * subclasses, but both need nonfair try for trylock method.
         */
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                // 新的线程可能抢占已经排队的线程的锁的使用权
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

4.2 tryAcquire()

检查state字段?(state字段看做是每一个ReentrantLock的状态,若为0则不自由(被线程持有);否则自由)

若为0,表示锁未被占用。然后通过 !hasQueuedPredecessors()先判断自己是不是队头的节点(或者说判断自己是否需要排队)?如果不需要排队,再调用compareAndSetState(0, acquires)来通过cas去获取锁;

若不为0,检查当前锁是否被自己占用,若被自己占用,则更新state字段,表示重入锁的次数

如果以上两点都没有成功,则获取锁失败,返回false。

代码语言:javascript
复制
 protected final boolean tryAcquire(int acquires) { //注意:这是公平的tryAcquire()
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            //需要先判断自己是不是队头的节点
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
}

final boolean tryAcquire(int acquires) { //注意:这是非公平的tryAcquire()
    //获取当前线程
    final Thread current = Thread.currentThread();
    //获取state变量值
    int c = getState();
    if (c == 0) { //没有线程占用锁
        //没有 !hasQueuedPredecessors() 判断,不考虑自己是不是在队头,直接申请锁
        if (compareAndSetState(0, acquires)) {
            //占用锁成功,设置独占线程为当前线程
            setExclusiveOwnerThread(current);
            return true;
        }
    } else if (current == getExclusiveOwnerThread()) { //当前线程已经占用该锁
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        // 更新state值为新的重入次数
        setState(nextc);
        return true;
    }
    //获取锁失败
    return false;

这里需要注意,公平锁和非公平锁的区别:

虽然他们的acquire(1)都调用tryAcquire(),但是公平锁的tryAcquire()与非公平锁的tryAcquire()不同,非公平锁没有 !hasQueuedPredecessors() 判断,不考虑自己是不是在队头,直接申请锁。

4.3 addWaiter()

当前线程加入AQS双向链表队列。

写入之前需要将当前线程包装为一个 Node 对象(addWaiter(Node.EXCLUSIVE))。

首先判断队列是否为空,不为空时则将封装好的 Node 利用 CAS 写入队尾;

如果队列为空(尾结点为null)或者出现并发写入失败的话,就需要调用 enq(node) 来写入了。

代码语言:javascript
复制
/**
 * 将新节点和当前线程关联并且入队列
 * @param mode 独占/共享
 * @return 新节点
 */
private Node addWaiter(Node mode) {
    //初始化节点,设置关联线程和模式(独占 or 共享)
    Node node = new Node(Thread.currentThread(), mode);
    // 获取尾节点引用
    Node pred = tail;
    // 尾节点不为空,说明队列已经初始化过
    if (pred != null) {
        node.prev = pred;
        // 设置新节点为尾节点
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 尾节点为空,说明队列还未初始化,需要初始化head节点并入队新节点
    enq(node);
    return node;
}

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

enq()的处理逻辑就相当于自旋加上CAS保证一定能写入队列。

4.4 acquireQueued()

写入队列后,需要挂起当前线程

代码语言:javascript
复制
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

首先会根据 node.predecessor() 获取到上一个节点是否为头节点,如果是则尝试获取一次锁,获取成功就万事大吉了。

如果不是头节点,或者获取锁失败,则会根据上一个节点的 waitStatus 状态来处理(shouldParkAfterFailedAcquire(p, node))。

waitStatus 用于记录当前节点的状态,如节点取消、节点等待等。

shouldParkAfterFailedAcquire(p, node) 返回当前线程是否需要挂起,如果需要则调用 parkAndCheckInterrupt():

代码语言:javascript
复制
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

他是利用 LockSupportpart 方法来挂起当前线程的,直到被唤醒。

4.5 unlock()

释放时候,state--,通过state==0判断锁是否完全被释放。

成功释放锁的话,唤起一个被挂起的线程

代码语言:javascript
复制
public void unlock() {
    sync.release(1);
}
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
        	   //唤醒被挂起的线程
            unparkSuccessor(h);
        return true;
    }
    return false;
}
//尝试释放锁
protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

五、总结

  1. 每一个ReentrantLock自身维护一个AQS队列记录申请锁的线程信息;
  2. 通过大量CAS保证多个线程竞争锁的时候的并发安全;
  3. 可重入的功能是通过维护state变量来记录重入次数实现的。
  4. 公平锁需要维护队列,通过AQS队列的先后顺序获取锁,缺点是会造成大量线程上下文切换;
  5. 非公平锁可以直接抢占,所以效率更高;

六、AQS(AbstractQueuedSynchronizer)源码

参考:https://blog.csdn.net/java_lyvee/article/details/98966684

队列存储内容:Node节点

代码语言:javascript
复制
static final class Node {
        /** Marker to indicate a node is waiting in shared mode */
        static final Node SHARED = new Node();
        /** Marker to indicate a node is waiting in exclusive mode */
        static final Node EXCLUSIVE = null;

        /** waitStatus value to indicate thread has cancelled */
        static final int CANCELLED =  1;
        /** waitStatus value to indicate successor's thread needs unparking */
        static final int SIGNAL    = -1;
        /** waitStatus value to indicate thread is waiting on condition */
        static final int CONDITION = -2;
        /**
         * waitStatus value to indicate the next acquireShared should
         * unconditionally propagate
         */
        static final int PROPAGATE = -3;

        
        volatile int waitStatus;

        
        volatile Node prev;

        
        volatile Node next;

       
        volatile Thread thread;

       
        Node nextWaiter;
}

交替执行:单个线程在获取锁--其实和队列无关,因为通过jdk级别(cas)就可以解决同步问题

竞争执行:多个线程再竞争同一把锁--需要用到aqs队列。

说到AQS联想到:{

自旋

park与unpark

cas

}.

t1占用锁还未释放后,t2的竞争流程:

注意点:AQS的对头head只想的节点Node,其属性Thread一定为空。

调用addWaiter方法

维护链表时,初始化时先初始化一个Thread属性为空的Node,设置为AQS的头和尾部;

然后通过自旋+CAS将t2插入队尾,修改tail指向为t2。

然后调用acquireQueued方法。

首先判断自己的上一个节点是不是头部节点,如果是再调用tryAcquire去尝试获取锁。

如果获取成功就返回并设置当前线程为持有锁的线程。

如果失败,就调用shouldParkAfterFailedAcquire(p, node)方法,前一个节点的waitStatus设为-1,并且返回false;

然后循环调用,再次自旋获取一个锁,然后继续调用shouldParkAfterFailedAcquire(p, node)方法,如果前一个节点为-1则返回true。(这里需要注意的是,在入队过程中还是会自旋两次去拿锁,分别是addWaiter和acquireQueued中的自旋获取锁

然后调用parkAndCheckInterrupt方法。

利用 LockSupportpart 方法来挂起当前线程的,直到被唤醒。

扩充:reentrantlock.lockInterruptbly()

这个锁方法,比原来的lock方法多了一个用处,就是当t1拿到锁微释放时,t2在队列等待挂起,在主线程中可以使用t2.interrupt()方法去打断t2,不让其一直挂起,做出响应。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、CAS
    • 1、CAS(Compare and Swap)
      • 2、CAS的开销
      • 二、AQS
      • 三、ReentrantLock的流程
      • 四、 lock() 和 unlock() 的实现
        • 4.1 lock()函数
          • 4.1.1 非公平锁:
          • 4.1.2 公平锁:
          • 4.1.3 两者区别
        • 4.2 tryAcquire()
          • 4.3 addWaiter()
            • 4.4 acquireQueued()
              • 4.5 unlock()
              • 五、总结
              • 六、AQS(AbstractQueuedSynchronizer)源码
              相关产品与服务
              容器服务
              腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档