专栏首页开发架构二三事Elasticsearch源码分析八之锁管理工具KeyLock

Elasticsearch源码分析八之锁管理工具KeyLock

Elasticsearch中有很多优秀的工具类,这里要分析的是Elasticsearch中用于资源并发控制的锁管理工具:KeyedLock,它基于ReentrantLock实现,这里也会对ReentrantLock进行一定的分析。

组成

public final class KeyedLock<T> {    /**     * 一个用于存放锁的容器     *     * 里面的KeyLock都是处于锁定状态的     */    private final ConcurrentMap<T, KeyLock> map = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency();
    /**     * 是公平锁还是非公平锁     */    private final boolean fair;
    /**     * Creates a new lock     * @param fair Use fair locking, ie threads get the lock in the order they requested it     */    public KeyedLock(boolean fair) {        this.fair = fair;    }
    /**     * Creates a non-fair lock     */    public KeyedLock() {        this(false);    }

KeyedLock的map属性是存放资源标识和KeyLock的容器,也就是一个大的锁容器。KeyLock为每一个资源标识对应的锁对象,它继承自ReentrantLock:

 private static final class KeyLock extends ReentrantLock {        KeyLock(boolean fair) {            super(fair);        }
        /**         * 计数器         */        private final AtomicInteger count = new AtomicInteger(1);    }

接下来先从ReentrantLock的加锁和释放锁的方法为切入点来分析KeyedLock中的加锁和释放锁的方法。

ReentrantLock的加锁和释放

加锁

  • ReentrantLock的java.util.concurrent.locks.ReentrantLock#lock方法:
  public void lock() {        sync.acquire(1);    }

内部调用的是java.util.concurrent.locks.AbstractQueuedSynchronizer#acquire方法:

  public final void acquire(int arg) {  // 如果申请失败,并且入队列成功,则中断当前线程  //  注意,这里&&的用法很妙,因为当tryAcquire(arg)返回false时才会判断acquireQueued方法,而在acquireQueue方法内部还会调用tryAcquire方法。如果第一个tryAcquire返回true,!tryAcquire就为false,就不会进入第二个方法,这样避免了重复加锁        if (!tryAcquire(arg) &&            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))            selfInterrupt();    }

这里我们以java.util.concurrent.locks.ReentrantLock.FairSync#tryAcquire方法为例:

@ReservedStackAccess        protected final boolean tryAcquire(int acquires) {            final Thread current = Thread.currentThread();            int c = getState();            if (c == 0) {                if (!hasQueuedPredecessors() &&                // CAS设置state值为传入的值,设置失败的返回为false;设置成功的则设置当前线程为排他线程                    compareAndSetState(0, acquires)) {                    setExclusiveOwnerThread(current);                    return true;                }            }            //如果已经获取过锁了,并且当前线程就是那个拥有锁的线程,则添加许可            else if (current == getExclusiveOwnerThread()) {                int nextc = c + acquires;                if (nextc < 0)                    throw new Error("Maximum lock count exceeded");                setState(nextc);                return true;            }            return false;        }
 /** Marker to indicate a node is waiting in exclusive mode */        static final Node EXCLUSIVE = null;

Node.EXCLUSIVE用于标识一个节点正处于排他模式,在等待状态。

java.util.concurrent.locks.AbstractQueuedSynchronizer#addWaiter方法:

private Node addWaiter(Node mode) {        Node node = new Node(mode);
        for (;;) {            Node oldTail = tail;            if (oldTail != null) {// 如果之前节点不为null                // 设置当前的prev节点为之前的old节点                node.setPrevRelaxed(oldTail);                // 并通过CAS将之前的tail节点位置设置上当前节点                if (compareAndSetTail(oldTail, node)) {                    // 将之前tail节点的next节点指向当前节点                    oldTail.next = node;                    // 返回当前节点                    return node;                }            } else {            // 如果第一次添加进入这个方法进行队列的head节点和tail节点的初始化,然后继续for循环                initializeSyncQueue();            }        }    }
       /** Constructor used by addWaiter. */        Node(Node nextWaiter) {            this.nextWaiter = nextWaiter;            THREAD.set(this, Thread.currentThread());        }

java.util.concurrent.locks.AbstractQueuedSynchronizer#initializeSyncQueue方法:

 /**     * Initializes head and tail fields on first contention.     */    private final void initializeSyncQueue() {        Node h;        // 初始化head和tail 节点,设置一个新创建的Node对象        if (HEAD.compareAndSet(this, null, (h = new Node())))            tail = h;    }

java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireQueued方法:

final boolean acquireQueued(final Node node, int arg) {        boolean interrupted = false;        try {            for (;;) {            // 取到当前节点的先前节点                final Node p = node.predecessor();                // 先前节点与head节点是同一个的时候                if (p == head && tryAcquire(arg)) {                // 将当前节点设置为头节点                    setHead(node);                    p.next = null; // help GC                    // 这时代表不需要入队列,可以获取锁,返回false                    return interrupted;                }                if (shouldParkAfterFailedAcquire(p, node))                    interrupted |= parkAndCheckInterrupt();            }        } catch (Throwable t) {            cancelAcquire(node);            if (interrupted)                selfInterrupt();            throw t;        }    }
  • ReentrantLock的java.util.concurrent.locks.ReentrantLock#tryLock()方法:
    public boolean tryLock() {        return sync.nonfairTryAcquire(1);    }

在tryLock内部调用的是java.util.concurrent.locks.ReentrantLock.Sync#nonfairTryAcquire方法:

 /**         * Performs non-fair tryLock.  tryAcquire is implemented in         * subclasses, but both need nonfair try for trylock method.         */        @ReservedStackAccess        final boolean nonfairTryAcquire(int acquires) {            final Thread current = Thread.currentThread();            // 初始state值为0            int c = getState();            if (c == 0) {            // CAS设置state值为传入的值,设置失败的返回为false;设置成功的则设置当前线程为排他线程                if (compareAndSetState(0, acquires)) {                    setExclusiveOwnerThread(current);                    return true;                }            }            //如果已经获取过锁了,并且当前线程就是那个拥有锁的线程,则添加许可            else if (current == getExclusiveOwnerThread()) {                int nextc = c + acquires;                if (nextc < 0) // overflow                    throw new Error("Maximum lock count exceeded");                setState(nextc);                return true;            }            return false;        }

普通的ReentrantLock加锁成功的条件有两种:1. 初始state为0,然后通过CAS设置状态成功了,并将当前线程设置为排他锁拥有者,加锁成功;2. 初始状态不为0,再次尝试获取锁的线程是当前拥有锁的线程。也就是说拥有锁的线程可以再次获取锁。

解锁

java.util.concurrent.locks.ReentrantLock#unlock方法:

   /**     * Attempts to release this lock.     *     * <p>If the current thread is the holder of this lock then the hold     * count is decremented.  If the hold count is now zero then the lock     * is released.  If the current thread is not the holder of this     * lock then {@link IllegalMonitorStateException} is thrown.     *     * @throws IllegalMonitorStateException if the current thread does not     *         hold this lock     */    public void unlock() {        sync.release(1);    }

内部调用的是java.util.concurrent.locks.AbstractQueuedSynchronizer#release方法:

/**     * Releases in exclusive mode.  Implemented by unblocking one or     * more threads if {@link #tryRelease} returns true.     * This method can be used to implement method {@link Lock#unlock}.     *     * @param arg the release argument.  This value is conveyed to     *        {@link #tryRelease} but is otherwise uninterpreted and     *        can represent anything you like.     * @return the value returned from {@link #tryRelease}     */    public final boolean release(int arg) {        if (tryRelease(arg)) {            Node h = head;            if (h != null && h.waitStatus != 0)                unparkSuccessor(h);            return true;        }        return false;    }

继续往下看看java.util.concurrent.locks.ReentrantLock.Sync#tryRelease方法:

 @ReservedStackAccess        protected final boolean tryRelease(int releases) {        // 从这里可以看出,对持有锁的线程加了几次锁则释放几次锁。当前线程加的锁必须当前线程来释放            int c = getState() - releases;            if (Thread.currentThread() != getExclusiveOwnerThread())                throw new IllegalMonitorStateException();            boolean free = false;            if (c == 0) {                free = true;                setExclusiveOwnerThread(null);            }            setState(c);            return free;        }

对于锁释放的规则为:对持有锁的线程加了几次锁就需要释放几次锁。当前线程加的锁必须当前线程来释放。

KeyedLock加锁和释放锁

加锁

  • org.elasticsearch.common.util.concurrent.KeyedLock#acquire方法:
  /**     * 一个锁能够被同一个线程获取无数次,lock实现了Releasable接口,可以进行try...with...resource进行关闭     * Acquires a lock for the given key. The key is compared by it's equals method not by object identity. The lock can be acquired     * by the same thread multiple times. The lock is released by closing the returned {@link Releasable}.     */    public Releasable acquire(T key) {        while (true) {            // 先尝试从容器中获取锁            KeyLock perNodeLock = map.get(key);            // 如果不存在            if (perNodeLock == null) {                // 创建新的锁                ReleasableLock newLock = tryCreateNewLock(key);                if (newLock != null) {                    return newLock;                }                // 如果newLock为null,则代表该key对应的lock已经存在于map中,下一次循环时上面的perNodeLock就不为null,会进入下一个分支            } else {//同一线程再次获取锁时                assert perNodeLock != null;                // 获取锁中计数器的值,计数器的初始值为1                int i = perNodeLock.count.get();                // 利用cas来对锁进行lock锁定,如果在这期间计数器的值被其他线程修改,则这次无法进行锁定,需要进入下一次循环                if (i > 0 && perNodeLock.count.compareAndSet(i, i + 1)) {//先CAS后执行lock,这里会与tryAcquire方法产生并发,CAS成功但是lock失败                    perNodeLock.lock();                    return new ReleasableLock(key, perNodeLock);                }            }        }    }
  • org.elasticsearch.common.util.concurrent.KeyedLock#tryAcquire方法:
 /**     * Tries to acquire the lock for the given key and returns it. If the lock can't be acquired null is returned.     */    public Releasable tryAcquire(T key) {        final KeyLock perNodeLock = map.get(key);        if (perNodeLock == null) {            // 如果需要创建新的锁,在这里就直接返回了            return tryCreateNewLock(key);        }        // 利用try lock尝试获取锁,让当前线程拥有锁。到最后再释放它。        // 情况一:不同线程中时,这里可能是在上一个线程unlock的时候恰巧获取到锁,见org.elasticsearch.common.util.concurrent.KeyedLock.release,unlock的时候。这种情况会丰while判断count的值 的时候会过滤掉,因为count被release修改成0了        // 情况二:如果是同一个线程第二次获取锁,这里也会返回true        if (perNodeLock.tryLock()) { // ok we got it - make sure we increment it accordingly otherwise release it again            int i;            //同一线程再次 获取到锁            while ((i = perNodeLock.count.get()) > 0) {                //这里可能会与acquire方法产生并发问题,如果另一个线程并发调用了acquire方法,acquire方法会修改count的值成功但加锁失败。然后会导致当前cas失败,会进入while重试                // we have to do this in a loop here since even if the count is > 0                // there could be a concurrent blocking acquire that changes the count and then this CAS fails. Since we already got                // the lock we should retry and see if we can still get it or if the count is 0. If that is the case and we give up.                //这里进行cas是为了防止其他地方在并发修改count的值,因为当前已经有了锁的占有权所以进行重试尝试,如果count为0则放弃。                if (perNodeLock.count.compareAndSet(i, i + 1)) {                    return new ReleasableLock(key, perNodeLock);                }            }            // 对应上面的tryLock,如果没有获取到锁的占有权,执行到了这里则需要释放当前线程对锁的占有权            perNodeLock.unlock(); // make sure we unlock and don't leave the lock in a locked state        }        return null;    }

这里有很多CAS操作,有些是为了避免acquire方法和tryAcquire以及release方法之间的并发问题,具体代码细节在注释中已经相当清楚了,这里提一下tryCreateNewLock方法:

  /**     * 每次第一次获取锁时,count值都为1;close之后,count变为0,然后会从map中清除。如果在close的时候有一个线程并发地获取到了锁,会对count进行修改     * @param key     * @return     */    private ReleasableLock tryCreateNewLock(T key) {        // 创建锁        KeyLock newLock = new KeyLock(fair);        // 锁定        newLock.lock();        // 如果该key对应的lock不存在,则将key和lock放入map中然后返回null        // 如果该key对应的lock存在,则返回之前的lock        KeyLock keyLock = map.putIfAbsent(key, newLock);        if (keyLock == null) {            // 包装成一个ReleasableLock            return new ReleasableLock(key, newLock);        }        return null;    }

对于ReleasableLock,它实现了Releasable接口,而ReleasbleLock的结构如下:

它实现了Closeable接口,可以通try...with...resource特性来使用,会自动进行资源的释放。

我们接着来看下ReleasableLock的代码:

private final class ReleasableLock implements Releasable {        final T key;        final KeyLock lock;        final AtomicBoolean closed = new AtomicBoolean();
        private ReleasableLock(T key, KeyLock lock) {            this.key = key;            this.lock = lock;        }
        @Override        public void close() {            if (closed.compareAndSet(false, true)) {                release(key, lock);            }        }    }

紧接着看下释放锁的操作。

释放锁

释放的操作是通过ReleasableLock的close方法来执行关闭,在其中调用了org.elasticsearch.common.util.concurrent.KeyedLock#release方法:

  /**     * 释放锁,在执行unlock之前该线程还拥有锁的占有权     * @param key     * @param lock     */    private void release(T key, KeyLock lock) {        // 先进行校验        assert lock == map.get(key);        // 减少锁的计数次数        final int decrementAndGet = lock.count.decrementAndGet();        // 解锁        lock.unlock();        // 当锁计数器减到零时,从map容器中将锁移除。因为count的初始值为1,所以在使用一次之后就会被移除        if (decrementAndGet == 0) {            map.remove(key, lock);        }        assert decrementAndGet >= 0 : decrementAndGet + " must be >= 0 but wasn't";    }

使用方法

可以看下org.elasticsearch.discovery.zen.UnicastZenPing.PingingRound#getOrConnect方法:

 public Connection getOrConnect(DiscoveryNode node) throws IOException {            Connection result;            try (Releasable ignore = connectionLock.acquire(node.getAddress())) {            ------------------------

总结

KeyedLock是一个用于管理KeyLock的容器,通过维护唯一标识符与KeyLock的map来维护多个资源的锁,它的锁底层基于ReentrantLock,通过acquire和tryAcquire方法对ReentrantLock的锁定操作进行包装,通过release方法对ReentrantLock的解锁操作进行包装。

本文分享自微信公众号 - 开发架构二三事(gh_d6f166e26398),作者:两个小灰象

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2020-03-12

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 实战之防止表单重复提交

    对于防止重复提交,最简单也最不安全的做法相信大家也都经历过,前端在一个请求发送后立即禁用掉按钮,这里咱们来讨论一下后端对防止重复提交的处理方式。 主要针对非分布...

    开发架构二三事
  • java8中skiplist的实现及源码分析

    这个类是实现了一个类似于树的二维连接的跳表,它的索引级别是放在分割开的节点里面的,基础节点拥有所有的数据。用这个便利的数据结构代替数组结构的原因主要有两点:

    开发架构二三事
  • netty源码分析四之客户端接入与数据写出

    在上文Bootstrap初始化流程分析中我们知道,在NioServerSocketChannel进行register时,会调用eventLoop.execute...

    开发架构二三事
  • JDK1.8源码(七)——java.util.HashMap 类

      本篇博客我们来介绍在 JDK1.8 中 HashMap 的源码实现,这也是最常用的一个集合。但是在介绍 HashMap 之前,我们先介绍什么是 Hash表。...

    IT可乐
  • 散列表

    是根据键 (Key) 而直接访问在内存存储位置的数据结构。也就是说,它通过计算一个关于键值的函数,将所需查询的数据映射到表中一个位置来访问记录,这加快了查找速度...

    周三不加班
  • 18届学长Java面经分享:应届生找工作,基础为主,项目在精

    先说一下我的个人情况,18届应届毕业生,去年9月份开始在上海一家软件公司实习,直到今年的4月底离开公司,6月中旬开始找工作,现已经拿到较为满意的offer(坐标...

    牛客网
  • Redis实现用户登录错误次数限制

    系统登录的时候经常会有这种场景,如果密码连续N次输入错误,则要等N分钟之后才能重试。实现的方式有多种,比如在内存中维护一个数据结构来存储这些信息,但实现起来比较...

    会跳舞的机器人
  • 常见面试第四题之requestLayout, invalidate和postInvalidate的异同

    requestLayout, invalidate和postInvalidate的异同 ? 今天我们来讲讲在面试当中最常见的,最常常被问到的第四题,近期由于小...

    非著名程序员
  • 使用Redis作为分布式锁的一些注意点

    最简单的方法是使用setnx命令。key是锁的唯一标识,按业务来决定命名,value为当前线程的线程ID。

    小勇DW3
  • redis做分布式锁可能不那么简单

    在计算机世界里,对于锁大家并不陌生,在现代所有的语言中几乎都提供了语言级别锁的实现,为什么我们的程序有时候会这么依赖锁呢?这个问题还是要从计算机的发展说起,随着...

    用户5546570

扫码关注云+社区

领取腾讯云代金券