前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Netty在Redis客户端中的应用

Netty在Redis客户端中的应用

作者头像
书唐瑞
发布2022-06-02 13:54:06
1.7K0
发布2022-06-02 13:54:06
举报
文章被收录于专栏:Netty历险记

在我们日常使用Redis实现分布式锁中,依赖如下

代码语言:javascript
复制
<dependency>
   <groupId>org.redisson</groupId>
   <artifactId>redisson</artifactId>
   <version>3.10.1</version>
</dependency>

在使用Redisson作为客户端,它需要与服务端进行通信,那么它的底层通信使用的是Netty.

在启动Redisson客户端时,底层Netty就已经与服务端建立好了通信(通道Channel).

简单写了一个示例代码

代码语言:javascript
复制
import org.redisson.Redisson;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;

import java.util.concurrent.TimeUnit;

public class RedisClient {

   public static void main(String[] args) {

       Config config = new Config();
       config.useSingleServer().setAddress("redis://127.0.0.1:6379");

       // 单机模式
       RedissonClient redissonClient = Redisson.create(config);
       RLock redLock = redissonClient.getLock("computerLock");// 获取锁实例

       try {
           boolean isLock = redLock.tryLock(500, 1000, TimeUnit.MILLISECONDS);

           if (isLock) {
               System.out.println("获取到锁,执行业务逻辑");
          }
      } catch (Exception x) {
      } finally {
           redLock.unlock();
           System.out.println("释放锁");
      }
  }
}

以下代码摘录在源码,部分无关紧要的代码做了删减

以上代码中,一开始在执行

代码语言:javascript
复制
RedissonClient redissonClient = Redisson.create(config);

时候,就会创建Netty客户端,并与服务端建立好通信.建立好通信通道之后,我们的业务代码向服务端发送的命令就是通过建立好的通信通道发送给服务端的.

代码语言:javascript
复制
public static RedissonClient create(Config config) {
   // #2 创建Redisson
   Redisson redisson = new Redisson(config);
   return redisson;
}

Redisson类是最重要的类之一

代码语言:javascript
复制
// 实例化Redisson
protected Redisson(Config config) {
   this.config = config;
   Config configCopy = new Config(config);
   // #3
   connectionManager = ConfigSupport.createConnectionManager(configCopy);
   evictionScheduler = new EvictionScheduler(connectionManager.getCommandExecutor());
}
代码语言:javascript
复制
public static ConnectionManager createConnectionManager(Config configCopy) {
   UUID id = UUID.randomUUID();

   if (configCopy.getMasterSlaveServersConfig() != null) {
       return new MasterSlaveConnectionManager(configCopy.getMasterSlaveServersConfig(), configCopy, id);
  } else if (configCopy.getSingleServerConfig() != null) {
       // #4 单机模式
       return new SingleConnectionManager(configCopy.getSingleServerConfig(), configCopy, id);
  } else {
       throw new IllegalArgumentException("server(s) address(es) not defined!");
  }
}

此篇文章我们以单机模式的部署进行分析的代码

代码语言:javascript
复制
public SingleConnectionManager(SingleServerConfig cfg, Config config, UUID id) {
   // #5
   super(create(cfg), config, id);
}

super调用父类MasterSlaveConnectionManager构造器

代码语言:javascript
复制
public MasterSlaveConnectionManager(MasterSlaveServersConfig cfg, Config config, UUID id) {
   this(config, id);
   this.config = cfg;

   initTimer(cfg);
   // #6 内部创建Netty客户端
   initSingleEntry();
}
代码语言:javascript
复制
protected void initSingleEntry() {
   try {
       MasterSlaveEntry entry;
       if (config.checkSkipSlavesInit()) {
           entry = new SingleEntry(this, config);
      } else {
           entry = createMasterSlaveEntry(config);
      }
       // #7
       RFuture<RedisClient> f = entry.setupMasterEntry(config.getMasterAddress());
       f.syncUninterruptibly();

  } catch (RuntimeException e) {
       stopThreads();
       throw e;
  }
}
代码语言:javascript
复制
public RFuture<RedisClient> setupMasterEntry(URI address) {
   // #8 创建RedisClient
   RedisClient client = connectionManager.createClient(NodeType.MASTER, address, sslHostname);
   // #9
   return setupMasterEntry(client);
}

在#8处会创建RedisClient,通过名字可以猜到,它是一个客户端对象,在它的内部有一个用于连接服务端的Netty的Bootstrap对象

代码语言:javascript
复制
private RedisClient(RedisClientConfig config) {
   RedisClientConfig copy = new RedisClientConfig(config);
   channels = new DefaultChannelGroup(copy.getGroup().next());
   // 创建Bootstrap
   bootstrap = createBootstrap(copy, Type.PLAIN);
   pubSubBootstrap = createBootstrap(copy, Type.PUBSUB);

   this.commandTimeout = copy.getCommandTimeout();
}

在#9处便会通过Bootstrap对象连接服务端了

代码语言:javascript
复制
private RFuture<RedisClient> setupMasterEntry(final RedisClient client) {
   final RPromise<RedisClient> result = new RedissonPromise<RedisClient>();
   RFuture<InetSocketAddress> addrFuture = client.resolveAddr();
   addrFuture.addListener(new FutureListener<InetSocketAddress>() {

       @Override
       public void operationComplete(Future<InetSocketAddress> future) throws Exception {
           masterEntry = new ClientConnectionsEntry(
               client,
               config.getMasterConnectionMinimumIdleSize(),
               config.getMasterConnectionPoolSize(),
               config.getSubscriptionConnectionMinimumIdleSize(),
               config.getSubscriptionConnectionPoolSize(),
               connectionManager,
               NodeType.MASTER);

           // #10 内部会连接服务端
           RFuture<Void> writeFuture = writeConnectionPool.add(masterEntry);

           if (config.getSubscriptionMode() == SubscriptionMode.MASTER) {
               RFuture<Void> pubSubFuture = pubSubConnectionPool.add(masterEntry);
          }
      }
  });

   return result;
}
代码语言:javascript
复制
public RFuture<Void> add(final ClientConnectionsEntry entry) {
   final RPromise<Void> promise = new RedissonPromise<Void>();
   promise.addListener(new FutureListener<Void>() {
       @Override
       public void operationComplete(Future<Void> future) throws Exception {
           if (future.isSuccess()) {
               entries.add(entry);
          }
      }
  });
   // #11 内部会连接服务端
   initConnections(entry, promise, true);
   return promise;
}
代码语言:javascript
复制
private void initConnections(final ClientConnectionsEntry entry, final RPromise<Void> initPromise, boolean checkFreezed) {
   final int minimumIdleSize = getMinimumIdleSize(entry);
   final AtomicInteger initializedConnections = new AtomicInteger(minimumIdleSize);
   int startAmount = Math.min(50, minimumIdleSize);
   final AtomicInteger requests = new AtomicInteger(startAmount);
   
   for (int i = 0; i < startAmount; i++) {
       // #12 创建连接
       createConnection(checkFreezed, requests, entry, initPromise, minimumIdleSize, initializedConnections);
  }
}
代码语言:javascript
复制
private void createConnection(final boolean checkFreezed, final AtomicInteger requests, final ClientConnectionsEntry entry, final RPromise<Void> initPromise,
           final int minimumIdleSize, final AtomicInteger initializedConnections) {

   // #13 获取连接
   acquireConnection(entry, new Runnable() {
       @Override
       public void run() {
           RPromise<T> promise = new RedissonPromise<T>();
           // #14 创建连接
           createConnection(entry, promise);
           promise.addListener(new FutureListener<T>() {
               @Override
               public void operationComplete(Future<T> future) throws Exception {

              }
          });
      }
  });

}
代码语言:javascript
复制
private void createConnection(final ClientConnectionsEntry entry, final RPromise<T> promise) {
   // #15 连接
   RFuture<T> connFuture = connect(entry);
}
代码语言:javascript
复制
protected RFuture<T> connect(ClientConnectionsEntry entry) {
   return (RFuture<T>) entry.connect();
}
代码语言:javascript
复制
public RFuture<RedisConnection> connect() {
   // #16 这里的client就是之前创建的RedisClient
   RFuture<RedisConnection> future = client.connectAsync();
   
   return future;
}
代码语言:javascript
复制
public RFuture<RedisConnection> connectAsync() {
   final RPromise<RedisConnection> f = new RedissonPromise<RedisConnection>();

   RFuture<InetSocketAddress> addrFuture = resolveAddr();
   addrFuture.addListener(new FutureListener<InetSocketAddress>() {
       @Override
       public void operationComplete(Future<InetSocketAddress> future) throws Exception {
           // #17 连接服务端
           ChannelFuture channelFuture = bootstrap.connect(future.getNow());
      }
  });

   return f;
}

在#17处通过之前创建的Bootstrap对象,调用connect方法连接服务端,底层就是通过Netty连接服务端的.

至此客户端就与服务端建立了连接,之后需要发送给服务端的命令,都通过这个建立好的连接发送出去.

最后系统中会多出许多'redisson-netty-1-x'命名的线程.它们都是已经和服务端建立好了连接,随时都可以进行通信.

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-09-10,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Netty历险记 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
云数据库 Redis®
腾讯云数据库 Redis®(TencentDB for Redis®)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档