前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊spring-data-redis的连接池的校验

聊聊spring-data-redis的连接池的校验

作者头像
code4it
发布2018-09-17 17:26:36
2.6K0
发布2018-09-17 17:26:36
举报
文章被收录于专栏:码匠的流水账

本文主要研究一下spring-data-redis的连接池的校验

lettuce

LettucePoolingConnectionProvider

spring-data-redis/2.0.10.RELEASE/spring-data-redis-2.0.10.RELEASE-sources.jar!/org/springframework/data/redis/connection/lettuce/LettucePoolingConnectionProvider.java

代码语言:javascript
复制
class LettucePoolingConnectionProvider implements LettuceConnectionProvider, RedisClientProvider, DisposableBean {
    private static final Log log = LogFactory.getLog(LettucePoolingConnectionProvider.class);
    private final LettuceConnectionProvider connectionProvider;
    private final GenericObjectPoolConfig poolConfig;
    private final Map<StatefulConnection<?, ?>, GenericObjectPool<StatefulConnection<?, ?>>> poolRef = new ConcurrentHashMap(32);
    private final Map<Class<?>, GenericObjectPool<StatefulConnection<?, ?>>> pools = new ConcurrentHashMap(32);

    LettucePoolingConnectionProvider(LettuceConnectionProvider connectionProvider, LettucePoolingClientConfiguration clientConfiguration) {
        Assert.notNull(connectionProvider, "ConnectionProvider must not be null!");
        Assert.notNull(clientConfiguration, "ClientConfiguration must not be null!");
        this.connectionProvider = connectionProvider;
        this.poolConfig = clientConfiguration.getPoolConfig();
    }

    public <T extends StatefulConnection<?, ?>> T getConnection(Class<T> connectionType) {
        GenericObjectPool pool = (GenericObjectPool)this.pools.computeIfAbsent(connectionType, (poolType) -> {
            return ConnectionPoolSupport.createGenericObjectPool(() -> {
                return this.connectionProvider.getConnection(connectionType);
            }, this.poolConfig, false);
        });

        try {
            StatefulConnection<?, ?> connection = (StatefulConnection)pool.borrowObject();
            this.poolRef.put(connection, pool);
            return (StatefulConnection)connectionType.cast(connection);
        } catch (Exception var4) {
            throw new PoolException("Could not get a resource from the pool", var4);
        }
    }

    public AbstractRedisClient getRedisClient() {
        if (this.connectionProvider instanceof RedisClientProvider) {
            return ((RedisClientProvider)this.connectionProvider).getRedisClient();
        } else {
            throw new IllegalStateException(String.format("Underlying connection provider %s does not implement RedisClientProvider!", this.connectionProvider.getClass().getName()));
        }
    }

    public void release(StatefulConnection<?, ?> connection) {
        GenericObjectPool<StatefulConnection<?, ?>> pool = (GenericObjectPool)this.poolRef.remove(connection);
        if (pool == null) {
            throw new PoolException("Returned connection " + connection + " was either previously returned or does not belong to this connection provider");
        } else {
            pool.returnObject(connection);
        }
    }

    public void destroy() throws Exception {
        if (!this.poolRef.isEmpty()) {
            log.warn("LettucePoolingConnectionProvider contains unreleased connections");
            this.poolRef.forEach((connection, pool) -> {
                pool.returnObject(connection);
            });
            this.poolRef.clear();
        }

        this.pools.forEach((type, pool) -> {
            pool.close();
        });
        this.pools.clear();
    }
}
  • 这里调用ConnectionPoolSupport.createGenericObjectPool来创建连接池

ConnectionPoolSupport.createGenericObjectPool

lettuce-core-5.0.5.RELEASE-sources.jar!/io/lettuce/core/support/ConnectionPoolSupport.java

代码语言:javascript
复制
    public static <T extends StatefulConnection<?, ?>> GenericObjectPool<T> createGenericObjectPool(
            Supplier<T> connectionSupplier, GenericObjectPoolConfig config, boolean wrapConnections) {

        LettuceAssert.notNull(connectionSupplier, "Connection supplier must not be null");
        LettuceAssert.notNull(config, "GenericObjectPoolConfig must not be null");

        AtomicReference<ObjectPool<T>> poolRef = new AtomicReference<>();

        GenericObjectPool<T> pool = new GenericObjectPool<T>(new RedisPooledObjectFactory<T>(connectionSupplier), config) {

            @Override
            public T borrowObject() throws Exception {
                return wrapConnections ? wrapConnection(super.borrowObject(), this) : super.borrowObject();
            }

            @Override
            public void returnObject(T obj) {

                if (wrapConnections && obj instanceof HasTargetConnection) {
                    super.returnObject((T) ((HasTargetConnection) obj).getTargetConnection());
                    return;
                }
                super.returnObject(obj);
            }
        };

        poolRef.set(pool);

        return pool;
    }
  • 这里使用了RedisPooledObjectFactory

ConnectionPoolSupport.RedisPooledObjectFactory

lettuce-core-5.0.5.RELEASE-sources.jar!/io/lettuce/core/support/ConnectionPoolSupport.java

代码语言:javascript
复制
    private static class RedisPooledObjectFactory<T extends StatefulConnection<?, ?>> extends BasePooledObjectFactory<T> {

        private final Supplier<T> connectionSupplier;

        RedisPooledObjectFactory(Supplier<T> connectionSupplier) {
            this.connectionSupplier = connectionSupplier;
        }

        @Override
        public T create() throws Exception {
            return connectionSupplier.get();
        }

        @Override
        public void destroyObject(PooledObject<T> p) throws Exception {
            p.getObject().close();
        }

        @Override
        public PooledObject<T> wrap(T obj) {
            return new DefaultPooledObject<>(obj);
        }

        @Override
        public boolean validateObject(PooledObject<T> p) {
            return p.getObject().isOpen();
        }
    }
  • 这里继承了BasePooledObjectFactory,重写了validate等方法,这里validate是通过isOpen来判断

RedisChannelHandler.isOpen

lettuce-core-5.0.5.RELEASE-sources.jar!/io/lettuce/core/RedisChannelHandler.java

代码语言:javascript
复制
public abstract class RedisChannelHandler<K, V> implements Closeable, ConnectionFacade {

    private static final InternalLogger logger = InternalLoggerFactory.getInstance(RedisChannelHandler.class);

    private Duration timeout;
    private CloseEvents closeEvents = new CloseEvents();

    private final RedisChannelWriter channelWriter;
    private final boolean debugEnabled = logger.isDebugEnabled();

    private volatile boolean closed;
    private volatile boolean active = true;
    private volatile ClientOptions clientOptions;

    //......

    /**
     * Notification when the connection becomes active (connected).
     */
    public void activated() {
        active = true;
        closed = false;
    }

    /**
     * Notification when the connection becomes inactive (disconnected).
     */
    public void deactivated() {
        active = false;
    }

    /**
     *
     * @return true if the connection is active and not closed.
     */
    public boolean isOpen() {
        return active;
    }

    @Override
    public synchronized void close() {

        if (debugEnabled) {
            logger.debug("close()");
        }

        if (closed) {
            logger.warn("Connection is already closed");
            return;
        }

        if (!closed) {
            active = false;
            closed = true;
            channelWriter.close();
            closeEvents.fireEventClosed(this);
            closeEvents = new CloseEvents();
        }
    }
}
  • isOpen是通过active字段来判断的,而active在deactivated或者close的时候变为false,初始化以及在activated的时候变为true
  • 可以看到对于docker pause这种造成的timeout,active这种方式检测不出来

DefaultLettucePool.LettuceFactory

spring-data-redis-2.0.10.RELEASE-sources.jar!/org/springframework/data/redis/connection/lettuce/DefaultLettucePool.java

代码语言:javascript
复制
private static class LettuceFactory extends BasePooledObjectFactory<StatefulConnection<byte[], byte[]>> {
        private final RedisClient client;
        private int dbIndex;

        public LettuceFactory(RedisClient client, int dbIndex) {
            this.client = client;
            this.dbIndex = dbIndex;
        }

        public void activateObject(PooledObject<StatefulConnection<byte[], byte[]>> pooledObject) throws Exception {
            if (pooledObject.getObject() instanceof StatefulRedisConnection) {
                ((StatefulRedisConnection)pooledObject.getObject()).sync().select(this.dbIndex);
            }

        }

        public void destroyObject(PooledObject<StatefulConnection<byte[], byte[]>> obj) throws Exception {
            try {
                ((StatefulConnection)obj.getObject()).close();
            } catch (Exception var3) {
                ;
            }

        }

        public boolean validateObject(PooledObject<StatefulConnection<byte[], byte[]>> obj) {
            try {
                if (obj.getObject() instanceof StatefulRedisConnection) {
                    ((StatefulRedisConnection)obj.getObject()).sync().ping();
                }

                return true;
            } catch (Exception var3) {
                return false;
            }
        }

        public StatefulConnection<byte[], byte[]> create() throws Exception {
            return this.client.connect(LettuceConnection.CODEC);
        }

        public PooledObject<StatefulConnection<byte[], byte[]>> wrap(StatefulConnection<byte[], byte[]> obj) {
            return new DefaultPooledObject(obj);
        }
    }
  • 被废弃的DefaultLettucePool里头有个LettuceFactory,其validate是通过ping来判断的,因而更为准确

jedis

JedisConnectionFactory

spring-data-redis-2.0.10.RELEASE-sources.jar!/org/springframework/data/redis/connection/jedis/JedisConnectionFactory.java

代码语言:javascript
复制
public class JedisConnectionFactory implements InitializingBean, DisposableBean, RedisConnectionFactory {
    //......
    private Pool<Jedis> createPool() {

        if (isRedisSentinelAware()) {
            return createRedisSentinelPool(this.sentinelConfig);
        }
        return createRedisPool();
    }

    /**
     * Creates {@link JedisSentinelPool}.
     *
     * @param config the actual {@link RedisSentinelConfiguration}. Never {@literal null}.
     * @return the {@link Pool} to use. Never {@literal null}.
     * @since 1.4
     */
    protected Pool<Jedis> createRedisSentinelPool(RedisSentinelConfiguration config) {

        GenericObjectPoolConfig poolConfig = getPoolConfig() != null ? getPoolConfig() : new JedisPoolConfig();
        return new JedisSentinelPool(config.getMaster().getName(), convertToJedisSentinelSet(config.getSentinels()),
                poolConfig, getConnectTimeout(), getReadTimeout(), getPassword(), getDatabase(), getClientName());
    }

    /**
     * Creates {@link JedisPool}.
     *
     * @return the {@link Pool} to use. Never {@literal null}.
     * @since 1.4
     */
    protected Pool<Jedis> createRedisPool() {

        return new JedisPool(getPoolConfig(), getHostName(), getPort(), getConnectTimeout(), getReadTimeout(),
                getPassword(), getDatabase(), getClientName(), isUseSsl(),
                clientConfiguration.getSslSocketFactory().orElse(null), //
                clientConfiguration.getSslParameters().orElse(null), //
                clientConfiguration.getHostnameVerifier().orElse(null));
    }
    //......
}
  • 不管是JedisPool还是JedisSentinelPool,里头使用的是JedisFactory

JedisFactory.validateObject

jedis-2.9.0-sources.jar!/redis/clients/jedis/JedisFactory.java

代码语言:javascript
复制
class JedisFactory implements PooledObjectFactory<Jedis> {
  private final AtomicReference<HostAndPort> hostAndPort = new AtomicReference<HostAndPort>();
  private final int connectionTimeout;
  private final int soTimeout;
  private final String password;
  private final int database;
  private final String clientName;
  private final boolean ssl;
  private final SSLSocketFactory sslSocketFactory;
  private SSLParameters sslParameters;
  private HostnameVerifier hostnameVerifier;

  //......

  @Override
  public boolean validateObject(PooledObject<Jedis> pooledJedis) {
    final BinaryJedis jedis = pooledJedis.getObject();
    try {
      HostAndPort hostAndPort = this.hostAndPort.get();

      String connectionHost = jedis.getClient().getHost();
      int connectionPort = jedis.getClient().getPort();

      return hostAndPort.getHost().equals(connectionHost)
          && hostAndPort.getPort() == connectionPort && jedis.isConnected()
          && jedis.ping().equals("PONG");
    } catch (final Exception e) {
      return false;
    }
  }
}
  • JedisFactory实现了PooledObjectFactory接口,其validateObject方法不仅校验isConnected,而且也校验了ping方法
  • ping方法只要超时就会抛出异常,从而校验失败,因而可以感知到docker pause带来的timeout,从而将连接从连接池剔除

小结

  • spring-date-redis的2.0及以上版本废弃了原来的LettucePool,改为使用LettucePoolingClientConfiguration
  • 这里有一个问题,就是旧版是采用ping的方式,而新版则是使用active字段来标识,对于docker pause识别不出来
  • 对于lettuce的async默认是不采用连接池的,第一次borrow到连接之后,就一直复用底层的连接,也没有归还,如果要使用连接池,需要设置shareNativeConnection为false
  • jedis的连接池实现,其validateObject方法不仅校验isConnected,而且也校验了ping方法,因而能够感知到docker pause带来的timeout,从而将连接从连接池剔除

doc

  • Connection-Pooling
  • redis:connectors:lettuce
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2018-09-14,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • lettuce
    • LettucePoolingConnectionProvider
      • ConnectionPoolSupport.createGenericObjectPool
        • ConnectionPoolSupport.RedisPooledObjectFactory
          • RedisChannelHandler.isOpen
            • DefaultLettucePool.LettuceFactory
            • jedis
              • JedisConnectionFactory
                • JedisFactory.validateObject
                • 小结
                • doc
                相关产品与服务
                云数据库 Redis
                腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
                领券
                问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档