前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Java多线程:ReentrantLock源码分析

Java多线程:ReentrantLock源码分析

原创
作者头像
冰寒火
修改2023-02-18 20:10:08
4540
修改2023-02-18 20:10:08
举报
文章被收录于专栏:软件设计软件设计

一、大纲

image.png
image.png

juc中的并发容器都是基于volatile变量和CAS指令实现,ReentrantLock也不例外,其类图如下所示:

AQS关系类图
AQS关系类图
  1. AQS提供了共享变量status和同步队列CLH(head、tail)以及修改这些变量的CAS方法;
  2. Sync继承AQS,并实现了lock和tryRelease方法;
  3. FairSync继承Sync类,以公平的方式实现了tryAcquire和initialTryLock,而NonFairSync则是以不公平的思想实现的;
  4. ReentrantLock组合了FairSync和NonFairSync,默认非公平。

二、Lock流程

1 调用关系

代码语言:txt
复制
//FairSync加锁流程,
ReentrantLock:lock()
		--> Sync:lock()
          	--> FairSync:initialLock()//首次尝试加锁及可重入逻辑
          	--> AbstractQueuedSynchronizer:acquire(int arg)
                  	--> FairSync:tryAcquire(int arg)//如果CLH队列没有线程等待且CAS修改status成功,加锁
                          	--> AbstractQueuedSynchronizer:acquire(...) //线程没拿到锁,进入CLH队列

2 源码分析

第一步,ReentrantLock调用lock方法,首先调用initialTryLock尝试第一次加锁。

  • 如果没有已等待的线程且cas status成功,则加锁成功;
  • 否则,判断本线程是否是持有锁的线程,如果有就重入,更新status。
代码语言:java
复制
//ReentrantLock
public void lock() {
    sync.lock();
}
//Sync
final void lock() {
    if (!initialTryLock())
        acquire(1);
}
//FairSync
final boolean initialTryLock() {
    Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        //如果没有已等待的线程且cas成功
        if (!hasQueuedThreads() && compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    } else if (getExclusiveOwnerThread() == current) {//可重入逻辑
        if (++c < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(c);
        return true;
    }
    return false;
}

第二步,如果既不是第一个线程,又不是可重入线程,则继续往下调用AbstractQueuedSynchronizer.acquire(int arg),该方法会调用子类重写的tryAcquire方法尝试加锁。FairSync类是公平锁,只有当同步队列中没有等待的线程或者本线程是队首线程才会尝试cas status。如果tryAcquire返回false,则调用AbstractQueuedSynchronizer.acquire(Node node,int arg,boolean shared ...)方法将线程入队等待。

代码语言:java
复制
//AbstractQueuedSynchronizer
public final void acquire(int arg) {
    if (!tryAcquire(arg))
        acquire(null, arg, false, false, false, 0L);
}
// FairSync
protected final boolean tryAcquire(int acquires) {
    if (getState() == 0 && !hasQueuedPredecessors() &&
        compareAndSetState(0, acquires)) {
        setExclusiveOwnerThread(Thread.currentThread());
        return true;
    }
    return false;
}
//AbstractQueuedSynchronizer
final int acquire(Node node, int arg, boolean shared,
                  boolean interruptible, boolean timed, long time) {
    Thread current = Thread.currentThread();
    byte spins = 0, postSpins = 0;   // retries upon unpark of first thread
    boolean interrupted = false, first = false;
    Node pred = null;                // predecessor of node when enqueued

    /*
     * Repeatedly:
     *  Check if node now first
     *    if so, ensure head stable, else ensure valid predecessor
     *  if node is first or not yet enqueued, try acquiring
     *  else if node not yet created, create it
     *  else if not yet enqueued, try once to enqueue
     *  else if woken from park, retry (up to postSpins times)
     *  else if WAITING status not set, set and retry
     *  else park and clear WAITING status, and check cancellation
     */

    for (;;) {
        if (!first && (pred = (node == null) ? null : node.prev) != null &&
            !(first = (head == pred))) {
            if (pred.status < 0) {
                cleanQueue();           // predecessor cancelled
                continue;
            } else if (pred.prev == null) {
                Thread.onSpinWait();    // ensure serialization
                continue;
            }
        }
        if (first || pred == null) {
            boolean acquired;
            try {
                if (shared)
                    acquired = (tryAcquireShared(arg) >= 0);
                else
                    acquired = tryAcquire(arg);
            } catch (Throwable ex) {
                cancelAcquire(node, interrupted, false);
                throw ex;
            }
            if (acquired) {
                if (first) {
                    node.prev = null;
                    head = node;
                    pred.next = null;
                    node.waiter = null;
                    if (shared)
                        signalNextIfShared(node);
                    if (interrupted)
                        current.interrupt();
                }
                return 1;
            }
        }
        //核心代码
        if (node == null) {                 // allocate; retry before enqueue
            //第一次for循环,创建节点
            if (shared)
                node = new SharedNode();
            else
                node = new ExclusiveNode();
        } else if (pred == null) {          // try to enqueue
            node.waiter = current;
            Node t = tail;
            node.setPrevRelaxed(t);         // avoid unnecessary fence
            if (t == null) //第二次for循环,初始化同步队列
                tryInitializeHead();
            else if (!casTail(t, node))
                node.setPrevRelaxed(null);  // back out
            else
                t.next = node;//第三次for,将new node入队
        } else if (first && spins != 0) {
            --spins;                        // reduce unfairness on rewaits
            Thread.onSpinWait();
        } else if (node.status == 0) {//第四次for循环,修改new node状态
            node.status = WAITING;          // enable signal and recheck
        } else {
            long nanos;
            spins = postSpins = (byte)((postSpins << 1) | 1);
            if (!timed)//第五次for循环,阻塞线程
                LockSupport.park(this);
            else if ((nanos = time - System.nanoTime()) > 0L)
                LockSupport.parkNanos(this, nanos);
            else
                break;
            node.clearStatus();
            if ((interrupted |= Thread.interrupted()) && interruptible)
                break;
        }
    }
    return cancelAcquire(node, interrupted, interruptible);
}
//AbstractQueuedSynchronizer
private void tryInitializeHead() {
    Node h = new ExclusiveNode();
    if (U.compareAndSetReference(this, HEAD, null, h))
        tail = h;
}

AbstractQueuedSynchronizer.acquire会执行五次循环才能将节点入队并阻塞线程,每次for循环都会尝试tryAcquire。

image.png
image.png
  1. 第一次for循环,创建节点
image.png
image.png
image.png
image.png
  1. 第二次for循环,初始化同步队列,head是虚节点;
image.png
image.png
image.png
image.png
  1. 第三次for,将new node入队,并cas更新tail;
image.png
image.png
image.png
image.png
  1. 第四次for循环,修改new node状态为WAITING;
image.png
image.png
image.png
image.png
  1. 第五次for循环,阻塞线程
image.png
image.png

三、unlock流程

1 调用关系

代码语言:txt
复制
//FairSync和NonFairSync释放锁的流程是一样的
ReentrantLock:unlock()
  	--> AbstractQueuedSynchronizer:release(int arg)
        	--> Sync:tryRelease(int arg)//如果本线程是持有锁的线程,那么修改state
        	--> AbstractQueuedSynchronizer:signalNext(Node h)//唤醒第一个线程
              	--> LockSupport:unpark() //调用Unsafe.unpark

2 源码分析

release流程比较简单,不分公平锁与非公平锁,直接由Sync实现。释放锁会先减少status和去除exclusiveOwnerThread,然后再唤醒同步队列上的线程。

代码语言:java
复制
//ReentrantLock
public void unlock() {
    sync.release(1);
}

//AbstractQueuedSynchronizer
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        signalNext(head);
        return true;
    }
    return false;
}
//Sync
protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (getExclusiveOwnerThread() != Thread.currentThread())
        throw new IllegalMonitorStateException();
    boolean free = (c == 0);
    if (free)
        setExclusiveOwnerThread(null);
    setState(c);
    return free;
}
//AbstractQueuedSynchronizer
private static void signalNext(Node h) {
    Node s;
    if (h != null && (s = h.next) != null && s.status != 0) {
        s.getAndUnsetStatus(WAITING);
        LockSupport.unpark(s.waiter);
    }
}
//LockSupport
public static void unpark(Thread thread) {
    if (thread != null) {
        if (thread.isVirtual()) {
            VirtualThreads.unpark(thread);
        } else {
            U.unpark(thread); //Unsafe
        }
    }
}

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、大纲
  • 二、Lock流程
    • 1 调用关系
      • 2 源码分析
      • 三、unlock流程
        • 1 调用关系
          • 2 源码分析
          相关产品与服务
          容器服务
          腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档