我们已经定义了一个基本的订阅者,它通过抛出一个异常并依赖Akka Streams的流监督来恢复Flow
,从而跳过失败的消息(例如,由于某些业务逻辑原因,我们不打算处理)
someLagomService
.someTopic()
.subscribe
.withGroupId("lagom-service")
.atLeastOnce(
Flow[Int]
.mapAsync(1)(el => {
// Exception may occur here or can map to Done
})
.withAttributes(ActorAttributes.supervisionStrategy({
case t =>
Supervision.Resume
})
)
这似乎适用于负载很小的基本用例,但我们注意到对于大量消息(例如:非常频繁地重新处理消息等),会出现非常奇怪的情况。
深入研究代码,我们看到拉科姆的broker.Subscriber.atLeastOnce
文档声明:
flow
可以从上游提取更多的元素,但它必须为接收到的每条消息恰好发出一条Done
消息。它还必须以接收消息的相同顺序发出它们。这意味着flow
不能过滤或收集消息的子集,相反,它必须将消息拆分成单独的流,并将那些本应被丢弃的消息映射到Done
。
此外,在Lagom的KafkaSubscriberActor
的实施中,我们看到私有atLeastOnce
的实施本质上是解压缩消息有效负载和偏移量,然后在我们的用户流将消息映射到Done
之后重新压缩,然后再进行备份。
上面这两个花边新闻似乎暗示,通过使用流监控器和跳过元素,我们最终可能会遇到这样一种情况,即可提交的偏移量不再与每个Kafka消息生成的Done
一致。
例如:如果我们流式传输1、2、3、4并将1、2和4映射到Done
,但在3上抛出异常,那么我们有3个Done
和4个可提交的偏移量?
Done
是正确的做法
使用Lagom 1.4.10
发布于 2019-03-12 04:00:35
这是正确的/预期的吗?这是否意味着我们应该避免在这里使用流监控器?
官方API documentations表示,
如果正在使用Kafka Lagom message broker模块,则默认情况下,当发生故障时会自动重新启动流。
因此,不需要添加自己的supervisionStrategy
来管理错误处理。默认情况下,流将重新启动,您不应该考虑“跳过”完成消息。
拉链不均匀会导致什么样的行为?
正因为如此,文档中说:
这意味着流不能过滤或收集消息的子集
它可能会未提交错误的偏移量。在重新启动时,您可能会从提交的较低偏移量中获得已处理的消息。
当涉及到通过Lagom message broker API从Kafka消费消息时,推荐的错误处理方法是什么?映射/恢复要做的故障是正确的吗?
Lagom通过删除导致错误的消息并重新启动流来处理异常处理。而将失败映射/恢复到Done不会对此进行任何更改。
你可以考虑,如果你以后需要访问这些消息,也可以使用Try {}
,例如,不抛出异常,通过将错误消息发送到不同的主题来收集错误消息,这将使你有机会监控错误的数量,并在条件合适时重播导致错误的消息,即错误被修复。
https://stackoverflow.com/questions/55108351
复制相似问题