前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >池化设计之CommonPool

池化设计之CommonPool

作者头像
leobhao
发布2022-06-28 18:40:25
3650
发布2022-06-28 18:40:25
举报
文章被收录于专栏:涓流

池化技术 CommonPool

对象池化管理是一个很重要的功能,无论是数据库连接池还是redis连接池,都应该特别关注连接池的使用,重点关注几个关键的指标是否正常,连接池使用不当很有可能导致连接池泄露的问题。

这里是基于 Apache CommonPool 2 来分析具体实现

关键成员

PooledObjectFactory

PooledObjectFactory 是用来管理对象的对象工厂, 定义如下:

代码语言:javascript
复制
public interface PooledObjectFactory<T> {
    //创建一个实例
    PooledObject<T> makeObject();
    void activateObject(PooledObject<T> obj);
    //去初始化,将对象返回到空闲池内
    void passivateObject(PooledObject<T> obj);
    boolean validateObject(PooledObject<T> obj);
    //销毁对象,不再被池需要
    void destroyObject(PooledObject<T> obj);
}

使用中可以继承最简单的一个实现BasePooledObjectFactory, 只需要实现 wrapcreate 方法即可, 例如:

代码语言:javascript
复制
public class StringBufferFactory extends BasePooledObjectFactory<StringBuffer> {

    @Override
    public StringBuffer create() throws Exception {
        return new StringBuffer();
    }

    /**
     * wrap 可用 DefaultPooledObject
     */
    @Override
    public PooledObject<StringBuffer> wrap(StringBuffer stringBuffer) {
        return new DefaultPooledObject<StringBuffer>(stringBuffer);
    }
}

普通的 PooledObjectFactory 只能生产出大批设置完全一致的对象。有时需要通过key来获取不同的对象, 这种时候就可以使用 BaseKeyedPooledObjectFactory 来替代 BasePooledObjectFactory 这个类,实现的是 KeyedPooledObjectFactory 接口 ,定义如下:

代码语言:javascript
复制
// 每个接口都多了一个 key 的定义
public interface KeyedPoolableObjectFactory<K,V> {
    PooledObject<V> makeObject(K key);
    void activateObject(K key, PooledObject<V> obj);
    void passivateObject(K key, PooledObject<V> obj);
    boolean validateObject(K key, PooledObject<V> obj);
    void destroyObject(K key, PooledObject<V> obj);
}
PooledObject

原始的对象类型不能够反应许多额外的信息,所以将原始类型封装起来,增加了其他的信息,用来对原始对象的行为和状态进行观察。封装类型的模板定义为如下:

代码语言:javascript
复制
public interface PooledObject<T>{
    //获取原始对象
     T getObject();
    //获取创建时间
    long getCreateTime();
    //该对象最近一次被使用的时长,如果仍然在使用则这个值会变大
    long getActiveTimeMillis();
    //最近一次空闲的时间
    long getIdleTimeMillis();
    //上次被签出的时间
    long getLastBorrowTime();
    // 上次被归还的时间
    long getLastReturnTime();
    // 上次使用的时间  
    long getLastUsedTime();
    //定义了对象的顺序,空闲时间从小到大排列,在GKOP的空闲清理器中会使用到
    int compareTo(PooledObject<T> other);
    将对象设置为EVICTION状态
    boolean startEvictionTest();
    //告诉对象清理已经结束了
    boolean endEvictionTest(Deque<PooledObject<T>> idleQueue);
    //分配对象
    boolean allocate();
    //回收对象
    boolean deallocate();
    //失效对象
    void invalidate();
    //设置logAbandoned
    void setLogAbandoned(boolean logAbandoned);
    //设置使用时间
    void use();
    //如果borrow对象打印调用栈
    void printStackTrace(PrintWriter writer);
    //获取对象的状态
    PooledObjectState getState();
    //标记对象为废弃的
    void markAbandoned();
    //标记对象为返回中
    void markReturning();
}

封装对象的状态比较多,有十一个状态, 每个状态含义如下:

代码语言:javascript
复制
public enum PooledObjectState {
    //在空闲队列中
    IDLE,
    //使用中
    ALLOCATED,
    //在队列中,可能会被清理
    EVICTION,
    //不在队列中,已经被标记为清除,但同时被借出,这个时候借出不成功,当清理器停止工作以后,该对象要被还回到对象的头部
    EVICTION_RETURN_TO_HEAD,
    //在队列中,校验过
    VALIDATION,
    //不在队列中,处于校验待分配状态,该对象已经被借出但是配置了testOnBorrow,正在进行校验,校验通过以后会变成分配。
    VALIDATION_PREALLOCATED,
    //不在队列中,别清理器移除队列,这个时候借出对象,并且正在校验,这个时候校验成功以后需要还回到队列头部
    VALIDATION_RETURN_TO_HEAD,
    //校验不通过
    INVALID,
    //废弃
    ABANDONED,
    //正在归还到线程池中
    RETURNING
}
ObjectPool

ObjectPool 定义了对象池的操作:

代码语言:javascript
复制
public interface ObjectPool<T> extends Closeable {
    //借出对象要么是通过工厂创建的对象,要么是从空闲队列里面获取,通过工厂方法激活并且校验通过的
    //根据约定,归还对象必须调用returnObject方法和invalidateObject方法
    T borrowObject();
    //返回被borrow出的对象
    void returnObject(T obj);
    //对借出来的对象进行校验处理,像数据库连接就会去查询一次数据,对于redis去查询发出ping命令
    void invalidateObject(T obj);
    //给空闲队列增加一个对象,工厂生成一个对象,去激活对象,然后加入到队列
    void addObject();
    //空闲对象的个数,认为不增加对象,有多少可用
    int getNumIdle();
    //认为是当前正在使用的对象
    int getNumActive();
    //销毁所有空闲对象,释放资源,必须要调用PooledObjectFactory#destroyObject方法
    void clear();
    //关闭池,并且释放资源
    void close();
}

一个对象池的基本使用方式如下:

代码语言:javascript
复制
Object obj = null;//被池管理的对象

   try {
       obj = pool.borrowObject();//从池里获取对象
       try {
           //使用该对象
       } catch(Exception e) {
           //失效该对象
           pool.invalidateObject(obj);
           //失效成功以后将obj变为null
           obj = null;
       } finally {
           //确保对象已经归还到池里面,并且避免两次归还
           if(null != obj) {
               pool.returnObject(obj);
          }
       }
   } catch(Exception e) {
         //获取对象失败
   }

GenericObjectPool

GenericObjectPool 实现了对象的池化管理

构造方法
代码语言:javascript
复制
public GenericObjectPool(final PooledObjectFactory<T> factory,
        final GenericObjectPoolConfig config) {

    super(config, ONAME_BASE, config.getJmxNamePrefix());

    if (factory == null) {
        jmxUnregister(); // tidy up
        throw new IllegalArgumentException("factory may not be null");
    }
    this.factory = factory;
    // 双向阻塞队列
    idleObjects = new LinkedBlockingDeque<>(config.getFairness());

    setConfig(config);
    //清除器定时任务,设置任务定时间隔,时间不设置,清理器是不会启动的
    startEvictor(getTimeBetweenEvictionRunsMillis());
}
borrowObject

borrowObject 从对象池获取对象:

代码语言:javascript
复制
//borrow一个对象的方法,borrow这个词很准确,因为还需要归还
public T borrowObject(final long borrowMaxWaitMillis) throws Exception {
    assertOpen();

    final AbandonedConfig ac = this.abandonedConfig;
    //空闲小于两个了 并且这个时候最大活跃的接近最大的了
    //说明这个时候对象是不够用的了,这个时候需要清理出来一批了
    if (ac != null && ac.getRemoveAbandonedOnBorrow() &&
            (getNumIdle() < 2) &&
            (getNumActive() > getMaxTotal() - 3) ) {
        removeAbandoned(ac);
    }

    PooledObject<T> p = null;

    // Get local copy of current config so it is consistent for entire
    // method execution
    final boolean blockWhenExhausted = getBlockWhenExhausted();

    boolean create;
    final long waitTime = System.currentTimeMillis();

    while (p == null) {
        create = false;
        //从双向阻塞队列里面获取
        p = idleObjects.pollFirst();
        if (p == null) {
            //获取不到空闲的则生成一个对象
            p = create();
            if (p != null) {
                create = true;
            }
        }
        //连接池耗尽则阻塞
        if (blockWhenExhausted) {
            if (p == null) {
                if (borrowMaxWaitMillis < 0) {
                    //这就是阻塞队列,不带超时时间
                    p = idleObjects.takeFirst();
                } else {
                    //这是带有超时时间的
                    p = idleObjects.pollFirst(borrowMaxWaitMillis,
                            TimeUnit.MILLISECONDS);
                }
            }
            if (p == null) {
                throw new NoSuchElementException(
                        "Timeout waiting for idle object");
            }
        } else {
            if (p == null) {
                throw new NoSuchElementException("Pool exhausted");
            }
        }
        if (!p.allocate()) {
            p = null;
        }

        if (p != null) {
            try {
                factory.activateObject(p);
            } catch (final Exception e) {
                try {
                    destroy(p);
                } catch (final Exception e1) {
                    // Ignore - activation failure is more important
                }
                p = null;
                if (create) {
                    final NoSuchElementException nsee = new NoSuchElementException(
                            "Unable to activate object");
                    nsee.initCause(e);
                    throw nsee;
                }
            }
            //会影响性能
            if (p != null && (getTestOnBorrow() || create && getTestOnCreate())) {
                boolean validate = false;
                Throwable validationThrowable = null;
                try {
                    validate = factory.validateObject(p);
                } catch (final Throwable t) {
                    PoolUtils.checkRethrow(t);
                    validationThrowable = t;
                }
                if (!validate) {
                    try {
                        destroy(p);
                        destroyedByBorrowValidationCount.incrementAndGet();
                    } catch (final Exception e) {
                        // Ignore - validation failure is more important
                    }
                    p = null;
                    if (create) {
                        final NoSuchElementException nsee = new NoSuchElementException(
                                "Unable to validate object");
                        nsee.initCause(validationThrowable);
                        throw nsee;
                    }
                }
            }
        }
    }

    updateStatsBorrow(p, System.currentTimeMillis() - waitTime);

    return p.getObject();
}
returnObject 流程
代码语言:javascript
复制
public void returnObject(final T obj) {
    final PooledObject<T> p = allObjects.get(new IdentityWrapper<>(obj));

    if (p == null) {
        if (!isAbandonedConfig()) {
            throw new IllegalStateException(
                    "Returned object not currently part of this pool");
        }
        return; // Object was abandoned and removed
    }

    synchronized(p) {
        final PooledObjectState state = p.getState();
        if (state != PooledObjectState.ALLOCATED) {
            throw new IllegalStateException(
                    "Object has already been returned to this pool or is invalid");
        }
        p.markReturning(); // Keep from being marked abandoned
    }

    final long activeTime = p.getActiveTimeMillis();

    if (getTestOnReturn()) {
        if (!factory.validateObject(p)) {
            try {
                destroy(p);
            } catch (final Exception e) {
                swallowException(e);
            }
            try {
                ensureIdle(1, false);
            } catch (final Exception e) {
                swallowException(e);
            }
            updateStatsReturn(activeTime);
            return;
        }
    }

    try {
        factory.passivateObject(p);
    } catch (final Exception e1) {
        swallowException(e1);
        try {
            destroy(p);
        } catch (final Exception e) {
            swallowException(e);
        }
        try {
            ensureIdle(1, false);
        } catch (final Exception e) {
            swallowException(e);
        }
        updateStatsReturn(activeTime);
        return;
    }

    if (!p.deallocate()) {
        throw new IllegalStateException(
                "Object has already been returned to this pool or is invalid");
    }

    final int maxIdleSave = getMaxIdle();
    if (isClosed() || maxIdleSave > -1 && maxIdleSave <= idleObjects.size()) {
        try {
            destroy(p);
        } catch (final Exception e) {
            swallowException(e);
        }
    } else {
        if (getLifo()) {
            //后进先出,增加到队列首部
            idleObjects.addFirst(p);
        } else {
            //先进先出 增加到队列尾部
            idleObjects.addLast(p);
        }
        if (isClosed()) {
            // Pool closed while object was being added to idle objects.
            // Make sure the returned object is destroyed rather than left
            // in the idle object pool (which would effectively be a leak)
            //池关掉以后,空闲队列里面的对象也要销毁
            clear();
        }
    }
    updateStatsReturn(activeTime);
}
evict

设置定时清除器去清理超过 minIdle 的对象:

代码语言:javascript
复制
public void evict() throws Exception {
    assertOpen();

    if (idleObjects.size() > 0) {

        PooledObject<T> underTest = null;
        final EvictionPolicy<T> evictionPolicy = getEvictionPolicy();

        synchronized (evictionLock) {
            final EvictionConfig evictionConfig = new EvictionConfig(
                    getMinEvictableIdleTimeMillis(),
                    getSoftMinEvictableIdleTimeMillis(),
                    getMinIdle());

            final boolean testWhileIdle = getTestWhileIdle();

            for (int i = 0, m = getNumTests(); i < m; i++) {
                if (evictionIterator == null || !evictionIterator.hasNext()) {
                    evictionIterator = new EvictionIterator(idleObjects);
                }
                if (!evictionIterator.hasNext()) {
                    // Pool exhausted, nothing to do here
                    return;
                }

                try {
                    underTest = evictionIterator.next();
                } catch (final NoSuchElementException nsee) {
                    // Object was borrowed in another thread
                    // Don't count this as an eviction test so reduce i;
                    i--;
                    evictionIterator = null;
                    continue;
                }

                if (!underTest.startEvictionTest()) {
                    // Object was borrowed in another thread
                    // Don't count this as an eviction test so reduce i;
                    i--;
                    continue;
                }

                // User provided eviction policy could throw all sorts of
                // crazy exceptions. Protect against such an exception
                // killing the eviction thread.
                boolean evict;
                try {
                    evict = evictionPolicy.evict(evictionConfig, underTest,
                            idleObjects.size());
                } catch (final Throwable t) {
                    // Slightly convoluted as SwallowedExceptionListener
                    // uses Exception rather than Throwable
                    PoolUtils.checkRethrow(t);
                    swallowException(new Exception(t));
                    // Don't evict on error conditions
                    evict = false;
                }

                if (evict) {
                    destroy(underTest);
                    destroyedByEvictorCount.incrementAndGet();
                } else {
                    //定时任务校验对象
                    if (testWhileIdle) {
                        boolean active = false;
                        try {
                            factory.activateObject(underTest);
                            active = true;
                        } catch (final Exception e) {
                            destroy(underTest);
                            destroyedByEvictorCount.incrementAndGet();
                        }
                        if (active) {
                            if (!factory.validateObject(underTest)) {
                                destroy(underTest);
                                destroyedByEvictorCount.incrementAndGet();
                            } else {
                                try {
                                    factory.passivateObject(underTest);
                                } catch (final Exception e) {
                                    destroy(underTest);
                                    destroyedByEvictorCount.incrementAndGet();
                                }
                            }
                        }
                    }
                    if (!underTest.endEvictionTest(idleObjects)) {
                        // TODO - May need to add code here once additional
                        // states are used
                    }
                }
            }
        }
    }
    final AbandonedConfig ac = this.abandonedConfig;
    if (ac != null && ac.getRemoveAbandonedOnMaintenance()) {
        //同时会清除废弃对象
        removeAbandoned(ac);
    }
}
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2021-01-07,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 池化技术 CommonPool
  • 关键成员
    • PooledObjectFactory
      • PooledObject
        • ObjectPool
        • GenericObjectPool
          • 构造方法
            • borrowObject
              • returnObject 流程
                • evict
                相关产品与服务
                云数据库 Redis
                腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
                领券
                问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档