前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >使用redis stream实现队列服务

使用redis stream实现队列服务

作者头像
跑马溜溜的球
发布2020-12-07 14:36:48
6520
发布2020-12-07 14:36:48
举报
文章被收录于专栏:日积月累1024

1. stream类型

Redis5.0引入了Stream类型。该Stream类型的出现,几乎满足了消息队列具备的全部内容,包括但不限于:

  • 消息ID的序列化生成
  • 消息遍历
  • 消息的阻塞和非阻塞读取
  • 消息的分组消费
  • 未完成消息的处理

关于stream的具体介绍可以参见:

2. 队列接口简介

我们基于redis stream实现了一个基础的,类似beanstalk的队列服务。用于多个无差别的消费者从一个队列消费任务的情况。如果您对stream有所了解,那么我们其实是使用了stream+group当作了beanstalk的tube。

提供最基础的功能:

  • addTask:添加任务。
代码语言:javascript
复制
/*                                                         
 * 向流中添加任务                                          
 * $data: 数组形式的任务数据                                            
 * return: 任务id                                          
 */    
addTask(array $data){}
  • getTask:获取任务。
代码语言:javascript
复制
 /*                                                         
  * 获取任务                                       
  * $block:阻塞时间,毫秒. null不阻塞                       
  * $count:读取条数, 只要有数据,条数不够也会立刻返回,即使设置了block。
  * $start:'>'表示接受最新数据. 若设置id,则读取大于该id,且未被确认(ack)的历史任务
  * 普通使用时,只要设置$block即可。
  * 
  * return [
  *      'id1' => taskdata1,                                
  *      'id2' => taskdata2,                                
  *      ... ...                                            
  * ]
  *
  * 无数据返回[]                                            
  */    
getTask($block=null, $count = 1, $start = '>'){}
  • delTask:删除任务
代码语言:javascript
复制
/*
 * 根据id确认任务完成并从stream中删除该任务
 * $ids: 可以是单条taskid,也可以是数组形式的多条id
 *
 * 
 * 该方法其实完成了两个动作
 * ack:确认任务完成
 * del:stream中删除任务
 * 所以返回值中包括两个值,第一个为ack是否成功,第二个为del是否成功
 */
delTask($ids){}

3. 代码实现

代码语言:javascript
复制
<?php
/*
 * 需要redis-server5.0以上 
 * php-redis扩展版本要适配redis-5.0
 * 
 * 使用redis stream仿照beanstalk封装的队列服务
 */
class RedisQueue{
    protected $_mRedis = null;
    protected $_mStream = '';
    protected $_mGroup = '';
    protected $_mConsumer = '';

    //默认0 不限制长度
    protected $_mMaxLength = 0;

    /* 
     * 创建队列, stream+group确认唯一队列
     * $config必须包括:
     * stream: stream名
     * server: 格式ip:port[:auth]
     * 
     * 可选参数:
     * maxLength:队列最大长度
     * group:分组名, 默认与stream相同. stream+group相当于beanstalk的tube
     * consumer:消费者名, 默认与stream相同. 
     * */
    public function __construct(array $config){
        if(!isset($config['stream'])){
            throw new Exception("you must config the stream");
        }

        $this->_mStream = $config['stream'];

        if(!isset($config['server'])){
            throw new Exception("you must config the server");
        }

        $tmp = explode(':', $config['server']);
        $host = $tmp[0];
        $port = $tmp[1];
        $auth = $tmp[2] ?? null;

        if ($host && $port){
            $this->_mRedis = new Redis();
            $this->_mRedis->connect($host,$port,1);
            if($auth){
                $this->_mRedis->auth($auth);
            }
        }
        else{
            throw new Exception("can not get redis server conf");
        }

        if(isset($config['maxLength'])){
            $this->_mMaxLength = $config['maxLength'];
        }

        $this->_mGroup = $config['group'] ?? $config['stream'];       
        $this->_mConsumer = $config['consumer'] ?? $config['stream'];

        $this->creatGroup();
    }

    /*
     * 删除当前流(队列)
     * */
    public function destoryStream(){
        $this->_mRedis->del($this->_mStream);
    }

    /*
     * 向流中添加任务
     * $data: array
     * return: taskid
     * */
    public function addTask(array $data){
        return $this->_mRedis->xAdd($this->_mStream, "*", $data , $this->_mMaxLength);
    }

    /*
     * 从group中获取任务
     * $block:阻塞时间,毫秒. null不阻塞
     * $count:读取条数, 只要有数据,条数不够也会立刻返回,即使设置了block
     * $start:'>'接受最新数据. 若设置id,则读取大于该id,且未被ack的历史任务
     *
     * return [
     *      'id1' => taskdata1,
     *      'id2' => taskdata2,
     *      ... ...
     * ]
     *
     * 无数据返回[]
     * */
    public function getTask($block=null, $count = 1, $start = '>'){
        $d = $this->_mRedis->xReadGroup($this->_mGroup, $this->_mConsumer, [$this->_mStream => $start], $count, $block);

        if (is_array($d) && count($d) > 0){
            return $d[$this->_mStream];
        }

        return $d;
    }
    
    /*
     * ack任务--从pending中删除
     * 同时从stream中删除
     */
    public function delTask($ids){
        if(!is_array($ids)){
            $ids = array($ids);
        }
        $multi = $this->_mRedis->multi(Redis::PIPELINE);

        $multi->xAck($this->_mStream, $this->_mGroup, $ids);
        $multi->xDel($this->_mStream, $ids);        
        $res = $this->_mRedis->exec();        
        return $res;
    }

    protected function creatGroup($startID = 0){
        return $this->_mRedis->xGroup('CREATE', $this->_mStream, $this->_mGroup, $startID, true);
    }
}

4. 使用示例

代码语言:javascript
复制
$config = [
        'server' => '10.10.10.1:6379:auth',
        'stream' => 'balltube', 
        'consumer' => 'normalprocessor'//可以不设置
    ];

//创建队列
$q = new RedisQueue($config);

//添加任务
$task = ['task'=>1];
$q->addTask($task);

//获取
$timeout = 1000;
$task = $q->getTask($timeout);

//确认并删除
$taskid = key($task);
$q->delTask($taskid);

5. 对于pending任务的处理

当任务被取出且未被确认时,该任务处理pending状态。beanstalk中,对于这种任务可以设置一个超时时间timeout,当任务超过timeout未被确认,该任务会被还回队列中。对于stream,应该如何处理这种任务呢?请参见:

《redis stream中pending数据的处理》

6. beanstalk与redis的stream队列性能对比

6.1 测试环境

  • 队列所在机器配置:4CPU, 6G内存。redis开启aof,每一秒钟持久化一次。
  • 压测机:8CPU,24G。

6.2 测试结果

在任务大小为1k和10k的时候,开启不同个数的进程进行10000次读/写操作,测试结果如下:

任务大小为1k

进程数

10

20

50

redis万次读

1.64928s

0.864051s

0.542352s

beanstalk万次读

1.702436s

0.915132s

0.503198s

redis万次写

3.328083s

1.714555s

0.837429s

beanstalk次写

3.402431s

1.702654s

0.9317s

任务大小为10k

进程数

10

20

50

redis万次读

1.962591s

1.569581s

1.001159s

beanstalk万次读

3.30333s

1.72248s

0.940097s

redis万次写

3.360724s

1.77125s

0.921126s

beanstalk次写

3.418932s

1.766198s

0.823796s

7. redis stream队列与beanstalk队列整体比较

stream

beanstalk

主从

支持

不支持

性能

相当

相当

任务持久化

支持

支持

任务优先级

不支持

支持

任务延迟

不支持

支持

超时任务

额外处理

自动

批量任务读写

支持

不支持

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2020/11/12 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. stream类型
  • 2. 队列接口简介
  • 3. 代码实现
  • 4. 使用示例
  • 5. 对于pending任务的处理
  • 6. beanstalk与redis的stream队列性能对比
    • 6.1 测试环境
      • 6.2 测试结果
        • 任务大小为1k
        • 任务大小为10k
    • 7. redis stream队列与beanstalk队列整体比较
    相关产品与服务
    云数据库 Redis
    腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档