前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >ReentrantLock

ReentrantLock

作者头像
虞大大
发布2020-08-26 17:14:50
6240
发布2020-08-26 17:14:50
举报
文章被收录于专栏:码云大作战

ReentrantLock是 java提供代码层面的锁,和synchronized关键字相同。为什么在用提供了 synchronized关键字后,还提供了ReentrantLock等代码层面的锁API,首先在synchronized关键字刚推出是,性能方面差很多,直到后面的版本中推出了锁升级的概念,在性能上有所好转。更重要的也是,JUC的包里面,提供的API更加灵活,符合生产环境各种需求。

分析 ReentrantLock 有如下几个特点: 1、互斥 2、重入 3、等待唤醒 4、存储一系列的等待线程 FIFO 先进先出 5、公平/非公平

  • 使用案例
代码语言:javascript
复制
public class ReenTrantLockDemo extends Thread{

    //模拟优惠卷
    private static List<Integer> array = new ArrayList<>();

    private static Lock lock = new ReentrantLock();

    public Integer get(){
        lock.lock();
        try {
            Integer o=  array.get(0);
            array.remove(o);
            return o;
        }catch (Exception e){
            System.out.println("获取出错");
        }finally {
            lock.unlock();
        }
        return -1;
    }


    @Override
    public void run() {
       Integer a =  get();
        System.out.println("获取到的优惠卷编号为"+a);
    }

    public static void main(String[] args) {

        for(int i=0;i<10;i++){
            array.add(i);
        }
        ReenTrantLockDemo reenTrantLockDemo = new ReenTrantLockDemo();
        ExecutorService service = Executors.newCachedThreadPool();
        ReenTrantLockDemo demo = new ReenTrantLockDemo();
        for (int i = 0; i < 10; i++) {
            service.submit(demo);
        }
        service.shutdown();

    }
}

结果为

代码语言:javascript
复制
获取到的优惠卷编号为3
获取到的优惠卷编号为2
获取到的优惠卷编号为6
获取到的优惠卷编号为5
获取到的优惠卷编号为1
获取到的优惠卷编号为0
获取到的优惠卷编号为4
获取到的优惠卷编号为8
获取到的优惠卷编号为7
获取到的优惠卷编号为9

获取没有出错,每一个都有获取到正确的编号,不重复

本篇分析源码的思路为 构造函数 -> lock/unlock -> condition(await-signal) 尽量简化的方式来介绍

ReentrantLock 构造函数
代码语言:javascript
复制
// 不带参数的,默认创建非公平锁
    public ReentrantLock() {
        sync = new NonfairSync();
    }
// 根据参数创建非公平和公平锁
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }

这里的FairSync 是什么,看一下类关系图:

Sync 继承了AQS,抽象同步队列。在JUC的包里面,AQS可以说是一个爷爷级别的存在,会有很多类,依赖AQS去实现。这里有AQS中,有2个重要关注的点来看 1、java.util.concurrent.locks.AbstractQueuedSynchronizer#state 2、java.util.concurrent.locks.AbstractQueuedSynchronizer.Node 3、java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject

state

要完成互斥的功能,需要一个共同去抢占的值,这里采用了AQS中的state,当线程获取锁时,成功修改state值的线程,获得了锁,可以执行程序,此时也会记录当前线程,在此线程内,需要再次获得这个锁的时候,就避免了再次抢占的操作。故如下

  • 无线程持有时,state为0
  • 当有线程持有时,state增长1,此时其他线程无法持有直到被释放
  • 释放锁时,state减少1 当state恢复为0时,其他线程又可以进行抢占
  • 当某个已经占用锁的线程再次获取到锁时,state再增长,为重入锁 A -----> (+) state (-) ------> A B -----> (+) state (-) ------> B C -----> (+) state (-) ------> C

当然在修改state时,使用了cas修改,保证原子性

Node

当一个线程获取锁时,其他线程处于等待状态,需要这样的容器来存储这些数据,所有有了Node,一个Node代表一个线程(除头结点外) Node属性

  • prev 指向上一节点
  • next 指向下一节点
  • thread 当前线程
  • waitStatus 等待状态

其中waitStatus较为复杂,其他是双向链表的标准属性。

代码语言:javascript
复制
    // 取消
    static final int CANCELLED =  1;
   // 等待被唤醒 也就是等待触发状态,前节点可能是head或者前节点为取消状态CANCELLED
    static final int SIGNAL    = -1;
    // 等待条件状态,在等待队列中
    static final int CONDITION = -2;
    // 向后传播,贡献模式下使用
    static final int PROPAGATE = -3;
lock/unlock

这里使用非公平锁来进行探讨 获取锁的代码如下:

代码语言:javascript
复制
    final void lock() {
        // cas 修改state的值
        if (compareAndSetState(0, 1))
           // 记录当前获取锁的线程
            setExclusiveOwnerThread(Thread.currentThread());
        else
            //获得锁 当抢夺锁失败后,进入这里
            acquire(1);
    }

cas修改值,修改成功,记录当前的线程,这个都好理解。难以理解的是acquire(1) ,当看这一部分源码时,要带入一个场景,就是锁已经被一个线程抢占了,未抢占的线程才会进入下面的方法中。

代码语言:javascript
复制
public final void acquire(int arg) {
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    // 执行到此处停止了
    selfInterrupt();
}

分为如下3个部分来看:

代码语言:javascript
复制
// 尝试获取锁
tryAcquire(arg) A

acquireQueued() B
addWaiter(Node.EXCLUSIVE) C
tryAcquire

先从到第一个方法来看 由于案例都是非公平锁,所以源码如下

代码语言:javascript
复制
    final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
            // 此时上一个线程刚好释放
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
            // 同一个线程需要获取锁,重入
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
    }

刚好执行到了此处,上一个线程释放了锁。或者,当前线程再次获取锁,属于重入的情况。只有这2种,情况下,执行到A处,就返回了,不会继续往下。

addWaiter

假如继续往下走,会先加入同步队列节点:

代码语言:javascript
复制
addWaiter(Node.EXCLUSIVE)

    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        // 尾插法
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        // 把node加入队列中
        enq(node);
        return node;
    }
    
    // 讲node插入队列
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                // 加入一个空节点,作为头结点
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

在抢占锁失败后,需要加入同步队列,双向链表,节点是上面提过的Node。在空队列情况下,再插入第一个节点时,会先创建一个空节点作为头节点。 分为2种情况如下,

  • addWaiter单线程的情况 addWaiter 中直接尾插法,插入节点
  • addWaiter 多线程执行,多个线程加入尾结点。只有一个可以在addWaiter中cas插入,其他线程需要enq方法中自旋执行。

可以看到在尾部插入节点时,都是通过,先改变prev指针,再CAS改变tail节点。保证了

代码语言:javascript
复制
node.prev = t;
compareAndSetTail(t, node)
t.next = node;
                  

这里会有疑问,为什么不是先改变next指针,如果先改变了next指针,cas后,还需要再次设置一次next,同时在通过head节点 往next查找时,会寻找到错误的节点。 当然在后面的源码中,会发现经常使用tail和prev指针来寻找。

acquireQueued

获取队列

代码语言:javascript
复制
 // 这里的Node是刚刚addWaiter里面加入的新的node节点
final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            // 中断标志位
            boolean interrupted = false;
            // 自旋操作 使用自旋不停的去尝试获取锁
            for (;;) {
                // 获取前置结点
                final Node p = node.predecessor();
                // 如果前置节点就是头结点,则尝试获取锁资源
                if (p == head && tryAcquire(arg)) {
                    // 设置头结点 头结点就表示当前正占有锁资源的节点
                    setHead(node);
                    // 把原来的头结点减少引用,帮助进入GC被回收
                    p.next = null; // help GC
                    failed = false;
                    // 向上传递是否中断标志
                    return interrupted;
                }
                // 字面意思,在尝试获取锁失败后,是否应该挂起
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

如上面注释的解释,再加入同步队列后,线程自旋去抢占资源,再适当时挂起。

代码语言:javascript
复制
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
             //如果前置节点的waitStatus是Node.SIGNAL则返回true,然后会执行parkAndCheckInterrupt()方法进行挂起
            return true;
        if (ws > 0) {
            //大于0,都是 取消状态
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            //这里我们由当前节点的前置节点开始,一直向前找最近的一个没有被取消的节点 ,把当前节点,放在找出来的节点后面
            pred.next = node;
        } else {
            //根据waitStatus的取值限定,这里waitStatus的值只能是0或者PROPAGATE,那么我们把前置节点的waitStatus设为Node.SIGNAL然后重新进入该方法进行判断

            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

改变前一个节点的状态为SIGNAL,然后自己挂起。当确定要挂起时,调用parkAndCheckInterrupt

代码语言:javascript
复制
private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        //被唤醒之后,返回中断标记,即如果是正常唤醒则返回false,如果是由于中断醒来,就返回true
        return Thread.interrupted();
    }

这里会传递线程中断标志到最上层,也就是,判断当前线程释放需要中断。

代码语言:javascript
复制
public final void acquire(int arg) {
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    // 执行到此处中断线程了
    selfInterrupt();
}

如果有异常,则会调用cancelAcquire

代码语言:javascript
复制

自此,整个lock源码流程结束。

unlock

unlock 会简单很多,因为执行unlock是在单线程环境下执行的。如下

代码语言:javascript
复制
    public void unlock() {
        sync.release(1);
    }
    // tryRelease释放锁
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            // 队列中有正在等待的节点
            if (h != null && h.waitStatus != 0)
                //唤醒头结点的下一个节点
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    
    // 释放锁,由于重入的特性,当state为0时,才会去释放当前线程持有
    protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }
        
        
        private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }
    

至此分析了,lock()和unlock() 方法,也提到了同步队列。后续还会带来锁中的等待队列

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-06-01,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码云大作战 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • ReentrantLock 构造函数
    • state
      • tryAcquire
      • addWaiter
      • acquireQueued
  • Node
  • lock/unlock
  • unlock
相关产品与服务
容器服务
腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档