我需要轮询一个hazelcast (客户端模式)队列,并在异常上使用retry (10次尝试)选项。我原以为骆驼轮询和处理会是多线程的。但事实并非如此。在重新尝试异常时,队列中的任何新消息都会堆积起来,只有在第一条消息完成后才会被收集起来进行处理。是否有任何并行处理选项(并发消费)。我添加了concurrentConsumer和poolSize作为查询参数。但打得不太好。
我尝试过的是:
fromF(hazelcast-queue://FOO?concurrentConsumers=5&hazelcastInstance=#hazelcastInstance&poolSize=10&queueConsumerMode=Poll).to("direct:testPoll");
from("direct:testPoll")
.log(LoggingLevel.DEBUG,":::>:Camel[${routeId}] consumes")
.onException(Exception.class)
.maximumRedeliveries(maxAttempt)
.delayPattern(delayPattern)
.maximumRedeliveryDelay(maxDelay)
.handled(true)
.logExhausted(false)
.end()
.bean("processTestPoll").log(INFO,"${body}").end();误差
有1个参数无法在端点上设置。如果参数拼写正确,并且它们是端点的属性,请检查uri。未知parameters={concurrentConsumers=10}
你的帮助会很感激的。提前谢谢。
发布于 2022-03-21 08:54:02
你想要达到的目标可以通过两种不同的方式通过SEDA来完成:
一般方式
您可以将消息发送到SEDA端点并并发地使用它们,如下所示:
fromF("hazelcast-%sFOO?hazelcastInstance=#hazelcastInstance&queueConsumerMode=Poll",
HazelcastConstants.QUEUE_PREFIX)
.to("seda:process");
from("seda:process?concurrentConsumers=5")
.log("Processing: ${threadName} ${body}");在前面的示例中,Hazelcast队列FOO由一个线程轮询,该线程将消息放入SEDA process,而SEDA process由5线程并发使用。
关于使用SEDA组件的并发使用者的更多详细信息
具体方式
正如您在已删除的答案中建议的那样,您还可以直接使用Hazelcast的特定SEDA端点实现它,如下所示:
fromF("hazelcast-%sFOO?hazelcastInstance=#hazelcastInstance&concurrentConsumers=5",
HazelcastConstants.SEDA_PREFIX)
.log("Processing: ${threadName} ${body}");在前面的示例中,哈泽尔广播队列FOO由5个线程并发使用。
关于Hazelcat SEDA端点的更多细节。
https://stackoverflow.com/questions/71552125
复制相似问题