专栏首页疾风先生基于AQS原理实现的锁

基于AQS原理实现的锁

点击上方疾风先生可以订阅哦

AQS基础概念与作用

  • AQS基础概念
    • AQS: 即抽象队列同步器,完整名称为AbstractQueuedSynchronizer
    • AQS之shared mode: 即共享锁/读锁,用于线程读取加锁,不能进行写操作,可以读读共享
    • AQS之exclusive mode: 即独占锁/排他锁/写锁,用于线程原子写操作时加锁,只能一个线程持有,其他线程处于等待状态
    • AQS不同mode的线程共享相同的等待队列wait queue,也就是在同一个阻塞队列中,线程持有的mode可能会不同
    • state属性: 作为AQS的同步状态信息属性,state具备线程安全特性(valatile & CAS分别保证可见性和原子性)
  • AQS主要作用
    • 提供一个基于FIFO等待队列的阻塞锁和相关同步器的模板框架
    • 对于阻塞锁和同步器的实现子类,必须定义一个非对外访问的helper class来继承AQS,利用AQS中受保护的方法来为阻塞锁和同步器对外暴露的方法提供服务
    • 继承AQS的同步器子类将通过模板框架提供的CAS操作state方式来保证原子性,以及volatile修饰保证可见性,这样能够实时知道当前对象获取锁或者释放锁所处的状态信息
    • 一般情况下,子类只会实现上述两种mode之一,但是对于ReadWriteLock具备上述两种mode,这也就是ReadWriteLock具备读写锁的特征
    • AQS内部定义一个实现Condition接口的实现内部类ConditionObject,主要作用是结合独占模式下的方法一同使用,也就是说在并发线程持有相同的独占锁情况下,独占资源下的方法可以结合Condition下的唤醒与挂起线程的方式完成线程之间的通信(独占资源方法有, 比如isHeldExclusively判断当前线程是否持有独占锁, release释放独占锁, acquire获取独占锁)

AQS核心组件

  • AQS类图明细
  • AQS的使用,也就是子类继承AQS之后,根据需要重写AQS中定义的protected方法,从上述类图可以看出:
    • tryAcquire: 获取独占锁
    • tryRelease: 释放独占锁
    • isHeldExclusively: 判断当前线程是否持有独占锁
    • tryAcquireShared: 获取共享锁
    • tryReleaseShared: 释放共享锁
    • 操作和获取属性state的getState()以及CAS修改state的方法
    • 操作线程独占方法get&set方法
    • 如果锁需要Condition,则可以直接使用ConditionObject来实现对应的分类锁逻辑实现线程通信(可选)
  • Node: 自定义双端链表,实现同步队列等待池,其包含的核心要素有
    • 指向前一个节点Node的prev
    • 指向后一个节点Node的next
    • 节点所处状态信息,即waiter status
    • 指向下一个处于condition的等待队列的节点nextWaiter
    • 保存当前执行的线程实例
// Node 部分代码,在AQS内部中定义
static final class Node{
    // 锁的mode
static final Node SHARED = new Node(); 	// 表示持有共享锁
static final Node EXCLUSIVE = null;		// 持有独占锁
	
// waiter status,即节点所处的阻塞状态列表如下
static final int CANCELLED =  1;		// 被取消,意味着放弃竞争锁资源,移出阻塞队列
static final int SIGNAL    = -1;		// 持有锁状态
static final int CONDITION = -2;		// 仅用于条件变量,线程处于条件等待队列中,也就是condition.await让线程挂起
static final int PROPAGATE = -3;		// 仅用于共享锁,表示已经释放锁并已唤醒下一个阻塞节点的线程状态
	
volatile int waitStatus;	// 当前节点的状态,初始化为0,不属于上述任何一种状态,属于可竞争获取锁的状态
	
// 实现双端链表
volatile Node prev;
volatile Node next;
	
// 当前节点的线程
volatile Thread thread;
	
// 标志当前节点是共享锁还是独占锁,用节点指针引用指向对应的mode
	Node nextWaiter;
	
// 独占锁:nextWaiter = null & thread = Thread.currentThread;
// 共享锁:nextWaiter = SHARED & thread = Thread.currentThread;
// 不具备上述条件属于正常对象,不持有锁状态
	 Node(Thread thread, Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
     }

     Node(Thread thread, int waitStatus) { // Used by Condition
         this.waitStatus = waitStatus;
         this.thread = thread;
     }
}
  • ConditionObject: 自定义接口Condition的实现,业务线程可以创建对应的锁条件来完成线程之间的通信.即线程唤醒与等待,其核心要素如下:
    • 自定义条件队列,即使拥有属性firstWaiter与lastWaiter
    • 实现接口await/signal/signalAll等方法
    • ConditionObject内部实现的核心方法
// ConditionObject部分代码,在AQS内部中定义
public class ConditionObject implements Condition,Serializable {
// 定义队列的首尾节点,内部自定义队列实现wait queue的操作
    private transient Node firstWaiter;
    private transient Node lastWaiter;
	
    private static final int REINTERRUPT =  1;		//  AQS内部调用Thread.interrupt()中断当前线程
    private static final int THROW_IE    = -1;		// 直接向当前线程抛出中断异常
	
// implements Condition methods ...
// 调用await,当前线程挂起
	await(){}	
// 调用signal,当前线程被唤醒
	signal(){}		
	signalAll(){}
// 挂起/唤醒超时方法 ...
	
    // 内部核心方法
    // 添加一个新的节点到当前条件阻塞队列中
    private Node addConditionWaiter() {
        // ...
    }
	
    // 对于已经cancelled的waiter移出队列
    private void unlinkCancelledWaiters() {
        // ...
    }

    // 唤醒持有相同的Condition的线程去争抢资源锁,获取到锁并通过CAS设置对应节点的waiter status为SIGNAL
    private void doSignalAll(Node first) {
    // ...
    }
}
  • AQS属性及其依赖
    • state,即同步状态,在对应的实现类表达含义不同,AQS中称为一个同步状态值信息
    • exclusiveOwnerThread,即当前线程持有独占锁,独占线程
    • Unsafe,即借助CAS技术来完成同步状态以及等待池队列节点的更新操作等
    • LockSupport工具类,借助LockSupport来完成线程的加锁与解锁操作
// AQS.java
public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
	
// 自定义实现双向FIFO的队列, 定义队列的head & tail
private transient volatile Node head;
private transient volatile Node tail;
private volatile int state;				// 当前AQS同步状态信息,具体实现类含义可能不一样
// CAS操作上述的state,head&tail, waitstuts
private static final Unsafe unsafe = Unsafe.getUnsafe();	
	
// 除了获取/释放锁之外,内部定义核心方法
private Node enq(final Node node) {}	// 	将Node插入队列
private Node addWaiter(Node mode) {}	//  创建想要持有mode的锁方式的Node并将其添加到阻塞队列中
	
// 内部实现的独占锁方式
final boolean acquireQueued(final Node node, int arg) {}	// 获取独占锁,失败则挂起
private void unparkSuccessor(Node node) {}	// 唤醒当前Node的下一个节点的线程
	
// 内部实现共享锁
private void doReleaseShared() {}			// 唤醒下一个节点的线程并通知其他节点的线程已经释放锁
private void doAcquireShared(int arg) {}	// 如果获取锁失败则挂起线程,成功则持有锁
}

AQS工作原理

  • AQS核心属性
    • state: AQS中表达为一个同步状态信息,具体实现类含义不一样,比如CountDownLatch的state表示计数器
    • exclusiveOwnerThread:独占线程对象
    • 双向队列等待池(Node head & Node tail): 基于内部类属性成员的链表head以及链表tail
  • AQS核心方法

独占锁资源接口

  • 模板方法: acquire/release
  • 具体实现方法: tryAcquire/tryRelease

共享锁资源接口

  • 模板方法: acquireShared/releaseShared
  • 具体实现方法: tryAcquireShared/tryReleaseShared

获取锁

  • acquire/acquireShared定义资源争抢锁逻辑,没有拿到则加入队列等待池中等待
  • tryAcquire/tryAcquireShared实际执行占用资源的操作,交由具体实现的AQS完成

释放锁

  • release/releaseShared定义释放资源的逻辑,释放后唤醒等待池下一个的节点线程
  • tryRelease/tryReleaseShared实际释放资源的操作,由具体实现的AQS完成

  • 共享锁加锁流程
  • 共享锁解锁流程
  • 独占锁加锁流程
  • 独占锁解锁流程
  • 根据上述流程总结如下
    • AQS加锁以及解锁的过程中是根据wait status来判断是否进行加锁和释放锁,wait status可理解为AQS中的“锁”,通过CAS更新wait status的状态来实现加锁和解锁,同时为了防止并发多线程再次修改,针对已经恢复正常的节点信息,通过PROPAGATE来控制,也就是拥有这个状态的wait status说明已经释放锁并唤醒其他线程争抢锁
    • AQS中通过链表的方式实现双向队列的操作,同时为了避免并发造成存储在链表中的节点顺序错乱,于是通过CAS的方式来设置链表的head和tail,保证修改的安全性
    • AQS中区分mode在于存储节点的nextWaiter的指向,在AQS中,共享锁和独占锁在加锁过程中,共享锁加锁后需要唤醒下一个mode为SHARED的阻塞节点线程,而独占锁不需要;在共享锁和独占锁解锁过程中,共享锁为了避免并发线程多次更新,于是通过PROPAGATE来控制,告知节点已经释放锁并通知其他线程
    • 可以看到,加锁和解锁可以通过走“捷径”的方式来完成,也就是对应的AQS实现子类的加锁和解锁处理逻辑
    • 加锁流程:当前节点先加入阻塞队列,CAS自旋获取锁,成功则删除当前节点(重置为head);解锁流程:就是从阻塞队列中获取下一个阻塞节点,并唤醒当前节点的线程

自定义AQS

  • 基于AQS原理核心要素有
    • 具备线程安全的双向阻塞队列
    • 具备线程安全的独占线程
    • 具备线程安全的状态state属性
  • 基于上述的组成部分自定义AQS伪代码实现
// DefineAQS.java
public abstract class DefineAQS {
final static class AQSNode{
        final static int SHARED = 9999;
        final static int EXCLUSIVE = -9999;

        private int mode;
        private volatile Thread thread;

        public AQSNode(int mode){
            this.thread = Thread.currentThread();
            this.mode = mode;
        }

        public Thread getThread() {
            return thread;
        }

        public int getMode() {
            return mode;
        }
    }

    private AtomicInteger state = null;
    private AtomicReference<Thread> exclusiveOwnerThread = new AtomicReference<>();
    private LinkedBlockingQueue<AQSNode> waiters = new LinkedBlockingQueue<>();


    public AtomicInteger getState() {
        return state;
    }

    public void setState(int state) {
        this.state = new AtomicInteger(state);
    }

    public void compareAndSetState(int expect, int update){
        this.state.compareAndSet(expect, update);
    }

    public void acquire(int arg){
        // 加入阻塞队列中
        AQSNode node = new AQSNode(AQSNode.EXCLUSIVE);
        waiters.offer(node);
        while (!tryAcquire(arg)){
            LockSupport.park(node.getThread());
        }
        // 当前线程已经获取到锁,移出阻塞队列,通知后续节点
        waiters.remove(node);
    }

    public void release(int arg){
        if (tryRelease(arg)) {
            while (true){
                AQSNode node = waiters.peek();
                if (node.getMode() == AQSNode.EXCLUSIVE){
                    LockSupport.unpark(node.getThread());
                    break;
                }
            }
        }
    }

    public void acquireShared(int arg){
        AQSNode node = new AQSNode(AQSNode.SHARED);
        waiters.offer(node);
        while (tryAcquireShared(arg) < 0){
            LockSupport.park(node.getThread());
        }
        waiters.remove(node);
    }

    public void releaseShared(int arg){
        if (tryReleaseShared(arg) > 0){
            while (true){
                AQSNode node = waiters.peek();
                if (node.getMode() == AQSNode.SHARED){
                    LockSupport.unpark(node.getThread());
                    break;
                }
            }
        }
    }
    // abstract method :

你好,我是疾风先生,先后从事外企和互联网大厂的java和python工作, 记录并分享个人技术栈,欢迎关注我的公众号,致力于做一个有深度,有广度,有故事的工程师,欢迎成长的路上有你陪伴,关注后回复greek可添加私人微信,欢迎技术互动和交流,谢谢!

本文分享自微信公众号 - 疾风先生(Gale2Writing),作者:疾风先生

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2020-02-06

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 【深入AQS原理】我画了35张图就是为了让你深入 AQS

    谈到并发,我们不得不说AQS(AbstractQueuedSynchronizer),所谓的AQS即是抽象的队列式的同步器,内部定义了很多锁相关的方法,我们熟知...

    一枝花算不算浪漫
  • JAVA-LOCK之底层实现原理(源码分析)

    首先和Synchronized(可以参考) 的不同之处,Lock完全用Java写成,在java这个层面是无关JVM实现的。其实现都依赖java.util.con...

    海涛
  • AbstractQueuedSynchronizer超详细原理解析

     今天我们来研究学习一下AbstractQueuedSynchronizer类的相关原理,java.util.concurrent包中很多类都依赖于这个类所提供...

    remcarpediem
  • AQS原理分析

    Lock 在 J.U.C 中是最核心的组件,前面学习 synchronized 的时候说过,锁最重要的特性就是解决并发安全问题。为什么要以 Lock 作为切入点...

    会说话的丶猫
  • 高并发编程-CyclicBarrier深入解析

    CyclicBarrier是一个同步辅助类,它允许一组线程互相等待,直到所有线程都到达某个公共屏障点(也可以叫同步点),即相互等待的线程都完成调用await方法...

    JavaQ
  • 公平锁/非公平锁/可重入锁/自旋锁

    在JAVA中我们知道有很多加锁的方式,比如常见的 通过synchronized关键字,还有Lock,还有之前说原子CAS操作时有看到过的死循环方式的自旋锁。

    YanL
  • Java中的锁原理、锁优化、CAS、AQS详解!

    Java编程语言允许线程访问共享变量, 为了确保共享变量能被准确和一致地更新,线程应该确保通过排他锁单独获得这个变量。Java语言提供了volatile,在某些...

    良月柒

扫码关注云+社区

领取腾讯云代金券