首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >Camel中批量消费者的聚合结果(例如,来自SQS)

Camel中批量消费者的聚合结果(例如,来自SQS)
EN

Stack Overflow用户
提问于 2018-05-02 22:11:19
回答 2查看 950关注 0票数 2

我正在使用设置了maxMessagesPerPoll=5的SQS FIFO队列中的消息。

目前我单独处理每条消息,这完全是对资源的浪费。在我的例子中,因为我们使用的是FIFO队列,并且这5个消息都与同一个对象相关,所以我可以将它们一起处理。

我认为这可以通过使用aggregate模式来完成,但我无法获得任何结果。

我的消费路线看起来像这样:

代码语言:javascript
运行
复制
from("aws-sqs://my-queue?maxMessagesPerPoll=5&messageGroupIdStrategy=usePropertyValue")
    .process(exchange -> {
        // process the message
    })

我相信应该可以做这样的事情

代码语言:javascript
运行
复制
from("aws-sqs://my-queue?maxMessagesPerPoll=5&messageGroupIdStrategy=usePropertyValue")
    .aggregate(const(true), new GroupedExchangeAggregationStrategy())
    .completionFromBatchConsumer()
    .process(exchange -> {
        // process ALL messages together as I now have a list of all exchanges
    })

但是processor永远不会被调用。

第二件事:如果我能做到这一点,什么时候将ACK发送到SQS?在处理每条单独的消息时,还是在聚合过程完成时?我希望是后者

EN

回答 2

Stack Overflow用户

发布于 2018-05-03 16:30:37

当处理器未被调用时,聚合器可能仍在等待新消息聚合。

您可以尝试使用completionSize(5)而不是completionFromBatchConsumer()进行测试。如果这是可行的,那么批处理完成定义就是问题所在。

对于针对代理的确认:不幸的是没有。我认为消息在到达聚合器时是提交的。

驼峰聚合器组件是一个“有状态”组件,因此它必须结束当前的事务。

因此,您可以为这些组件配备持久存储库,以避免在进程终止时丢失数据,。在这样的场景中,如果您没有附加持久存储库,那么已经聚合的消息显然会丢失。

票数 1
EN

Stack Overflow用户

发布于 2018-05-04 16:08:58

问题出在GroupedExchangeAggregationStrategy

当我使用这个策略时,输出是所有交换的“数组”。这意味着到达完成谓词的交换不再具有初始属性。相反,它有CamelGroupedExchangeCamelAggregatedSize,这对completionFromBatchConsumer()没有任何用处

因为我实际上不需要聚合所有的交换,所以使用GroupedBodyAggregationStrategy就足够了。则交换属性将保持与原始交换相同,只有正文将包含一个“数组”

另一种解决方案是使用completionSize(Predicate predicate),并使用从分组的交换中提取所需值的自定义谓词。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/50136770

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档