前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >JedisCluster 通过 Pipeline 实现两套数据轮换更新

JedisCluster 通过 Pipeline 实现两套数据轮换更新

作者头像
绿毛龟
发布2024-01-19 11:22:21
960
发布2024-01-19 11:22:21
举报
文章被收录于专栏:学习道路指南学习道路指南

前言

本文实现了通过定时任务来调用接口,使两套数据轮换更新。

因为要区分两套数据,所以 key 要设置前缀。

例如:一天数据一换,今天查的 A 开头的 key ,明天查 B 开头的 key ,后天又查 A 开头的 key 。今天查完后,明天更新 B 开头的 key ,但是 A 开头的 key 暂时不动,后天再查的时候,A开头的 key 要进行更新,先删再更新。


一、整体流程

1.1 大致流程

  1. 从数据库里查数据。
  2. 更新当前前缀。
  3. 往redis集群更新数据。

1.2 流程代码解释

代码语言:javascript
复制
    @Override
    public R<String> updateCampToJedis() {
        R<String> r = new R<>();
        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMM");
        String currentMonth = dateFormat.format(new Date());
        //1. 从数据库里查数据
        List<UserWideInfo> UserWideInfoList = UserWideInfoMapper.selectFromTable(currentMonth);
        if (UserWideInfoList.size() == 0) {
            r.setCode(R.ERROR_CODE);
            r.setMsg("没有数据");
            return r;
        }
        //2. 更新当前前缀
        updateCurrentPrefixIndex();
        r.setCode(R.SUCCESS_CODE);
        //3. 往redis集群存入数据
        insertToJedis(ZhmsUserWideInfoList);
        return r;
    }

二、从数据库里查数据

2.1 SQL语句

这里因为每个月查询的是不同月份的表,所有用到动态 sql 。

代码语言:javascript
复制
    <select  id="selectFromTable" resultType="com.hopedata.zhmscloud.camp.entity.po.ZhmsUserWideInfo">
        SELECT * FROM USER_WIDE_INFO_M_${SysMonth}
    </select>

三、更新当前前缀

要做到更新当前前缀,需要有两套前缀不同的 key ,还需要一个能区分前缀的前缀索引 currentPrefixIndex 。

3.1 设置前缀常量

用 A 和 B 来区分两组 key 。

代码如下:

代码语言:javascript
复制
    private static final String PREFIX_A = "A";
    private static final String PREFIX_B = "B";

3.2 初始化 currentPrefixIndex

向 redis集群中存入初始的 currentPrefixIndex 。

代码如下:

代码语言:javascript
复制
    @GetMapping("/init")
    public String init() {
        return jedisCluster.set("currentPrefixIndex", "0");
    }

3.3 获取当日前缀

先取出当日的前缀索引 currentPrefixIndex ,与 2 取余数 ,来获取当日的前缀。

代码如下:

代码语言:javascript
复制
     //获取当日前缀
    private String getKeyPrefix() {
        int currentPrefixIndex = Integer.parseInt(jedisCluster.get("currentPrefixIndex"));
        if (currentPrefixIndex % 2 == 0) {
            return PREFIX_A;
        } else {
            return PREFIX_B;
        }
    }

3.4 更新 currentPrefixIndex

每天需要更新前缀索引 currentPrefixIndex ,让 currentPrefixIndex + 1 , 使区分读的数据。

代码如下:

代码语言:javascript
复制
    // 重新设置currentPrefixIndex
    private void updateCurrentPrefixIndex() {
        String currentValue = jedisCluster.get("currentPrefixIndex");
        int newValue = Integer.parseInt(currentValue) + 1;
        jedisCluster.set("currentPrefixIndex", String.valueOf(newValue));
    }

四、往redis集群更新数据

这其实是最重要的一步,因为同时存入大量的数据,所以要使用到 Pipeline 来实现。

4.1 大致流程

  1. 获取到当前前缀,查出相关的 key ,更新数据之前把旧数据删除。
  2. 把新数据解析后更新到 redis 集群。

注意:因为数据量大,为了减少网络性能消耗,删除和更新都要用 Pipeline 来操作。

代码如下:

代码语言:javascript
复制
    private void insertToJedis(List<UserWideInfo> UserWideInfoList) {
        String keyPrefix = getKeyPrefix();
        List<String> keys = new ArrayList<>();
        Map<String, JedisPool> clusterNodes = jedisCluster.getClusterNodes();
        for (JedisPool node : clusterNodes.values()) {
            try (Jedis jedis = node.getResource()) {
                Set<String> nodeKeys = jedis.keys(keyPrefix + "*");
                keys.addAll(nodeKeys);
            }
        }
        Map<JedisPool, List<String>> delKey = assignKey(keys, jedisCluster);
        //先删旧的
        for (JedisPool jedisPool : delKey.keySet()) {
            try (Jedis jedis = jedisPool.getResource()){
                Pipeline pipelined = jedis.pipelined();
                List<String> keysList = delKey.get(jedisPool);
                for (String key : keysList) {
                    pipelined.del(key);
                }
                pipelined.sync();
            }
        }
        List<String> keyList =new ArrayList<>();
        HashMap<String, String> map = new HashMap<>();
        //填充keyList和value
        for (UserWideInfo UserWideInfo : UserWideInfoList) {
            String key = keyPrefix + "_" + UserWideInfo.getBillNo();
            keyList.add(key);
            //构建value
            ...
            ...
            map.put(key, value);
        }
        Map<JedisPool, List<String>> result = assignKey(keyList, jedisCluster);
        for (JedisPool jedisPool : result.keySet()) {
            try (Jedis jedis = jedisPool.getResource()){
                Pipeline pipelined = jedis.pipelined();
                // 获取当前JedisPool对应的键列表
                List<String> keysList = result.get(jedisPool); 
                // 将命令添加到Pipeline中
                for (String key : keysList) {
                    String value = map.get(key);
                    pipelined.set(key, value);
                }
                // 执行Pipeline中的所有命令
                pipelined.sync();
            }
        }
    }

五、JedisCluster 实现 Pipeline 操作

5.1 实现过程

因为 JedisCluster 不支持 Pipeline 操作,所以需要自己来实现。

代码如下:

代码语言:javascript
复制
@Slf4j
public class JedisPipelineUtil {

    /**
     * jedis集群下使用pipeline之前先将key分配管道
     * Map<String, List<String>> 键值为节点ip和端口号 192.168.1.1:6397 value为redis存入的key
     *
     * @param list         存redis的key
     * @param jedisCluster
     * @return
     */
    public static Map<String, List<String>> assignSlot(List<String> list, JedisCluster jedisCluster) {
        Map<String, List<String>> hostPhoneMap = new HashMap<>();
        Map<String, JedisPool> clusterNodes = jedisCluster.getClusterNodes();
        Map.Entry<String, JedisPool> next = clusterNodes.entrySet().iterator().next();
        JedisPool jedisPool = next.getValue();
        Jedis jedis = jedisPool.getResource();
        Map<Integer, String> slots = discoverClusterSlots(jedis);
        for (String s : list) {
            String hostAndPort = slots.get(JedisClusterCRC16.getSlot(s));
            if (hostPhoneMap.containsKey(hostAndPort)) {
                hostPhoneMap.get(hostAndPort).add(s);
            } else {
                List<String> newList = new ArrayList<>();
                newList.add(s);
                hostPhoneMap.put(hostAndPort, newList);
            }
        }
        jedis.close();
        return hostPhoneMap;
    }

    /**
     * jedis集群下使用pipeline之前先将key分配管道
     * Map<JedisPool, List<String>> 键值为节JedisPool value为redis存入的key
     *
     * @param list         存redis的key
     * @param jedisCluster
     * @return
     */
    public static Map<JedisPool, List<String>> assignKey(List<String> list, JedisCluster jedisCluster) {
        Map<JedisPool, List<String>> map = new HashMap<>();
        Map<String, List<String>> var1 = assignSlot(list, jedisCluster);
        Iterator<Map.Entry<String, List<String>>> iterator = var1.entrySet().iterator();
        while (iterator.hasNext()) {
            Map.Entry<String, List<String>> next = iterator.next();
            JedisPool jedisPool = jedisCluster.getClusterNodes().get(next.getKey());
            map.put(jedisPool, next.getValue());
        }
        return map;

    }

    private static Map<Integer, String> discoverClusterSlots(Jedis jedis) {
        Map<Integer, String> slotsMap = new HashMap<>();
        List<Object> slots = jedis.clusterSlots();
        Iterator var3 = slots.iterator();
        while (var3.hasNext()) {
            Object slotInfoObj = var3.next();
            List<Object> slotInfo = (List) slotInfoObj;
            if (slotInfo.size() > 2) {
                List<Integer> slotNums = getAssignedSlotArray(slotInfo);
                List<Object> hostInfos = (List) slotInfo.get(2);
                if (!hostInfos.isEmpty()) {
                    String targetNode = generateHostAndPort(hostInfos);
                    Iterator<Integer> var4 = slotNums.iterator();
                    while (var4.hasNext()) {
                        Integer slot = var4.next();
                        slotsMap.put(slot, targetNode);
                    }
                }
            }
        }
        return slotsMap;
    }

    private static List<Integer> getAssignedSlotArray(List<Object> slotInfo) {
        List<Integer> slotNums = new ArrayList<>();

        for (int slot = ((Long) slotInfo.get(0)).intValue(); slot <= ((Long) slotInfo.get(1)).intValue(); ++slot) {
            slotNums.add(slot);
        }

        return slotNums;
    }

    private static String generateHostAndPort(List<Object> hostInfos) {
        String host = SafeEncoder.encode((byte[]) hostInfos.get(0));
        int port = ((Long) hostInfos.get(1)).intValue();
        return host + ":" + port;
    }
}

使用 assignKey 方法就可以分配管道。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2023-12-19,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 一、整体流程
    • 1.1 大致流程
      • 1.2 流程代码解释
      • 二、从数据库里查数据
        • 2.1 SQL语句
        • 三、更新当前前缀
          • 3.1 设置前缀常量
            • 3.2 初始化 currentPrefixIndex
              • 3.3 获取当日前缀
                • 3.4 更新 currentPrefixIndex
                • 四、往redis集群更新数据
                  • 4.1 大致流程
                  • 五、JedisCluster 实现 Pipeline 操作
                    • 5.1 实现过程
                    领券
                    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档