前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Elasticsearch源码分析八之锁管理工具KeyLock

Elasticsearch源码分析八之锁管理工具KeyLock

作者头像
山行AI
发布2020-03-25 09:29:51
5910
发布2020-03-25 09:29:51
举报
文章被收录于专栏:山行AI山行AI

Elasticsearch中有很多优秀的工具类,这里要分析的是Elasticsearch中用于资源并发控制的锁管理工具:KeyedLock,它基于ReentrantLock实现,这里也会对ReentrantLock进行一定的分析。

组成

代码语言:javascript
复制
public final class KeyedLock<T> {    /**     * 一个用于存放锁的容器     *     * 里面的KeyLock都是处于锁定状态的     */    private final ConcurrentMap<T, KeyLock> map = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency();
    /**     * 是公平锁还是非公平锁     */    private final boolean fair;
    /**     * Creates a new lock     * @param fair Use fair locking, ie threads get the lock in the order they requested it     */    public KeyedLock(boolean fair) {        this.fair = fair;    }
    /**     * Creates a non-fair lock     */    public KeyedLock() {        this(false);    }

KeyedLock的map属性是存放资源标识和KeyLock的容器,也就是一个大的锁容器。KeyLock为每一个资源标识对应的锁对象,它继承自ReentrantLock:

代码语言:javascript
复制
 private static final class KeyLock extends ReentrantLock {        KeyLock(boolean fair) {            super(fair);        }
        /**         * 计数器         */        private final AtomicInteger count = new AtomicInteger(1);    }

接下来先从ReentrantLock的加锁和释放锁的方法为切入点来分析KeyedLock中的加锁和释放锁的方法。

ReentrantLock的加锁和释放

加锁

  • ReentrantLock的java.util.concurrent.locks.ReentrantLock#lock方法:
代码语言:javascript
复制
  public void lock() {        sync.acquire(1);    }

内部调用的是java.util.concurrent.locks.AbstractQueuedSynchronizer#acquire方法:

代码语言:javascript
复制
  public final void acquire(int arg) {  // 如果申请失败,并且入队列成功,则中断当前线程  //  注意,这里&&的用法很妙,因为当tryAcquire(arg)返回false时才会判断acquireQueued方法,而在acquireQueue方法内部还会调用tryAcquire方法。如果第一个tryAcquire返回true,!tryAcquire就为false,就不会进入第二个方法,这样避免了重复加锁        if (!tryAcquire(arg) &&            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))            selfInterrupt();    }

这里我们以java.util.concurrent.locks.ReentrantLock.FairSync#tryAcquire方法为例:

代码语言:javascript
复制
@ReservedStackAccess        protected final boolean tryAcquire(int acquires) {            final Thread current = Thread.currentThread();            int c = getState();            if (c == 0) {                if (!hasQueuedPredecessors() &&                // CAS设置state值为传入的值,设置失败的返回为false;设置成功的则设置当前线程为排他线程                    compareAndSetState(0, acquires)) {                    setExclusiveOwnerThread(current);                    return true;                }            }            //如果已经获取过锁了,并且当前线程就是那个拥有锁的线程,则添加许可            else if (current == getExclusiveOwnerThread()) {                int nextc = c + acquires;                if (nextc < 0)                    throw new Error("Maximum lock count exceeded");                setState(nextc);                return true;            }            return false;        }
代码语言:javascript
复制
 /** Marker to indicate a node is waiting in exclusive mode */        static final Node EXCLUSIVE = null;

Node.EXCLUSIVE用于标识一个节点正处于排他模式,在等待状态。

java.util.concurrent.locks.AbstractQueuedSynchronizer#addWaiter方法:

代码语言:javascript
复制
private Node addWaiter(Node mode) {        Node node = new Node(mode);
        for (;;) {            Node oldTail = tail;            if (oldTail != null) {// 如果之前节点不为null                // 设置当前的prev节点为之前的old节点                node.setPrevRelaxed(oldTail);                // 并通过CAS将之前的tail节点位置设置上当前节点                if (compareAndSetTail(oldTail, node)) {                    // 将之前tail节点的next节点指向当前节点                    oldTail.next = node;                    // 返回当前节点                    return node;                }            } else {            // 如果第一次添加进入这个方法进行队列的head节点和tail节点的初始化,然后继续for循环                initializeSyncQueue();            }        }    }
       /** Constructor used by addWaiter. */        Node(Node nextWaiter) {            this.nextWaiter = nextWaiter;            THREAD.set(this, Thread.currentThread());        }

java.util.concurrent.locks.AbstractQueuedSynchronizer#initializeSyncQueue方法:

代码语言:javascript
复制
 /**     * Initializes head and tail fields on first contention.     */    private final void initializeSyncQueue() {        Node h;        // 初始化head和tail 节点,设置一个新创建的Node对象        if (HEAD.compareAndSet(this, null, (h = new Node())))            tail = h;    }

java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireQueued方法:

代码语言:javascript
复制
final boolean acquireQueued(final Node node, int arg) {        boolean interrupted = false;        try {            for (;;) {            // 取到当前节点的先前节点                final Node p = node.predecessor();                // 先前节点与head节点是同一个的时候                if (p == head && tryAcquire(arg)) {                // 将当前节点设置为头节点                    setHead(node);                    p.next = null; // help GC                    // 这时代表不需要入队列,可以获取锁,返回false                    return interrupted;                }                if (shouldParkAfterFailedAcquire(p, node))                    interrupted |= parkAndCheckInterrupt();            }        } catch (Throwable t) {            cancelAcquire(node);            if (interrupted)                selfInterrupt();            throw t;        }    }
  • ReentrantLock的java.util.concurrent.locks.ReentrantLock#tryLock()方法:
代码语言:javascript
复制
    public boolean tryLock() {        return sync.nonfairTryAcquire(1);    }

在tryLock内部调用的是java.util.concurrent.locks.ReentrantLock.Sync#nonfairTryAcquire方法:

代码语言:javascript
复制
 /**         * Performs non-fair tryLock.  tryAcquire is implemented in         * subclasses, but both need nonfair try for trylock method.         */        @ReservedStackAccess        final boolean nonfairTryAcquire(int acquires) {            final Thread current = Thread.currentThread();            // 初始state值为0            int c = getState();            if (c == 0) {            // CAS设置state值为传入的值,设置失败的返回为false;设置成功的则设置当前线程为排他线程                if (compareAndSetState(0, acquires)) {                    setExclusiveOwnerThread(current);                    return true;                }            }            //如果已经获取过锁了,并且当前线程就是那个拥有锁的线程,则添加许可            else if (current == getExclusiveOwnerThread()) {                int nextc = c + acquires;                if (nextc < 0) // overflow                    throw new Error("Maximum lock count exceeded");                setState(nextc);                return true;            }            return false;        }

普通的ReentrantLock加锁成功的条件有两种:1. 初始state为0,然后通过CAS设置状态成功了,并将当前线程设置为排他锁拥有者,加锁成功;2. 初始状态不为0,再次尝试获取锁的线程是当前拥有锁的线程。也就是说拥有锁的线程可以再次获取锁。

解锁

java.util.concurrent.locks.ReentrantLock#unlock方法:

代码语言:javascript
复制
   /**     * Attempts to release this lock.     *     * <p>If the current thread is the holder of this lock then the hold     * count is decremented.  If the hold count is now zero then the lock     * is released.  If the current thread is not the holder of this     * lock then {@link IllegalMonitorStateException} is thrown.     *     * @throws IllegalMonitorStateException if the current thread does not     *         hold this lock     */    public void unlock() {        sync.release(1);    }

内部调用的是java.util.concurrent.locks.AbstractQueuedSynchronizer#release方法:

代码语言:javascript
复制
/**     * Releases in exclusive mode.  Implemented by unblocking one or     * more threads if {@link #tryRelease} returns true.     * This method can be used to implement method {@link Lock#unlock}.     *     * @param arg the release argument.  This value is conveyed to     *        {@link #tryRelease} but is otherwise uninterpreted and     *        can represent anything you like.     * @return the value returned from {@link #tryRelease}     */    public final boolean release(int arg) {        if (tryRelease(arg)) {            Node h = head;            if (h != null && h.waitStatus != 0)                unparkSuccessor(h);            return true;        }        return false;    }

继续往下看看java.util.concurrent.locks.ReentrantLock.Sync#tryRelease方法:

代码语言:javascript
复制
 @ReservedStackAccess        protected final boolean tryRelease(int releases) {        // 从这里可以看出,对持有锁的线程加了几次锁则释放几次锁。当前线程加的锁必须当前线程来释放            int c = getState() - releases;            if (Thread.currentThread() != getExclusiveOwnerThread())                throw new IllegalMonitorStateException();            boolean free = false;            if (c == 0) {                free = true;                setExclusiveOwnerThread(null);            }            setState(c);            return free;        }

对于锁释放的规则为:对持有锁的线程加了几次锁就需要释放几次锁。当前线程加的锁必须当前线程来释放。

KeyedLock加锁和释放锁

加锁

  • org.elasticsearch.common.util.concurrent.KeyedLock#acquire方法:
代码语言:javascript
复制
  /**     * 一个锁能够被同一个线程获取无数次,lock实现了Releasable接口,可以进行try...with...resource进行关闭     * Acquires a lock for the given key. The key is compared by it's equals method not by object identity. The lock can be acquired     * by the same thread multiple times. The lock is released by closing the returned {@link Releasable}.     */    public Releasable acquire(T key) {        while (true) {            // 先尝试从容器中获取锁            KeyLock perNodeLock = map.get(key);            // 如果不存在            if (perNodeLock == null) {                // 创建新的锁                ReleasableLock newLock = tryCreateNewLock(key);                if (newLock != null) {                    return newLock;                }                // 如果newLock为null,则代表该key对应的lock已经存在于map中,下一次循环时上面的perNodeLock就不为null,会进入下一个分支            } else {//同一线程再次获取锁时                assert perNodeLock != null;                // 获取锁中计数器的值,计数器的初始值为1                int i = perNodeLock.count.get();                // 利用cas来对锁进行lock锁定,如果在这期间计数器的值被其他线程修改,则这次无法进行锁定,需要进入下一次循环                if (i > 0 && perNodeLock.count.compareAndSet(i, i + 1)) {//先CAS后执行lock,这里会与tryAcquire方法产生并发,CAS成功但是lock失败                    perNodeLock.lock();                    return new ReleasableLock(key, perNodeLock);                }            }        }    }
  • org.elasticsearch.common.util.concurrent.KeyedLock#tryAcquire方法:
代码语言:javascript
复制
 /**     * Tries to acquire the lock for the given key and returns it. If the lock can't be acquired null is returned.     */    public Releasable tryAcquire(T key) {        final KeyLock perNodeLock = map.get(key);        if (perNodeLock == null) {            // 如果需要创建新的锁,在这里就直接返回了            return tryCreateNewLock(key);        }        // 利用try lock尝试获取锁,让当前线程拥有锁。到最后再释放它。        // 情况一:不同线程中时,这里可能是在上一个线程unlock的时候恰巧获取到锁,见org.elasticsearch.common.util.concurrent.KeyedLock.release,unlock的时候。这种情况会丰while判断count的值 的时候会过滤掉,因为count被release修改成0了        // 情况二:如果是同一个线程第二次获取锁,这里也会返回true        if (perNodeLock.tryLock()) { // ok we got it - make sure we increment it accordingly otherwise release it again            int i;            //同一线程再次 获取到锁            while ((i = perNodeLock.count.get()) > 0) {                //这里可能会与acquire方法产生并发问题,如果另一个线程并发调用了acquire方法,acquire方法会修改count的值成功但加锁失败。然后会导致当前cas失败,会进入while重试                // we have to do this in a loop here since even if the count is > 0                // there could be a concurrent blocking acquire that changes the count and then this CAS fails. Since we already got                // the lock we should retry and see if we can still get it or if the count is 0. If that is the case and we give up.                //这里进行cas是为了防止其他地方在并发修改count的值,因为当前已经有了锁的占有权所以进行重试尝试,如果count为0则放弃。                if (perNodeLock.count.compareAndSet(i, i + 1)) {                    return new ReleasableLock(key, perNodeLock);                }            }            // 对应上面的tryLock,如果没有获取到锁的占有权,执行到了这里则需要释放当前线程对锁的占有权            perNodeLock.unlock(); // make sure we unlock and don't leave the lock in a locked state        }        return null;    }

这里有很多CAS操作,有些是为了避免acquire方法和tryAcquire以及release方法之间的并发问题,具体代码细节在注释中已经相当清楚了,这里提一下tryCreateNewLock方法:

代码语言:javascript
复制
  /**     * 每次第一次获取锁时,count值都为1;close之后,count变为0,然后会从map中清除。如果在close的时候有一个线程并发地获取到了锁,会对count进行修改     * @param key     * @return     */    private ReleasableLock tryCreateNewLock(T key) {        // 创建锁        KeyLock newLock = new KeyLock(fair);        // 锁定        newLock.lock();        // 如果该key对应的lock不存在,则将key和lock放入map中然后返回null        // 如果该key对应的lock存在,则返回之前的lock        KeyLock keyLock = map.putIfAbsent(key, newLock);        if (keyLock == null) {            // 包装成一个ReleasableLock            return new ReleasableLock(key, newLock);        }        return null;    }

对于ReleasableLock,它实现了Releasable接口,而ReleasbleLock的结构如下:

它实现了Closeable接口,可以通try...with...resource特性来使用,会自动进行资源的释放。

我们接着来看下ReleasableLock的代码:

代码语言:javascript
复制
private final class ReleasableLock implements Releasable {        final T key;        final KeyLock lock;        final AtomicBoolean closed = new AtomicBoolean();
        private ReleasableLock(T key, KeyLock lock) {            this.key = key;            this.lock = lock;        }
        @Override        public void close() {            if (closed.compareAndSet(false, true)) {                release(key, lock);            }        }    }

紧接着看下释放锁的操作。

释放锁

释放的操作是通过ReleasableLock的close方法来执行关闭,在其中调用了org.elasticsearch.common.util.concurrent.KeyedLock#release方法:

代码语言:javascript
复制
  /**     * 释放锁,在执行unlock之前该线程还拥有锁的占有权     * @param key     * @param lock     */    private void release(T key, KeyLock lock) {        // 先进行校验        assert lock == map.get(key);        // 减少锁的计数次数        final int decrementAndGet = lock.count.decrementAndGet();        // 解锁        lock.unlock();        // 当锁计数器减到零时,从map容器中将锁移除。因为count的初始值为1,所以在使用一次之后就会被移除        if (decrementAndGet == 0) {            map.remove(key, lock);        }        assert decrementAndGet >= 0 : decrementAndGet + " must be >= 0 but wasn't";    }

使用方法

可以看下org.elasticsearch.discovery.zen.UnicastZenPing.PingingRound#getOrConnect方法:

代码语言:javascript
复制
 public Connection getOrConnect(DiscoveryNode node) throws IOException {            Connection result;            try (Releasable ignore = connectionLock.acquire(node.getAddress())) {            ------------------------

总结

KeyedLock是一个用于管理KeyLock的容器,通过维护唯一标识符与KeyLock的map来维护多个资源的锁,它的锁底层基于ReentrantLock,通过acquire和tryAcquire方法对ReentrantLock的锁定操作进行包装,通过release方法对ReentrantLock的解锁操作进行包装。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-03-12,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 开发架构二三事 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 组成
  • ReentrantLock的加锁和释放
    • 加锁
      • 解锁
        • KeyedLock加锁和释放锁
          • 加锁
          • 释放锁
        • 使用方法
          • 总结
          相关产品与服务
          容器服务
          腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档