前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >spring-data-redis中lettuce pipeline的坑之解决篇

spring-data-redis中lettuce pipeline的坑之解决篇

作者头像
山行AI
发布2020-07-14 16:59:15
5.8K3
发布2020-07-14 16:59:15
举报
文章被收录于专栏:山行AI山行AI

在上一篇中我们知道了几种常用的redis client,分别分析了lettuce原生的pipeline处理方式和在使用spring data redis包装后的lettuce处理pipeline时源码细节,并知道了后者直接使用时并不是真正的pipeline操作。那么如果我既想要使用spring-data-redis来操作lettuce的pipeline,又想要真正做到pipeline该怎么处理呢?本节我们就来聊一聊这个问题。与此同时,我们会来进一步地分析下redis connection和pool的内容。

前言

我们先来了解下在spring-data-redis中是如何包装lettuce的连接的,然后会根据这些信息得到上一篇文章中留下的那个问题的解。

连接处理

会先后对r连接池、redisTemplate模式下的连接和shareNativeConnection模式下的连接处理方式进行分析。

连接池

如果想了解连接池的内容,就需要了解下LettuceConnectionFactory。我们来看一下它的属性:

代码语言:javascript
复制
    private final LettuceClientConfiguration clientConfiguration;

    private @Nullable AbstractRedisClient client;
    private @Nullable LettuceConnectionProvider connectionProvider;
    private @Nullable LettuceConnectionProvider reactiveConnectionProvider;
    private boolean validateConnection = false;
    private boolean shareNativeConnection = true;
    private @Nullable SharedConnection<byte[]> connection;
    private @Nullable SharedConnection<ByteBuffer> reactiveConnection;
    private @Nullable LettucePool pool;
    /** Synchronization monitor for the shared Connection */
    private final Object connectionMonitor = new Object();
    private boolean convertPipelineAndTxResults = true;

    private RedisStandaloneConfiguration standaloneConfig = new RedisStandaloneConfiguration("localhost", 6379);

    private @Nullable RedisConfiguration configuration;

    private @Nullable ClusterCommandExecutor clusterCommandExecutor;
主要属性:

•LettuceClientConfiguration:client的配置,基于commons pool的连接池目前也是基于它;•AbstractRedisClient client:内部维持的redis client对象;•LettuceConnectionProvider connectionProvider: 连接提供者,连接池就是由它来提供•LettuceConnectionProvider reactiveConnectionProvider: reactive模式下的连接提供者•validateConnection:是否校验连接•shareNativeConnection:是否共享本地连接•SharedConnection<byte[]> connection:用于共享的连接,如果shareNativeConnection为false则此处为null•SharedConnection reactiveConnection:用于reactive模式下的共享连接•LettucePool pool:旧版本的lettuce连接池,目前由commons pool替代了•Object connectionMonitor: 共享的连接之间用于同步的monitor

主要方法:

1. 连接池

代码语言:javascript
复制
/**
     * @param pool
     * @deprecated since 2.0, use pooling via {@link LettucePoolingClientConfiguration}.
     */
    @Deprecated
    public LettuceConnectionFactory(LettucePool pool) {
        this(new MutableLettuceClientConfiguration());
        this.pool = pool;
    }

旧的连接工厂,目前不再使用了,目前使用的是根据LettucePoolingClientConfiguration的配置初始化的连接池。

LettuceConnectionFactory的创建部分见org.springframework.boot.autoconfigure.data.redis.LettuceConnectionConfiguration#redisConnectionFactory方法:

代码语言:javascript
复制
@Bean
@ConditionalOnMissingBean(RedisConnectionFactory.class)
public LettuceConnectionFactory redisConnectionFactory(ClientResources clientResources)
    throws UnknownHostException {
    LettuceClientConfiguration clientConfig = getLettuceClientConfiguration(clientResources,
                                                                            this.properties.getLettuce().getPool());
    return createLettuceConnectionFactory(clientConfig);
}

通过clientResources和配置信息来初始化LettuceClientConfiguration。

2. LettuceConnectionConfiguration#getLettuceClientConfiguration方法:

代码语言:javascript
复制
private LettuceClientConfiguration getLettuceClientConfiguration(ClientResources clientResources, Pool pool) {
        LettuceClientConfigurationBuilder builder = createBuilder(pool);
        applyProperties(builder);
        if (StringUtils.hasText(this.properties.getUrl())) {
            customizeConfigurationFromUrl(builder);
        }
        builder.clientResources(clientResources);
        customize(builder);
        return builder.build();
    }

这里通过LettuceClientConfigurationBuilder来构建LettuceClientConfiguration对象的,来看下它的实现:

那么这里会创建LettuceClientConfigurationBuilder对象还是LettucePoolingClientConfigurationBuilder对象呢?需要来看下createBuilder方法:

代码语言:javascript
复制
private LettuceClientConfigurationBuilder createBuilder(Pool pool) {
        if (pool == null) {
            return LettuceClientConfiguration.builder();
        }
        return new PoolBuilderFactory().createBuilder(pool);
    }

可以看出如果配置了spring.redis.lettuce.pool的相关信息,这里就会生成LettucePoolingClientConfigurationBuilder对象。我们来看下LettucePoolingClientConfigurationBuilder的build方法:

代码语言:javascript
复制
@Override
public LettucePoolingClientConfiguration build() {
    return new DefaultLettucePoolingClientConfiguration(super.build(), poolConfig);
}

GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig()

这里最后生成的是DefaultLettucePoolingClientConfiguration对象,内部使用的连接池配置为commons-pool提供的GenericObjectPoolConfig。

3. 我们来看下LettuceConnectionFactory的初始化部分,.LettuceConnectionFactory#afterPropertiesSet:

代码语言:javascript
复制
public void afterPropertiesSet() {
        this.client = createClient();
        this.connectionProvider = createConnectionProvider(client, LettuceConnection.CODEC);
        --------省略集群模式下的---------
        }
    }

这里省略掉集群模式下的初始化方式的分析,因为单机和集群版的流程大致相同,只是一些配置不太一样。

4. createConnectionProvider方法:

代码语言:javascript
复制
private LettuceConnectionProvider createConnectionProvider(AbstractRedisClient client, RedisCodec<?, ?> codec) {
        LettuceConnectionProvider connectionProvider = doCreateConnectionProvider(client, codec);
        if (this.clientConfiguration instanceof LettucePoolingClientConfiguration) {
            return new LettucePoolingConnectionProvider(connectionProvider,
                    (LettucePoolingClientConfiguration) this.clientConfiguration);
        }
        return connectionProvider;
    }

由于上面生成的是DefaultLettucePoolingClientConfiguration,这里最后生成的就是LettucePoolingConnectionProvider对象。也就是一个池化的对象。

接下来我们来看一看从它里面获取连接的方法org.springframework.data.redis.connection.lettuce.LettucePoolingConnectionProvider#getConnection:

代码语言:javascript
复制
private final Map<Class<?>, GenericObjectPool<StatefulConnection<?, ?>>> pools = new ConcurrentHashMap<>(32);

    @Override
    public <T extends StatefulConnection<?, ?>> T getConnection(Class<T> connectionType) {
        GenericObjectPool<StatefulConnection<?, ?>> pool = pools.computeIfAbsent(connectionType, poolType -> {
            return ConnectionPoolSupport.createGenericObjectPool(() -> connectionProvider.getConnection(connectionType),
                    poolConfig, false);
        });
        try {
            StatefulConnection<?, ?> connection = pool.borrowObject();
            poolRef.put(connection, pool);
            return connectionType.cast(connection);
        } catch (Exception e) {
            throw new PoolException("Could not get a resource from the pool", e);
        }
    }

pools是用来维护Connection类型与GenericObjectPool连接池之间关系的一个map,每次获取连接时会根据连接类型获取到对应的连接池,然后从连接池中获取连接。在LettucePoolingConnectionProvider内部包装着一个StandaloneConnectionProvider类型的provider,它才是最终提供connection对象的provider,代码部分为:

代码语言:javascript
复制
ConnectionPoolSupport.createGenericObjectPool(() -> connectionProvider.getConnection(connectionType),
                    poolConfig, false)

org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory#getConnection

该方法的代码如下:

代码语言:javascript
复制
private @Nullable LettucePool pool;

public RedisConnection getConnection() {
        if (isClusterAware()) {
            return getClusterConnection();
        }
        LettuceConnection connection;
        if (pool != null) {
            connection = new LettuceConnection(getSharedConnection(), getTimeout(), null, pool, getDatabase());
        } else {
            connection = new LettuceConnection(getSharedConnection(), connectionProvider, getTimeout(), getDatabase());
        }
        connection.setConvertPipelineAndTxResults(convertPipelineAndTxResults);
        return connection;
    }

这里需要注意一点,这个pool是LettucePool对象,在上文中已经分析过,它是比较老的api中的使用的pool,在新的中使用commons-pool代替了。所以这里在新的配置环境中会走pool==null这个分支。LettuceConnection的第一个入参是StatefulConnection<byte[], byte[]> asyncSharedConn对象,也就是说getSharedConnection方法返回的结果是asyncSharedConn。这个会对LettuceConnection的getAsyncConnection方法产生影响,该方法代码如下:

代码语言:javascript
复制
RedisClusterAsyncCommands<byte[], byte[]> getAsyncConnection() {
    if (isQueueing()) {
        return getAsyncDedicatedConnection();
    }
    if (asyncSharedConn != null) {
        if (asyncSharedConn instanceof StatefulRedisConnection) {
            // 如果asyncSharedConn不为空,则会通过它的async方法来创建RedisClusterAsyncCommands对象
            return ((StatefulRedisConnection<byte[], byte[]>) asyncSharedConn).async();
        }
    }
    // 如果asyncSharedConn为空,则走getAsyncDedicatedConnection方法来创建RedisClusterAsyncCommands
    return getAsyncDedicatedConnection();
}

如果asyncSharedConn不为空,这里会通过asyncSharedConn的async()方法来生成RedisClusterAsyncCommands对象。

如果asyncSharedConn为空,则会调用getAsyncDedicatedConnection方法来生成RedisClusterAsyncCommands对象:

代码语言:javascript
复制
protected RedisClusterAsyncCommands<byte[], byte[]> getAsyncDedicatedConnection() {
        if (asyncDedicatedConn == null) {
            asyncDedicatedConn = doGetAsyncDedicatedConnection();
            if (asyncDedicatedConn instanceof StatefulRedisConnection) {
                ((StatefulRedisConnection<byte[], byte[]>) asyncDedicatedConn).sync().select(dbIndex);
            }
        }
        if (asyncDedicatedConn instanceof StatefulRedisConnection) {
            return ((StatefulRedisConnection<byte[], byte[]>) asyncDedicatedConn).async();
        }
        if (asyncDedicatedConn instanceof StatefulRedisClusterConnection) {
            return ((StatefulRedisClusterConnection<byte[], byte[]>) asyncDedicatedConn).async();
        }
        ------------省略部分代码----------------
    }

org.springframework.data.redis.connection.lettuce.LettuceConnection#doGetAsyncDedicatedConnection:
protected StatefulConnection<byte[], byte[]> doGetAsyncDedicatedConnection() {
    return connectionProvider.getConnection(StatefulConnection.class);
}

可以看出最终doGetAsyncDedicatedConnection方法也是通过connectionProvider对象来获取StatefulConnection类型的连接对象的。

那么,调用哪个方法会返回RedisClusterAsyncCommands对象呢?

在LettuceConnection中只有一个public的方法返回RedisClusterAsyncCommands对象的:

代码语言:javascript
复制
    @Override
    public RedisClusterAsyncCommands<byte[], byte[]> getNativeConnection() {
        LettuceSubscription subscription = this.subscription;
        return (subscription != null ? subscription.getNativeConnection().async() : getAsyncConnection());
    }

可以看出,它调用的实际上也是getAsyncConnection()方法。

shareNativeConnection 参数

来看一下org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory#getSharedConnection方法:

代码语言:javascript
复制
@Nullable
protected StatefulRedisConnection<byte[], byte[]> getSharedConnection() {
    return shareNativeConnection ? (StatefulRedisConnection) getOrCreateSharedConnection().getConnection() : null;
}

这里有一个很重要的参数——shareNativeConnection,如果shareNativeConnection为true,会使用getOrCreateSharedConnection().getConnection()来操作,它的第一步返回的是SharedConnection对象,然后通过getConnection()来获取native连接。我们来看下它们的方法:

代码语言:javascript
复制
org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory#getOrCreateSharedConnection:
private SharedConnection<byte[]> getOrCreateSharedConnection() {
    synchronized (this.connectionMonitor) {
        if (this.connection == null) {
            // 如果为空,则创建 SharedConnection
            this.connection = new SharedConnection<>(connectionProvider);
        }
        // 如果不为空,则使用相同连接
        return this.connection;
    }
}

org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory.SharedConnection#getConnection:
@Nullable
StatefulConnection<E, E> getConnection() {
    synchronized (this.connectionMonitor) {
        if (this.connection == null) {
             // 如果connectin为空则调用getNativeConnection方法获取连接
            this.connection = getNativeConnection();
        }
        if (getValidateConnection()) {
            // 校验连接
            validateConnection();
        }
        // 如果内部连接已经存在,则返回相同的连接
        return this.connection;
    }
}

org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory.SharedConnection#getNativeConnection:
/**
  * Obtain a connection from the associated {@link LettuceConnectionProvider}.
 *
* @return the connection.
*/
private StatefulConnection<E, E> getNativeConnection() {

    try {
        // 从provider中获取连接,这里也是从连接池中去获取连接的,返回的也是StatefulConnection类型的连接对象
        return connectionProvider.getConnection(StatefulConnection.class);
    } catch (RedisException e) {
        throw new RedisConnectionFailureException("Unable to connect to Redis", e);
    }
}

这里主要是获取SharedConnection的步骤,最终会从connectionProvider中获取shared连接,连接为StatefulConnection类型。可以看出通过LettuceConnectionFactory#getSharedConnection方法最终获取到的连接为StatefulRedisConnection对象。

shareNativeConnection为true和false的区别

上面我们知道,当shareNativeConnection为true时会通过getOrCreateSharedConnection().getConnection()来初始化LettuceConnection的asyncSharedConn属性。它生成的是SharedConnection对象,然后通过它的getConnection方法获取具体连接的。在它们里面都有一个共同点,会先判断当连接对象为空时会创建新的连接,如果已经初始化过了,则使用已有的连接,即共享连接。

redisTemplate模式下的连接

咱们以this.redisTemplate.opsForValue().get(cacheKey)方法为切入点往下来看。

首先是org.springframework.data.redis.core.DefaultValueOperations#get(java.lang.Object)方法:

代码语言:javascript
复制
@Override
    public V get(Object key) {
        return execute(new ValueDeserializingRedisCallback(key) {
            @Override
            protected byte[] inRedis(byte[] rawKey, RedisConnection connection) {
                return connection.get(rawKey);
            }
        }, true);
    }

接着往下走,继续往下看org.springframework.data.redis.core.AbstractOperations#execute方法:

代码语言:javascript
复制
    @Nullable
    <T> T execute(RedisCallback<T> callback, boolean b) {
        return template.execute(callback, b);
    }

org.springframework.data.redis.core.RedisTemplate#execute(org.springframework.data.redis.core.RedisCallback<T>, boolean):
@Nullable
public <T> T execute(RedisCallback<T> action, boolean exposeConnection) {
    return execute(action, exposeConnection, false);
}

org.springframework.data.redis.core.RedisTemplate#execute(org.springframework.data.redis.core.RedisCallback<T>, boolean, boolean):
@Nullable
public <T> T execute(RedisCallback<T> action, boolean exposeConnection, boolean pipeline) {
    Assert.isTrue(initialized, "template not initialized; call afterPropertiesSet() before using it");
    Assert.notNull(action, "Callback object must not be null");
    RedisConnectionFactory factory = getRequiredConnectionFactory();
    RedisConnection conn = null;
    try {
        if (enableTransactionSupport) {
            // 开启事务时获取连接的方法
            // only bind resources in case of potential transaction synchronization
            conn = RedisConnectionUtils.bindConnection(factory, enableTransactionSupport);
        } else {
            // 获取连接的方法
            conn = RedisConnectionUtils.getConnection(factory);
        }
        boolean existingConnection = TransactionSynchronizationManager.hasResource(factory);
        RedisConnection connToUse = preProcessConnection(conn, existingConnection);
        boolean pipelineStatus = connToUse.isPipelined();
        if (pipeline && !pipelineStatus) {
            connToUse.openPipeline();
        }
        RedisConnection connToExpose = (exposeConnection ? connToUse : createRedisConnectionProxy(connToUse));
        // 执行的部分
        T result = action.doInRedis(connToExpose);
        // close pipeline
        if (pipeline && !pipelineStatus) {
            connToUse.closePipeline();
        }
        // TODO: any other connection processing?
        return postProcessResult(result, connToUse, existingConnection);
    } finally {
        // 释放连接
        RedisConnectionUtils.releaseConnection(conn, factory);
    }
}

主要有三步操作:

1.通过RedisConnectionUtils.getConnection(factory)来获取连接,底层是通过connectionFactory.getConnection()来获取连接的;2.action.doInRedis:执行操作;3.使用RedisConnectionUtils.releaseConnection方法释放连接

spring-data-redis使用lettuce中假的pipeline的方法

看完了上面的内容,就能知道其实解决办法很简单:获取原生的lettuce连接、获取RedisClusterAsyncCommands对象,然后用原生的操作pipeline的方法来处理,继而释放连接即可。

代码语言:javascript
复制
    RedisConnectionFactory connectionFactory = redisTemplate.getConnectionFactory();
        LettuceConnection connection = null;
        try {
            connection = (LettuceConnection) RedisConnectionUtils.getConnection(connectionFactory);
            //LettuceConnection connection = (LettuceConnection)redisTemplate.getConnectionFactory().getConnection();
            RedisClusterAsyncCommands<byte[], byte[]> commands = connection.getNativeConnection();
            commands.setAutoFlushCommands(false);
            List<RedisFuture<?>> futures = Lists.newArrayList();
            for (int i = 0; i < 50; i++) {
                futures.add(commands.set(("aaa-" + i).getBytes(), ("value-" + i).getBytes()));
                futures.add(commands.expire(("key-" + i).getBytes(), 3600));
            }

            // write all commands to the transport layer
            commands.flushCommands();

            // synchronization example: Wait until all futures complete
            boolean result = LettuceFutures.awaitAll(5, TimeUnit.SECONDS,
                    futures.toArray(new RedisFuture[futures.size()]));
        }finally {
            if (connection != null){
                RedisConnectionUtils.releaseConnection(connection,connectionFactory);
            }
        }

上面的代码中主要包括四步:

•获取LettuceConnection对象,不管是否共享连接,底层实际上也都是从连接池中获取连接的,只是连接之间是否可以共享而已•获取RedisClusterAsyncCommands,通过connection.getNativeConnection()方法获取•操作部分:先关掉autoflush,然后将所有的操作加到commands列表中,最后直接flush出去•释放连接

后记

这里只是略显仓促地对上一篇的文章进行一个补充,如果能给你带来一些帮助,不甚荣幸!

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-07-11,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 开发架构二三事 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 连接处理
    • 连接池
      • redisTemplate模式下的连接
      • spring-data-redis使用lettuce中假的pipeline的方法
      • 后记
      相关产品与服务
      云数据库 Redis
      腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档