前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >如何让JedisCluster支持Pipeline

如何让JedisCluster支持Pipeline

作者头像
Bug开发工程师
发布2019-12-09 16:39:14
2.9K0
发布2019-12-09 16:39:14
举报
文章被收录于专栏:码农沉思录

hmset等批量操作命令与pipeline最大的区别是,前者是原子性命令,比如hmset,如果一次插入的field过多,会导致命令耗时增加;后者非原子性,只是批量的传输要执行命令,减少网络耗时。pipeline提升性能的关键,一是RTT,节省往返时间,二是I/O系统调用,read系统调用,需要从用户态,切换到内核态。

mset这类批量操作命令只能指定一个key,在Cluster集群下,不存在跨节点问题。而Pipeline由于支持所有命令的操作,支持多key,在Cluster集群模式下,会出现key映射到不同solt槽,可能会落到不同的节点上。这也是JedisCluster不提供Pipeline支持的原因。

HashTag

HashTag机制可以影响key被分配到的slot,从而可以使用那些被限制在slot中操作,比如rename。

我在项目重构阶段就遇到这个问题,代码中为了保证数据更新的原子性,使用了一个临时key写入数据,当所有数据更新完成后,再用rename将临时key替换回原来的key,因redis集群由原来的主从集群改为cluster集群后,rename导致代码抛出异常,原因是rename前的key与rename后的key映射到的槽位不在同一个节点上。

HashTag即是用{}包裹key的一个子串,如{user}1, {user}2。在设置了HashTag的情况下,集群会根据HashTag决定key分配到的slot, 两个key拥有相同的HashTag=>{user}, 它们会被分配到同一个slot,允许我们使用mget、rename命令。

代码语言:javascript
复制
#           rename
#offers-tmp ---->offers-active
{offers}-tmp ----> {offers}-active

通常情况下,HashTag不支持嵌套,即将第一个'{'和第一个'}'中间的内容作为HashTag。使用HashTag可能会导致过多的key分配到同一个slot中,造成数据倾斜影响系统的吞吐量,务必谨慎使用。

让JedisCluster支持Pipeline

Pipeline需要客户端和服务端的支持。这是官网对Pipeline的介绍:https://redis.io/topics/pipelining。

对于服务端来说,所需要的是能够处理一个客户端通过同一个 TCP 连接发来的多个命令,但并不是所有命令都接收完才执行,和处理单个命令一样,每读到一条完整的命令就放入命令等待队列等待执行,每处理完一条命令就响应给客户端,直到客户端调用socket.getInputStream()的输入流的read方法读取响应。(Jedis具体实现看RedisInputStream与Connection。)

对于客户端,则是要将多个命令写入缓冲区,缓冲区满了就发送,然后再写入缓冲区buf,最后一次需要调用flush将未满的缓冲区的命令都发送出去,最后才处理 Redis 的应答(即read),缓冲区大小为8192字节。

使用Cluster集群模式,需要客户端缓存每个节点的槽位信息。JedisCluster在发送命令前会根据CRC16(key) %16384计算出key所在的槽位,根据槽位获取对应的节点连接池,再从连接池中获取一个Jedis连接。

JedisClustet是通过JedisSlotBasedConnectionHandler获取连接的,在JedisCluster的<init>方法中,会创建一个JedisSlotBasedConnectionHandler,它有一个字段cache,类型为JedisClusterInfoCache。JedisClusterInfoCache缓存了每个主节点对应的连接池nodes,以及每个槽位对应的连接池。

代码语言:javascript
复制
private Map<String, JedisPool> nodes = new HashMap<String, JedisPool>();
private Map<Integer, JedisPool> slots = new HashMap<Integer, JedisPool>();

在JedisClusterInfoCache<init>方法中调用discoverClusterNodesAndSlots方法,获取所有节点和槽位信息。即遍历配置的所有节点,只要有一个节点能连接上就可以获取到集群的槽位信息,获取到槽位配置信息后终止遍历。在连接上一个节点后,发送一条cluster slots命令获取槽位分配信息。

代码语言:javascript
复制
172.24.1.1:6379> cluster slots 
1) 1) (integer) 0 // 槽位开始
   2) (integer) 5460 // 槽位结束
   3) 1) "172.24.1.1"  // 节点1(主节点)的ip
      2) (integer) 6380 // 节点1的端口
      3) "c70e6b2122bd336790d7f8c7bbbc88b59ea95ac1" 
   4) 1) "172.24.1.2" // 节点2的ip
      2) (integer) 6379 // 节点2的端口
      3) "c3d94b9de931247446dea98e8afd2ce5059fa377"
.........

cluster slots返回是一个数组,即Cluster中所有小主从集群的信息,数组每个元素又是一个数组,通过遍历数组拿到每段槽位的主节点信息,并创建一个连接池,在源码中有一句注释。

代码语言:javascript
复制
// at this time, we just use master, discard slave information

“此时,我们只使用master,放弃slave信息”。所以并不会为从节点创建连接池。nodes字段缓存的是所有小集群的主节点的连接池。完全就是弃用从节点了,只有当主节点挂掉,连接池中的连接不可用时,才会刷新nodes,配置的从节点才会用到。

正如例子中的,JedisCluster为槽位0~5460的主节点创建一个连接池JedisPool,而slots则会缓存0到5460slot持有这个JedsiPool的引用。

代码语言:javascript
复制
// 0到5460,可看getAssignedSlotArray方法
for (Integer slot : targetSlots) {
     slots.put(slot, targetPool);
}
代码语言:javascript
复制
private List<Integer> getAssignedSlotArray(List<Object> slotInfo) {
    List<Integer> slotNums = new ArrayList<Integer>();
    for (int slot = ((Long) slotInfo.get(0)).intValue(); slot <= ((Long) slotInfo.get(1))
         .intValue(); slot++) {
       slotNums.add(slot);
    } 
    return slotNums;
}

所以,只要能够获取JedisCluster对象的JedisSlotBasedConnectionHandler字段,再拿到JedisSlotBasedConnectionHandler对象的JedisClusterInfoCache字段,我们就能自己实现Pipeline。

代码语言:javascript
复制
 public class JedisClusterPipeline{
     // 采用反射获取部分字段
     private static final Field FIELD_CONNECTION_HANDLER;
     private static final Field FIELD_CACHE;
     static {
         FIELD_CONNECTION_HANDLER = getField(BinaryJedisCluster.class, "connectionHandler");
         FIELD_CACHE = getField(JedisClusterConnectionHandler.class, "cache");
     }

     private JedisSlotBasedConnectionHandler connectionHandler;
     private JedisClusterInfoCache clusterInfoCache;

     public void setJedisCluster(JedisCluster jedis) {
         connectionHandler = getValue(jedis, FIELD_CONNECTION_HANDLER);
         clusterInfoCache = getValue(connectionHandler, FIELD_CACHE);
     }

     private JedisClusterPipeline() {
     }
     /**
      * 根据jedisCluster实例生成对应的JedisClusterPipeline
      *
      * @param
      * @return
      */
     public static JedisClusterPipeline pipelined(JedisCluster jedisCluster) {
         JedisClusterPipeline pipeline = new JedisClusterPipeline();
         pipeline.setJedisCluster(jedisCluster);
         return pipeline;
     }

}

还需继承PipelineBase以获得Pipeline API的支持,实现Closeable接口close方法做连接释放操作。

代码语言:javascript
复制
public class JedisClusterPipeline extends PipelineBase 
              implements Closeable {
    ......
    // 根据顺序存储每个命令对应的Client
    private Queue<Client> clients = new LinkedList<>();
    // 缓存Pipline持有的连接
    private Map<JedisPool, Jedis> jedisMap = new HashMap<>();
}

继承PipelineBase需要实现getClient方法。能够通过key获取一个连接Jedis(Client)。首先通过CRC16计算出key所在的槽位,再根据槽位获取到一个连接。

代码语言:javascript
复制
  @Override
  protected Client getClient(String key) {
         byte[] bKey = SafeEncoder.encode(key);
         return getClient(bKey);
  }
  
  @Override
  protected Client getClient(byte[] key) {
      Jedis jedis = getJedis(JedisClusterCRC16.getSlot(key));
      Client client = jedis.getClient();
      clients.add(client);
      return client;
  }

根据槽位获取连接就是从JedisClusterInfoCache的slots字段获取槽位对应的连接池,拿到连接池就可以从连接池中获取连接了。

由于使用Pipeline时可能存在多个key落到同一个节点上,所以只需要确保一个节点只从连接池中获取一个连接就可以了,所以使用一个Map(jedisMap)来缓存当前Pipeline持有的Jedis。

代码语言:javascript
复制
   private Jedis getJedis(int slot) {
        // 根据pool从缓存中获取Jedis
        JedisPool pool = clusterInfoCache.getSlotPool(slot);
        Jedis jedis = jedisMap.get(pool);
        if (null == jedis) {
            jedis = pool.getResource();
            jedisMap.put(pool, jedis);
        }
        return jedis;
    }

往Pipeline每写入一条命令,都是往Jedis(Client(Socket))的输出流写入,命令会缓存在输出流缓冲区中,缓冲区满则发送,最后需要调用flush命令将缓冲区剩余数据都传输到远端redis服务器。以输出流为例,看下RedisOutputStream输出流是怎么实现的。

代码语言:javascript
复制
public final class RedisOutputStream extends FilterOutputStream {
  protected final byte[] buf;
  protected int count;
  // out = socket.getOutputStream() 
  // @see: redis.clients.jedis.Connection
  public RedisOutputStream(final OutputStream out) {
    this(out, 8192);
  }
  public RedisOutputStream(final OutputStream out, final int size) {
    super(out);
    if (size <= 0) {
      throw new IllegalArgumentException("Buffer size <= 0");
    }
    buf = new byte[size];
  }
 }

从RedisOutputStream源码可以看出,输出流的缓冲区大小默认为8192字节,接着看下往输出流里面写入命令都做了写什么。

代码语言:javascript
复制
// 将命令字节数组写入到RedisOutputStream
public void write(final byte[] b) throws IOException {
    write(b, 0, b.length);
}  

public void write(final byte[] b, final int off, final int len) throws IOException {
    // 如果字节数组大于缓冲区大小,则先将缓冲区数据写入到输出流,再直接将要写入的数据写入到输出流
    if (len >= buf.length) {
       flushBuffer();
       out.write(b, off, len);
     } else {
       // 如果字节数据大于可写入的缓冲区大小,则将缓冲区数据写入到输出流
       if (len >= buf.length - count) {
         flushBuffer();
       }
       // 再将字节数组写入到缓冲区
       System.arraycopy(b, off, buf, count, len);
       count += len;
    }
}
 // 将缓冲区内容真正写入到输出流
 private void flushBuffer() throws IOException {
    if (count > 0) {
      out.write(buf, 0, count);
      count = 0;
    }
}

所以写入完要执行的全部命令后,需要调用当前Pipeline所持有的所有Clinet的getAll()方法,将Client的输出流缓冲区命令都传输到远端redis执行,并开始从响应的输入流中读取返回结果。

代码语言:javascript
复制
public List<Object> sync() {
        List<Object> responseList = new ArrayList<>();
        try {
            // 遍历获取所有客户端结果
            for (Client client : clients) {
                // 获取所有服务端响应readProtocolWithCheckingBroken【Protocol.read(inputStream);】
                List<Object> unformatted = client.getAll();
                for (Object o : unformatted) {
                    // 从pipelinedResponses队列中弹出一个Response写入数据
                    Response<?> data = generateResponse(o);
                    if (null != responseList) {
                        responseList.add(data.get());
                    }
                }
            }
            return responseList;
        } catch (JedisRedirectionException jre) {
            if (jre instanceof JedisMovedDataException) {
                // 如果发生重定向(槽位重定向),则重建群集的插槽缓存
                refreshCluster();
            }
            throw jre;
        } finally {
            this.close();
       }
}

generateResponse是父类PipelineBase的方法,PipelineBase继承Queable。

使用Pipeline连续的往RedisOutputStream写入命令,每写入一条命令就会返回一个Response对象,同时这个Response被放入一个Queue<Response>队列中,这步一会分析。而这个Response对象就跟Futute功能一样,你可以调用Response的get方法获取返回结果,只是此时调用get会直接抛出JedisDataException异常。

代码语言:javascript
复制
public class Response<T> {
  protected T response = null;
  .....
  private boolean set = false;
  .......
  public void set(Object data) {
    this.data = data;
    set = true;
  }

  public T get() {
    .......
    if (!set) {
      throw new JedisDataException(
          "Please close pipeline or multi block before calling this method.");
    }
   .....
    return response;
  }
}

PipelineBase继承Queable,Queable有一个Queue类型字段pipelinedResponses,所以我们在自己实现的JedisClusterPipeline的sync方法中调用generateResponse方法,就会从队列中弹出一个Response对象并写入结果,此时调用get方法才能获取到返回结果。

代码语言:javascript
复制
public class Queable {
  private Queue<Response<?>> pipelinedResponses = new LinkedList<Response<?>>();
  .......
  // 从Response队列中头部弹出一个Response,并给Response写入结果
  protected Response<?> generateResponse(Object data) {
    Response<?> response = pipelinedResponses.poll();
    if (response != null) {
      response.set(data);
    }
    return response;
  }
  // 根据build new一个Response,并放入队列中
  // builder是用于解析结果的,比如T为Long,则build会将结果字符串解析为Long
  // public static final Builder<Long> LONG = new Builder<Long>() {
  //      public Long build(Object data) {
  //         return (Long)data;
  //     }
  // };
  protected <T> Response<T> getResponse(Builder<T> builder) {
    Response<T> lr = new Response<T>(builder);
    pipelinedResponses.add(lr);
    return lr;
  }
  .........
}

Jedis的Pipeline实现利用了Queue的先入先出特性,按命令的执行顺序响应结果。但这种先写入的命令先响应结果,在Cluster下,就会导致结果与命令不对应。因为多个Client,每个Client都执行一些命令,你无法保证获取结果顺序,在JedisCluster下实现Pipeline最好放弃响应结果。或者忽略响应结果的顺序问题。如果强需求获取命令的对应返回结果,那么此Pipeline并不能满足你。JedisCluster为什么不支持Pipeline是有道理的。

最后是close方法的实现,就是将当前pipeline持有的所有Jedis连接释放回连接池,遍历所有Jedis调用其close方法即可。同时flushCachedData方法是调用jedis.getClient().getAll()获取所有返回结果,其实就是清理Client的响应输入流,避免Jedis被复用时读取到错误的结果。

代码语言:javascript
复制
@Override
public void close() {
     super.clean();
     clients.clear();
     for (Jedis jedis : jedisMap.values()) {
           flushCachedData(jedis);
           jedis.close();
      }
      jedisMap.clear();
}

clean方法是父类Queable的方法,调用清空Response队列。

代码语言:javascript
复制
protected void clean() {
    pipelinedResponses.clear();
}

至此,一个简单的JedisClusterPipeline就完成了。

需要注意的地方

由于Cluster集群模式存在节点的动态添加或删除,且client不能实时感知,所以,建议在批量操作之前调用重新获取一遍集群信息。或是发生失败时再重新获取集群信息,毕竟会改变的概率很小,完全不用为这种小概率买单,前提是能容忍偶然的失败。

应用需要保证不论成功还是失败都会调用所有Jedis的close方法释放连接,且释放连接回连接池之前要清理Client。

在使用hmset这类批量命令时,如果field较多可以分批次写入,避免因命令执行耗时导致的阻塞。这点尤其要重视,我们项目中目前也存在这个问题。Pipeline建议命令总和不超过8192字节的缓冲区大小。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-12-04,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码农沉思录 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
云数据库 Redis
腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档