本文主要研究一下spring-data-redis的连接池的校验
spring-data-redis/2.0.10.RELEASE/spring-data-redis-2.0.10.RELEASE-sources.jar!/org/springframework/data/redis/connection/lettuce/LettucePoolingConnectionProvider.java
class LettucePoolingConnectionProvider implements LettuceConnectionProvider, RedisClientProvider, DisposableBean {
private static final Log log = LogFactory.getLog(LettucePoolingConnectionProvider.class);
private final LettuceConnectionProvider connectionProvider;
private final GenericObjectPoolConfig poolConfig;
private final Map<StatefulConnection<?, ?>, GenericObjectPool<StatefulConnection<?, ?>>> poolRef = new ConcurrentHashMap(32);
private final Map<Class<?>, GenericObjectPool<StatefulConnection<?, ?>>> pools = new ConcurrentHashMap(32);
LettucePoolingConnectionProvider(LettuceConnectionProvider connectionProvider, LettucePoolingClientConfiguration clientConfiguration) {
Assert.notNull(connectionProvider, "ConnectionProvider must not be null!");
Assert.notNull(clientConfiguration, "ClientConfiguration must not be null!");
this.connectionProvider = connectionProvider;
this.poolConfig = clientConfiguration.getPoolConfig();
}
public <T extends StatefulConnection<?, ?>> T getConnection(Class<T> connectionType) {
GenericObjectPool pool = (GenericObjectPool)this.pools.computeIfAbsent(connectionType, (poolType) -> {
return ConnectionPoolSupport.createGenericObjectPool(() -> {
return this.connectionProvider.getConnection(connectionType);
}, this.poolConfig, false);
});
try {
StatefulConnection<?, ?> connection = (StatefulConnection)pool.borrowObject();
this.poolRef.put(connection, pool);
return (StatefulConnection)connectionType.cast(connection);
} catch (Exception var4) {
throw new PoolException("Could not get a resource from the pool", var4);
}
}
public AbstractRedisClient getRedisClient() {
if (this.connectionProvider instanceof RedisClientProvider) {
return ((RedisClientProvider)this.connectionProvider).getRedisClient();
} else {
throw new IllegalStateException(String.format("Underlying connection provider %s does not implement RedisClientProvider!", this.connectionProvider.getClass().getName()));
}
}
public void release(StatefulConnection<?, ?> connection) {
GenericObjectPool<StatefulConnection<?, ?>> pool = (GenericObjectPool)this.poolRef.remove(connection);
if (pool == null) {
throw new PoolException("Returned connection " + connection + " was either previously returned or does not belong to this connection provider");
} else {
pool.returnObject(connection);
}
}
public void destroy() throws Exception {
if (!this.poolRef.isEmpty()) {
log.warn("LettucePoolingConnectionProvider contains unreleased connections");
this.poolRef.forEach((connection, pool) -> {
pool.returnObject(connection);
});
this.poolRef.clear();
}
this.pools.forEach((type, pool) -> {
pool.close();
});
this.pools.clear();
}
}
lettuce-core-5.0.5.RELEASE-sources.jar!/io/lettuce/core/support/ConnectionPoolSupport.java
public static <T extends StatefulConnection<?, ?>> GenericObjectPool<T> createGenericObjectPool(
Supplier<T> connectionSupplier, GenericObjectPoolConfig config, boolean wrapConnections) {
LettuceAssert.notNull(connectionSupplier, "Connection supplier must not be null");
LettuceAssert.notNull(config, "GenericObjectPoolConfig must not be null");
AtomicReference<ObjectPool<T>> poolRef = new AtomicReference<>();
GenericObjectPool<T> pool = new GenericObjectPool<T>(new RedisPooledObjectFactory<T>(connectionSupplier), config) {
@Override
public T borrowObject() throws Exception {
return wrapConnections ? wrapConnection(super.borrowObject(), this) : super.borrowObject();
}
@Override
public void returnObject(T obj) {
if (wrapConnections && obj instanceof HasTargetConnection) {
super.returnObject((T) ((HasTargetConnection) obj).getTargetConnection());
return;
}
super.returnObject(obj);
}
};
poolRef.set(pool);
return pool;
}
lettuce-core-5.0.5.RELEASE-sources.jar!/io/lettuce/core/support/ConnectionPoolSupport.java
private static class RedisPooledObjectFactory<T extends StatefulConnection<?, ?>> extends BasePooledObjectFactory<T> {
private final Supplier<T> connectionSupplier;
RedisPooledObjectFactory(Supplier<T> connectionSupplier) {
this.connectionSupplier = connectionSupplier;
}
@Override
public T create() throws Exception {
return connectionSupplier.get();
}
@Override
public void destroyObject(PooledObject<T> p) throws Exception {
p.getObject().close();
}
@Override
public PooledObject<T> wrap(T obj) {
return new DefaultPooledObject<>(obj);
}
@Override
public boolean validateObject(PooledObject<T> p) {
return p.getObject().isOpen();
}
}
lettuce-core-5.0.5.RELEASE-sources.jar!/io/lettuce/core/RedisChannelHandler.java
public abstract class RedisChannelHandler<K, V> implements Closeable, ConnectionFacade {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(RedisChannelHandler.class);
private Duration timeout;
private CloseEvents closeEvents = new CloseEvents();
private final RedisChannelWriter channelWriter;
private final boolean debugEnabled = logger.isDebugEnabled();
private volatile boolean closed;
private volatile boolean active = true;
private volatile ClientOptions clientOptions;
//......
/**
* Notification when the connection becomes active (connected).
*/
public void activated() {
active = true;
closed = false;
}
/**
* Notification when the connection becomes inactive (disconnected).
*/
public void deactivated() {
active = false;
}
/**
*
* @return true if the connection is active and not closed.
*/
public boolean isOpen() {
return active;
}
@Override
public synchronized void close() {
if (debugEnabled) {
logger.debug("close()");
}
if (closed) {
logger.warn("Connection is already closed");
return;
}
if (!closed) {
active = false;
closed = true;
channelWriter.close();
closeEvents.fireEventClosed(this);
closeEvents = new CloseEvents();
}
}
}
spring-data-redis-2.0.10.RELEASE-sources.jar!/org/springframework/data/redis/connection/lettuce/DefaultLettucePool.java
private static class LettuceFactory extends BasePooledObjectFactory<StatefulConnection<byte[], byte[]>> {
private final RedisClient client;
private int dbIndex;
public LettuceFactory(RedisClient client, int dbIndex) {
this.client = client;
this.dbIndex = dbIndex;
}
public void activateObject(PooledObject<StatefulConnection<byte[], byte[]>> pooledObject) throws Exception {
if (pooledObject.getObject() instanceof StatefulRedisConnection) {
((StatefulRedisConnection)pooledObject.getObject()).sync().select(this.dbIndex);
}
}
public void destroyObject(PooledObject<StatefulConnection<byte[], byte[]>> obj) throws Exception {
try {
((StatefulConnection)obj.getObject()).close();
} catch (Exception var3) {
;
}
}
public boolean validateObject(PooledObject<StatefulConnection<byte[], byte[]>> obj) {
try {
if (obj.getObject() instanceof StatefulRedisConnection) {
((StatefulRedisConnection)obj.getObject()).sync().ping();
}
return true;
} catch (Exception var3) {
return false;
}
}
public StatefulConnection<byte[], byte[]> create() throws Exception {
return this.client.connect(LettuceConnection.CODEC);
}
public PooledObject<StatefulConnection<byte[], byte[]>> wrap(StatefulConnection<byte[], byte[]> obj) {
return new DefaultPooledObject(obj);
}
}
spring-data-redis-2.0.10.RELEASE-sources.jar!/org/springframework/data/redis/connection/jedis/JedisConnectionFactory.java
public class JedisConnectionFactory implements InitializingBean, DisposableBean, RedisConnectionFactory {
//......
private Pool<Jedis> createPool() {
if (isRedisSentinelAware()) {
return createRedisSentinelPool(this.sentinelConfig);
}
return createRedisPool();
}
/**
* Creates {@link JedisSentinelPool}.
*
* @param config the actual {@link RedisSentinelConfiguration}. Never {@literal null}.
* @return the {@link Pool} to use. Never {@literal null}.
* @since 1.4
*/
protected Pool<Jedis> createRedisSentinelPool(RedisSentinelConfiguration config) {
GenericObjectPoolConfig poolConfig = getPoolConfig() != null ? getPoolConfig() : new JedisPoolConfig();
return new JedisSentinelPool(config.getMaster().getName(), convertToJedisSentinelSet(config.getSentinels()),
poolConfig, getConnectTimeout(), getReadTimeout(), getPassword(), getDatabase(), getClientName());
}
/**
* Creates {@link JedisPool}.
*
* @return the {@link Pool} to use. Never {@literal null}.
* @since 1.4
*/
protected Pool<Jedis> createRedisPool() {
return new JedisPool(getPoolConfig(), getHostName(), getPort(), getConnectTimeout(), getReadTimeout(),
getPassword(), getDatabase(), getClientName(), isUseSsl(),
clientConfiguration.getSslSocketFactory().orElse(null), //
clientConfiguration.getSslParameters().orElse(null), //
clientConfiguration.getHostnameVerifier().orElse(null));
}
//......
}
jedis-2.9.0-sources.jar!/redis/clients/jedis/JedisFactory.java
class JedisFactory implements PooledObjectFactory<Jedis> {
private final AtomicReference<HostAndPort> hostAndPort = new AtomicReference<HostAndPort>();
private final int connectionTimeout;
private final int soTimeout;
private final String password;
private final int database;
private final String clientName;
private final boolean ssl;
private final SSLSocketFactory sslSocketFactory;
private SSLParameters sslParameters;
private HostnameVerifier hostnameVerifier;
//......
@Override
public boolean validateObject(PooledObject<Jedis> pooledJedis) {
final BinaryJedis jedis = pooledJedis.getObject();
try {
HostAndPort hostAndPort = this.hostAndPort.get();
String connectionHost = jedis.getClient().getHost();
int connectionPort = jedis.getClient().getPort();
return hostAndPort.getHost().equals(connectionHost)
&& hostAndPort.getPort() == connectionPort && jedis.isConnected()
&& jedis.ping().equals("PONG");
} catch (final Exception e) {
return false;
}
}
}