我希望能够使用一次处理超过10个SQS消息。
从这个问题出发,建议使用ExecutorChannel.我更新了我的代码,但仍然有相同的症状。
如何在多线程中执行Spring集成流以并行地消耗更多Amazon队列消息?
在进行此更新之后,我的应用程序请求10条消息,并处理这些消息,只有在我在流结束时调用amazonSQSClient.deleteMessage之后,它才会接受来自SQS队列的另外10条消息。
应用程序使用SQS FiFo队列。
还有什么是我遗漏的,还是使用SqsMessageDeletionPolicy.NEVER然后在流结束时删除消息的不可避免的症状?由于其他限制,在流开始时接受消息实际上不是一个选项。
下面是相关的代码片段,并进行了一些简化,但我希望它能表达这个问题。
队列配置
@Bean
public AsyncTaskExecutor inputChannelTaskExecutor() {
SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor();
executor.setConcurrencyLimit(50);
return executor;
}
@Bean
@Qualifier("inputChannel")
public ExecutorChannel inputChannel() {
return new ExecutorChannel(inputChannelTaskExecutor());
}
我也尝试了一个ThreadPoolTaskExecutor,而不是SimpleAsyncTaskExecutor,结果是一样的,但是我也会包括它,以防它提供其他的洞察力。
@Bean
public AsyncTaskExecutor inputChannelTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setMaxPoolSize(50);
executor.setQueueCapacity(50);
executor.setThreadNamePrefix("spring-async-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.afterPropertiesSet();
executor.initialize();
return executor;
}
SQS信道适配器
@Bean
public SqsMessageDrivenChannelAdapter changeQueueMessageAdapter() {
SqsMessageDrivenChannelAdapter adapter = new SqsMessageDrivenChannelAdapter(this.amazonSQSClient, changeQueue);
adapter.setOutputChannel(inputChannel);
adapter.setMessageDeletionPolicy(SqsMessageDeletionPolicy.NEVER);
return adapter;
}
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerSpec poller() {
return Pollers.fixedRate(500, TimeUnit.MILLISECONDS).maxMessagesPerPoll(10);
}
简化主流程
对于我们来说,一个常见的场景是在短时间内获得多个分支编辑。此流仅“关心”至少发生了一次编辑。messageTransformer从有效负载文档中提取一个id,并将其放在header dsp_docId中,然后我们使用它来进行聚合(我们在其他几个地方使用这个id,因此我们觉得头是有意义的,而不是在自定义聚合器中完成所有工作)。
provisioningServiceActivator检索分支的最新版本,然后路由器决定是否需要进一步转换(在这种情况下,它将其发送到transformBranchChannel),或者可以发送到我们的PI实例(通过sendToPiChannel) )。
转换流(没有显示,我不认为您需要它)最终导致发送到PI流,它只是做了更多的工作。
listingGroupProcessor捕获所有的aws_receiptHandle头,并将它们添加到一个新的标题中,作为一个分隔的列表。
sendToPi流(和errorFlow)以调用自定义处理程序结束,该处理程序负责删除aws_receiptHandle字符串列表中引用的所有aws_receiptHandle消息。
@Bean
IntegrationFlow sqsListener() {
return IntegrationFlows.from(inputChannel)
.transform(messageTransformer)
.aggregate(a -> a.correlationExpression("1")
.outputProcessor(listingGroupProcessor)
.autoStartup(true)
.correlationStrategy(message -> message.getHeaders().get("dsp_docId"))
.groupTimeout(messageAggregateTimeout) // currently 25s
.expireGroupsUponCompletion(true)
.sendPartialResultOnExpiry(true)
.get())
.handle(provisioningServiceActivator, "handleStandard")
.route(Branch.class, branch -> (branch.isSuppressed() == null || !branch.isSuppressed()),
routerSpec -> routerSpec.channelMapping(true, "transformBranchChannel")
.resolutionRequired(false)
.defaultOutputToParentFlow())
.channel(sendtoPiChannel)
.get();
}
发布于 2018-09-17 15:11:18
我想我应该把这作为一个答案,因为这解决了我的问题,并可能帮助其他人。作为一个答案,它更有可能被发现,而不是一个编辑的原始问题,可能会被忽视。
首先,我应该注意到我们使用的是FiFo队列。
问题实际上是在链的上游,我们将MessageGroupId设置为一个简单的值,该值描述了数据的来源。这意味着我们有非常大的消息组。
从ReceiveMessage文档中可以看到,在此场景中,它非常明智地阻止您请求来自该组的更多消息,因为如果需要将消息重新放到队列中,就不可能保证订单。
更新发布消息的代码以设置适当的MessageGroupId,然后意味着ExecutorChannel按预期工作。
虽然具有特定MessageGroupId的消息是不可见的,但在可见超时过期之前,不返回属于同一MessageGroupId的更多消息。您仍然可以使用另一个MessageGroupId接收消息,只要它也是可见的。
https://stackoverflow.com/questions/52336006
复制相似问题