专栏首页老男孩成长之路【源码分析】SpringBoot2中取代Druid的超级连接池:HikariCP之ConcurrentBag

【源码分析】SpringBoot2中取代Druid的超级连接池:HikariCP之ConcurrentBag

HiKariCP是数据库连接池的一个后起之秀,号称性能最好,可以完美地PK掉其他连接池。

以前无意间搜资料了解到 HikariCP,一下子就被它的简洁代码和卓越性能吸引住了。以前也有翻过它的代码,但是不是很系统,最近再次翻阅,正好做些笔记,方便以后学习。

最近在学习 Java 并发知识。那就从 HikariCP 自定义的并发集合 ConcurrentBag 开始学习。

在 HikariCP 的 Wiki 中,有 Down the Rabbit Hole · ConcurrentBag 的章节来专门介绍 ConcurrentBag

ConcurrentBag 的灵感借鉴自 C# .NET 的 ConcurrentBag 类。但是实现却是完全不同的。这里的 ConcurrentBag 有如下特性:

  • A lock-free design
  • ThreadLocal caching
  • Queue-stealing
  • Direct hand-off optimizations

下面,通过代码来对此做个说明。

ConcurrentBag 类的定义中,声明了集合元素必须是 IConcurrentBagEntry 的子类。先来看看这个接口的定义:

public interface IConcurrentBagEntry
{
    int STATE_NOT_IN_USE = 0;
    int STATE_IN_USE = 1;
    int STATE_REMOVED = -1;
    int STATE_RESERVED = -2;

    boolean compareAndSet(int expectState, int newState);
    void setState(int newState);
    int getState();
}

接下来,看一下成员变量:

// 存放共享元素
private final CopyOnWriteArrayList<T> sharedList;
private final boolean weakThreadLocals;

// 在 ThreadLocal 缓存线程本地元素,避免线程争用
private final ThreadLocal<List<Object>> threadList;
private final IBagStateListener listener;
//
private final AtomicInteger waiters;
private volatile boolean closed;

// 接力队列
private final SynchronousQueue<T> handoffQueue;

ConcurrentBag 开头的 JavaDoc 中就做了明确说明:

Note that items that are "borrowed" from the bag are not actually removed from any collection, so garbage collection will not occur even if the reference is abandoned. Thus care must be taken to "requite" borrowed objects otherwise a memory leak will result. Only the "remove" method can completely remove an object from the bag.

翻译一下就是:注意,从 ConcurrentBag 中“借用”(borrow)对象,实际上并未从任何集合中删除(只是将其状态设置为 STATE_IN_USE),因此即使删除引用也不会进行垃圾收集。因此必须注意归还(requite)借用的对象(将元素状态设置为 STATE_NOT_IN_USE),否则将导致内存泄漏。 只有“删除”(remove)方法才能从袋子中完全删除一个对象。具体看代码:

/**
* The method will borrow a BagEntry from the bag, blocking for the
* specified timeout if none are available.
*
* @param timeout how long to wait before giving up, in units of unit
* @param timeUnit a <code>TimeUnit</code> determining how to interpret the timeout parameter
* @return a borrowed instance from the bag or null if a timeout occurs
* @throws InterruptedException if interrupted while waiting
*/
public T borrow(long timeout, final TimeUnit timeUnit) throws InterruptedException
{
    // 1\. 尝试从 ThreadLocal 中查找目标值
    // Try the thread-local list first
    final List<Object> list = threadList.get();
    for (int i = list.size() - 1; i >= 0; i--) {
        final Object entry = list.remove(i);
        @SuppressWarnings("unchecked")
        final T bagEntry = weakThreadLocals ? ((WeakReference<T>) entry).get() : (T) entry;
        if (bagEntry != null && bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
            return bagEntry;
        }
    }

    // Otherwise, scan the shared list ... then poll the handoff queue
    final int waiting = waiters.incrementAndGet();
    try {
        // 2\. 如果 ThreadLocal 中没有目标元素:没有元素 或者 修改元素状态失败,则从 `sharedList` 中获取目标元素。
        //    这里可以看出,只是将目标元素的状态从 `STATE_NOT_IN_USE` 修改为 `STATE_IN_USE`,并没有删除。
        //    换句话说,在 `sharedList` 变量中,保存着集合中所有的元素。
        for (T bagEntry : sharedList) {
        if (bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
            // If we may have stolen another waiter's connection, request another bag add.
            if (waiting > 1) {
                listener.addBagItem(waiting - 1);
            }
            return bagEntry;
        }
        }

        listener.addBagItem(waiting);

        // 3\. 如果 `sharedList` 也没有目标元素,则在接力队列 handoffQueue 中获取,直到超时
        timeout = timeUnit.toNanos(timeout);
        do {
        final long start = currentTime();
        final T bagEntry = handoffQueue.poll(timeout, NANOSECONDS);
        if (bagEntry == null || bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
            return bagEntry;
        }

        timeout -= elapsedNanos(start);
        } while (timeout > 10_000);

        return null;
    }
    finally {
        waiters.decrementAndGet();
    }
}

/**
* This method will return a borrowed object to the bag.  Objects
* that are borrowed from the bag but never "requited" will result
* in a memory leak.
*
* @param bagEntry the value to return to the bag
* @throws NullPointerException if value is null
* @throws IllegalStateException if the bagEntry was not borrowed from the bag
*/
public void requite(final T bagEntry)
{
    // 将归还元素的状态设置成 `STATE_NOT_IN_USE`
    bagEntry.setState(STATE_NOT_IN_USE);

    // 如果等待大于零,则先尝试将元素交给接力队列 handoffQueue,这样更快地交给消费方。
    for (int i = 0; waiters.get() > 0; i++) {
        if (bagEntry.getState() != STATE_NOT_IN_USE || handoffQueue.offer(bagEntry)) {
            return;
        }
        else if ((i & 0xff) == 0xff) {
            parkNanos(MICROSECONDS.toNanos(10));
        }
        else {
            Thread.yield();
        }
    }

    // 如果没有等待,则将元素放入到 ThreadLocal 中,方便方便下次使用。
    final List<Object> threadLocalList = threadList.get();
    if (threadLocalList.size() < 50) {
        threadLocalList.add(weakThreadLocals ? new WeakReference<>(bagEntry) : bagEntry);
    }
}

集合元素的添加和删除是通过 addremove 方法来实现的。代码如下:

/**
* Add a new object to the bag for others to borrow.
*
* @param bagEntry an object to add to the bag
*/
public void add(final T bagEntry)
{
    if (closed) {
        LOGGER.info("ConcurrentBag has been closed, ignoring add()");
        throw new IllegalStateException("ConcurrentBag has been closed, ignoring add()");
    }

    // 从这里可以看出,添加的元素都会添加到 sharedList 变量中。
    sharedList.add(bagEntry);

    // spin until a thread takes it or none are waiting
    while (waiters.get() > 0 && bagEntry.getState() == STATE_NOT_IN_USE && !handoffQueue.offer(bagEntry)) {
        Thread.yield();
    }
}

/**
* Remove a value from the bag.  This method should only be called
* with objects obtained by <code>borrow(long, TimeUnit)</code> or <code>reserve(T)</code>
*
* @param bagEntry the value to remove
* @return true if the entry was removed, false otherwise
* @throws IllegalStateException if an attempt is made to remove an object
*         from the bag that was not borrowed or reserved first
*/
public boolean remove(final T bagEntry)
{
    // 删除元素之前,需要确保可以将状态设置为 STATE_REMOVED
    if (!bagEntry.compareAndSet(STATE_IN_USE, STATE_REMOVED) && !bagEntry.compareAndSet(STATE_RESERVED, STATE_REMOVED) && !closed) {
        LOGGER.warn("Attempt to remove an object from the bag that was not borrowed or reserved: {}", bagEntry);
        return false;
    }

    // 从 sharedList 删除元素
    final boolean removed = sharedList.remove(bagEntry);
    if (!removed && !closed) {
        LOGGER.warn("Attempt to remove an object from the bag that does not exist: {}", bagEntry);
    }

    // 从 ThreadLocal 中也要删除。
    // 在上面 borrow 方法借用元素时,从 ThreadLocal 中获得的元素要从本地 List 中删除的。
    // 这样就不需要但是因为 ThreadLocal 中的元素没有删除导致的内存泄露问题了。
    threadList.get().remove(bagEntry);

    return removed;
}

这里有一个疑问:只处理了状态是 STATE_IN_USESTATE_RESERVED 的元素。那么,状态是 STATE_NOT_IN_USE 的元素,为什么不能删除?

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Spring Boot 2.0选择HikariCP作为默认数据库连接池的五大理由

    摘要: 本文非原创,是「工匠小猪猪的技术世界」搜集了一些HikariCP相关的资料整理给大家的介绍,主要讲解了为什么sb2选择了HikariCP以及Hikari...

    程序猿DD
  • Java 数据持久化系列之 HikariCP (一)

    在上一篇《Java 数据持久化系列之池化技术》中,我们了解了池化技术,并使用 Apache-common-Pool2 实现了一个简单连接池,实验对比了它和 Hi...

    程序员历小冰
  • Java 数据持久化系列之 HikariCP (一)

    在上一篇《Java 数据持久化系列之池化技术》中,我们了解了池化技术,并使用 Apache-common-Pool2 实现了一个简单连接池,实验对比了它和 Hi...

    程序员历小冰
  • SpringBoot官方为什么采用这个数据库连接池?史上最快?

    现在已经有很多公司在使用HikariCP了,HikariCP还成为了SpringBoot默认的连接池,伴随着SpringBoot和微服务,HikariCP 必将...

    macrozheng
  • 主流Java数据库连接池比较及前瞻

    常用的主流开源数据库连接池有C3P0、DBCP、Tomcat Jdbc Pool、BoneCP、Druid等

    程序猿DD
  • 数据库连接池性能比对(hikari druid c3p0 dbcp jdbc)

    对现有的数据库连接池做调研对比,综合性能,可靠性,稳定性,扩展性等因素选出推荐出最优的数据库连接池 。     

    allsmallpig
  • 对比各大数据库连接池技术-Jdbc-Dbcp-C3p0-Druid-Hikaricp

    连接池是一种用于提高具有动态数据库驱动内容的应用程序性能的技术。打开和关闭数据库连接可能看起来不是昂贵的费用,但它可以相当快地加起来。假设建立连接需要5ms,执...

    杨校
  • 【追光者系列】HikariCP 源码分析之 allowPoolSuspension

    摘要: 原创出处 https://mp.weixin.qq.com/s/-WGg22lUQU41c_8lx6kyQA 「渣渣王子」欢迎转载,保留摘要,

    芋道源码
  • 大话数据库连接池简史,你都用过几个?

    数据库连接池在Java数据库相关中间件产品群中,应该算是底层最基础的一类产品,作为企业应用开发必不可少的组件,无数天才们为我们贡献了一个又一个的优秀产品,它们有...

    芋道源码
  • 芋道 Spring Boot 数据库连接池入门

    在我们的项目中,数据库连接池基本是必不可少的组件。在目前数据库连接池的选型中,主要是

    芋道源码
  • java应用最好的数据源 Hikari?

    DBCP是Apache推出的数据库连接池(Database Connection Pool)。

    冯杰宁
  • 深入Spring Boot (十六):从源码分析自动配置原理

    在分析SpringBoot自动配置实现原理之前,先来看一下在使用SpringBoot开发的项目代码中如何将数据库连接池切换成Druid。

    JavaQ
  • 顶级Javaer,常用的 14 个类库

    昨天下载下来Java16尝尝鲜。一看,好家伙,足足有176MB大。即使把jmc和jvisualvm给搞了出去,依然还是这么大,真的是让人震惊不已。

    xjjdog
  • SpringBoot 报 No operations allowed after connection closed 异常解决办法

      MySQL 5.0 以后针对超长时间数据库连接做了一个处理,即一个数据库连接在无任何操作情况下过了 8 个小时后(MySQL 服务器默认的超时时间是 8 小...

    Demo_Null
  • ApiBoot DataSource Switch 使用文档

    ApiBoot是一款基于SpringBoot1.x,2.x的接口服务集成基础框架, 内部提供了框架的封装集成、使用扩展、自动化完成配置,让接口开...

    恒宇少年
  • 一个比 c3p0 快200倍的数据库连接池,这么牛?

    连接池是一种常用的技术,为什么需要连接池呢?这个需要从TCP说起。假如我们的服务器跟数据库没有部署在同一台机器,那么,服务器每次查询数据库都要先建立连接,一般都...

    用户5224393
  • Java中数据库连接池

    今天继续Java的课题,两天没有做任何事情,过了个自在的周末,但是不知道为什么总是有点淡淡的忧桑。

    香菜聊游戏
  • springboot(2)--数据源

    springboot简化了我们构建应用的难度,把很多功能帮我们打包,然后我们

    叔牙
  • Java生态中性能最强数据库连接池HikariCP

    字节码精简:优化代码,直到编译后的字节码最少,这样,CPU缓存可以加载更多的程序代码; 优化代理和拦截器:减少代码,例如HikariCP的Statement ...

    JavaEdge

扫码关注云+社区

领取腾讯云代金券