前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >JUC-ReentrantLock

JUC-ReentrantLock

作者头像
才疏学浅的木子
发布2023-10-17 08:33:01
1420
发布2023-10-17 08:33:01
举报
文章被收录于专栏:CSDN文章

AQS

学习ReentrantLock就不得不知道AQS,因为ReentrantLock就是基于了AQS对象的

特点
  • 用 state 属性来表示资源的状态(分独占模式和共享模式),子类需要定义如何维护这个状态,控制如何获取 锁和释放锁
  • getState - 获取 state 状态
  • setState - 设置 state 状态
  • compareAndSetState - cas 机制设置 state 状态
  • 独占模式是只有一个线程能够访问资源,而共享模式可以允许多个线程访问资源
  • 提供了基于 FIFO 的等待队列,类似于 Monitor 的 EntryList
  • 条件变量来实现等待、唤醒机制,支持多个条件变量,类似于 Monitor 的 WaitSet

ReentrantLock

相对于 synchronized 它具备如下特点

  • 可中断
  • 可以设置超时时间
  • 可以设置为公平锁
  • 支持多个条件变量

与 synchronized 一样,都支持可重入

可重入

可重入是指同一个线程如果首次获得了这把锁,那么因为它是这把锁的拥有者,因此有权利再次获取这把锁 如果是不可重入锁,那么第二次获得锁时,自己也会被锁挡住

条件变量

synchronized 中也有条件变量,就是我们讲原理时那个 waitSet 休息室,当条件不满足时进入 waitSet 等待 ReentrantLock 的条件变量比 synchronized 强大之处在于,它是支持多个条件变量的,这就好比

  • synchronized 是那些不满足条件的线程都在一间休息室等消息
  • 而 ReentrantLock 支持多间休息室,有专门等烟的休息室、专门等早餐的休息室、唤醒时也是按休息室来唤 醒

原理

非公平锁实现原理

加锁流程

先看构造器,默认为非公平锁

代码语言:javascript
复制
    public ReentrantLock() {
        sync = new NonfairSync();
    }

没有竞争时

在这里插入图片描述
在这里插入图片描述

第一个竞争出现时

在这里插入图片描述
在这里插入图片描述

Thread-1 执行了

  1. CAS 尝试将 state 由 0 改为 1,结果失败
  2. 进入 tryAcquire 逻辑,这时 state 已经是1,结果仍然失败
  3. 接下来进入 addWaiter 逻辑,构造 Node 队列

图中黄色三角表示该 Node 的 waitStatus 状态,其中 0 为默认正常状态 Node 的创建是懒惰的 其中第一个 Node 称为 Dummy(哑元)或哨兵,用来占位,并不关联线程

在这里插入图片描述
在这里插入图片描述

当前线程进入 acquireQueued 逻辑

  1. acquireQueued 会在一个死循环中不断尝试获得锁,失败后进入 park 阻塞
  2. 如果自己是紧邻着 head(排第二位),那么再次 tryAcquire 尝试获取锁,当然这时 state 仍为 1,失败
  3. 进入 shouldParkAfterFailedAcquire 逻辑,将前驱 node,即 head 的 waitStatus 改为 -1,这次返回 false
在这里插入图片描述
在这里插入图片描述
  1. shouldParkAfterFailedAcquire 执行完毕回到 acquireQueued ,再次 tryAcquire 尝试获取锁,当然这时 state 仍为 1,失败
  2. 当再次进入 shouldParkAfterFailedAcquire 时,这时因为其前驱 node 的 waitStatus 已经是 -1,这次返回 true
  3. 进入 parkAndCheckInterrupt, Thread-1 park(灰色表示)
在这里插入图片描述
在这里插入图片描述

再次有多个线程经历上述过程竞争失败,变成这个样子

在这里插入图片描述
在这里插入图片描述

Thread-0 释放锁,进入 tryRelease 流程,如果成功

  • 设置 exclusiveOwnerThread 为 null
  • state = 0
在这里插入图片描述
在这里插入图片描述

当前队列不为 null,并且 head 的 waitStatus = -1,进入 unparkSuccessor 流程 找到队列中离 head 最近的一个 Node(没取消的),unpark 恢复其运行,本例中即为 Thread-1 回到 Thread-1 的 acquireQueued 流程

在这里插入图片描述
在这里插入图片描述

如果加锁成功(没有竞争),会设置 exclusiveOwnerThread 为 Thread-1,state = 1 head 指向刚刚 Thread-1 所在的 Node,该 Node 清空 Thread 原本的 head 因为从链表断开,而可被垃圾回收

如果这时候有其它线程来竞争(非公平的体现),例如这时有 Thread-4 来了

在这里插入图片描述
在这里插入图片描述

如果不巧又被 Thread-4 占了先

  • Thread-4 被设置为 exclusiveOwnerThread,state = 1
  • Thread-1 再次进入 acquireQueued 流程,获取锁失败,重新进入 park 阻塞
加锁源码
代码语言:javascript
复制
 static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;
        // 加锁实现
        final void lock() {
        	// 首先用 cas 尝试(仅尝试一次)将 state 从 0 改为 1, 如果成功表示获得了独占锁
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
            	//如果失败
                acquire(1);
        }
代码语言:javascript
复制
    public final void acquire(int arg) {
        if (!tryAcquire(arg) && //进入tryAcquire
        	//当tryAcquire返回false时候,先调用addWaiter,再调用acquireQueued
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

tryAcquire中的逻辑

代码语言:javascript
复制
        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }

 		final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
			
			// 如果还没有获得锁
            if (c == 0) {
				//这里直接cas体现了非公平不去检查AQS
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            //如果已经获得锁并且还是自己那么发送了锁重入
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            //获取失败
            return false;
        }

addWaiter逻辑

代码语言:javascript
复制
    private Node addWaiter(Node mode) {
        //将当前线程关联到一个Node对象,模式为独占模式
        Node node = new Node(Thread.currentThread(), mode);
        
        Node pred = tail;
        //如果tail不为空cas尝试将Node对象加入AQS队列尾部
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        //队列为空
        enq(node);
        return node;
    }
	
	private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { 
            	//还没有设置head 为哨兵节点
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
            	//cas将Node加入AQS尾部
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

acquireQueued中的流程

代码语言:javascript
复制
	final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
            	//判断这个节点的上一个节点
                final Node p = node.predecessor();
				//上一个节点是head,去尝试一次
                if (p == head && tryAcquire(arg)) {
                    //获取成功设置自己为head
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    //返回中断标记
                    return interrupted;
                }
                //尝试失败 / 上一个节点不是head
                if (shouldParkAfterFailedAcquire(p, node) &&  // 判断是否需要等待
                    parkAndCheckInterrupt()) 
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }


	private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        //上一个节点的状态
        int ws = pred.waitStatus;
        //上一个节点在阻塞
        if (ws == Node.SIGNAL)
            return true;
        
        //表示取消状态
        if (ws > 0) {
            //重构前面所有取消的节点
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
	        // 需要设置上一个节点的值为SIGNAL
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }


	private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

注意:是否需要unpark是由当前节点的前驱节点的waitStatus == Node.SIGNAL来决定,而不是本节点的waitStatus

解锁源码
代码语言:javascript
复制
    public void unlock() {
        sync.release(1);
    }
    
	public final boolean release(int arg) {
		//尝试释放锁
        if (tryRelease(arg)) {
			//队头节点
            Node h = head;
           
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

tryRelease

代码语言:javascript
复制
        protected final boolean tryRelease(int releases) {
        	//state减去释放的
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
			//如果state为01则设置owner为null
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

unparkSuccessor

代码语言:javascript
复制
private void unparkSuccessor(Node node) {
		
		
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        Node s = node.next;
        // 不考虑已取消的节点
        if (s == null || s.waitStatus > 0) {
            s = null;
            //从队尾先前找需要unpark的节点
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
        	//启动线程
            LockSupport.unpark(s.thread);
    }
可重入原理
代码语言:javascript
复制
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            
			// 原理
			
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
可打断原理
代码语言:javascript
复制
    private void doAcquireInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
					// 直接抛出异常
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
公平锁实现原理
代码语言:javascript
复制
protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
            	//判断是否有前驱节点没有才去竞争
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }
条件变量实现原理

每个条件变量对应一个等待队列,其实先类是ConditionObject

await

开始 Thread-0 持有锁,调用 await,进入 ConditionObject 的 addConditionWaiter 流程 创建新的 Node 状态为 -2(Node.CONDITION),关联 Thread-0,加入等待队列尾部

在这里插入图片描述
在这里插入图片描述

接下来进入 AQS 的 fullyRelease 流程,释放同步器上的锁

在这里插入图片描述
在这里插入图片描述

unpark AQS 队列中的下一个节点,竞争锁,假设没有其他竞争线程,那么 Thread-1 竞争成功

在这里插入图片描述
在这里插入图片描述

park 阻塞 Thread-0

在这里插入图片描述
在这里插入图片描述
signal 流程

假设 Thread-1 要来唤醒 Thread-0

在这里插入图片描述
在这里插入图片描述

进入 ConditionObject 的 doSignal 流程,取得等待队列中第一个 Node,即 Thread-0 所在 Node

在这里插入图片描述
在这里插入图片描述

执行 transferForSignal 流程,将该 Node 加入 AQS 队列尾部,将 Thread-0 的 waitStatus 改为 0,Thread-3 的 waitStatus 改为 -1

在这里插入图片描述
在这里插入图片描述
addConditionWaiter
代码语言:javascript
复制
private Node addConditionWaiter() {
            Node t = lastWaiter;
            
            if (t != null && t.waitStatus != Node.CONDITION) {
            	//从队列中删除
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            //创建一个节点放入
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }
doSignal
代码语言:javascript
复制
		//将没取消的第一个节点转移到AQS队列
        private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&  //将队列中的Node转移到AQS队列,不成功就继续循环
                     (first = firstWaiter) != null);
        }
代码语言:javascript
复制
final boolean transferForSignal(Node node) {
		  
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;
        
        //加入AQS队列尾部
        Node p = enq(node);
        int ws = p.waitStatus;
		
		
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            
            LockSupport.unpark(node.thread);
            
        return true;
    }
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2023-05-04,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • AQS
    • 特点
    • ReentrantLock
      • 可重入
        • 条件变量
        • 原理
          • 非公平锁实现原理
            • 加锁源码
            • 解锁源码
          • 可重入原理
            • 可打断原理
              • 公平锁实现原理
                • 条件变量实现原理
                  • await
                  • signal 流程
                  • addConditionWaiter
                  • doSignal
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档