首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >春云流运动绑定-并发

春云流运动绑定-并发
EN

Stack Overflow用户
提问于 2019-06-13 02:37:48
回答 1查看 1.3K关注 0票数 1

我使用以下组件构建了一个spring引导运动消费者程序:

  • spring引导(版本-2.1.2 version)
  • 弹簧云(版本- Greenwich.RELEASE)
  • 弹簧云流运动粘结剂(1.1.0版)

我使用来自一个运动流的事件,其中包含一个碎片。此外,这个春季引导使用者应用程序正在关键云创建平台中运行。

在发布这个问题之前,我在本地(使用kinesalite)和PCF (带有动态流)尝试了这个场景。你能确认我的理解是否正确吗?我查看了spring云流文档(https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/https://github.com/spring-cloud/spring-cloud-stream-binder-aws-kinesis/blob/master/spring-cloud-stream-binder-kinesis-docs/src/main/asciidoc/overview.adoc)。虽然文档是详尽的,但是并发性和高可用性并没有得到详细的解释。

假设我有3个被部署到PCF的使用者实例(通过将manifest.yml文件中的实例属性设置为3,在cf中使用)。

所有三个实例都具有以下属性

spring.cloud.stream.bindings..consumer.concurrency=5

spring.cloud.stream.bindings..group=my-consumer-group

spring.cloud.stream.kinesis.binder.checkpoint.table=my-metadata-dynamodb-table

spring.cloud.stream.kinesis.binder.locks.table=my-locks-dynamodb-table

让我们假设这些事件是由制片人按这个顺序送去的。

event5 (流中的最新事件)- event4 - event3 - event2 - event1 (流中的第一个事件)

对于这样的配置,我在下面解释了我的理解。你能确认一下这是否正确吗?

  1. 在给定的时间点,使用者只有一个实例处于活动状态,并且它将处理发送到动态流的所有事件(因为流只有一个碎片)。其他两个实例中的一个只有在主实例关闭时才能控制。这种配置是为了确保消息的高可用性和保留顺序。
  2. 由于实例的数量是在PCF的spring.cloud.stream.bindings..consumer.instanceCount中设置的,所以我不需要担心设置spring.cloud.stream.instanceCount或manifest.yml属性。
  3. 当spring引导使用者启动/启动时,5个使用者线程是活动的(因为并发设置为5)。现在,这些事件按上面解释的顺序使用。Thread1接住了event1。当thread1仍在积极处理event1时,另一个线程只是从流中选择并开始处理下一个事件(thread2、process、event2等)。虽然在本例中保留了事件的顺序(事件1总是在event2等.之前得到),但不能保证thread1将在线程2之前完成event1的处理。
  4. 当所有5个线程都忙于处理流中的5个事件时,如果新事件(如event6和event7 )出现,使用者必须等待线程可用。比方说,thread3已完成处理event3和其他线程仍在忙着处理事件,thread3将获取event6并开始处理,但是event7仍然没有被捕获,因为没有可用的线程。
  5. 默认情况下,并发性设置为1。如果您的业务需求要求您在选择下一个事件之前完成第一个事件的处理,那么并发性应该是1。在这种情况下,您正在影响吞吐量。一次只能使用一个事件。但是,如果吞吐量很重要,并且希望在给定的时间点处理多个事件,则应该将并发性设置为所需的值。增加碎片数量也是一种选择,但作为消费者,如果不能要求增加,这是实现并行/吞吐量的最佳选择。
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2019-06-13 14:51:53

请参阅concurrency选项JavaDocs中的KinesisMessageDrivenChannelAdapter

代码语言:javascript
运行
复制
/**
 * The maximum number of concurrent {@link ConsumerInvoker}s running.
 * The {@link ShardConsumer}s are evenly distributed between {@link ConsumerInvoker}s.
 * Messages from within the same shard will be processed sequentially.
 * In other words each shard is tied with the particular thread.
 * By default the concurrency is unlimited and shard
 * is processed in the {@link #consumerExecutor} directly.
 * @param concurrency the concurrency maximum number
 */
public void setConcurrency(int concurrency) {

因此,由于在这一个流中只有一个碎片,所以将只有一个活动线程在单个碎片上迭代ShardIterators。

关键是,我们总是必须在单个线程中处理单个碎片中的记录。这样,我们保证了一个正确的顺序,加上检查点是对最高的序列号。

请多查查什么是AWS运动以及它是如何工作的。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/56572653

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档