聊聊spring-data-redis的连接池的校验

本文主要研究一下spring-data-redis的连接池的校验

lettuce

LettucePoolingConnectionProvider

spring-data-redis/2.0.10.RELEASE/spring-data-redis-2.0.10.RELEASE-sources.jar!/org/springframework/data/redis/connection/lettuce/LettucePoolingConnectionProvider.java

class LettucePoolingConnectionProvider implements LettuceConnectionProvider, RedisClientProvider, DisposableBean {
    private static final Log log = LogFactory.getLog(LettucePoolingConnectionProvider.class);
    private final LettuceConnectionProvider connectionProvider;
    private final GenericObjectPoolConfig poolConfig;
    private final Map<StatefulConnection<?, ?>, GenericObjectPool<StatefulConnection<?, ?>>> poolRef = new ConcurrentHashMap(32);
    private final Map<Class<?>, GenericObjectPool<StatefulConnection<?, ?>>> pools = new ConcurrentHashMap(32);

    LettucePoolingConnectionProvider(LettuceConnectionProvider connectionProvider, LettucePoolingClientConfiguration clientConfiguration) {
        Assert.notNull(connectionProvider, "ConnectionProvider must not be null!");
        Assert.notNull(clientConfiguration, "ClientConfiguration must not be null!");
        this.connectionProvider = connectionProvider;
        this.poolConfig = clientConfiguration.getPoolConfig();
    }

    public <T extends StatefulConnection<?, ?>> T getConnection(Class<T> connectionType) {
        GenericObjectPool pool = (GenericObjectPool)this.pools.computeIfAbsent(connectionType, (poolType) -> {
            return ConnectionPoolSupport.createGenericObjectPool(() -> {
                return this.connectionProvider.getConnection(connectionType);
            }, this.poolConfig, false);
        });

        try {
            StatefulConnection<?, ?> connection = (StatefulConnection)pool.borrowObject();
            this.poolRef.put(connection, pool);
            return (StatefulConnection)connectionType.cast(connection);
        } catch (Exception var4) {
            throw new PoolException("Could not get a resource from the pool", var4);
        }
    }

    public AbstractRedisClient getRedisClient() {
        if (this.connectionProvider instanceof RedisClientProvider) {
            return ((RedisClientProvider)this.connectionProvider).getRedisClient();
        } else {
            throw new IllegalStateException(String.format("Underlying connection provider %s does not implement RedisClientProvider!", this.connectionProvider.getClass().getName()));
        }
    }

    public void release(StatefulConnection<?, ?> connection) {
        GenericObjectPool<StatefulConnection<?, ?>> pool = (GenericObjectPool)this.poolRef.remove(connection);
        if (pool == null) {
            throw new PoolException("Returned connection " + connection + " was either previously returned or does not belong to this connection provider");
        } else {
            pool.returnObject(connection);
        }
    }

    public void destroy() throws Exception {
        if (!this.poolRef.isEmpty()) {
            log.warn("LettucePoolingConnectionProvider contains unreleased connections");
            this.poolRef.forEach((connection, pool) -> {
                pool.returnObject(connection);
            });
            this.poolRef.clear();
        }

        this.pools.forEach((type, pool) -> {
            pool.close();
        });
        this.pools.clear();
    }
}
  • 这里调用ConnectionPoolSupport.createGenericObjectPool来创建连接池

ConnectionPoolSupport.createGenericObjectPool

lettuce-core-5.0.5.RELEASE-sources.jar!/io/lettuce/core/support/ConnectionPoolSupport.java

    public static <T extends StatefulConnection<?, ?>> GenericObjectPool<T> createGenericObjectPool(
            Supplier<T> connectionSupplier, GenericObjectPoolConfig config, boolean wrapConnections) {

        LettuceAssert.notNull(connectionSupplier, "Connection supplier must not be null");
        LettuceAssert.notNull(config, "GenericObjectPoolConfig must not be null");

        AtomicReference<ObjectPool<T>> poolRef = new AtomicReference<>();

        GenericObjectPool<T> pool = new GenericObjectPool<T>(new RedisPooledObjectFactory<T>(connectionSupplier), config) {

            @Override
            public T borrowObject() throws Exception {
                return wrapConnections ? wrapConnection(super.borrowObject(), this) : super.borrowObject();
            }

            @Override
            public void returnObject(T obj) {

                if (wrapConnections && obj instanceof HasTargetConnection) {
                    super.returnObject((T) ((HasTargetConnection) obj).getTargetConnection());
                    return;
                }
                super.returnObject(obj);
            }
        };

        poolRef.set(pool);

        return pool;
    }
  • 这里使用了RedisPooledObjectFactory

ConnectionPoolSupport.RedisPooledObjectFactory

lettuce-core-5.0.5.RELEASE-sources.jar!/io/lettuce/core/support/ConnectionPoolSupport.java

    private static class RedisPooledObjectFactory<T extends StatefulConnection<?, ?>> extends BasePooledObjectFactory<T> {

        private final Supplier<T> connectionSupplier;

        RedisPooledObjectFactory(Supplier<T> connectionSupplier) {
            this.connectionSupplier = connectionSupplier;
        }

        @Override
        public T create() throws Exception {
            return connectionSupplier.get();
        }

        @Override
        public void destroyObject(PooledObject<T> p) throws Exception {
            p.getObject().close();
        }

        @Override
        public PooledObject<T> wrap(T obj) {
            return new DefaultPooledObject<>(obj);
        }

        @Override
        public boolean validateObject(PooledObject<T> p) {
            return p.getObject().isOpen();
        }
    }
  • 这里继承了BasePooledObjectFactory,重写了validate等方法,这里validate是通过isOpen来判断

RedisChannelHandler.isOpen

lettuce-core-5.0.5.RELEASE-sources.jar!/io/lettuce/core/RedisChannelHandler.java

public abstract class RedisChannelHandler<K, V> implements Closeable, ConnectionFacade {

    private static final InternalLogger logger = InternalLoggerFactory.getInstance(RedisChannelHandler.class);

    private Duration timeout;
    private CloseEvents closeEvents = new CloseEvents();

    private final RedisChannelWriter channelWriter;
    private final boolean debugEnabled = logger.isDebugEnabled();

    private volatile boolean closed;
    private volatile boolean active = true;
    private volatile ClientOptions clientOptions;

    //......

    /**
     * Notification when the connection becomes active (connected).
     */
    public void activated() {
        active = true;
        closed = false;
    }

    /**
     * Notification when the connection becomes inactive (disconnected).
     */
    public void deactivated() {
        active = false;
    }

    /**
     *
     * @return true if the connection is active and not closed.
     */
    public boolean isOpen() {
        return active;
    }

    @Override
    public synchronized void close() {

        if (debugEnabled) {
            logger.debug("close()");
        }

        if (closed) {
            logger.warn("Connection is already closed");
            return;
        }

        if (!closed) {
            active = false;
            closed = true;
            channelWriter.close();
            closeEvents.fireEventClosed(this);
            closeEvents = new CloseEvents();
        }
    }
}
  • isOpen是通过active字段来判断的,而active在deactivated或者close的时候变为false,初始化以及在activated的时候变为true
  • 可以看到对于docker pause这种造成的timeout,active这种方式检测不出来

DefaultLettucePool.LettuceFactory

spring-data-redis-2.0.10.RELEASE-sources.jar!/org/springframework/data/redis/connection/lettuce/DefaultLettucePool.java

private static class LettuceFactory extends BasePooledObjectFactory<StatefulConnection<byte[], byte[]>> {
        private final RedisClient client;
        private int dbIndex;

        public LettuceFactory(RedisClient client, int dbIndex) {
            this.client = client;
            this.dbIndex = dbIndex;
        }

        public void activateObject(PooledObject<StatefulConnection<byte[], byte[]>> pooledObject) throws Exception {
            if (pooledObject.getObject() instanceof StatefulRedisConnection) {
                ((StatefulRedisConnection)pooledObject.getObject()).sync().select(this.dbIndex);
            }

        }

        public void destroyObject(PooledObject<StatefulConnection<byte[], byte[]>> obj) throws Exception {
            try {
                ((StatefulConnection)obj.getObject()).close();
            } catch (Exception var3) {
                ;
            }

        }

        public boolean validateObject(PooledObject<StatefulConnection<byte[], byte[]>> obj) {
            try {
                if (obj.getObject() instanceof StatefulRedisConnection) {
                    ((StatefulRedisConnection)obj.getObject()).sync().ping();
                }

                return true;
            } catch (Exception var3) {
                return false;
            }
        }

        public StatefulConnection<byte[], byte[]> create() throws Exception {
            return this.client.connect(LettuceConnection.CODEC);
        }

        public PooledObject<StatefulConnection<byte[], byte[]>> wrap(StatefulConnection<byte[], byte[]> obj) {
            return new DefaultPooledObject(obj);
        }
    }
  • 被废弃的DefaultLettucePool里头有个LettuceFactory,其validate是通过ping来判断的,因而更为准确

jedis

JedisConnectionFactory

spring-data-redis-2.0.10.RELEASE-sources.jar!/org/springframework/data/redis/connection/jedis/JedisConnectionFactory.java

public class JedisConnectionFactory implements InitializingBean, DisposableBean, RedisConnectionFactory {
    //......
    private Pool<Jedis> createPool() {

        if (isRedisSentinelAware()) {
            return createRedisSentinelPool(this.sentinelConfig);
        }
        return createRedisPool();
    }

    /**
     * Creates {@link JedisSentinelPool}.
     *
     * @param config the actual {@link RedisSentinelConfiguration}. Never {@literal null}.
     * @return the {@link Pool} to use. Never {@literal null}.
     * @since 1.4
     */
    protected Pool<Jedis> createRedisSentinelPool(RedisSentinelConfiguration config) {

        GenericObjectPoolConfig poolConfig = getPoolConfig() != null ? getPoolConfig() : new JedisPoolConfig();
        return new JedisSentinelPool(config.getMaster().getName(), convertToJedisSentinelSet(config.getSentinels()),
                poolConfig, getConnectTimeout(), getReadTimeout(), getPassword(), getDatabase(), getClientName());
    }

    /**
     * Creates {@link JedisPool}.
     *
     * @return the {@link Pool} to use. Never {@literal null}.
     * @since 1.4
     */
    protected Pool<Jedis> createRedisPool() {

        return new JedisPool(getPoolConfig(), getHostName(), getPort(), getConnectTimeout(), getReadTimeout(),
                getPassword(), getDatabase(), getClientName(), isUseSsl(),
                clientConfiguration.getSslSocketFactory().orElse(null), //
                clientConfiguration.getSslParameters().orElse(null), //
                clientConfiguration.getHostnameVerifier().orElse(null));
    }
    //......
}
  • 不管是JedisPool还是JedisSentinelPool,里头使用的是JedisFactory

JedisFactory.validateObject

jedis-2.9.0-sources.jar!/redis/clients/jedis/JedisFactory.java

class JedisFactory implements PooledObjectFactory<Jedis> {
  private final AtomicReference<HostAndPort> hostAndPort = new AtomicReference<HostAndPort>();
  private final int connectionTimeout;
  private final int soTimeout;
  private final String password;
  private final int database;
  private final String clientName;
  private final boolean ssl;
  private final SSLSocketFactory sslSocketFactory;
  private SSLParameters sslParameters;
  private HostnameVerifier hostnameVerifier;

  //......

  @Override
  public boolean validateObject(PooledObject<Jedis> pooledJedis) {
    final BinaryJedis jedis = pooledJedis.getObject();
    try {
      HostAndPort hostAndPort = this.hostAndPort.get();

      String connectionHost = jedis.getClient().getHost();
      int connectionPort = jedis.getClient().getPort();

      return hostAndPort.getHost().equals(connectionHost)
          && hostAndPort.getPort() == connectionPort && jedis.isConnected()
          && jedis.ping().equals("PONG");
    } catch (final Exception e) {
      return false;
    }
  }
}
  • JedisFactory实现了PooledObjectFactory接口,其validateObject方法不仅校验isConnected,而且也校验了ping方法
  • ping方法只要超时就会抛出异常,从而校验失败,因而可以感知到docker pause带来的timeout,从而将连接从连接池剔除

小结

  • spring-date-redis的2.0及以上版本废弃了原来的LettucePool,改为使用LettucePoolingClientConfiguration
  • 这里有一个问题,就是旧版是采用ping的方式,而新版则是使用active字段来标识,对于docker pause识别不出来
  • 对于lettuce的async默认是不采用连接池的,第一次borrow到连接之后,就一直复用底层的连接,也没有归还,如果要使用连接池,需要设置shareNativeConnection为false
  • jedis的连接池实现,其validateObject方法不仅校验isConnected,而且也校验了ping方法,因而能够感知到docker pause带来的timeout,从而将连接从连接池剔除

doc

  • Connection-Pooling
  • redis:connectors:lettuce

原文发布于微信公众号 - 码匠的流水账(geek_luandun)

原文发表时间:2018-09-14

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏码匠的流水账

聊聊sentinel的SentinelWebAutoConfiguration

本文主要研究一下sentinel的SentinelWebAutoConfiguration

1720
来自专栏函数式编程语言及工具

Akka(8): 分布式运算:Remoting-远程查找式

  Akka是一种消息驱动运算模式,它实现跨JVM程序运算的方式是通过能跨JVM的消息系统来调动分布在不同JVM上ActorSystem中的Actor进行运算,...

4249
来自专栏码匠的流水账

聊聊spring cloud gateway的PreserveHostHeaderGatewayFilter

本文主要研究下spring cloud gateway的PreserveHostHeaderGatewayFilter

1262
来自专栏木宛城主

PowerShell 获取Site Collection下被签出的文件

由于权限的设置,当文件被签出时导致别人不可见了,这对校验文件个数的人来说着实是件烦恼的事。幸好利用PowerShell,可以获取Site Collection下...

2017
来自专栏24K纯开源

RegQueryValueEx正确使用方法

      项目中需要读取注册表中的HKEY_CLASSES_ROOT主键下一个子键的值,看了看MSDN的说明,有RegOpenKeyEx和RegQueryVa...

2668
来自专栏DT乱“码”

基于springMVC拦截器实现操作日志统计

1.spring配置文件配置。  <!-- 拦截器 --> <mvc:interceptors> <!-- 日志拦截器 --> <bean cl...

2437
来自专栏码匠的流水账

聊聊rocketmq的PushConsumerImpl

io/openmessaging/rocketmq/consumer/PushConsumerImpl.java

1772
来自专栏码匠的流水账

聊聊resilience4j的bulkhead

resilience4j-bulkhead-0.13.0-sources.jar!/io/github/resilience4j/bulkhead/Bulkhe...

1891
来自专栏菩提树下的杨过

base64编码在silverlight中的使用

在传统的.net应用中,使用base64编码字符串是一件很轻松的事情,比如下面这段代码演示了如何将本地文件转化为base64字符串,并且将base64字符串又还...

2257
来自专栏码匠的流水账

tomcat如何关闭response的outputStream

在写文件下载的时候,遇到了一个问题,就是这个ServletOutputStream到底要不要自己flush以及close。这里以tomcat容易为例,解读一下。

1381

扫码关注云+社区

领取腾讯云代金券