前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >ReentranLock源码浅析

ReentranLock源码浅析

原创
作者头像
小金狗子
修改2020-03-23 14:38:00
3860
修改2020-03-23 14:38:00
举报

本文为作者个人理解难免有误,欢迎指正

ReentranLock 的基本构成

代码语言:txt
复制
private final Sync sync;

abstract static class Sync extends AbstractQueuedSynchronizer{...}

static final class NonfairSync extends Sync{...}

static final class FairSync extends Sync{...}

常用的lock方法具体实现是在FairSync和NonfairSync。Nonfaire体现在lock时尝试直接获取锁,如果获取失败,则使用和公平锁一样通过AbstractQueuedSynchronizer的acquire去加锁。

加锁

代码语言:txt
复制
final void lock() {

      if (compareAndSetState(0, 1))

        setExclusiveOwnerThread(Thread.currentThread());

      else

        acquire(1);

}

对于锁的获取实质上是改变AbstractQueuedSynchronizer中state属性的值。 改变的过程大致是将state属性在的java内存中的偏移映射为本地内存地址,再通过Atomic::cmpxchg

改值。这些都是native方法,在java层面上可以不关心。简单点讲,本例中我们只需要记住所谓的加锁就是将state值从0改为1。

AbstractQueuedSynchronizer是核心!!!!

到这里,我们需要先看下AbstractQueuedSynchronizer的基本结构。

代码语言:txt
复制
    private transient volatile Node head;

    /**

    * Tail of the wait queue, lazily initialized.  Modified only via

    * method enq to add new wait node.

    */

    private transient volatile Node tail;

    /**

    * The synchronization state.

    */

    private volatile int state;

Node组成双向链表,在源码中的称呼为CLH队列。

代码语言:txt
复制
        +------+  prev +-----+       +-----+
        | head | <---- |     | <---- | tail|  
        |      | ----> |     | ----> |     |
        +------+  next +-----+       +-----+

Node的结构

代码语言:txt
复制
/** Marker to indicate a node is waiting in shared mode */

        static final Node SHARED = new Node();

        /** Marker to indicate a node is waiting in exclusive mode */

        static final Node EXCLUSIVE = null;

        /** waitStatus value to indicate thread has cancelled */

        static final int CANCELLED =  1;

        /** waitStatus value to indicate successor's thread needs unparking */

        static final int SIGNAL    = -1;

        /** waitStatus value to indicate thread is waiting on condition */

        static final int CONDITION = -2;

        /**

        * waitStatus value to indicate the next acquireShared should

        * unconditionally propagate

        */

        static final int PROPAGATE = -3;

        volatile int waitStatus;



        volatile Node prev;

        volatile Node next;

        volatile Thread thread;



        Node nextWaiter;

因为是个双向链表,所以自然有前驱和后继。thread表示尝试加锁的线程。waitStatus的值从-3到1,默认值0(下文用DEFAULT表示)

在本例中我们聚焦SIGNAL。它表示被标记为SIGNAL的节点的后继结点(的线程)需要被唤起!

acquire方法的实现

代码语言:txt
复制
if (!tryAcquire(arg) &&

            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

            selfInterrupt();

首先看下tryAcquire方法.以FairSync为例,对于NonfairSync,对比代码发现就少了!hasQueuedPredecessors()这个逻辑

代码语言:txt
复制
final Thread current = Thread.currentThread();

            int c = getState();

            if (c == 0) {

                if (!hasQueuedPredecessors() &&

                    compareAndSetState(0, acquires)) {

                    setExclusiveOwnerThread(current);

                    return true;

                }

            }

            else if (current == getExclusiveOwnerThread()) {

                int nextc = c + acquires;

                if (nextc < 0)

                    throw new Error("Maximum lock count exceeded");

                setState(nextc);

                return true;

            }

            return false;

先抛开hasQueuedPredecessors(),从代码来看 如果没获得锁,就做加锁操作,已经获得锁,就对当前state 加acquires(本例中为1,即+1)

为了略微全面一些,我们以公平锁为例.

没有人持有锁时,head和tail都为null,所以也不会有Queued Predecessors,tryAcquire结果为true,acquire直接返回,加锁成功结束。

相同线程执行lock,即锁重入时,对state做递增操作(+1)并state赋值。

如果是不同线程的话,就需要继续执行acquireQueued

先看addWaiter方法

代码语言:txt
复制
private Node addWaiter(Node mode) {

        Node node = new Node(Thread.currentThread(), mode);

        // Try the fast path of enq; backup to full enq on failure

        Node pred = tail;

        if (pred != null) {

            node.prev = pred;

            if (compareAndSetTail(pred, node)) {

                pred.next = node;

                return node;

            }

        }

        enq(node);

        return node;

    }



private Node enq(final Node node) {

        for (;;) {

            Node t = tail;

            if (t == null) { // Must initialize

                if (compareAndSetHead(new Node()))

                    tail = head;

            } else {

                node.prev = t;

                if (compareAndSetTail(t, node)) {

                    t.next = node;

                    return t;

                }

            }

        }

    }   

创建了一个Node,Node是加锁线程的代,我们取名N1。由于head和tail都为null,进入enq方法。首先创建一个Node(取名N0)初始化head和tail,初始化后如下图,

代码语言:txt
复制
        +------+

        | head |

        | tail |

        +------+

继续循环,走入else后,N1是tail,N0是head

代码语言:txt
复制
        +------+        +------+

        |  N0  | <----  |  N1  |

        | head | ---->  | tail |

        +------+        +------+

加入成功后就会执行

代码语言:txt
复制
final boolean acquireQueued(final Node node, int arg) {

        boolean failed = true;

        try {

            boolean interrupted = false;

            for (;;) {

                final Node p = node.predecessor();

                if (p == head && tryAcquire(arg)) {

                    setHead(node);

                    p.next = null; // help GC

                    failed = false;

                    return interrupted;

                }

                if (shouldParkAfterFailedAcquire(p, node) &&

                    parkAndCheckInterrupt())

                    interrupted = true;

            }

        } finally {

            if (failed)

                cancelAcquire(node);

        }

    }

从上图的结构看,node的前驱就是head,但是tryAcquire是不成功的,因为已经被别的线程加锁了,所以走shouldParkAfterFailedAcquire()

代码语言:txt
复制
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {

        int ws = pred.waitStatus;

        if (ws == Node.SIGNAL)

            /*

            * This node has already set status asking a release

            * to signal it, so it can safely park.

            */

            return true;

        if (ws > 0) {

            /*

            * Predecessor was cancelled. Skip over predecessors and

            * indicate retry.

            */

            do {

                node.prev = pred = pred.prev;

            } while (pred.waitStatus > 0);

            pred.next = node;

        } else {

            /*

            * waitStatus must be 0 or PROPAGATE.  Indicate that we

            * need a signal, but don't park yet.  Caller will need to

            * retry to make sure it cannot acquire before parking.

            */

            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);

        }

        return false;

    }

刚开始,我们创建的Node和默认新增的都是没有设置waitStatus的,所以N0(pred)和N1(node)都默认值0。

经过这段代码,实质上更改waitStatus的值

代码语言:txt
复制
        +------+        +------+

        |  N0  | <----  |  N1  |

        |SIGNAL| ---->  |DEFAULT|

        +------+        +------+

此时acquireQueued继续循环,shouldParkAfterFailedAcquire走入第一个if返回true,执行parkAndCheckInterrupt,线程就会park在LockSupport.park(this);

代码语言:txt
复制
    private final boolean parkAndCheckInterrupt() {

        LockSupport.park(this);

        return Thread.interrupted();

    }

如果别的线程继续加锁

代码语言:txt
复制
        +------+        +------+      +------+        +------+

        |  N0  | <----  |  N1  | <---- | Nn-1 | <----  |  Nn  |

        |SIGNAL| ---->  |SIGNAL| ----> |SIGNAL| ---->  |DEFAULT|

        +------+        +------+      +------+        +------+

锁释放

代码语言:txt
复制
    public final boolean release(int arg) {

        if (tryRelease(arg)) {

            Node h = head;

            if (h != null && h.waitStatus != 0)

                unparkSuccessor(h);

            return true;

        }

        return false;

    }



    protected final boolean tryRelease(int releases) {

                int c = getState() - releases;

                if (Thread.currentThread() != getExclusiveOwnerThread())

                    throw new IllegalMonitorStateException();

                boolean free = false;

                if (c == 0) {

                    free = true;

                    setExclusiveOwnerThread(null);

                }

                setState(c);

                return free;

    }

首先对在tryRelease中清除当前持有锁的线程信息,更改state的值。

代码语言:txt
复制
    private void unparkSuccessor(Node node) {

        /*

        * If status is negative (i.e., possibly needing signal) try

        * to clear in anticipation of signalling.  It is OK if this

        * fails or if status is changed by waiting thread.

        */

        int ws = node.waitStatus;

        if (ws < 0)

            compareAndSetWaitStatus(node, ws, 0);

        /*

        * Thread to unpark is held in successor, which is normally

        * just the next node.  But if cancelled or apparently null,

        * traverse backwards from tail to find the actual

        * non-cancelled successor.

        */

        Node s = node.next;

        if (s == null || s.waitStatus > 0) {

            s = null;

            for (Node t = tail; t != null && t != node; t = t.prev)

                if (t.waitStatus <= 0)

                    s = t;

        }

        if (s != null)

            LockSupport.unpark(s.thread);

    }

unparkSuccessor主要是将当前节点的值归0,然后唤醒下一个节点。

代码语言:txt
复制
        +------+        +------+

        |  N0  | <----  |  N1  |

        |DEFAULT| ----> |DEFAULT|

        +------+        +------+

当下一节点的被唤醒时,原来part在LockSupport.park(this)上的线程就开始继续执行acquireQueued中的for循环了。重点看一下

代码语言:txt
复制
final Node p = node.predecessor();

                if (p == head && tryAcquire(arg)) {

                    setHead(node);

                    p.next = null; // help GC

                    failed = false;

                    return interrupted;

                }

N1的前驱还是head,并且因为锁以释放,在公平锁的情况下,能尝试加锁成功。那么原本的N0节点就被移除了,N1就变成了HEAD,同时也是tail。Java

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 本文为作者个人理解难免有误,欢迎指正
  • ReentranLock 的基本构成
  • 加锁
    • AbstractQueuedSynchronizer是核心!!!!
      • Node的结构
        • acquire方法的实现
    • 锁释放
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档