首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊lettuce的sentinel连接

聊聊lettuce的sentinel连接

作者头像
code4it
发布2018-09-17 17:26:57
2.2K0
发布2018-09-17 17:26:57
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下lettuce的sentinel连接

RedisClient.connectSentinel

lettuce-core-5.0.4.RELEASE-sources.jar!/io/lettuce/core/RedisClient.java

    private <K, V> StatefulRedisSentinelConnection<K, V> connectSentinel(RedisCodec<K, V> codec, RedisURI redisURI,
            Duration timeout) {
        assertNotNull(codec);
        checkValidRedisURI(redisURI);

        ConnectionBuilder connectionBuilder = ConnectionBuilder.connectionBuilder();
        connectionBuilder.clientOptions(ClientOptions.copyOf(getOptions()));
        connectionBuilder.clientResources(clientResources);

        DefaultEndpoint endpoint = new DefaultEndpoint(clientOptions);

        StatefulRedisSentinelConnectionImpl<K, V> connection = newStatefulRedisSentinelConnection(endpoint, codec, timeout);

        logger.debug("Trying to get a Redis Sentinel connection for one of: " + redisURI.getSentinels());

        connectionBuilder.endpoint(endpoint).commandHandler(() -> new CommandHandler(clientOptions, clientResources, endpoint))
                .connection(connection);
        connectionBuilder(getSocketAddressSupplier(redisURI), connectionBuilder, redisURI);

        if (clientOptions.isPingBeforeActivateConnection()) {
            connectionBuilder.enablePingBeforeConnect();
        }

        if (redisURI.getSentinels().isEmpty() && (isNotEmpty(redisURI.getHost()) || !isEmpty(redisURI.getSocket()))) {
            channelType(connectionBuilder, redisURI);
            try {
                getConnection(initializeChannelAsync(connectionBuilder));
            } catch (RuntimeException e) {
                connection.close();
                throw e;
            }
        } else {

            boolean connected = false;
            boolean first = true;
            Exception causingException = null;
            validateUrisAreOfSameConnectionType(redisURI.getSentinels());

            for (RedisURI uri : redisURI.getSentinels()) {
                if (first) {
                    channelType(connectionBuilder, uri);
                    first = false;
                }
                connectionBuilder.socketAddressSupplier(getSocketAddressSupplier(uri));

                if (logger.isDebugEnabled()) {
                    SocketAddress socketAddress = SocketAddressResolver.resolve(uri, clientResources.dnsResolver());
                    logger.debug("Connecting to Redis Sentinel, address: " + socketAddress);
                }
                try {
                    getConnection(initializeChannelAsync(connectionBuilder));
                    connected = true;
                    break;
                } catch (Exception e) {
                    logger.warn("Cannot connect Redis Sentinel at " + uri + ": " + e.toString());
                    causingException = e;
                }
            }

            if (!connected) {
                connection.close();
                throw new RedisConnectionException("Cannot connect to a Redis Sentinel: " + redisURI.getSentinels(),
                        causingException);
            }
        }

        if (LettuceStrings.isNotEmpty(redisURI.getClientName())) {
            connection.setClientName(redisURI.getClientName());
        }

        return connection;
    }
  • connectSentinel方法,会遍历sentinel,挨个取master获取连接,如果连接不上或抛异常则继续用下一个sentinel获取
  • 如果遍历完sentinel都抛异常,则最后抛出RedisConnectionException(“Cannot connect to a Redis Sentinel: “ + redisURI.getSentinels(),causingException)
  • 这里会调用AbstractRedisClient的initializeChannelAsync方法

AbstractRedisClient.initializeChannelAsync

lettuce-core-5.0.4.RELEASE-sources.jar!/io/lettuce/core/AbstractRedisClient.java

    /**
     * Connect and initialize a channel from {@link ConnectionBuilder}.
     *
     * @param connectionBuilder must not be {@literal null}.
     * @return the {@link ConnectionFuture} to synchronize the connection process.
     * @since 4.4
     */
    @SuppressWarnings("unchecked")
    protected <K, V, T extends RedisChannelHandler<K, V>> ConnectionFuture<T> initializeChannelAsync(
            ConnectionBuilder connectionBuilder) {

        SocketAddress redisAddress = connectionBuilder.socketAddress();

        if (clientResources.eventExecutorGroup().isShuttingDown()) {
            throw new IllegalStateException("Cannot connect, Event executor group is terminated.");
        }

        logger.debug("Connecting to Redis at {}", redisAddress);

        CompletableFuture<Channel> channelReadyFuture = new CompletableFuture<>();
        Bootstrap redisBootstrap = connectionBuilder.bootstrap();

        RedisChannelInitializer initializer = connectionBuilder.build();
        redisBootstrap.handler(initializer);

        clientResources.nettyCustomizer().afterBootstrapInitialized(redisBootstrap);
        CompletableFuture<Boolean> initFuture = initializer.channelInitialized();
        ChannelFuture connectFuture = redisBootstrap.connect(redisAddress);

        connectFuture.addListener(future -> {

            if (!future.isSuccess()) {

                logger.debug("Connecting to Redis at {}: {}", redisAddress, future.cause());
                connectionBuilder.endpoint().initialState();
                channelReadyFuture.completeExceptionally(future.cause());
                return;
            }

            initFuture.whenComplete((success, throwable) -> {

                if (throwable == null) {
                    logger.debug("Connecting to Redis at {}: Success", redisAddress);
                    RedisChannelHandler<?, ?> connection = connectionBuilder.connection();
                    connection.registerCloseables(closeableResources, connection);
                    channelReadyFuture.complete(connectFuture.channel());
                    return;
                }

                logger.debug("Connecting to Redis at {}, initialization: {}", redisAddress, throwable);
                connectionBuilder.endpoint().initialState();
                Throwable failure;

                if (throwable instanceof RedisConnectionException) {
                    failure = throwable;
                } else if (throwable instanceof TimeoutException) {
                    failure = new RedisConnectionException("Could not initialize channel within "
                            + connectionBuilder.getTimeout(), throwable);
                } else {
                    failure = throwable;
                }
                channelReadyFuture.completeExceptionally(failure);

                CompletableFuture<Boolean> response = new CompletableFuture<>();
                response.completeExceptionally(failure);

            });
        });

        return new DefaultConnectionFuture<T>(redisAddress, channelReadyFuture.thenApply(channel -> (T) connectionBuilder
                .connection()));
    }
  • 这里initializeChannelAsync的时候,会调用connectionBuilder.socketAddress()方法,进而调用RedisClient的getSocketAddress方法

RedisClient.getSocketAddress

lettuce-core-5.0.4.RELEASE-sources.jar!/io/lettuce/core/RedisClient.java

    protected SocketAddress getSocketAddress(RedisURI redisURI) throws InterruptedException, TimeoutException,
            ExecutionException {
        SocketAddress redisAddress;

        if (redisURI.getSentinelMasterId() != null && !redisURI.getSentinels().isEmpty()) {
            logger.debug("Connecting to Redis using Sentinels {}, MasterId {}", redisURI.getSentinels(),
                    redisURI.getSentinelMasterId());
            redisAddress = lookupRedis(redisURI);

            if (redisAddress == null) {
                throw new RedisConnectionException("Cannot provide redisAddress using sentinel for masterId "
                        + redisURI.getSentinelMasterId());
            }

        } else {
            redisAddress = SocketAddressResolver.resolve(redisURI, clientResources.dnsResolver());
        }
        return redisAddress;
    }

    private SocketAddress lookupRedis(RedisURI sentinelUri) throws InterruptedException, TimeoutException, ExecutionException {
        try (StatefulRedisSentinelConnection<String, String> connection = connectSentinel(sentinelUri)) {
            return connection.async().getMasterAddrByName(sentinelUri.getSentinelMasterId())
                    .get(timeout.toNanos(), TimeUnit.NANOSECONDS);
        }
    }
  • getSocketAddress方法会调用lookupRedis方法,而lookupRedis方法则调用getMasterAddrByName方法,通过sentinel来获取master的ip地址

小结

  • redis的sentinel类似于一个master的服务发现中心,假设master有故障,则通过sentinel获取新的master实现failover。
  • 而sentinel部署多个来实现高可用,假设一个sentinel挂了,则client端使用下一个sentinel来获取master地址

doc

  • lettuce Redis-Sentinel
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2018-09-13,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • RedisClient.connectSentinel
  • AbstractRedisClient.initializeChannelAsync
  • RedisClient.getSocketAddress
  • 小结
  • doc
相关产品与服务
云数据库 Redis
腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档