首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

PubSub:有没有可能为迟到的订阅者获取最后一条消息?

PubSub(Publish-Subscribe)是一种消息传递模式,用于在分布式系统中实现异步通信。在PubSub模式中,消息发布者(Publisher)将消息发送到一个或多个主题(Topic),而订阅者(Subscriber)则通过订阅这些主题来接收消息。

对于迟到的订阅者,有一种可能的解决方案是使用持久化订阅(Durable Subscription)。持久化订阅可以确保即使订阅者在消息发布之后才加入,也能接收到最后一条消息。

持久化订阅的工作原理是,当订阅者订阅一个主题时,系统会为其创建一个订阅标识,并将该标识与订阅者关联。当有新的消息发布到该主题时,系统会将消息发送给所有订阅者,包括持久化订阅者和非持久化订阅者。对于持久化订阅者,系统会将消息存储在持久化存储中,直到订阅者接收到消息为止。

通过持久化订阅,迟到的订阅者可以获取到最后一条消息。这对于一些实时性要求不高,但需要完整消息历史的场景非常有用,例如日志记录、事件溯源等。

腾讯云提供了一款适用于PubSub模式的产品,即消息队列 CMQ(Cloud Message Queue)。CMQ支持持久化订阅,可以满足迟到订阅者获取最后一条消息的需求。您可以通过以下链接了解更多关于腾讯云消息队列 CMQ的信息:消息队列 CMQ产品介绍

页面内容是否对你有帮助?
有帮助
没帮助

相关·内容

Redis(8)——发布订阅与Stream

订阅模式原理 正如我们上面说到了,当发送一条消息到 wmyskxz.chat 这个频道时,Redis 不仅仅会发送到当前的频道,还会发送到匹配于当前模式的所有频道,实际上,pubsub_patterns...: PubSub 的生产者传递过来一个消息,Redis 会直接找到相应的消费者传递过去。...同一个消费者组内的消费者共享所有的 Stream 信息,同一条消息只会有一个消费者消费到,这样就可以应用在分布式的应用场景中来保证消息的唯一性。...pending_ids:每个消费者内部都有的一个状态变量,用来表示 已经 被客户端 获取,但是 还没有 ack 的消息。...下次继续调用 xread 时,将上次返回的最后一个消息 ID 作为参数传递进去,就可以继续消费后续的消息。

1.4K30

Redis 中使用 list,streams,pubsub 几种方式实现消息队列

使用 Redis 实现消息队列 普通的订阅 基于模式(pattern)的发布/订阅 看下源码实现 分析下源码实现 stream 的结构 streamCG 消费者组 streamConsumer 消费者结构...1、消息如何防止丢失; 2、消息的重复发送如何处理; 3、消息的顺序性问题; 关于 mq 中如何处理这几个问题,可参看RabbitMQ,RocketMQ,Kafka 事务性,消息丢失,消息顺序性和消息重复发送的处理策略...如果客户端从队列中拿到一条消息时,但是还没消费,客户端宕机了,这条消息就对应丢失了, Redis 中为了避免这种情况的出现,提供了 BRPOPLPUSH 命令,BRPOPLPUSH 会在消费一条消息的时候...◆基于 Streams 的消息队列 Streams 是 Redis 专门为消息队列设计的数据类型。 是可持久化的,可以保证数据不丢失。 支持消息的多播、分组消费。 支持消息的有序性。...◆发布订阅 Redis 发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息。

1.2K40
  • Dapr 入门教程之发布订阅

    前面我们了解了如果在 Dapr 下面进行服务调用,以及最简单的状态管理,本节我们来了解如何启用 Dapr 的发布/订阅模式,发布者将生成特定主题的消息,而订阅者将监听特定主题的信息。...使用发布服务,开发人员可以重复发布消息到一个主题上。 Pub/sub 组件对这些消息进行排队处理。 该主题订阅者将从队列中获取到消息并处理他们。...接下来我们使用的这个示例包含一个发布者: React 前端消息生成器 包含另外 3 个消息订阅者: Node.js 订阅者 Python 订阅者 C# 订阅者 Dapr 使用可插拔的消息总线来支持发布-...Dapr 消息订阅发布服务 注意,Node 订阅者接收类型为 A 和 B 的消息,而 Python 订阅者接收类型为 A和 C 的消息,所以注意每个控制台窗口的日志显示。...) 同样的方式,这是告诉 Dapr 要订阅 pubsub 组件的哪些主题,这里我们订阅的组件名为 pubsub 的,主题为 A 和 C,这些主题的消息通过其他两个路由进行处理: @app.route(

    1.6K40

    Go 每日一库之 watermill

    例如,message-bus将消息发送到订阅者管道之后就不管了,这样如果订阅者处理压力较大,会在管道中堆积太多消息,一旦订阅者异常退出,这些消息将会全部丢失!...另外,message-bus不负责保存消息,如果订阅者后启动,之前发布的消息,这个订阅者是无法收到的。这些问题,我们将要介绍的watermill都能解决!...watermill是 Go 语言的一个异步消息解决方案,它支持消息重传、保存消息,后启动的订阅者也能收到前面发布的消息。...而且上面的例子中,每个消息处理结束需要手动调用Ack()方法,消息管理器才会下发后面一条信息,很容易遗忘。还有些时候,我们有这样的需求,处理完某个消息后,重新发布另外一些消息。...这些功能都是比较通用的,为此watermill提供了路由(Router)功能。直接拿来官网的图: ? 路由其实管理多个订阅者,每个订阅者在一个独立的goroutine中运行,彼此互不干扰。

    1.1K20

    硬核 | Redis PubSub 发布订阅与宅男有什么关系?

    主要包含三个部分组成:「发布者」、「订阅者」、「Channel」。 发布者和订阅者属于客户端,Channel 是 Redis 服务端,发布者将消息发布到频道,订阅这个频道的订阅者则收到消息。...通过频道(Channel)实现 三步走: 订阅者订阅频道; 发布者向「频道」发布消息; 所有订阅「频道」的订阅者收到消息。...比如,65 哥 订阅了「smile.girls.Tina」频道和「smile.girls.*」模式,那么当 Tina 发布动态到频道的时候,65 哥会收到两条票消息,一条消息类型是message,一条类型是...当消息发布到频道的时候,遍历字典获取所有客户端并把消息发送到频道的客户端。...也不支持 ACK 机制,所以当前业务不能容忍这些缺点,那就使用专业的消息队列,如果能容忍那就能享受 Redis 唯快不破的优势。 最后,可以在评论区叫我一声「靓仔」么?

    87510

    Redis还可以做哪些事?

    beijing tianjin km 最后面的km表示距离单位是公里,支持的单位有以下几个: m,米 km,千米 mi,英里 ft,尺 获取附近的位置有两个命令,georadius根据经纬度获取,georadiusbymember...在上一篇文章中讲到了可以使用list和zset来实现消息队列,但是上面实现的消息队列是点对点模式,也就是一条消息只能由一个消费者来消费。...除此之外,redis还支持发布订阅模式,即一个消息由所有订阅者消费,比如广播、公告等等,发布一条公告后,所有关注了我的用户都可以收到这条公告。...发布消息 发布到信道channel:message一条消息,消息内容为hi pulish channel:message hi 订阅信道 订阅者可以订阅一个或多个信道,比如订阅channel:message...:message订阅个数 pubsub numsub channel:message redis的发布订阅模式和专业的消息中间件相比,略显粗糙,但是实现起来非常简单,学习成本较低。

    49610

    聊一聊观察者模式

    在查完资料之后,得出如下结论(三本书中都有提到),观察者模式的另外一种名称叫做发布订阅者模式(本文中的观察者模式和订阅者模式指的是一个东西)。...那有没有另外一种代码的书写方式,增加dog,但是不修改thief的代码,同样达到上面的效果呢?...观察者模式也可以叫做订阅发布模式,本质是一种消息机制,用这种机制我们可以解耦代码中对象互相调用。 第三版代码,我们可以用如下图示来理解: ?...我们在前端应用中使用的redux和vuex都运用了观察者模式,或者叫做订阅者模式,其运行原理也如上图。...5、根据promise的特性我们用promise改写了pubsub的代码,用promise的then来存储观察者的行为,用这个promsie的resolve来实现public,这里面我们演示了如何获取promise.then

    50730

    Redis:20---常用功能之(发布与订阅)

    一、发布与订阅概述 Redis提供了基于“发布/订阅”模式的消息机制,此种模式下,消息发布者和订阅者不进行直接通信,发布者客户端向指定的频道(channel)发布消息,订阅该频道的每个客户端都可以收到该消息...,会将消息message发送给频道channel 例如:下面操作会向channel:sports频道发布一条消息“Tim won the championship”,返回结果为订阅者个数,因为此时没有订阅...右侧客户端向itformation频道发送一条消息,左侧客户端可以收到这条消息 ?...五、查询订阅 ①查询活跃的频道 pubsub channels [pattern] 所谓活跃的频道是指当前频道至少有一个订阅者,其中[pattern]是可以指定具体的模式 例如,下面左侧客户端订阅一个名为...“it_redis”的频道,右侧使用pubsub查询,结果会显示出来 ?

    58630

    Redis 客户端服务端交互1 客户端服务端协议

    发布/订阅模式下,往 channel 订阅者推送的消息,采用array 类型数据。 请求/响应模式 对于之前提到的数据结构,其基本操作都是通过请求/响应模式完成的。...(2)交互方向 发布者和Redis 服务端的交互模式仍为 请求/响应模式; 服务器向订阅者推送数据; 时序:推送发生在服务器接收到发布消息之后。...当producer-1 向channel abc 发送消息时,除了abc 之外,pattern channel *bc 也会收到消息,然后再推送给分别的订阅者。...image.gif pubsub_channels map 维护普通channel和订阅者的关系:key 是channel的名字,value是所有订阅者 client 的链表; pubsub_patterns...每当发布者向某个channel publish 一条消息时,redis 首先会从pubsub_channels 中找到对应的value,向它的所有Client发送消息;同时遍历pubsub_patterns

    1.9K20

    Redis发布订阅:我想着应该是全网讲解最简单最通俗的文章了吧!

    ;redis订阅发布:生产者生产完消息通过频道分发消息给订阅该频道的消费者,这样就可以较少队列数据的积攒,导致内存暴增。...注意点:结果集返回是接收到message的订阅者数量,没有订阅者返回0。 pubsub指令:pubsub channels [argument [argument ...]]...Snipaste_2021-05-04_13-36-32.png 订阅频道发消息截图 //获取指定频道的订阅的客户端数量 127.0.0.1:6379> PUBSUB numsub mumu_1 mumu...Publish.php 0000000000 注意事项 1、订阅的消费者需要一直执行,阻塞获取消息,如果断开则表示退订了。...redis发布订阅的优缺点 小伙伴们从上面的实践操作来看,PubSub生产的消息,如果没有对应的频道或者消费者,消息会被丢弃,直接投递失败返回0状态。

    1.5K00

    Redis:发布订阅(pubsub)的实现原理及避坑场景

    ---- 简介 ---- Redis 发布订阅 (pub/sub) 是一种消息通信模式:发送者 (pub) 发送消息到频道(channel),订阅者 (sub) 从频道(channel)接收消息。...首先将键值对:频道名字 -> null 保存到client的哈希字典pubsub_channels中,以支持方便获取此client所订阅的所有频道信息的命令(对应代码行234)。...如果找到以此键值为当前频道名字对应的client链表(对应代码行244)则获取当前键值对的value值。最后将当前订阅此频道的client添加到链表尾部(对应代码行246)。...发布消息的流程 以频道名 renzhikeji为例: 发布消息命令的处理函数为:publishCommand(pubsub.c文件) (来源:Redis-7.0.5: pubsub.c -->...(订阅关系)中,寻找此频道的所有订阅者,将此频道发布的消息写入所有对应订阅者client的对应的响应缓存中。

    7.9K30

    JavaScript设计模式与开发实践 - 观察者模式

    概述 观察者模式又叫发布 - 订阅模式(Publish/Subscribe),它定义了一种一对多的关系,让多个观察者对象同时监听某一个目标对象(为了方便理解,以下将观察者对象叫做订阅者,将目标对象叫做发布者...发布者的状态发生变化时就会通知所有的订阅者,使得它们能够自动更新自己。...用观察者模式重写之后,对用户信息感兴趣的业务模块将自行订阅登录成功的消息事件。...,自动通知所有已经订阅过的对象; 页面载入后发布者很容易与订阅者存在一种动态关联,增加了灵活性; 发布者与订阅者之间的抽象耦合关系能够单独扩展以及重用。...缺点 创建订阅者本身要消耗一定的时间和内存,而且当你订阅一个消息后,也许此消息最后都未发生,但这个订阅者会始终存在于内存中; 虽然可以弱化对象之间的联系,但如果过度使用的话,对象和对象之间的必要联系也将被深埋在背后

    78070

    Redis系列(十七)独立功能之pubsub

    当 Redis 接受到发布消息的请求之后,需要将消息发给所有的可能匹配的客户端,也就是渠道订阅者和模式订阅者都需要发送。...渠道订阅: 根据发送消息的渠道,从渠道订阅者的字典中取到对应的值,然后遍历链表,当消息发送给所有订阅的客户端。...pubsub 模块最大的缺点就是它不支持消息的持久化,也就是说,必须双方同时在线,这在业务系统中是很难绝对保证的。 PubSub 的生产者传递过来一个消息,Redis 会直接找到相应的消费者传递过去。...它的分布式锁在竞争锁失败时,会自动订阅一个渠道,而在锁释放的时候,也会发布解锁信息,通知所有的竞争方来重新获取锁。...之后简单介绍了 Redis 内部实现此模块的一些原理,最后向大家安利了一下Stream这个轻量级的消息队列。一定要去用一下试试看噢~.

    1.6K20

    把酒言欢话聊天,基于Vue3.0+Tornado6.1+Redis发布订阅(pubsub)模式打造异步非阻塞(aioredis)实时(websocket)通信聊天系统

    (listener)负责订阅频道(channel);发送者(publisher)负责向频道(channel)发送二进制的字符串消息,然后频道收到消息时,推送给订阅者。    ...频道不仅可以联系发布者和订阅者,同时,也可以利用频道进行“消息隔离”,即不同频道的消息只会给订阅该频道的用户进行推送:     根据发布者订阅者逻辑,改写main.py: import tornado.httpserver...由前端控制websocket链接用户选择将消息发布到那个频道上,同时每个用户通过前端cookie的设置具备频道属性,当具备频道属性的用户对该频道发布了一条消息之后,所有其他具备该频道属性的用户通过redis...结语:实践操作来看,Redis发布订阅模式,非常契合这种实时(websocket)通信聊天系统的场景,但是发布的消息如果没有对应的频道或者消费者,消息则会被丢弃,假如我们在生产环境在消费的时候,突然断网...,导致其中一个订阅者挂掉了一段时间,那么当它重新连接上的时候,中间这一段时间产生的消息也将不会存在,所以如果想要保证系统的健壮性,还需要其他服务来设计高可用的实时存储方案,不过那就是另外一个故事了,最后奉上项目地址

    1.9K10

    最通俗易懂的Redis发布订阅及代码实战

    在这种机制下,消息发布者向指定频道(channel)发布消息,消息订阅者可以收到指定频道的消息,同一个频道可以有多个消息订阅者,如下图: 在这里插入图片描述 Redis也提供了一些命令支持这个机制,接下来我们详细介绍一下这些命令...频道名称 消息 比如,要向channel:one-more-study:demo频道发布一条消息“I am One More Study.”...订阅消息 订阅消息的命令是subscribe,订阅者可以订阅一个或者多个频道,语法是: subscribe 频道名称 [频道名称 ...]...同样也是3条结果,分别表示:返回值的类型(信息)、消息来源的频道名称、消息内容。 新开启的订阅者,是无法收到该频道之前的历史消息的,因为Redis没有对发布的消息做持久化。...查询订阅信息 查看活跃频道 活跃频道指的是至少有一个订阅者的频道,语法是: pubsub channels [模式] 比如: > pubsub channels 1) "channel:one-more-study

    53030

    Redis实现消息队列和实时通信

    消息队列消息队列是一种常用的通信模式,用于解耦消息的发送者和接收者,并实现异步处理。Redis提供了一个名为"List"的数据结构,可以用于实现简单的消息队列。...通过调用send_message函数,我们向名为my_queue的队列发送了一条消息。然后,我们调用receive_message函数来接收队列中的消息。...如果有消息存在,我们打印出消息内容,否则打印出提示信息。使用Redis的List数据结构实现消息队列的优势在于其高效的插入和读取操作,以及支持多个消费者并发消费的能力。...在join方法中,我们使用r.pubsub().subscribe命令订阅了聊天室的频道。在leave方法中,我们使用r.pubsub().unsubscribe命令取消了订阅。...最后,我们使用threading.Thread创建了一个新线程,并在其中调用receive_messages方法来接收聊天室的消息。

    90640

    Redis发布订阅

    它不仅可以用作数据库,还可以用作缓存和消息代理。今天,我们要探讨的是 Redis 中一个强大的功能——发布订阅模式。 发布订阅模式是一种消息通信模式,发送者(发布者)发送消息,订阅者接收消息。...1、Redis发布订阅介绍 1.1、Redis发布订阅概述 Redis 的发布订阅(Pub/Sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息。...在消息队列中,消息是持久化的,消息被发送到队列后,会一直在队列中等待被消费,即使没有在线的消费者,消息也不会丢失,消费者下次上线后可以继续从队列中获取到消息。...客户端结构:每个 Redis 客户端都有一个 pubsub_channels 和 pubsub_patterns 两个属性,分别用于存储该客户端订阅的频道和模式。...例如,客户端可以发送如下命令向名为 mychannel 的频道发布一条消息: PUBLISH mychannel "hello" 服务器会返回一个整数,表示消息成功发送到的客户端数量。

    1.6K30

    React 入门学习(九)-- 消息订阅发布

    换到代码层面上,我们订阅了一个消息假设为 A,当另一个人发布了 A 消息时,因为我们订阅了消息 A ,那么我们就可以拿到 A 消息,并获取数据 那我们要怎么实现呢?...首先引入 pubsub-js 我们需要先安装这个库 yarn add pubsub-js 引入这个库 import PubSub from 'pubsub-js' 订阅消息 我们通过 subscribe...来订阅消息,它接收两个参数,第一个参数是消息的名称,第二个是消息成功的回调,回调中也接受两个参数,一个是消息名称,一个是返回的数据 PubSub.subscribe('search',(msg,data...: true }) 这样我们就能成功的在请求之前发送消息,我们只需要在 List 组件中订阅一下这个消息即可,并将返回的数据用于更新状态即可 PubSub.subscribe('search',(msg...return await reasponse.json(); } catch (error) { console.log('Request Failed', error); } } 最后关于错误对象的获取可以采用

    41910

    React 入门学习(九)-- 消息订阅发布

    换到代码层面上,我们订阅了一个消息假设为 A,当另一个人发布了 A 消息时,因为我们订阅了消息 A ,那么我们就可以拿到 A 消息,并获取数据 那我们要怎么实现呢?...首先引入 pubsub-js 我们需要先安装这个库 yarn add pubsub-js 引入这个库 import PubSub from 'pubsub-js' 订阅消息 我们通过 subscribe...来订阅消息,它接收两个参数,第一个参数是消息的名称,第二个是消息成功的回调,回调中也接受两个参数,一个是消息名称,一个是返回的数据 PubSub.subscribe('search',(msg,data...: true }) 这样我们就能成功的在请求之前发送消息,我们只需要在 List 组件中订阅一下这个消息即可,并将返回的数据用于更新状态即可 PubSub.subscribe('search',(msg...return await reasponse.json(); } catch (error) { console.log('Request Failed', error); } } 最后关于错误对象的获取可以采用

    51820

    Redis进阶-Redis 4种MQ 方案对比

    的消费者都能从Channel中获取到同等的信息。...当消费者的数量达到一定规模时,服务器的性能将线性下降,因此每个消费者获取到消息的延迟也线性增长 当生产者产生消息的速度远大于消费者的消费能力的时候,消费者会被强制断开连接,因此会造成消息的丢失...client-output-buffer-limit pubsub 32mb 8mb 60 当消费者的buffer积压超过32MB,或者在60s内消费者的buffer一直保持在8MB以上,那么该消费者就会被...当consumer断开连接或者crash的时候,再次去消费,历史消息会得以保留,可以从最后一次消费的位置进行消费 消息可以积压。...一条消息被一个消费者消费之后,这条消息就被删除了,其他的消费者再无可能重复消费掉这条消息。也就是说List方案的消息不是发散的,同一条消息只能被一个消费者消费。

    1.4K10
    领券