前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >lettuce连接池很香,撸撸它的源代码

lettuce连接池很香,撸撸它的源代码

作者头像
jinjunzhu
发布2020-08-20 16:10:01
10.7K1
发布2020-08-20 16:10:01
举报
文章被收录于专栏:个人开发

springboot中lettuce配置

lettuce初始化

使用netty创建连接

管理连接

actuator健康检查获取连接

释放不掉的连接

共享连接

总结


Lettuce是一个高性能的redis客户端,底层基于netty框架来管理连接,天然是非阻塞和线程安全的。比起jedis需要为每个实例创建物理连接来保证线程安全,lettuce确实很优秀。本文主要介绍springboot使用lettuce整合redis客户端。说明一下,本文的源代码是使用springboot2.1.6,对应lettuce版本是5.1.7.RELEASE。

springboot中lettuce配置

springboot中配置lettuce是非常容易的,代码如下:

pom.xml文件

代码语言:javascript
复制
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-pool2</artifactId>
    <version>2.6.0</version>
</dependency>

application.properties配置

代码语言:javascript
复制
spring.redis.database=0
spring.redis.host=192.168.59.138
spring.redis.password=
spring.redis.port=6379
spring.redis.timeout=5000
#最大连接数
spring.redis.lettuce.pool.max-active=50
#最大阻塞等待时间
spring.redis.lettuce.pool.max-wait=5000
#连接池中最大空闲连接
spring.redis.lettuce.pool.max-idle=50
#连接池中最小空闲连接
spring.redis.lettuce.pool.min-idle=5
#eviction线程调度时间间隔
spring.redis.lettuce.pool.time-between-eviction-runs=1

redis配置类RedisConfig.java

代码语言:javascript
复制
@Configuration
public class RedisConfig {
    @Bean
    RedisTemplate redisTemplate(LettuceConnectionFactory factory){
        factory.setShareNativeConnection(false);
        RedisTemplate redisTemplate = new RedisTemplate();
        redisTemplate.setConnectionFactory(factory);
        return redisTemplate;
    }
}

上面3步就能完成springboot使用lettuce连接池整合redis的配置,之后我们就可以在业务类中注入RedisTemplate来使用了。

lettuce初始化

我们看一下整个初始化流程相关类的UML类图

LettuceConnectionConfiguration类是lettuce初始化的起始类,这个类是spring的管理的配置类,它初始化了lettuce连接工厂类,见如下代码

代码语言:javascript
复制
@Bean
@ConditionalOnMissingBean(RedisConnectionFactory.class)
public LettuceConnectionFactory redisConnectionFactory(ClientResources clientResources)
    throws UnknownHostException {
  LettuceClientConfiguration clientConfig = getLettuceClientConfiguration(clientResources,
      this.properties.getLettuce().getPool());
  return createLettuceConnectionFactory(clientConfig);
}

初始化的过程会判断是单点模式/集群模式/哨兵模式,来初始化连接工厂,本文以单点模式为例来讲解

代码语言:javascript
复制
private LettuceConnectionFactory createLettuceConnectionFactory(LettuceClientConfiguration clientConfiguration) {
  if (getSentinelConfig() != null) {
    return new LettuceConnectionFactory(getSentinelConfig(), clientConfiguration);
  }
  if (getClusterConfiguration() != null) {
    return new LettuceConnectionFactory(getClusterConfiguration(), clientConfiguration);
  }
  return new LettuceConnectionFactory(getStandaloneConfig(), clientConfiguration);
}

获取到工厂类以后,lettuce会用如下2个Provider来获取和释放连接,分别管理普通模式和交互模式的连接。本示例采用单机的redis模式,所以初始化后的Provider是StandaloneConnectionProvider。

代码语言:javascript
复制
private @Nullable LettuceConnectionProvider connectionProvider;
private @Nullable LettuceConnectionProvider reactiveConnectionProvider;
public void afterPropertiesSet() {
    this.client = createClient();
    this.connectionProvider = createConnectionProvider(client, LettuceConnection.CODEC);
    this.reactiveConnectionProvider = createConnectionProvider(client, LettuceReactiveRedisConnection.CODEC);
        //省略部分代码
  }

注意:上面创建的provider类型是LettucePoolingConnectionProvider,它是StandaloneConnectionProvider的装饰器类,每次获取和释放连接,工厂类都会通过LettucePoolingConnectionProvider类调用LettucePoolingConnectionProvider的获取和释放操作

使用netty创建连接

lettuce的连接是靠netty来管理的,这或许是它性能优秀的重要原因。我们看一下通过netty来创建连接的代码,看一下StandaloneConnectionProvider的下面方法:

代码语言:javascript
复制
public <T extends StatefulConnection<?, ?>> T getConnection(Class<T> connectionType) {
    //省略部分代码
    if (StatefulConnection.class.isAssignableFrom(connectionType)) {
      return connectionType.cast(readFrom.map(it -> this.masterReplicaConnection(redisURISupplier.get(), it))
          .orElseGet(() -> client.connect(codec)));
    }
    throw new UnsupportedOperationException("Connection type " + connectionType + " not supported!");
  }

上面的client.connect(codec)是创建连接的代码,一直跟踪这个方法,

代码语言:javascript
复制
private void initializeChannelAsync0(ConnectionBuilder connectionBuilder, CompletableFuture<Channel> channelReadyFuture,
            SocketAddress redisAddress) {

        logger.debug("Connecting to Redis at {}", redisAddress);

        Bootstrap redisBootstrap = connectionBuilder.bootstrap();

        RedisChannelInitializer initializer = connectionBuilder.build();
        redisBootstrap.handler(initializer);

        clientResources.nettyCustomizer().afterBootstrapInitialized(redisBootstrap);
        CompletableFuture<Boolean> initFuture = initializer.channelInitialized();
        ChannelFuture connectFuture = redisBootstrap.connect(redisAddress);
    //省略部分代码    
    }

管理连接

执行请求命令的时候首先要获取连接,流程图如下

关键代码

LettucePoolingConnectionProvider中getConnection

代码语言:javascript
复制
public <T extends StatefulConnection<?, ?>> T getConnection(Class<T> connectionType) {
  GenericObjectPool<StatefulConnection<?, ?>> pool = pools.computeIfAbsent(connectionType, poolType -> {
    return ConnectionPoolSupport.createGenericObjectPool(() -> connectionProvider.getConnection(connectionType),
        poolConfig, false);
  });
  try {
    StatefulConnection<?, ?> connection = pool.borrowObject();
    poolRef.put(connection, pool);
    return connectionType.cast(connection);
  } catch (Exception e) {
    throw new PoolException("Could not get a resource from the pool", e);
  }
}

GenericObjectPool中borrowObject

代码语言:javascript
复制
public T borrowObject(final long borrowMaxWaitMillis) throws Exception {
        //省略部分代码
        PooledObject<T> p = null;
        // Get local copy of current config so it is consistent for entire
        // method execution
        final boolean blockWhenExhausted = getBlockWhenExhausted();

        boolean create;
        final long waitTime = System.currentTimeMillis();

        while (p == null) {
            create = false;
            p = idleObjects.pollFirst();
            if (p == null) {
                p = create();
                if (p != null) {
                    create = true;
                }
            }
            //省略部分代码
        }

        updateStatsBorrow(p, System.currentTimeMillis() - waitTime);
        return p.getObject();
    }

释放连接的流程图如下:

看下关键代码

GenericObjectPool中释放连接代码

代码语言:javascript
复制
public void returnObject(final T obj) {
        //省略部分代码
        final int maxIdleSave = getMaxIdle();
        if (isClosed() || maxIdleSave > -1 && maxIdleSave <= idleObjects.size()) {
            try {
                destroy(p);
            } catch (final Exception e) {
                swallowException(e);
            }
        } else {
            if (getLifo()) {
                idleObjects.addFirst(p);
            } else {
                idleObjects.addLast(p);
            }
            if (isClosed()) {
                // Pool closed while object was being added to idle objects.
                // Make sure the returned object is destroyed rather than left
                // in the idle object pool (which would effectively be a leak)
                clear();
            }
        }
        updateStatsReturn(activeTime);
    }

RedisChannalHandler中的close方法

代码语言:javascript
复制
public void close() {
  //省略部分代码
    closeAsync().join();
}
public CompletableFuture<Void> closeAsync() {
    //省略部分代码
        if (CLOSED.compareAndSet(this, ST_OPEN, ST_CLOSED)) {
            active = false;
            CompletableFuture<Void> future = channelWriter.closeAsync();
      //省略部分代码
        }
        return closeFuture;
    }

DefaultEndpoint类的closeAsync

代码语言:javascript
复制
public CompletableFuture<Void> closeAsync() {
    //省略部分代码
        if (STATUS.compareAndSet(this, ST_OPEN, ST_CLOSED)) {
            Channel channel = getOpenChannel();
            if (channel != null) {
                Futures.adapt(channel.close(), closeFuture);
            } else {
                closeFuture.complete(null);
            }
        }
        return closeFuture;

    }

actuator健康检查获取连接

我们知道,springboot的actuator健康检查是实现了ReactiveHealthIndicator接口,如果springboot工程启用了actuator,在lettuce初始化时,会创建一个reactive的连接,UML类图如下:

RedisReactiveHealthIndicator类会调用RedisConnectionFactory来创建一个reactive连接,代码如下:

代码语言:javascript
复制
protected Mono<Health> doHealthCheck(Health.Builder builder) {
  //getConnection()创建一个连接
  return getConnection().flatMap((connection) -> doHealthCheck(builder, connection));
}

public LettuceReactiveRedisConnection getReactiveConnection() {
  //下面的构造函数会创建交互式连接
  return getShareNativeConnection()
      ? new LettuceReactiveRedisConnection(getSharedReactiveConnection(), reactiveConnectionProvider)
      : new LettuceReactiveRedisConnection(reactiveConnectionProvider);
}
LettuceReactiveRedisConnection(StatefulConnection<ByteBuffer, ByteBuffer> sharedConnection,
      LettuceConnectionProvider connectionProvider) {

    Assert.notNull(sharedConnection, "Shared StatefulConnection must not be null!");
    Assert.notNull(connectionProvider, "LettuceConnectionProvider must not be null!");
    //调用AsyncConnect构造函数创建连接方法
    this.dedicatedConnection = new AsyncConnect(connectionProvider, StatefulConnection.class);
    this.pubSubConnection = new AsyncConnect(connectionProvider, StatefulRedisPubSubConnection.class);
    this.sharedConnection = Mono.just(sharedConnection);
  }
AsyncConnect(LettuceConnectionProvider connectionProvider, Class<? extends T> connectionType) {

      Assert.notNull(connectionProvider, "LettuceConnectionProvider must not be null!");

      this.connectionProvider = connectionProvider;
      //回到了之前讲的使用connectionProvider创建连接
      Mono<T> defer = Mono.defer(() -> Mono.<T> just(connectionProvider.getConnection(connectionType)));

      this.connectionPublisher = defer.subscribeOn(Schedulers.elastic());
    }

释放不掉的连接

有时候我们为了节省创建连接花费的时间,会设置min-idle,但其实lettuce初始化时并不会创建这个数量的连接,除非我们设置一个参数spring.redis.lettuce.pool.time-between-eviction-runs=1,

而这样lettuce在初始化的时候因为使用了actuator做健康检查而创建{min-idle} + 1个reactive连接,并不会创建普通连接,只有在第一次请求的时候才会创建{min-idle} + 1个普通连接。

如果没有交互式场景,这些交互式连接不会被释放,造成资源浪费。所以如果使用了actuator监控检查,而又想初始化时创建一定数量的连接,只能造成连接资源浪费了。

为什么要这么设计,有点不明白,可能是bug?没顾上看后面的版本有没有处理这个问题。看下UML类图,从这个流程图看到,time-between-eviction-runs这个参数决定了是否初始化的时候创建${min-idle} + 1个连接池

上面关键代码就是GenericObjectPool类中的ensureMinIdle方法,在释放连接的时候也会调用这个方法,代码如下:

代码语言:javascript
复制
private void ensureIdle(final int idleCount, final boolean always) throws Exception {
       //省略部分代码
        while (idleObjects.size() < idleCount) {
            final PooledObject<T> p = create();
            if (p == null) {
                // Can't create objects, no reason to think another call to
                // create will work. Give up.
                break;
            }
            if (getLifo()) {
                idleObjects.addFirst(p);
            } else {
                idleObjects.addLast(p);
            }
        }
        if (isClosed()) {
            // Pool closed while object was being added to idle objects.
            // Make sure the returned object is destroyed rather than left
            // in the idle object pool (which would effectively be a leak)
            clear();
        }
    }

那为什么会比min-idle多创建一个连接呢?问题还在于上面的一个方法。初始化的流程如下:

1.健康检查需要创建一个reactive连接

代码语言:javascript
复制
protected Mono<Health> doHealthCheck(Health.Builder builder) {
  return getConnection().flatMap((connection) -> doHealthCheck(builder, connection));
}

2.之前介绍过,创建连接实际是用LettucePoolConnectionProvider的getConnection方法

代码语言:javascript
复制
public <T extends StatefulConnection<?, ?>> T getConnection(Class<T> connectionType) {
  GenericObjectPool<StatefulConnection<?, ?>> pool = pools.computeIfAbsent(connectionType, poolType -> {
    return ConnectionPoolSupport.createGenericObjectPool(() -> connectionProvider.getConnection(connectionType),
        poolConfig, false);
  });
  //省略部分代码
}

3.调用了ConnectionPoolSupport.createGenericObjectPool

代码语言:javascript
复制
public static <T extends StatefulConnection<?, ?>> GenericObjectPool<T> createGenericObjectPool(
        Supplier<T> connectionSupplier, GenericObjectPoolConfig config, boolean wrapConnections) {
  //省略部分代码
    GenericObjectPool<T> pool = new GenericObjectPool<T>(new RedisPooledObjectFactory<T>(connectionSupplier), config) {
  //省略部分代码
    };
    poolRef.set(new ObjectPoolWrapper<>(pool));
    return pool;
}

4.ConnectionPoolSupport.createGenericObjectPool方法创建GenericObjectPool对象,构造函数里面用到了前面讲的setConfig

代码语言:javascript
复制
public GenericObjectPool(final PooledObjectFactory<T> factory,
            final GenericObjectPoolConfig<T> config) {
  //省略部分代码
        setConfig(config);
    }

5.setConfig最终调用了上面讲的ensureIdle,而健康检查的那个连接还没有返还给线程池,线程池的数量已经是min-idle了,最终多了一个

同理,普通连接也是一样,首次创建的时候会比min-idle多一个

共享连接

第一部分介绍springboot整合lettuce时讲到RedisConfig的配置,如下方法里面第一行代码就是设置时是否共享Native连接。

代码语言:javascript
复制
@Bean
RedisTemplate redisTemplate(LettuceConnectionFactory factory){
    factory.setShareNativeConnection(false);
    RedisTemplate redisTemplate = new RedisTemplate();
    redisTemplate.setConnectionFactory(factory);
    return redisTemplate;
}

这个主要用于获取集群中的连接或者是获取Reactive连接时,可以用LettuceConnectionFactory中直接获取。我对这个地方的设计并不是特别理解,只是为了省去了从连接池获取和释放的的时间?

总结

lettuce的确很香,不过从设计中也可以看出一些瑕疵

如果应用使用了springboot的actuator,建议min-idle设置为0

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-06-20,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 jinjunzhu 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
云数据库 Redis®
腾讯云数据库 Redis®(TencentDB for Redis®)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档