前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >redis stream中pending数据的处理

redis stream中pending数据的处理

作者头像
跑马溜溜的球
发布2020-12-07 14:36:35
2.4K0
发布2020-12-07 14:36:35
举报
文章被收录于专栏:日积月累1024

1. pending数据的产生

在消费者组模式下,当一个消息被消费者取出,为了解决组内消息读取但处理期间消费者崩溃带来的消息丢失问题,STREAM 设计了 Pending 列表,用于记录读(XREADGROUP)取但并未处理完毕(未ACK)的消息。

2. 对pending数据的几种处理方式

下面的讨论基于几点:

  1. 面向的场景为多个无差别消费者(每个消费者名子相同,功能相同)在同一group下消费任务。
  2. 我们要保证的是,每个任务至多只做一次。
  3. 代码实现是在使用redis stream实现队列服务一文的封装基础上实现的。

2.1 无需处理

如果你的处理逻辑是:

代码语言:javascript
复制
getTask()
delTask()
yourProcessFuc();

即不太关注任务的丢失,此时无需做什么特别处理。但一定记得delTask(),不然pending队列会越积越多,占用大量存储空间。

2.2 从pending中按条件读取,放回原队列

代码语言:javascript
复制
    /*
     * 将pending队列中超时的数据重新放回队列
     * 
     * $idleTime: 超时时间, 毫秒
     * $perPage:每次从pending队列中取的任务数, 之所以分页是为防止队列太长,一下取出内存不够
     *
     * 注意:只能有一个进程执行pendingRestore
     *
     * 优点: consumer不需要做任何改动
     * 缺点: 
     * 先del再add, 成本上不划算,
     * 如果del和add中间断掉任务就丢了
     * 无法保留任务被重复投递的次数,不过如果你的任务只想重做一次,或者不关注此数据那就无所谓了。
     * 
     * return: restore的数量
     * */
    public function pendingRestore($idleTime = 5000, $perPage = 20){
        /**
         * 比较简单粗暴的取pending数据方式
         * 依赖
         * 1.每次从pending取走/删除超时数据
         * 2.id是按时间排序,小id未超时,大id一定未超时
         *
         */
        $restoreNum = 0;
        while(1){
            $thisNum = 0;
            $data = $this->getPending($perPage);

            foreach($data as $one){
                $id = $one[0];
                $duration = $one[2];
                if ($duration > $idleTime){
                    $data = $this->getRange($id, $id);
                    $task = $data[$id];

                    $this->delTask($id);
                    $this->addTask($task);

                    $thisNum++;
                }
            }
            
            $restoreNum += $thisNum;
            
            if ($thisNum < $perPage){
                break;
            }
        }

        return $restoreNum;
    }
    
    /* 从pending队列中取任务
     */
    protected function getPending($count = 1, $start='-', $end='+', $consumer = null){
        if (!$consumer){
            return $this->_mRedis->xPending($this->_mStream, $this->_mGroup, $start, $end, $count);
        }

        return $this->_mRedis->xPending($this->_mStream, $this->_mGroup, $start, $end, $count, $consumer);
    }

    /*
     * 取[$start, $end]范围内的数据, 注意是闭区间
     *
     * $count:条数,null时表示取全部
     * */
    protected function getRange($start = '-', $end = '+', $count = null){
        if(is_null($count)){
            return $this->_mRedis->xRange($this->_mStream, $start, $end);
        }else{
            return $this->_mRedis->xRange($this->_mStream, $start, $end, $count);
        }
    }

2.3 使用claim

将超时任务放入另一个名子的消费者pending队列中,然后从新的消费者历史数据中取出数据并处理。

代码语言:javascript
复制
    /*
     * 另一种恢复超时任务的方法
     * 思路:将超时任务放入newConsumer的pending中,后续可以从newConsume的历史中取出数据并处理
     *
     * 优点:
     * 恢复数据没有重复读,删,插,效率高
     * 任务投递次数会保留在新的pending中 
     *
     * 缺点:
     * consumer需要做改动,至少要改变consumer的名子
     * 只能用单进程从历史数据中读数据,然后处理。
     *
     *
     * $idleTime: 超时时间, 毫秒
     * $newConsumer: 之后处理pending任务的消费者名称
     * $perPage: 每次取pending任务的条数
     *
     * return: 满足条件且成功claim的条数
     * */
    public function pendingClaim($idleTime = 5000, $newConsumer=null, $perPage = 20){
        if (!$newConsumer){
            return false;
        }
    
        $info = $this->getPendingInfo();
        $startID = $info[1];
        $endID = $info[2];
    
        $claimNum = 0;
        /*
         * 使用startid, endid遍历pending列表
         * 因为getpending取的是[startid, endid]
         * 所以边界处的id可能被重复取出,但不影响结果的正确性
         * perPage越大/符合xclaim条件的id越多,重复的可能性越小
         * */
        while($startID != $endID){
            //var_dump([$startID, $endID]);
            $data = $this->getPending($perPage, $startID, $endID, $this->_mConsumer);
        
            foreach($data as $one){
                $ids[] = $one[0];
                $startID = $one[0];
            }
            
            //xClaim会根据条件自动过滤任务
            $res = $this->_mRedis->xClaim($this->_mStream, $this->_mGroup, $newConsumer, $idleTime, $ids, ['JUSTID']);
            
            $thisNum = count($res);   
            $claimNum += $thisNum;
            
            //id是按时间排列,小id未超时,则后面不会超时
            //在所有id都有相同的投递次数的基础上
            if ($thisNum < $perPage){
                break;
            }
        }

        return $claimNum;
    }

使用pendingClaim后,可以使用一个单独进程,通过下面方式获取超时任务并处理。

代码语言:javascript
复制
$config = [
            'server' => '10.10.10.1:6379',
            'stream' => 'balltube',    
            'consumer' => 'pendingProcessor',//pendingClaim中的newConsumer
];

$q = new RedisQueue($this->_config);
$block = 1000;
$num = 1;

while(1){
    $d = $q->getTask($block, $num, 0);
    if (empty($d)){
        break;
    }

    $id = key($d);
    $data = $d[$id];
    $q->delTask($id);
    //处理任务逻辑
    yourTaskProcessFunc($data);
}

3. git代码库

https://github.com/qmhball/redisQueue

  • RedisQueue.php 队列实现
  • RedisQueueTest.php 对应测试

我的博客即将同步至腾讯云+社区,邀请大家一同入驻:https://cloud.tencent.com/developer/support-plan?invite_code=26tnhtub18qsc

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2020/11/12 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. pending数据的产生
  • 2. 对pending数据的几种处理方式
    • 2.1 无需处理
      • 2.2 从pending中按条件读取,放回原队列
        • 2.3 使用claim
        • 3. git代码库
        相关产品与服务
        云数据库 Redis
        腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档