我正在构建一个简单的解决方案,其中生产者服务将事件推送到消息队列,然后让流服务通过gRPC流API使这些事件可用。
云Pub/Sub似乎很适合这项工作,但是,扩展流媒体服务意味着,该服务的每个副本都需要在缩小之前创建自己的订阅并删除它,这似乎是不必要的复杂,而不是平台的初衷。
另一方面,Kafka似乎可以很好地处理类似的事情,但我想避免不得不管理底层平台本身,而是利用云基础设施。
我还应该提到,创建流API的原因是允许流到前端(可能无法访问底层基础设施)。
是否有更好的方法在GCP平台上进行这样的操作,而不选择部署和管理自己的基础设施的路线呢?
发布于 2019-03-19 13:34:08
如果您本质上想要短期订阅,那么在创建订阅时可以在订阅对象上设置一些内容:
expiration_policy
设置为较小的持续时间。当订阅服务器在此时间段内没有接收消息时,订阅将被删除。折中之处是,如果您的订阅者由于一个持续时间超过这段时间的暂时问题而停机,则订阅将被删除。默认情况下,有效期为31天。你可以把这个低到一天。对于拉动式订阅者,订阅者只需停止向Cloud /Sub发出请求,以便计时器在到期时启动。对于推送订阅,计时器是根据没有消息成功传递到端点的时间启动的。因此,如果没有发布消息,或者端点返回所有被推送消息的错误,则计时器生效。message_retention_duration
值。这是在订阅服务器没有接收和添加消息时保留消息的时间段。默认情况下,这是7天。你可以把它调低到10分钟。权衡的是,如果您的订阅服务器断开连接或在处理消息时超过此持续时间,则会删除超过此期限的旧消息,而订阅服务器将不会看到它们。完全关闭的订阅服务器可能只需调用DeleteSubscription本身,以便订阅立即消失,但对于意外关闭的订阅者,设置这两个属性将减少订阅继续存在的时间和将保留的消息数量(这些消息永远不会被传递)。
请记住,云Pub/Sub配额将每个主题和每个项目的订阅限制为1到10 000个。因此,如果创建了大量订阅,并且活动或未清除(手动操作,或在expiration_policy
的ttl
通过后自动清除),则可能无法创建新的订阅。
发布于 2021-06-21 14:48:18
我认为你最初的想法比短暂的订阅要好。我的意思是它有效,但感觉完全不自然。取决于您的需求是什么。例如,客户端是只需要在连接时接收消息,还是都需要获取所有消息?
只有在连接时
你最初的想法更好海事组织。我可能会做的是创建一个客户端可以连接到的gRPC流服务。实现本质上是一种观察者模式。使用者将收到一条消息,然后遍历订阅者,对所有订阅者进行“发送”。从那里开始,当客户端连接到服务时,它只会向该观察者集合注册自己,并在断开连接时取消注册。水平缩放是被动的,因为客户端对它们所连接的任何实例都很粘。
每个人都会得到信息,如果最终
这个概念类似于上面的内容,但是客户端不会在断开连接时隐式地从观察者那里取消注册。相反,它将显式地注册和取消注册(通过设计这样做的方法/命令)。修改“断开连接”逻辑,告诉观察者列表客户端已脱机。然后消费者的广播逻辑略有不同。现在,它遍历列表,并说“如果联机,那么发送,否则队列”,并将消息发送到一个临时队列(属于客户端)。然后,您的“on connect”逻辑将向客户端发送队列中的所有消息,然后通知客户它已恢复联机。基本上是个收件箱。在像RabbitMQ这样的大多数产品中,建立短暂的、自删除的队列真的很容易。不过,我认为您必须对删除队列是否可以做一些管理。例如,除非客户端显式取消订阅或已停用这么长时间,否则永远不要删除队列。如果不这样做,整个收件箱的想法就会分崩离析。
上面选择的答案与我在这里订阅的内容非常相似,因为订阅是队列。如果我这样做,那么我可能会将其实现为内部总线,而不是观察者(因为这是不必要的)--您可以根据需要创建一个客户端,该客户端实际上只是转发消息。消息使用者根据客户端是否连接而订阅和取消订阅。正如Kamal所指出的,如果您的规模超过了pubsub允许的最大订阅数量,您就会遇到问题。如果您发现自己处于这个位置,那么您可以通过实现上面的模式来解除该约束。这基本上是相同的模式,但您将责任转移到您的下位,在那里唯一的约束是您自己的资源。
gRPC使这一机制变得非常简单。或者,对于web来说,如果您是在微软堆栈上,那么SignalR也会让这件事变得非常容易。客户端连接到集线器,您可以发布到所有已连接的客户端。这里的使用者模式基本上保持不变,但是您不必手动实现观察者模式。
(注:图表中的箭头是依赖的方向,而不是数据流)
https://stackoverflow.com/questions/55206370
复制相似问题