我们已经定义了一个基本的订阅者,它通过抛出一个异常并依赖Akka Streams的流监督来恢复Flow
,从而跳过失败的消息(例如,由于某些业务逻辑原因,我们不打算处理)
someLagomService
.someTopic()
.subscribe
.withGroupId("lagom-service")
.atLeastOnce(
Flow[Int]
.mapAsync(1)(el => {
// Exception may occur here or can map to Done
})
.withAttributes(ActorAttributes.supervisionStrategy({
case t =>
Supervision.Resume
})
)
这似乎适用于负载很小的基本用例,但我们注意到对于大量消息(例如:非常频繁地重新处理消息等),会出现非常奇怪的情况。
深入研究代码,我们看到拉科姆的broker.Subscriber.atLeastOnce
文档声明:
flow
可以从上游提取更多的元素,但它必须为接收到的每条消息恰好发出一条Done
消息。它还必须以接收消息的相同顺序发出它们。这意味着flow
不能过滤或收集消息的子集,相反,它必须将消息拆分成单独的流,并将那些本应被丢弃的消息映射到Done
。
此外,在Lagom的KafkaSubscriberActor
的实施中,我们看到私有atLeastOnce
的实施本质上是解压缩消息有效负载和偏移量,然后在我们的用户流将消息映射到Done
之后重新压缩,然后再进行备份。
上面这两个花边新闻似乎暗示,通过使用流监控器和跳过元素,我们最终可能会遇到这样一种情况,即可提交的偏移量不再与每个Kafka消息生成的Done
一致。
例如:如果我们流式传输1、2、3、4并将1、2和4映射到Done
,但在3上抛出异常,那么我们有3个Done
和4个可提交的偏移量?
Done
是正确的做法
使用Lagom 1.4.10
https://stackoverflow.com/questions/55108351
复制相似问题