我正在使用设置了maxMessagesPerPoll=5
的SQS FIFO队列中的消息。
目前我单独处理每条消息,这完全是对资源的浪费。在我的例子中,因为我们使用的是FIFO队列,并且这5个消息都与同一个对象相关,所以我可以将它们一起处理。
我认为这可以通过使用aggregate
模式来完成,但我无法获得任何结果。
我的消费路线看起来像这样:
from("aws-sqs://my-queue?maxMessagesPerPoll=5&messageGroupIdStrategy=usePropertyValue")
.process(exchange -> {
// process the message
})
我相信应该可以做这样的事情
from("aws-sqs://my-queue?maxMessagesPerPoll=5&messageGroupIdStrategy=usePropertyValue")
.aggregate(const(true), new GroupedExchangeAggregationStrategy())
.completionFromBatchConsumer()
.process(exchange -> {
// process ALL messages together as I now have a list of all exchanges
})
但是processor
永远不会被调用。
第二件事:如果我能做到这一点,什么时候将ACK发送到SQS?在处理每条单独的消息时,还是在聚合过程完成时?我希望是后者
发布于 2018-05-03 16:30:37
当处理器未被调用时,聚合器可能仍在等待新消息聚合。
您可以尝试使用completionSize(5)
而不是completionFromBatchConsumer()
进行测试。如果这是可行的,那么批处理完成定义就是问题所在。
对于针对代理的确认:不幸的是没有。我认为消息在到达聚合器时是提交的。
驼峰聚合器组件是一个“有状态”组件,因此它必须结束当前的事务。
因此,您可以为这些组件配备持久存储库,以避免在进程终止时丢失数据,。在这样的场景中,如果您没有附加持久存储库,那么已经聚合的消息显然会丢失。
发布于 2018-05-04 16:08:58
问题出在GroupedExchangeAggregationStrategy
上
当我使用这个策略时,输出是所有交换的“数组”。这意味着到达完成谓词的交换不再具有初始属性。相反,它有CamelGroupedExchange
和CamelAggregatedSize
,这对completionFromBatchConsumer()
没有任何用处
因为我实际上不需要聚合所有的交换,所以使用GroupedBodyAggregationStrategy
就足够了。则交换属性将保持与原始交换相同,只有正文将包含一个“数组”
另一种解决方案是使用completionSize(Predicate predicate)
,并使用从分组的交换中提取所需值的自定义谓词。
https://stackoverflow.com/questions/50136770
复制相似问题