学习
实践
活动
专区
工具
TVP
写文章
专栏首页JAVA开发专栏基于Redis实现特殊的消息队列
原创

基于Redis实现特殊的消息队列

特殊场景的消息队列

消息队列使用比较多的产品kafka,在各个领域都发挥了很大的作用,但是在以下的几种场景是无法满足需求。

场景

  1. 消息重复概率比较高时,需要对重复消息进行合并处理避免浪费有限的资源,减少延迟
  2. 需要根据业务自定义的优先级进行消息处理,高优先级的消息比低优先级的消息先处理
  3. 消息需要定时消费场景,消息只有在设定的消费时间到了之后立马被消费

RMQ(Redis message queue,RMQ)

功能:

RMQ设计为一个第三方库,可以帮助用户基于Redis快速实现消息队列的功能,RMQ消息队列具有消息合并、区分优先级、支持定时消息等特性。RMQ消息队列可以用于异步解耦、削峰填谷,支持千万级别的数据堆积。

RMQ消息队列目前支持三种类型的消息,分别是:RangeMergeMessage(区间重复合并消息)、PriorityMessage(优先级消息)、FixedTimeMessage(任意定时消息)

区间重复合并消息

RangeMergeMessage支持区间重复消息合并,发送消息时需要设置时间区间,消息延迟该时间长度后被消费,在该时间区间内如果发送重复的消息,重复消息将会被合并。如果消息在Redis服务端发生堆积,重复到来的消息依然会被合并处理。该类型消息适用于消息重复率较高并且希望消息合并并处理的场景,对重复消息进行合并可以减少下游消费系统的压力,减少不必要的资源消耗,将有限的资源最大化的利用,提升消费效率。

优先级消息

PriorityMessage支持给消息设置任意等级的优先级,优先级高的消息会被优先消费,相同的优先级的消息被随机消费。如果消息在Redis服务端发生堆积,重复的消息将被合并处理,合并后的消息优先级等于最后存储的消息的优先级。该类型消息适用于希望重复消息合并处理并且需要设置优先级的场景,下游消费者资源有限时候,合并重复消息并且优先级高的消息将可以合理利用有限的资源。

任意定时消息

FixedTimeMessage支持给消息设置任意消费时间,只有消费时间到了之后消息才被消费,消费时间可以精确到秒。消息到期后没有及时被消费时,消费者将按照时间由远到近进行消费。如果消息在Redis服务端发送堆积,重复的消息将被合并处理,合并后消息的消费时间等于最后存储的消息的消费时间,该类型消息适用于希望重复消息合并处理且需要定时消费的场景,定时消息应用场景非常丰富,比如定时打标去标、活动结束后清理动作、定时超时关闭等。

并发消息控制

使用传统消息中间件进行集群消费的时候,为了避免并发处理同一元数据导致不一致的问题,通常需要对元数据加分布式锁,频繁的锁冲突会导致消费效率低下,加分布式锁的最终目的其实就是保障属于同一元数据的消息被串行消费。加分布式锁并不是最好的方案,最好的方案是从根本上解决并发问题,让属于同一元数据的消息串行消费。RMQ 消息队列具有并发消费控制能力,属于同一元数据的消息只会被分配给全局唯 一一个线程进行消费,因此属于同一元数据的消息将被串行消费。使用方如果需要 该能力,除了需要提供 Redis,还需要提供 ZooKeeper。

重试次数控制

RMQ 消息队列支持失败重试消费 16 次,业务返回消费失败后,消息会被回滚并等待重试消费,重试 16 次后消息进入死信队列,消息不再被消费,除非人工干预。

RMQ实现的原理

RMQ消息队列由三部分组成,分别是ZooKeeper、RMQ二方库、Redis

Zookeeper

Zookeeper负责维护集群worker的信息,将topic的所有slot分配给全局的woker;

Redis

Redis负责存储消息,采集Sorted Set结构存储,Store Queue是消息队列,Prepare Queue是采用二阶段消费的方式正在消费消息队列中的信息,Dead Queue是死的队列信息

RMQ

  1. RmqClient负责RMQ的启动工作,包括上报TopicDef、Worker给Zookeeper,分配给Slot的Worker扫描业务定义的MessageListerBean
  2. Producer负责根据不用消息类型将消息按照指定的方式存储到Redis中
  3. Consumer负责根据不用消息类型按照指定方式从Redis弹出的消息并调用业务的MessageListener。

消息的存储

Topic的设计原理

Topic的定义有三个部分组成,topic表示主题名称、slotAmount表示消息存储划分的槽的数量,topicType表示消息的类型

主题名称时一个Topic的唯一标示,相同主题名称的Topic的SlotAmount和topicType一定是一样的消息存储采用Redis的Sorted Set结构表示,为了支持大量的消息堆积的情况,需要把消息分散存储到很多个槽中,SlotAmount表示该Topic消息存储共同使用的槽数量,槽数量一定需要是2的n次方的幂。在消息存储的时候,采用制定数据或者消息体的哈希求余数得到槽的位置

SoreQueue的设计原理

上图中的topic划分了8个槽为,编号是0--7,如果发送指定了消息的slotBasis,则计算slotBasis的CRC32的值,CRC32值对槽数量进行取模得到对应的槽序号,SlotKey设计为#{topic}_#{index}也就是Redis的键,其中#{}表示占位符的处理。

发送方需要保证相同内容的消息的SlotBasis相同,如果没有制定SlotBasis则采用内容计算SlotKey,这样内容相同的消息就会落在同一个Sorted Set里面,所以内容相同的消息会进行合并的处理。

Redis的Sorted Set中的数据按照分数排序,实现不同类型的消息的关键就在于如何利用分数,如何增加消息到Sorted Set、如何从Sorted Set中获取数据消息。

  1. 优先级消息将优先级作为分数,消费时每次弹出分数最大的消息;
  2. 任意定时消息将时间戳作为分数,消费时每次弹出分数大于当前时间戳的一个 消息;
  3. 区间重复合并消息将时间戳作为分数,添加消息时将(当前时间戳+时间区间)作为分数,消费时每次弹出分数大于当前时间戳的一个消息

原创声明,本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

关注作者,阅读全部精彩内容
登录 后参与评论
0 条评论

相关文章

  • 【BCVP】实现基于 Redis 的消息队列

    如果自己学不动了,或者感觉没有动力的时候,看看书,听听音乐,跑跑步,休息两天,重新出发,偷懒虽好,可不要贪杯。

    老张的哲学
  • [视频教程] 基于redis的消息队列实现与思考

    使用redis的list列表来实现消息队列功能,相信大家都听过消息队列,但是在业务中可能并没有真正去使用它。在公司项目中正好有个场景使用到了消息队列,因此就来说...

    唯一Chat
  • php基于Redis消息队列实现的消息推送的办法

    以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持网站事(zalou.cn)。

    砸漏
  • php基于Redis消息队列实现的消息推送的方法

    砸漏
  • redis实现消息队列

    消息队列一般都会想到kafka,rabbitmq,Rockermq, 其实,给你印像做缓存的Redis也是能做消息队列.

    星痕
  • redis实现消息队列

    Redis 怎么做消息队列? - Kaito的回答 - 知乎 https://www.zhihu.com/question/20795043/answer/19...

    opencode
  • 基于 Redis 消息队列实现文件上传的异步存储

    本来准备给 Redis 实战入门篇做个收尾了,不过想起来 Laravel 进阶组件部分还剩下文件存储、邮件和通知这几个功能没有介绍,不如索性一并介绍下,因为它们...

    学院君
  • 基于 Redis 消息队列实现邮件通知的异步发送

    由于发送邮件、短信之类的操作通常涉及到第三方服务的调用,所以也是个响应时间不确定的耗时操作,如果放到处理用户请求进程中同步处理,需要等待很长时间才能获取响应结果...

    学院君
  • 基于Redis实现分布式消息队列(一)

    1、为什么需要消息队列? 当系统中出现“生产“和“消费“的速度或稳定性等因素不一致的时候,就需要消息队列,作为抽象层,弥合双方的差异。

    后端技术探索
  • 基于Redis实现分布式消息队列(二)

    1、访问Redis的工具类 public class RedisManager {

    后端技术探索
  • Redis实现消息队列 转

    打开浏览器,输入地址,按下回车,打开了页面。于是一个HTTP请求(request)就由客户端发送到服务器,服务器处理请求,返回响应(response)内容。

    wuweixiang
  • 利用Redis实现消息队列

    消息队列(message queue) 可以分为两部分,即消息(message)与队列(queue),它是分布式系统中重要的组件,其通用的使用场景可以简单地描述...

    xcsoft
  • Redis实现简单消息队列

    打开浏览器,输入地址,按下回车,打开了页面。于是一个HTTP请求(request)就由客户端发送到服务器,服务器处理请求,返回响应(response)内容。

    周小董
  • [记录点滴]Redis实现简单消息队列

    本文提出了一种用Redis实现简单消息队列的方案,适合在资源不足的条件下临时使用。

    罗西的思考
  • SpringBoot 整合 Redis 实现消息队列

    异步处理,应用解耦(拆分多系统),流量削峰(秒杀活动、请求量过大)和消息通讯(发布公告、日志)四个场景。

    宁在春
  • Redis 使用 List 实现消息队列的利与弊

    分布式系统中必备的一个中间件就是消息队列,通过消息队列我们能对服务间进行异步解耦、流量消峰、实现最终一致性。

    码哥字节
  • Redis实现消息队列的4种方案

    Redis作为内存中的数据结构存储,常用作数据库、缓存和消息代理。它支持数据结构,如 字符串,散列,列表,集合,带有范围查询的排序集(sorted sets),...

    allsmallpig
  • 基于Redis实现消息队列的6种方案之方案简述(中)

    在《基于Redis实现消息队列的6种方案之方案简述(上)》中我们讲到了基于List类型实现的消息队列,今天我们来讲下优先队列的实现。

    不太灵光的程序员

扫码关注腾讯云开发者

领取腾讯云代金券