专栏首页后端技术学习ReentranLock源码学习

ReentranLock源码学习

首先回答一个问题?线程的三大特性?什么时候我们需要锁?java中已经提供了synchronized,为什么还要使用ReentrantLock?AQS原理。

线程的三大特性:原子性、可见性、有序性。也就是说满足这个三个特性的操作都是可以保证安全的,如Atomic包、volatile、通过happensBefore原则可以进行线程的安全的判断,这个依据通常是为了避免jvm指令重排。比如通常我们知道的配置信息,如果有多个线程去进行配置信息的修改,则需要进行上锁。或者多个线程修改一个变量时,此时就需要进行上锁了,或者读写分离时,可以考虑ReentrantReadWriteLock等。其本质是解决并行中的问题,将并行转成串行问题进行解决。那怎么上锁才有用呢?锁的状态大部分情况下是互斥的。当然也有特例:ReentrantReadWriteLock的读读是不会互斥的,其读写,写写实互斥的,当然可重入锁执行一个线程调用另外一个线程也不会互斥。之所以使用RenntranLock,是因为它适用于并发场景较为激烈的情况,同时其是经过优化了的。当然synchronized自JDK1.6之后也进行了优化,将其分为了偏向锁、轻量级锁、重量级锁。

同时ReentrantLock是基于AQS(AbstractQueuedSynchronizer)实现的,其目前也是唯一实现lock接口的可重入锁。其优点在于将锁进行细化,将锁分为两种锁,公平锁和非公平锁,也即独占锁与抢占锁。当进入公平锁时,是直接返回获取锁成功的,而没有获取锁时,首先会将其封装成node,放入到addWaiter中,进行阻塞,等待上一个线程完成,在进行请求,如果上一个线程完成了,则进行状态的waitStatus的变化,将其变成可执行状态,进行操作。再进行锁的获取。同时Condition采用await和singnal的方式,当然也是将其封装到队列中,进行唤醒队列。调用 Condition 的 await()方法(或者以 await 开头的方法),会使当前线程进入等待队列并释放锁,同时线程状态变为等待状态。当从 await()方法返回时,当前线程一定获取了 Condition 相关联的锁。调用 Condition 的 signal()方法,将会唤醒在等待队列中等待时间最长的节点(首节点),在唤醒节点之前,会将节点移到同步队列中。

1.AQS数据结构及变量

//Node数据结构
//FIFO的双向链表,每个数据结构都有两个指针,
//分别指向后继节点和前驱节点。
//每个node都是由线程封装的,当线程抢占锁失败
//后会封装成node加入到AQS队列中去,当获取锁的线程释放锁以后,会从
//队列中唤醒一个阻塞的节点(线程)
static final class Node {
//waitStatus的5种状态:CANCELLED=1、
//SIGNAL=-1、CONDITION=-2、
//PROPAGATE=-3、0:默认状态

//CANCELLED=1,结束状态,进入该状态后的节点将不会再变化
static final int CANCELLED =  1;
//SIGNAL=-1,只要前置节点释放锁,就会通知标识为SIGNAL状态的后续节点的线程
static final int SIGNAL    = -1;
/** waitStatus value to indicate thread is waiting on condition */
//一个线程通信工具类似于synchronized的wait/notify
//可以使某些线程一起等待某个条件(condition),
//只有满足条件时,线程才会被唤醒
//主要有两个值得关注的:await、signal
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
//共享模式下,PROPAGATE状态的线程处于可运行状态
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev; //前驱节点
volatile Node next;  //后驱节点
volatile Thread thread;  //当前线程
Node nextWaiter; //存储在condition队列中的后继节点
  //是否为共享锁
  final boolean isShared() {
       return nextWaiter == SHARED;
   }
   final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }

    Node() {    // Used to establish initial head or SHARED marke
    }

    //addWaiter中的信息
    Node(Thread thread, Node mode) {     // Used by addWaite
        this.nextWaiter = mode;
        this.thread = thread;
    }

    //通常condition中包含的信息
    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

//头结点
private transient volatile Node head;
//尾节点
private transient volatile Node tail;
//CAS中的属性,取0或者大于0,其中0表示无锁状态,
//>0表示已经有线程获得锁,state可以递增,也即重入的次数
private volatile int state;

2.方法

获取锁

static final class NonfairSync extends Sync {
 
    //锁分为公平锁fairSync和非公平锁NonFairSync,我们可以
    //知道synchronized是公平锁,也就是fairSync
    //而公平锁是独占锁,因此可以知道synchronized是独占锁
    //而非公平锁为抢占锁。不管有没有线程排队,上来cas去抢占一下锁
    //cas成功,则表示成功获取锁,进行成功返回
    //否者cas失败,调用acquire(1)走锁竞争逻辑
    //其中cas调用底层的unsafe.compareAndSwapInt(this,stateOffset, expect, update);
    //进行更新操作,同时由于操作是原子性操作,因此不会出现线程安全问题
    //state=0,表示无锁状态
    //state>0时,也就是为1时,说明有线程获得了锁。
    //由于ReentrantLock允许重入,因此同一个线程多次获取同步锁的时候,state会递增,比如重入5次
    //namstate为5,同时需要释放5次,其他线程才可以获取锁。
    final void lock() {
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            //进入非公平锁逻辑,重点关注
            acquire(1);
    }

    //尝试获取锁,如果成功返回true,
    // 不成功返回false,它是重写AQS队列
    //类中的tryAcquire方法
    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}

//执行cas操作,调用unsafe下的compareAndSwapInt:当前的值,偏移量、期望值、更新值
//同时注意偏移量是2的次幂
protected final boolean compareAndSetState(int expect, int update) {
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);


//进入非公平所,抢占锁逻辑
//传入1是为了通过tryAcquire获取抢占锁,
//如果成功返回true,否则返回false
//如果tryAcquire失败,则会通过
//addWaiter方法将当前线程封装成Node添加 到AQS队列队尾
//acquireQueued,将node作为参数,通过自旋去尝试获取锁
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}

  //通过addWriter方法会把线程添加到链表中,
    //接着会把node作为参数传递给acquireQueued方法,去竞争锁
    //将node作为参数,通过自旋去尝试获取锁
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            //不进行中断
            boolean interrupted = false;
            //进行自旋
            for (;;) {
                //获取当前节点的prev节点
                final Node p = node.predecessor();
                //如果是head节点,说明有资格去争抢锁
                if (p == head && tryAcquire(arg)) {
                    //获取锁成功,说明前一个线程已经释放锁,
                    //然后设置head为当前线程执行权限
                    setHead(node);
                    //把原来head节点从链表中移除
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                //前一个线程还没有释放锁,
                // 使得当前线程在执行tryAcquire时返回false
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            //通过 cancelAcquire 取消获得锁的操作
            if (failed)
                cancelAcquire(node);
        }
    }

 final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

 //如果前一个线程还没有释放,此时当前线程和下一个线程都会来争抢锁会失败
    //那么失败以后会调用shouldParkAfterFailedAcquire方法
    //node中waitStatus有5种状态CANCELLED
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        //拿到前置节点的等待状态
        int ws = pred.waitStatus;
        //如果状态等于SIGNAL,
        // 只要前置节点释放锁,
        //就会通知标识为SIGNAL状态的后续节点的线程
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
         //如果ws>0,则说明处于CANCELLED(1)的状态,说明CANCELLLED
        //在同步队列中等待的线程等待超时或被中断,
        //需要从同步队列中取消该Node的节点,处于结束状态
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do { //采用循环从队列中移除CANCELLED的节点
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {  //否者只有两种状态默认状态0或者PROPAGATE
            //也即初始化状态或者处于可执行状态,利用 cas
            // 设置 prev 节点的状态为 SIGNAL(-1)
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }


private final boolean parkAndCheckInterrupt() {
        //挂起当前线程变成WATING状态
        //park方法等待许可,unpark方法为线程提供许可
        LockSupport.park(this);
        //返回当前线程是否被其它线程触发
        // 过中断请求,如果有触发过中断请求,
        // 则返回当前的中断标识true
        //并且对中断标识进行复位标识已经响应过了中断请求
        //如果返回true,则意味着在acquire方法中会执行selfInterrput()
        //因为线程在调用acquireQueued方法的时候是不会响应中断请求的
        return Thread.interrupted();
    }

   public static void park(Object blocker) {
        Thread t = Thread.currentThread();
        setBlocker(t, blocker);
        UNSAFE.park(false, 0L);
        setBlocker(t, null);
    }

final boolean acquireQueued(final Node node, int arg) {
  boolean failed = true;
  try {
    boolean interrupted = false;
    for (;;) {
      final Node p = node.predecessor();//获取当前节点的 prev 节点
      if (p == head && tryAcquire(arg)) {//如果是 head 节点,说明有资格去争抢锁
        setHead(node);//获取锁成功,也就是ThreadA 已经释放了锁,然后设置 head 为 ThreadB 获得执行权限
        p.next = null; //把原 head 节点从链表中移除
        failed = false;
        return interrupted;
      }
      //ThreadA 可能还没释放锁,使得 ThreadB 在执行 tryAcquire 时会返回 false
      if (shouldParkAfterFailedAcquire(p,node) && parkAndCheckInterrupt())
        interrupted = true; //并且返回当前线程在等待过程中有没有中断过。
    }
  } finally {
    if (failed)
      cancelAcquire(node);
  }
}

释放锁

public void unlock() {
    sync.release(1);
}

 //进行unlock会调用release方法释放锁
    public final boolean release(int arg) {
        //如果释放锁成功
        if (tryRelease(arg)) {
            //拿到AQS的head节点
            Node h = head;
            //如果头结点不为空,同时等待状态不为0
            //调用unparkSUcessor方法唤醒后续节点
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }

 //唤醒节点的后继节点(如果存在)。
    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        //获取head节点的状态
        int ws = node.waitStatus;
        //如果等待状态<0,则设置head节点的状态为0
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        //得到head节点的下一个节点
        Node s = node.next;
        //如果下一个节点为null或者status>0表示canacelled状态
        //通过从尾部节点开始扫描,找到距离head最近的一个waitStatus<=0的节点
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        //next节点不为空,直接唤醒这个线程即可
        if (s != null)
            LockSupport.unpark(s.thread);
    }

private static final boolean compareAndSetWaitStatus(Node node,
                                                         int expect,
                                                         int update) {
        return unsafe.compareAndSwapInt(node, waitStatusOffset,
                                        expect, update);
    }

Condition

阻塞:await()方法中,在线程释放锁资源之后,如果节点不在 AQS 等待队列,则阻塞当前线程,如果在等待队列,则自旋等待尝试获取锁

释放:signal()后,节点会从 condition 队列移动到 AQS 等待队列,则进入正常锁的获取流程

await:

public final void await() throws InterruptedException {
    if (Thread.interrupted())
      throw new InterruptedException();
    Node node = addConditionWaiter(); //创建一个新的节点,节点状态为 condition,采用的数据结构仍然是链表
    int savedState = fullyRelease(node); //释放当前的锁,得到锁的状态,并唤醒 AQS 队列中的一个线程
    int interruptMode = 0;
    //如果当前节点没有在同步队列上,即还没有被 signal,则将当前线程阻塞
    while (!isOnSyncQueue(node)) {//判断这个节点是否在 AQS 队列上,第一次判断的是 false,因为前面已经释放锁了
      LockSupport.park(this); // 第一次总是 park 自己,开始阻塞等待
      // 线程判断自己在等待过程中是否被中断了,如果没有中断,则再次循环,会在 isOnSyncQueue 中判断自己是否在队列上.
      // isOnSyncQueue 判断当前 node 状态,如果是 CONDITION 状态,或者不在队列上了,就继续阻塞.      // isOnSyncQueue 判断当前 node 还在队列上且不是 CONDITION 状态了,就结束循环和阻塞.
      if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
        break;
    }
    // 当这个线程醒来,会尝试拿锁, 当 acquireQueued 返回 false 就是拿到锁了.
    // interruptMode != THROW_IE -> 表示这个线程没有成功将 node 入队,但 signal 执行了 enq 方法让其入队了.
    // 将这个变量设置成 REINTERRUPT.
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
      interruptMode = REINTERRUPT;
    // 如果 node 的下一个等待者不是 null, 则进行清理,清理 Condition 队列上的节点.
    // 如果是 null ,就没有什么好清理的了.
    if (node.nextWaiter != null) // clean up if cancelled
      unlinkCancelledWaiters();
    // 如果线程被中断了,需要抛出异常.或者什么都不做
    if (interruptMode != 0)
      reportInterruptAfterWait(interruptMode);
}

signal

public final void signal() {
  if (!isHeldExclusively()) //先判断当前线程是否获得了锁
    throw new IllegalMonitorStateException();
  Node first = firstWaiter; // 拿到 Condition 队列上第一个节点
  if (first != null)
    doSignal(first);
}

private void doSignal(Node first) {
  do {
    if ( (firstWaiter = first.nextWaiter) == null)// 如果第一个节点的下一个节点是 null, 那么, 最后一个节点也是 null.
      lastWaiter = null; // 将 next 节点设置成 null
      first.nextWaiter = null;
  } while (!transferForSignal(first) && (first = firstWaiter) != null);
  }

final boolean transferForSignal(Node node) {
  /*
  * If cannot change waitStatus, the node has been cancelled.
  */
  if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
    return false;
  Node p = enq(node);
  int ws = p.waitStatus;
  // 如果上一个节点的状态被取消了, 或者尝试设置上一个节点的状态为 SIGNAL失败了(SIGNAL 表示: 他的 next 节点需要停止阻塞),
  if (ws > 0 || !compareAndSetWaitStatus(p, ws,Node.SIGNAL))
    LockSupport.unpark(node.thread); // 唤醒输入节点上的线程.
  return true;
}

final boolean transferForSignal(Node node) {
  /*
  * If cannot change waitStatus, the node has been cancelled.*/
  if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
    return false;
  Node p = enq(node);
  int ws = p.waitStatus;
  // 如果上一个节点的状态被取消了, 或者尝试设置上一个节点的状态为 SIGNAL失败了(SIGNAL 表示: 他的 next 节点需要停止阻塞),
  if (ws > 0 || !compareAndSetWaitStatus(p, ws,Node.SIGNAL))
    LockSupport.unpark(node.thread); // 唤醒输入节点上的线程.
  return true;
} 

本文分享自微信公众号 - 后端技术学习(gh_9f5627e6cc61),作者:亚洲

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2020-03-04

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • ThreadPoolExecutor源码学习

    但点进去看newSingleThreadExecutor可以看到其会调用ThreadPoolExecutor里面的线程。因此有必要研究ThreadPoolExe...

    路行的亚洲
  • Netty学习一

    前面我们已经学习了NIO的简单知识,三大组件:ByteBuffer、Channel、Selector。知道ByteBufffer是数据,而Channel是数据的...

    路行的亚洲
  • pmq学习五-pmq启动学习

    前面我们看到了pmq从端到端的调用,但是没有看到的还有很多细节的东西。比如我们看到在学习RocketMQ时,可以看到很多启动都是在broker中启动,那pmq中...

    路行的亚洲
  • 一篇文章让你掌握 AQS 核心知识

    感谢原作者,本文转载自 https://my.oschina.net/xiongying0214/blog/1944627

    好好学java
  • 分门别类总结Java中的各种锁

    非公平锁是指多个线程获取锁的顺序并不是按照申请锁的顺序,有可能后申请的线程比先申请的线程优先获取锁。有可能,会造成优先级反转或者饥饿现象。

    李红
  • Java源码阅读之ReentrantLock - lock和unLock方法

    如果需要使用或者了解ReentrantLock,证明已经步入并发编程领域了,这里理论基础不多提,需要的自行查阅资料。

    格子Lin
  • 多线程基础(十四):AbstractQueuedSynchronizer源码分析

    类AbstractQueuedSynchronizer是java并发包中的核心,是实现大部分并发工具类的底层工具类,现在对这个类的源码进行分析。

    冬天里的懒猫
  • ReentrantLock 源码分析以及 AQS (一)

    JDK1.5 之后发布了JUC(java.util.concurrent),用于解决多线程并发问题。AQS 是一个特别重要的同步框架,很多同步类都借助于 AQS...

    烟雨星空
  • ReentrantLock

    ReentrantLock是 java提供代码层面的锁,和synchronized关键字相同。为什么在用提供了 synchronized关键字后,还提供了Ree...

    虞大大
  • ReentrantLock 源码解析(JDK1.8)

    java404

扫码关注云+社区

领取腾讯云代金券