我正在使用设置了maxMessagesPerPoll=5的SQS FIFO队列中的消息。
目前我单独处理每条消息,这完全是对资源的浪费。在我的例子中,因为我们使用的是FIFO队列,并且这5个消息都与同一个对象相关,所以我可以将它们一起处理。
我认为这可以通过使用aggregate模式来完成,但我无法获得任何结果。
我的消费路线看起来像这样:
from("aws-sqs://my-queue?maxMessagesPerPoll=5&messageGroupIdStrategy=usePropertyValue")
    .process(exchange -> {
        // process the message
    })我相信应该可以做这样的事情
from("aws-sqs://my-queue?maxMessagesPerPoll=5&messageGroupIdStrategy=usePropertyValue")
    .aggregate(const(true), new GroupedExchangeAggregationStrategy())
    .completionFromBatchConsumer()
    .process(exchange -> {
        // process ALL messages together as I now have a list of all exchanges
    })但是processor永远不会被调用。
第二件事:如果我能做到这一点,什么时候将ACK发送到SQS?在处理每条单独的消息时,还是在聚合过程完成时?我希望是后者
发布于 2018-05-03 16:30:37
当处理器未被调用时,聚合器可能仍在等待新消息聚合。
您可以尝试使用completionSize(5)而不是completionFromBatchConsumer()进行测试。如果这是可行的,那么批处理完成定义就是问题所在。
对于针对代理的确认:不幸的是没有。我认为消息在到达聚合器时是提交的。
驼峰聚合器组件是一个“有状态”组件,因此它必须结束当前的事务。
因此,您可以为这些组件配备持久存储库,以避免在进程终止时丢失数据,。在这样的场景中,如果您没有附加持久存储库,那么已经聚合的消息显然会丢失。
发布于 2018-05-04 16:08:58
问题出在GroupedExchangeAggregationStrategy上
当我使用这个策略时,输出是所有交换的“数组”。这意味着到达完成谓词的交换不再具有初始属性。相反,它有CamelGroupedExchange和CamelAggregatedSize,这对completionFromBatchConsumer()没有任何用处
因为我实际上不需要聚合所有的交换,所以使用GroupedBodyAggregationStrategy就足够了。则交换属性将保持与原始交换相同,只有正文将包含一个“数组”
另一种解决方案是使用completionSize(Predicate predicate),并使用从分组的交换中提取所需值的自定义谓词。
https://stackoverflow.com/questions/50136770
复制相似问题