前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >使用Redis Stream来做消息队列和在Asp.Net Core中的实现

使用Redis Stream来做消息队列和在Asp.Net Core中的实现

作者头像
乔达摩@嘿
发布2021-09-23 11:34:13
1.8K0
发布2021-09-23 11:34:13
举报
文章被收录于专栏:嘿dotNet嘿dotNet
Redis - Wikipedia
Redis - Wikipedia

写在前面

我一直以来使用redis的时候,很多低烈度需求(并发要求不是很高)需要用到消息队列的时候,在项目本身已经使用了Redis的情况下都想直接用Redis来做消息队列,而不想引入新的服务,kafka和RabbitMQ等;

奈何这兄弟一直不给力;

虽然 Redis 的Pub/Sub 是实现了发布/订阅的,但这家伙最坑的是:丢数据

由于Pub/Sub 只是简单的实现了发布订阅模式,简单的沟通起生产者和消费者,当接收生产者的数据后并立即推送或者说转发给订阅消费者,并不会做任何的持久化、存储操作。由此:

  1. ​ 消费者(客户端)掉线;
  2. ​ 消费者未订阅(所以使用的时候一定记得先订阅再生产);
  3. ​ 服务端宕机;
  4. ​ 消费者消费不过来,消息堆积(生产数据受数据缓冲区限制);

以上情况都会导致生产数据的丢失,基于上坑,据我所知大家很少使用Pub/Sub ;

不过官方的哨兵集群通信的时候就是用的Pub/Sub;

然后,各路大佬结合队列、阻塞等等实现了各种各样的方案,主要是使用:BLPOP+LPUSH 的实现

这里就不一一展开了,有兴趣请看叶老板文章

可能是各种实现都会带来各种的问题,redis的官方也看到了社区的挣扎。终于,到了Redis5.0,官方带来了消息队列的实现:Stream

Redis Stream介绍

简单来说Redis Stream 就是想用Redis 做消息队列的最佳推荐;

XADD--发布消息

代码语言:javascript
复制
XADD stream1 * name hei age 18
XADD stream1 * name zhangshan age 19 #再发一条
代码语言:javascript
复制
127.0.0.1:6379> XADD stream1 * name hei age 18
"1631628884174-0"
127.0.0.1:6379> XADD stream1 * name zhangshan age 19 
"1631628890025-0"

其中的'*'表示让 Redis 自动生成唯一的消息 ID,格式是 「时间戳-自增序号」

XREAD--订阅消息

订阅消息

代码语言:javascript
复制
XREAD COUNT 5 STREAMS stream1 0-0 
代码语言:javascript
复制
127.0.0.1:6379> XREAD COUNT 5 STREAMS stream1 0-0 
1) 1) "stream1"
   2) 1) 1) "1631628884174-0"
         2) 1) "name"
            2) "hei"
            3) "age"
            4) "18"
      2) 1) "1631628890025-0"
         2) 1) "name"
            2) "zhangshan"
            3) "age"
            4) "19"

'0-0' 表示从开头读取 如果需继续拉取下一条,需传入上一条消息的id

阻塞等待消息

代码语言:javascript
复制
XREAD COUNT 5 BLOCK 50000 STREAMS stream1 1631628890025-0

阻塞等待消息id ‘1631628890025-0’ 后的消息 50000 阻塞时间(毫秒) ‘0’ 表示无限期阻塞

从到这里就可以看出 Pub/Sub多端订阅的最大优点,Stream也是支持的。有的同学很快就发现问题了,这里多端订阅后,没有消息确认ACK机制。

没错,因为现在所有的消费者都是订阅共同的消息,多端订阅,如果某个客户端ACK某条消息后,其他端消费不了,就实现不了多端消费了。

由此,引出 分组:GROUP

GROUP--订阅分组消息(多端订阅)

同样先发布消息

代码语言:javascript
复制
XADD stream1 * name hei age 18
XADD stream1 * name zhangshan age 19 
代码语言:javascript
复制
127.0.0.1:6379> XADD stream1 * name hei age 18
"1631629080208-0"
127.0.0.1:6379> XADD stream1 * name zhangshan age 19 
"1631629084083-0"

XGROUP CREATE 创建分组

创建分组1

代码语言:javascript
复制
XGROUP CREATE stream1 group1 0-0  
代码语言:javascript
复制
127.0.0.1:6379> XGROUP CREATE stream1 group1 0-0  
OK

‘0-0’ 表示从开头读取 '>' 表示读取最新,未被消费过的消息

XREADGROUP--分组读取

分组 group1

代码语言:javascript
复制
XREADGROUP GROUP group1 consumer1 COUNT 5 STREAMS stream1 >  

consumer1 消费者名称, redis服务器会记住第一次使用的消费者名称;

代码语言:javascript
复制
127.0.0.1:6379> XREADGROUP GROUP group1 consumer1 COUNT 5 STREAMS stream1 >  
1) 1) "stream1"
   2) 1) 1) "1631628884174-0"
         2) 1) "name"
            2) "hei"
            3) "age"
            4) "18"
      2) 1) "1631628890025-0"
         2) 1) "name"
            2) "zhangshan"
            3) "age"
            4) "19"
      3) 1) "1631629080208-0"
         2) 1) "name"
            2) "hei"
            3) "age"
            4) "18"
      4) 1) "1631629084083-0"
         2) 1) "name"
            2) "zhangshan"
            3) "age"
            4) "19"
127.0.0.1:6379> XREADGROUP GROUP group1 consumer1 COUNT 5 STREAMS stream1 >  
(nil)

同样 ‘0-0’ 表示从开头读取 '>' 表示读取最新,未被消费过的消息 (可以看到命令执行第二遍已经读不到新消息了)

分组 group2

代码语言:javascript
复制
127.0.0.1:6379> XGROUP CREATE stream1 group2 0-0  
OK
127.0.0.1:6379> XREADGROUP GROUP group2 consumer1 COUNT 5 STREAMS stream1 >  
1) 1) "stream1"
   2) 1) 1) "1631628884174-0"
         2) 1) "name"
            2) "hei"
            3) "age"
            4) "18"
      2) 1) "1631628890025-0"
         2) 1) "name"
            2) "zhangshan"
            3) "age"
            4) "19"
      3) 1) "1631629080208-0"
         2) 1) "name"
            2) "hei"
            3) "age"
            4) "18"
      4) 1) "1631629084083-0"
         2) 1) "name"
            2) "zhangshan"
            3) "age"
            4) "19

可以看到可以读到同样的消息,多端订阅没有问题;

当然分组也支持阻塞读取:

代码语言:javascript
复制
#和XREAD一样
XREAD COUNT 5 BLOCK 0 STREAMS queue 1618469127777-0 

#分组阻塞
XREADGROUP GROUP group2 consumer1 COUNT 5 BLOCK 0 STREAMS stream1 > 

‘0’ 表示无限期阻塞,单位(毫秒)

XPENDING--待处理消息

消息使用XREADGROUP 读取后会进入待处理条目列表(PEL);

我们看看:

代码语言:javascript
复制
 XPENDING stream1 group2
代码语言:javascript
复制
127.0.0.1:6379>  XPENDING stream1 group2
1) (integer) 4
2) "1631628884174-0"
3) "1631629084083-0"
4) 1) 1) "consumer1"
      2) "4"

表示:

  1. (integer) 4 //表示当前消费者组的待处理消息的数量
  2. "1631628884174-0" //消息最大id
  3. "1631629084083-0" //最小id
      1. "consumer1" // 消费者名称
      2. "4" //消费者待处理消息数量

XACK--删除已处理消息(消息确认机制)

我们已经知道group2待处理消息有4条,我们从头读取看看:

代码语言:javascript
复制
XREADGROUP GROUP group2 consumer1 COUNT 5 STREAMS stream1 0-0
代码语言:javascript
复制
127.0.0.1:6379> XREADGROUP GROUP group2 consumer1 COUNT 5 STREAMS stream1 0-0
1) 1) "stream1"
   2) 1) 1) "1631628884174-0"
         2) 1) "name"
            2) "hei"
            3) "age"
            4) "18"
      2) 1) "1631628890025-0"
         2) 1) "name"
            2) "zhangshan"
            3) "age"
            4) "19"
      3) 1) "1631629080208-0"
         2) 1) "name"
            2) "hei"
            3) "age"
            4) "18"
      4) 1) "1631629084083-0"
         2) 1) "name"
            2) "zhangshan"
            3) "age"
            4) "19"

假设最后一条消息 ‘1631629084083-0’ 我已处理完成

代码语言:javascript
复制
127.0.0.1:6379> XACK stream1 group2 1631629084083-0
(integer) 1

再看:

代码语言:javascript
复制
127.0.0.1:6379> XREADGROUP GROUP group2 consumer1 COUNT 5 STREAMS stream1 0-0
1) 1) "stream1"
   2) 1) 1) "1631628884174-0"
         2) 1) "name"
            2) "hei"
            3) "age"
            4) "18"
      2) 1) "1631628890025-0"
         2) 1) "name"
            2) "zhangshan"
            3) "age"
            4) "19"
      3) 1) "1631629080208-0"
         2) 1) "name"
            2) "hei"
            3) "age"
            4) "18"
代码语言:javascript
复制
127.0.0.1:6379>  XPENDING stream1 group2
1) (integer) 3
2) "1631628884174-0"
3) "1631629080208-0"
4) 1) 1) "consumer1"
      2) "3"

可以清楚看到goroup2 待处理消息剩下3条;

这时 Redis 已经把这条消息标记为「处理完成」不再追踪;

Stream在Asp.net Core中的使用

代码语言:javascript
复制
private static string _connstr = "172.16.3.119:6379";
private static string _keyStream = "stream1";
private static string _nameGrourp = "group1";
private static string _nameConsumer = "consumer1";

发布:

代码语言:javascript
复制
csRedis.XAdd(_keyStream, "*", ("name", "message1"));

订阅:

代码语言:javascript
复制
static async Task CsRedisStreamConsumer()
{
    Console.WriteLine("CsRedis StreamConsumer start!");

    var csRedis = new CSRedis.CSRedisClient(_connstr);
    csRedis.XAdd(_keyStream, "*", ("name", "message1"));

    try
    {
        csRedis.XGroupCreate(_keyStream, _nameGrourp);
    }
    catch { }

    (string key, (string id, string[] items)[] data)[] product;
    (string Pid, string Platform, string Time) data = (null, null, null);

    while (true)
    {
        try
        {
            product = csRedis.XReadGroup(_nameGrourp, _nameConsumer, 1, 10000, (_keyStream, ">"));
            if (product?.Length > 0 == true && product[0].data?.Length > 0 == true)
            {
                Console.WriteLine($"message-id:{product.FirstOrDefault().data.FirstOrDefault().id}");

                product.FirstOrDefault().data.FirstOrDefault().items.ToList().ForEach(value =>
                {
                    Console.WriteLine($"    {value}");
                });

                //csRedis.XAck(_keyStream, _nameGrourp, product[0].data[0].id);
            }
        }
        catch (Exception)
        {
            //throw;
        }
    }
}

CSRedisCore

动画2
动画2

这里的超时报错可通过修改连接参数:syncTimeout 解决 CSRedisCore支持阻塞读取;

StackExchange.Redis

发布:

代码语言:javascript
复制
db.StreamAdd(_keyStream, "name", "message1", "*");

订阅:

代码语言:javascript
复制
static async Task StackExchangeRedisStreamConsumer()
{
    Console.WriteLine("StackExchangeRedis StreamConsumer start!");

    var redis = ConnectionMultiplexer.Connect(_connstr);
    var db = redis.GetDatabase();

    try
    {
        ///初始化方式1
        //db.StreamAdd(_keyStream, "name", "message1", "*");
        //db.StreamCreateConsumerGroup(_keyStream, _nameGrourp);

        //方式2
        db.StreamCreateConsumerGroup(_keyStream, _nameGrourp, StreamPosition.NewMessages);
    }
    catch { }

    StreamEntry[] data = null;

    while (true)
    {
        data = db.StreamReadGroup(_keyStream, _nameGrourp, _nameConsumer, ">", count: 1, noAck: true);

        if (data?.Length > 0 == true)
        {
            Console.WriteLine($"message-id:{data.FirstOrDefault().Id}");

            data.FirstOrDefault().Values.ToList().ForEach(c =>
            {
                Console.WriteLine($"    {c.Name}:{c.Value}");
            });

            db.StreamAcknowledge(_keyStream, _nameGrourp, data.FirstOrDefault().Id);
        }
    }
}
动画
动画

StackExchange.Redis 有点比较坑的是不存在阻塞读取;理由:https://stackexchange.github.io/StackExchange.Redis/PipelinesMultiplexers.html#multiplexing

QA

Q:Stream是否支持AOF、RDB持久化?

A:支持,其它数据类型一样,每个写操作,也都会写入到 RDB 和 AOF 中。

Q:Stream是否还是会丢数据?若是,何种情况下?;

A:会;1、AOF是定时写盘的,如果数据还在内存中时redis服务宕机就会;2、主从切换时(从库还未同步完成主库发来的数据,就被提成主库)

总结

技术中有的时候没有“银弹”,只有更适合的技术,汝之蜜糖彼之砒霜;

很多时候的技术选型都是个比较麻烦的东西,对选型人的要求很高;你可能不是只需要熟悉其中的一种路线,而是要踩过各种各样的坑,再根据当前受限的环境,选择比较适合目前需求/团队的;

回到Stream上,我认为目前Stream能满足挺大部分队列需求;

特别是“在项目本身已经使用了Redis的情况下都想直接用Redis来做消息队列,而不想引入新的更专业的mq,比如kafka和RabbitMQ的时候”

当然,最终决定需要用更专业的mq与否的,还是需求;

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2021-09-14 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 写在前面
  • Redis Stream介绍
    • XADD--发布消息
      • XREAD--订阅消息
        • GROUP--订阅分组消息(多端订阅)
          • 同样先发布消息
            • XGROUP CREATE 创建分组
              • XREADGROUP--分组读取
                • XPENDING--待处理消息
                  • XACK--删除已处理消息(消息确认机制)
                  • Stream在Asp.net Core中的使用
                    • CSRedisCore
                      • StackExchange.Redis
                      • QA
                      • 总结
                      相关产品与服务
                      消息队列 CMQ 版
                      消息队列 CMQ 版(TDMQ for CMQ,简称 TDMQ CMQ 版)是一款分布式高可用的消息队列服务,它能够提供可靠的,基于消息的异步通信机制,能够将分布式部署的不同应用(或同一应用的不同组件)中的信息传递,存储在可靠有效的 CMQ 队列中,防止消息丢失。TDMQ CMQ 版支持多进程同时读写,收发互不干扰,无需各应用或组件始终处于运行状态。
                      领券
                      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档