Elasticsearch中有很多优秀的工具类,这里要分析的是Elasticsearch中用于资源并发控制的锁管理工具:KeyedLock,它基于ReentrantLock实现,这里也会对ReentrantLock进行一定的分析。
public final class KeyedLock<T> { /** * 一个用于存放锁的容器 * * 里面的KeyLock都是处于锁定状态的 */ private final ConcurrentMap<T, KeyLock> map = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency();
/** * 是公平锁还是非公平锁 */ private final boolean fair;
/** * Creates a new lock * @param fair Use fair locking, ie threads get the lock in the order they requested it */ public KeyedLock(boolean fair) { this.fair = fair; }
/** * Creates a non-fair lock */ public KeyedLock() { this(false); }
KeyedLock的map属性是存放资源标识和KeyLock的容器,也就是一个大的锁容器。KeyLock为每一个资源标识对应的锁对象,它继承自ReentrantLock:
private static final class KeyLock extends ReentrantLock { KeyLock(boolean fair) { super(fair); }
/** * 计数器 */ private final AtomicInteger count = new AtomicInteger(1); }
接下来先从ReentrantLock的加锁和释放锁的方法为切入点来分析KeyedLock中的加锁和释放锁的方法。
public void lock() { sync.acquire(1); }
内部调用的是java.util.concurrent.locks.AbstractQueuedSynchronizer#acquire方法:
public final void acquire(int arg) { // 如果申请失败,并且入队列成功,则中断当前线程 // 注意,这里&&的用法很妙,因为当tryAcquire(arg)返回false时才会判断acquireQueued方法,而在acquireQueue方法内部还会调用tryAcquire方法。如果第一个tryAcquire返回true,!tryAcquire就为false,就不会进入第二个方法,这样避免了重复加锁 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
这里我们以java.util.concurrent.locks.ReentrantLock.FairSync#tryAcquire方法为例:
@ReservedStackAccess protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && // CAS设置state值为传入的值,设置失败的返回为false;设置成功的则设置当前线程为排他线程 compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } //如果已经获取过锁了,并且当前线程就是那个拥有锁的线程,则添加许可 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
/** Marker to indicate a node is waiting in exclusive mode */ static final Node EXCLUSIVE = null;
Node.EXCLUSIVE用于标识一个节点正处于排他模式,在等待状态。
java.util.concurrent.locks.AbstractQueuedSynchronizer#addWaiter方法:
private Node addWaiter(Node mode) { Node node = new Node(mode);
for (;;) { Node oldTail = tail; if (oldTail != null) {// 如果之前节点不为null // 设置当前的prev节点为之前的old节点 node.setPrevRelaxed(oldTail); // 并通过CAS将之前的tail节点位置设置上当前节点 if (compareAndSetTail(oldTail, node)) { // 将之前tail节点的next节点指向当前节点 oldTail.next = node; // 返回当前节点 return node; } } else { // 如果第一次添加进入这个方法进行队列的head节点和tail节点的初始化,然后继续for循环 initializeSyncQueue(); } } }
/** Constructor used by addWaiter. */ Node(Node nextWaiter) { this.nextWaiter = nextWaiter; THREAD.set(this, Thread.currentThread()); }
java.util.concurrent.locks.AbstractQueuedSynchronizer#initializeSyncQueue方法:
/** * Initializes head and tail fields on first contention. */ private final void initializeSyncQueue() { Node h; // 初始化head和tail 节点,设置一个新创建的Node对象 if (HEAD.compareAndSet(this, null, (h = new Node()))) tail = h; }
java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireQueued方法:
final boolean acquireQueued(final Node node, int arg) { boolean interrupted = false; try { for (;;) { // 取到当前节点的先前节点 final Node p = node.predecessor(); // 先前节点与head节点是同一个的时候 if (p == head && tryAcquire(arg)) { // 将当前节点设置为头节点 setHead(node); p.next = null; // help GC // 这时代表不需要入队列,可以获取锁,返回false return interrupted; } if (shouldParkAfterFailedAcquire(p, node)) interrupted |= parkAndCheckInterrupt(); } } catch (Throwable t) { cancelAcquire(node); if (interrupted) selfInterrupt(); throw t; } }
public boolean tryLock() { return sync.nonfairTryAcquire(1); }
在tryLock内部调用的是java.util.concurrent.locks.ReentrantLock.Sync#nonfairTryAcquire方法:
/** * Performs non-fair tryLock. tryAcquire is implemented in * subclasses, but both need nonfair try for trylock method. */ @ReservedStackAccess final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); // 初始state值为0 int c = getState(); if (c == 0) { // CAS设置state值为传入的值,设置失败的返回为false;设置成功的则设置当前线程为排他线程 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } //如果已经获取过锁了,并且当前线程就是那个拥有锁的线程,则添加许可 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
普通的ReentrantLock加锁成功的条件有两种:1. 初始state为0,然后通过CAS设置状态成功了,并将当前线程设置为排他锁拥有者,加锁成功;2. 初始状态不为0,再次尝试获取锁的线程是当前拥有锁的线程。也就是说拥有锁的线程可以再次获取锁。
java.util.concurrent.locks.ReentrantLock#unlock方法:
/** * Attempts to release this lock. * * <p>If the current thread is the holder of this lock then the hold * count is decremented. If the hold count is now zero then the lock * is released. If the current thread is not the holder of this * lock then {@link IllegalMonitorStateException} is thrown. * * @throws IllegalMonitorStateException if the current thread does not * hold this lock */ public void unlock() { sync.release(1); }
内部调用的是java.util.concurrent.locks.AbstractQueuedSynchronizer#release方法:
/** * Releases in exclusive mode. Implemented by unblocking one or * more threads if {@link #tryRelease} returns true. * This method can be used to implement method {@link Lock#unlock}. * * @param arg the release argument. This value is conveyed to * {@link #tryRelease} but is otherwise uninterpreted and * can represent anything you like. * @return the value returned from {@link #tryRelease} */ public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
继续往下看看java.util.concurrent.locks.ReentrantLock.Sync#tryRelease方法:
@ReservedStackAccess protected final boolean tryRelease(int releases) { // 从这里可以看出,对持有锁的线程加了几次锁则释放几次锁。当前线程加的锁必须当前线程来释放 int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
对于锁释放的规则为:对持有锁的线程加了几次锁就需要释放几次锁。当前线程加的锁必须当前线程来释放。
/** * 一个锁能够被同一个线程获取无数次,lock实现了Releasable接口,可以进行try...with...resource进行关闭 * Acquires a lock for the given key. The key is compared by it's equals method not by object identity. The lock can be acquired * by the same thread multiple times. The lock is released by closing the returned {@link Releasable}. */ public Releasable acquire(T key) { while (true) { // 先尝试从容器中获取锁 KeyLock perNodeLock = map.get(key); // 如果不存在 if (perNodeLock == null) { // 创建新的锁 ReleasableLock newLock = tryCreateNewLock(key); if (newLock != null) { return newLock; } // 如果newLock为null,则代表该key对应的lock已经存在于map中,下一次循环时上面的perNodeLock就不为null,会进入下一个分支 } else {//同一线程再次获取锁时 assert perNodeLock != null; // 获取锁中计数器的值,计数器的初始值为1 int i = perNodeLock.count.get(); // 利用cas来对锁进行lock锁定,如果在这期间计数器的值被其他线程修改,则这次无法进行锁定,需要进入下一次循环 if (i > 0 && perNodeLock.count.compareAndSet(i, i + 1)) {//先CAS后执行lock,这里会与tryAcquire方法产生并发,CAS成功但是lock失败 perNodeLock.lock(); return new ReleasableLock(key, perNodeLock); } } } }
/** * Tries to acquire the lock for the given key and returns it. If the lock can't be acquired null is returned. */ public Releasable tryAcquire(T key) { final KeyLock perNodeLock = map.get(key); if (perNodeLock == null) { // 如果需要创建新的锁,在这里就直接返回了 return tryCreateNewLock(key); } // 利用try lock尝试获取锁,让当前线程拥有锁。到最后再释放它。 // 情况一:不同线程中时,这里可能是在上一个线程unlock的时候恰巧获取到锁,见org.elasticsearch.common.util.concurrent.KeyedLock.release,unlock的时候。这种情况会丰while判断count的值 的时候会过滤掉,因为count被release修改成0了 // 情况二:如果是同一个线程第二次获取锁,这里也会返回true if (perNodeLock.tryLock()) { // ok we got it - make sure we increment it accordingly otherwise release it again int i; //同一线程再次 获取到锁 while ((i = perNodeLock.count.get()) > 0) { //这里可能会与acquire方法产生并发问题,如果另一个线程并发调用了acquire方法,acquire方法会修改count的值成功但加锁失败。然后会导致当前cas失败,会进入while重试 // we have to do this in a loop here since even if the count is > 0 // there could be a concurrent blocking acquire that changes the count and then this CAS fails. Since we already got // the lock we should retry and see if we can still get it or if the count is 0. If that is the case and we give up. //这里进行cas是为了防止其他地方在并发修改count的值,因为当前已经有了锁的占有权所以进行重试尝试,如果count为0则放弃。 if (perNodeLock.count.compareAndSet(i, i + 1)) { return new ReleasableLock(key, perNodeLock); } } // 对应上面的tryLock,如果没有获取到锁的占有权,执行到了这里则需要释放当前线程对锁的占有权 perNodeLock.unlock(); // make sure we unlock and don't leave the lock in a locked state } return null; }
这里有很多CAS操作,有些是为了避免acquire方法和tryAcquire以及release方法之间的并发问题,具体代码细节在注释中已经相当清楚了,这里提一下tryCreateNewLock方法:
/** * 每次第一次获取锁时,count值都为1;close之后,count变为0,然后会从map中清除。如果在close的时候有一个线程并发地获取到了锁,会对count进行修改 * @param key * @return */ private ReleasableLock tryCreateNewLock(T key) { // 创建锁 KeyLock newLock = new KeyLock(fair); // 锁定 newLock.lock(); // 如果该key对应的lock不存在,则将key和lock放入map中然后返回null // 如果该key对应的lock存在,则返回之前的lock KeyLock keyLock = map.putIfAbsent(key, newLock); if (keyLock == null) { // 包装成一个ReleasableLock return new ReleasableLock(key, newLock); } return null; }
对于ReleasableLock,它实现了Releasable接口,而ReleasbleLock的结构如下:
它实现了Closeable接口,可以通try...with...resource特性来使用,会自动进行资源的释放。
我们接着来看下ReleasableLock的代码:
private final class ReleasableLock implements Releasable { final T key; final KeyLock lock; final AtomicBoolean closed = new AtomicBoolean();
private ReleasableLock(T key, KeyLock lock) { this.key = key; this.lock = lock; }
@Override public void close() { if (closed.compareAndSet(false, true)) { release(key, lock); } } }
紧接着看下释放锁的操作。
释放的操作是通过ReleasableLock的close方法来执行关闭,在其中调用了org.elasticsearch.common.util.concurrent.KeyedLock#release方法:
/** * 释放锁,在执行unlock之前该线程还拥有锁的占有权 * @param key * @param lock */ private void release(T key, KeyLock lock) { // 先进行校验 assert lock == map.get(key); // 减少锁的计数次数 final int decrementAndGet = lock.count.decrementAndGet(); // 解锁 lock.unlock(); // 当锁计数器减到零时,从map容器中将锁移除。因为count的初始值为1,所以在使用一次之后就会被移除 if (decrementAndGet == 0) { map.remove(key, lock); } assert decrementAndGet >= 0 : decrementAndGet + " must be >= 0 but wasn't"; }
可以看下org.elasticsearch.discovery.zen.UnicastZenPing.PingingRound#getOrConnect方法:
public Connection getOrConnect(DiscoveryNode node) throws IOException { Connection result; try (Releasable ignore = connectionLock.acquire(node.getAddress())) { ------------------------
KeyedLock是一个用于管理KeyLock的容器,通过维护唯一标识符与KeyLock的map来维护多个资源的锁,它的锁底层基于ReentrantLock,通过acquire和tryAcquire方法对ReentrantLock的锁定操作进行包装,通过release方法对ReentrantLock的解锁操作进行包装。