前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Jedis cluster命令执行流程剖析

Jedis cluster命令执行流程剖析

作者头像
九州暮云
发布2019-08-21 11:36:25
1.2K0
发布2019-08-21 11:36:25
举报
文章被收录于专栏:九州牧云九州牧云

在Redis Cluster集群模式下,由于key分布在各个节点上,会造成无法直接实现mget、sInter等功能。因此,无论我们使用什么客户端来操作Redis,都要考虑单一key命令操作、批量key命令操作和多节点命令操作的情况,以及效率问题。

在之前的文章中剖析了Jedis cluster集群初始化源码,分析了源码之后可以得知,在Jedis中,使用的是JedisClusterConnection集群连接类来与Redis集群节点进行命令交互,它使用装饰模式对JedisCluster命令执行类进行了一层包装,同时对这三种不同类型的命令操作做了分类处理。

下面就看下JedisClusterConnection类中,如何实现这三种类型的key命令操作。在这里只列举一些典型的命令进行说明。本文基于spring-data-redis-1.8.4-RELEASE.jar和jedis-2.9.0.jar进行源码剖析,Redis版本为Redis 3.2.8。

单一key命令操作

对于单一命令操作,常用的就是get、set了。在JedisClusterConnection类中,get方法的实现如下:

代码语言:javascript
复制
public byte[] get(byte[] key) {
	try {
		return cluster.get(key);
	} catch (Exception ex) {
		throw convertJedisAccessException(ex);
	}
}

在上面代码中,执行cluster.get()方法时,实际上调用的是BinaryJedisCluster类的get()方法:

代码语言:javascript
复制
  public byte[] get(final byte[] key) {
    return new JedisClusterCommand<byte[]>(connectionHandler, maxAttempts) {
      @Override
      public byte[] execute(Jedis connection) {
        return connection.get(key);
      }
    }.runBinary(key);
  }

BinaryJedisCluster类的get()方法的核心操作是由JedisClusterCommand类runBinary()方法完成的,下面剖析一下该类的核心代码:

代码语言:javascript
复制
public abstract class JedisClusterCommand<T> {

	// 集群节点连接器
	private JedisClusterConnectionHandler connectionHandler;
	// 重试次数,默认5次
	private int maxAttempts;

	// 模板回调方法,执行相关的redis命令
	public abstract T execute(Jedis connection);

   public T runBinary(byte[] key) {
	    if (key == null) {
	      throw new JedisClusterException("No way to dispatch this command to Redis Cluster.");
	    }
		
	    return runWithRetries(key, this.maxAttempts, false, false);
  	}

	/**
	 * 利用重试机制运行键命令
	 * 
	 * @param key
	 *            要操作的键
	 * @param attempts
	 *            重试次数,每重试一次减1
	 * @param tryRandomNode
	 *            标识是否随机获取活跃节点连接,true为是,false为否
	 * @param asking
	 * @return
	 */
	private T runWithRetries(byte[] key, int attempts, boolean tryRandomNode, boolean asking) {
		if (attempts <= 0) {
			throw new JedisClusterMaxRedirectionsException("Too many Cluster redirections?");
		}

		Jedis connection = null;
		try {

			if (asking) {
				// TODO: Pipeline asking with the original command to make it
				// faster....
				connection = askConnection.get();
				connection.asking();

				// if asking success, reset asking flag
				asking = false;
			} else {
				if (tryRandomNode) {
					// 随机获取活跃节点连接
					connection = connectionHandler.getConnection();
				} else {
					// 计算key的slot值,然后根据slot缓存获取节点连接
					connection = connectionHandler.getConnectionFromSlot(JedisClusterCRC16.getSlot(key));
				}
			}

			// 调用具体的模板方法实现执行命令
			return execute(connection);

			// 集群节点不可达,直接抛出异常
		} catch (JedisNoReachableClusterNodeException jnrcne) {
			throw jnrcne;
		} catch (JedisConnectionException jce) {
			// 在递归执行runWithRetries方法之前释放连接
			releaseConnection(connection);
			connection = null;

			// 如果节点不能连接,重新初始化slot缓存
			if (attempts <= 1) {
				this.connectionHandler.renewSlotCache();

				throw jce;
			}

			// 出现连接错误重试执行命令
			return runWithRetries(key, attempts - 1, tryRandomNode, asking);
		} catch (JedisRedirectionException jre) {
			// 如果出现MOVE重定向错误,在连接上执行cluster slots命令重新初始化slot缓存
			if (jre instanceof JedisMovedDataException) {
				this.connectionHandler.renewSlotCache(connection);
			}

			// 在递归执行runWithRetries方法或者重建slot缓存之前释放连接,从而避免在错误的连接上执行命令,也为了避免连接泄露问题
			releaseConnection(connection);
			connection = null;

			if (jre instanceof JedisAskDataException) {
				asking = true;
				askConnection.set(this.connectionHandler.getConnectionFromNode(jre.getTargetNode()));
			} else if (jre instanceof JedisMovedDataException) {
			} else {
				throw new JedisClusterException(jre);
			}
			// slot初始化后重试执行命令
			return runWithRetries(key, attempts - 1, false, asking);
		} finally {
			//释放连接
			releaseConnection(connection);
		}
	}
}

单一key命令执行流程:

  1. 计算slot并根据slots缓存获取目标节点连接,发送命令
  2. 如果出现连接错误,使用重试机制执行键命令,每次命令重试对 attempts参数减1
  3. 捕获到MOVED重定向错误,使用cluster slots命令更新slots 缓存(renewSlotCache方法)
  4. 重复执行1 ~ 3步,直到命令执行成功,或者当attempts <= 0时抛出JedisClusterMaxRedirectionsException异常
  5. 在递归执行runWithRetries方法或者重建slot缓存之前释放连接,从而避免在错误的连接上执行命令,也为了避免连接泄露问题

多节点命令操作

在Redis Cluster中,有些命令如keys、flushall和删除指定模式的键这些操作,需要遍历所有的节点才可以完成。下面就以keys命令来说明这种情况下JedisClusterConnection类是如何完成该操作的,该类中keys()方法代码如下:

代码语言:javascript
复制
public Set<byte[]> keys(final byte[] pattern) {

		Assert.notNull(pattern, "Pattern must not be null!");

		//在所有主节点上执行keys命令,然后返回一个Collection集合
		Collection<Set<byte[]>> keysPerNode = clusterCommandExecutor
				.executeCommandOnAllNodes(new JedisClusterCommandCallback<Set<byte[]>>() {

					@Override
					public Set<byte[]> doInCluster(Jedis client) {
						return client.keys(pattern);
					}
				}).resultsAsList();

		//遍历执行keys命令获得的结果,然后添加进Set集合返回
		Set<byte[]> keys = new HashSet<byte[]>();
		for (Set<byte[]> keySet : keysPerNode) {
			keys.addAll(keySet);
		}
		return keys;
	}

在上面代码中我们看到了keys()方法内部调用了ClusterCommandExecutor类的executeCommandOnAllNodes()方法,该类是一个集群命令执行类,它提供了在多个集群节点上批量执行命令的特性,由于考虑到在多个节点上执行命令的效率问题,它使用Spring的org.springframework.core.task包里面的AsyncTaskExecutor接口来为命令执行操作提供异步支持,然后返回异步执行结果。ClusterCommandExecutor类的executeCommandOnAllNodes()方法及关联方法实现剖析如下:

代码语言:javascript
复制
      /**
	 * 使用ClusterCommandCallback接口实现类的doInCluster()方法在所有可达的主节点上执行命令
	 *
	 * @param cmd
	 * @return
	 * @throws ClusterCommandExecutionFailureException
	 */
	public <S, T> MulitNodeResult<T> executeCommandOnAllNodes(final ClusterCommandCallback<S, T> cmd) {
		// getClusterTopology().getActiveMasterNodes()获取的是所有的主节点
		return executeCommandAsyncOnNodes(cmd, getClusterTopology().getActiveMasterNodes());
	}
	
	/**
	 * @param callback
	 * @param nodes
	 * @return
	 * @throws ClusterCommandExecutionFailureException
	 * @throws IllegalArgumentException
	 *             in case the node could not be resolved to a topology-known node
	 */
	public <S, T> MulitNodeResult<T> executeCommandAsyncOnNodes(final ClusterCommandCallback<S, T> callback, Iterable<RedisClusterNode> nodes) {

		Assert.notNull(callback, "Callback must not be null!");
		Assert.notNull(nodes, "Nodes must not be null!");

		List<RedisClusterNode> resolvedRedisClusterNodes = new ArrayList<RedisClusterNode>();
		ClusterTopology topology = topologyProvider.getTopology();

		// 遍历Redis集群节点集合nodes,获取节点信息
		for (final RedisClusterNode node : nodes) {
			try {
				resolvedRedisClusterNodes.add(topology.lookup(node));
			} catch (ClusterStateFailureException e) {
				throw new IllegalArgumentException(String.format("Node %s is unknown to cluster", node), e);
			}
		}

		// 遍历节点信息,在相应Redis集群节点上执行相关命令
		Map<NodeExecution, Future<NodeResult<T>>> futures = new LinkedHashMap<NodeExecution, Future<NodeResult<T>>>();
		for (final RedisClusterNode node : resolvedRedisClusterNodes) {

			futures.put(new NodeExecution(node), executor.submit(new Callable<NodeResult<T>>() {

				@Override
				public NodeResult<T> call() throws Exception {
					return executeCommandOnSingleNode(callback, node);
				}
			}));
		}

		// 解析执行结果并返回
		return collectResults(futures);
	}
	
	public <S, T> NodeResult<T> executeCommandOnSingleNode(ClusterCommandCallback<S, T> cmd, RedisClusterNode node) {
		return executeCommandOnSingleNode(cmd, node, 0);
	}

	private <S, T> NodeResult<T> executeCommandOnSingleNode(ClusterCommandCallback<S, T> cmd, RedisClusterNode node, int redirectCount) {

		Assert.notNull(cmd, "ClusterCommandCallback must not be null!");
		Assert.notNull(node, "RedisClusterNode must not be null!");

		if (redirectCount > maxRedirects) {
			throw new TooManyClusterRedirectionsException(String.format(
					"Cannot follow Cluster Redirects over more than %s legs. Please consider increasing the number of redirects to follow. Current value is: %s.",
					redirectCount, maxRedirects));
		}

		RedisClusterNode nodeToUse = lookupNode(node);

		S client = this.resourceProvider.getResourceForSpecificNode(nodeToUse);
		Assert.notNull(client, "Could not acquire resource for node. Is your cluster info up to date?");

		try {
			// 在相应Redis节点上执行命令,具体执行命令的函数是实现ClusterCommandCallback接口的类的doInCluster方法
			return new NodeResult<T>(node, cmd.doInCluster(client));
		} catch (RuntimeException ex) {

			RuntimeException translatedException = convertToDataAccessExeption(ex);
			// 如果请求不被目标服务器接受,则进行重试,重新执行命令:redirectCount + 1
			if (translatedException instanceof ClusterRedirectException) {
				ClusterRedirectException cre = (ClusterRedirectException) translatedException;
				return executeCommandOnSingleNode(cmd, topologyProvider.getTopology().lookup(cre.getTargetHost(), cre.getTargetPort()),
						redirectCount + 1);
			} else {
				throw translatedException != null ? translatedException : ex;
			}
		} finally {
			this.resourceProvider.returnResourceForSpecificNode(nodeToUse, client);
		}
	}

多节点命令执行流程:

  1. 使用 getClusterTopology().getActiveMasterNodes()方法获取所有的可达的主节点。这里的可达表示主节点可以被连接上且状态不为fail状态
  2. 遍历所有主节点,然后在这些节点上异步执行相应的命令,最后将结果作为一个集合返回
  3. 如果请求不被目标服务器接受,则进行重试,重新执行命令,每次对redirectCount + 1,但redirectCount的次数不能大于maxRedirects最大重试次数,大于后会抛出TooManyClusterRedirectionsException异常

批量key命令操作

与keys、flushall等多节点命令相似,mget等批量key操作命令也要遍历多个节点执行相关命令。下面就以mget命令来说明这种情况下JedisClusterConnection类是如何完成该操作的,该类中mGet()方法代码如下:

代码语言:javascript
复制
public List<byte[]> mGet(byte[]... keys) {

		Assert.noNullElements(keys, "Keys must not contain null elements!");

		// 如果进行批量操作的key的slot值相同,表示key都在同一节点上,则直接在key所在的节点执行命令
		if (ClusterSlotHashUtil.isSameSlotForAllKeys(keys)) {
			return cluster.mget(keys);
		}

		// 如果进行批量操作的key的slot值不同,表示key不在同一节点上,则需要计算key的slot值,根据slot确定key所在的节点,然后执行命令
		return this.clusterCommandExecutor.executeMuliKeyCommand(new JedisMultiKeyClusterCommandCallback<byte[]>() {

			@Override
			public byte[] doInCluster(Jedis client, byte[] key) {
				return client.get(key);
			}
		}, Arrays.asList(keys)).resultsAsListSortBy(keys);
	}

类似地,在上面代码中我们看到了mGet()方法内部调用了ClusterCommandExecutor类的executeMuliKeyCommand()方法。该方法实现剖析如下:

代码语言:javascript
复制
      /**
	 * 在一组Redis集群节点上进行一个或多个key操作
	 *
	 * @param cmd
	 * @return
	 * @throws ClusterCommandExecutionFailureException
	 */
	public <S, T> MulitNodeResult<T> executeMuliKeyCommand(final MultiKeyClusterCommandCallback<S, T> cmd, Iterable<byte[]> keys) {

		// 节点和key映射Map,一个节点上有多个key
		Map<RedisClusterNode, Set<byte[]>> nodeKeyMap = new HashMap<RedisClusterNode, Set<byte[]>>();

		// 遍历key集合,将key添加到相应的Redis集群节点集合中
		for (byte[] key : keys) {
			// 通过getClusterTopology().getKeyServingNodes(key)方法计算key的slot值,然后获取key所在的Redis集群节点信息
			for (RedisClusterNode node : getClusterTopology().getKeyServingNodes(key)) {

				if (nodeKeyMap.containsKey(node)) {
					nodeKeyMap.get(node).add(key);
				} else {
					Set<byte[]> keySet = new LinkedHashSet<byte[]>();
					keySet.add(key);
					nodeKeyMap.put(node, keySet);
				}
			}
		}

		Map<NodeExecution, Future<NodeResult<T>>> futures = new LinkedHashMap<NodeExecution, Future<NodeResult<T>>>();

		// 遍历nodeKeyMap,如果是节点是主节点,则执行相关key的命令操作
		for (final Entry<RedisClusterNode, Set<byte[]>> entry : nodeKeyMap.entrySet()) {

			if (entry.getKey().isMaster()) {
				for (final byte[] key : entry.getValue()) {
					futures.put(new NodeExecution(entry.getKey(), key), executor.submit(new Callable<NodeResult<T>>() {

						@Override
						public NodeResult<T> call() throws Exception {
							return executeMultiKeyCommandOnSingleNode(cmd, entry.getKey(), key);
						}
					}));
				}
			}
		}

		return collectResults(futures);
	}

批量key命令执行流程:

  1. 先使用ClusterSlotHashUtil.isSameSlotForAllKeys()方法计算出这些key的slot值,接下来判断如果进行批量操作的key的slot值相同,表示key都在同一节点上,则直接在key所在的节点执行命令。否则,执行第2步
  2. 如果进行批量操作的key的slot值不同,表示key不在同一节点上,则需要计算key的slot值,根据slot确定key所在的节点,然后在该节点上执行命令,最后封装结果到集合里面返回

总结

  1. 无论是哪种类型的key操作,都是在Redis集群的主节点上执行命令的。这跟Redis Cluster集群的特性有关,Redis一般不允许在从节点上进行读写操作,在JedisClusterInfoCache类中,slots这个Map本地缓存保存的也是slot槽和主节点的连接池信息
  2. 对于keys等多节点命令来说,不需要计算key的slot值,只需要遍历全部主节点然后执行命令即可
  3. 对于单一key和批量key命令操作来说,需要计算key的slot值,根据slot确定key所在的节点,然后在该节点上执行命令
  4. Redis使用单线程模式执行命令,每一次命令执行需要经过发送命令、执行命令、返回结果三个阶段。在集群条件下,不同节点执行命令的效率是不同的,对于多节点命令和批量key命令操作,考虑到命令执行时间过长,可能导致其它命令阻塞的情况,客户端需要在命令执行时提供异步支持
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 单一key命令操作
  • 多节点命令操作
  • 批量key命令操作
  • 总结
相关产品与服务
云数据库 Redis
腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档