我使用以下组件构建了一个spring引导运动消费者程序:
我使用来自一个运动流的事件,其中包含一个碎片。此外,这个春季引导使用者应用程序正在关键云创建平台中运行。
在发布这个问题之前,我在本地(使用kinesalite)和PCF (带有动态流)尝试了这个场景。你能确认我的理解是否正确吗?我查看了spring云流文档(https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/和https://github.com/spring-cloud/spring-cloud-stream-binder-aws-kinesis/blob/master/spring-cloud-stream-binder-kinesis-docs/src/main/asciidoc/overview.adoc)。虽然文档是详尽的,但是并发性和高可用性并没有得到详细的解释。
假设我有3个被部署到PCF的使用者实例(通过将manifest.yml文件中的实例属性设置为3,在cf中使用)。
所有三个实例都具有以下属性
spring.cloud.stream.bindings..consumer.concurrency=5
spring.cloud.stream.bindings..group=my-consumer-group
spring.cloud.stream.kinesis.binder.checkpoint.table=my-metadata-dynamodb-table
spring.cloud.stream.kinesis.binder.locks.table=my-locks-dynamodb-table
让我们假设这些事件是由制片人按这个顺序送去的。
event5 (流中的最新事件)- event4 - event3 - event2 - event1 (流中的第一个事件)
对于这样的配置,我在下面解释了我的理解。你能确认一下这是否正确吗?
发布于 2019-06-13 14:51:53
请参阅concurrency选项JavaDocs中的KinesisMessageDrivenChannelAdapter
/**
* The maximum number of concurrent {@link ConsumerInvoker}s running.
* The {@link ShardConsumer}s are evenly distributed between {@link ConsumerInvoker}s.
* Messages from within the same shard will be processed sequentially.
* In other words each shard is tied with the particular thread.
* By default the concurrency is unlimited and shard
* is processed in the {@link #consumerExecutor} directly.
* @param concurrency the concurrency maximum number
*/
public void setConcurrency(int concurrency) {因此,由于在这一个流中只有一个碎片,所以将只有一个活动线程在单个碎片上迭代ShardIterators。
关键是,我们总是必须在单个线程中处理单个碎片中的记录。这样,我们保证了一个正确的顺序,加上检查点是对最高的序列号。
请多查查什么是AWS运动以及它是如何工作的。
https://stackoverflow.com/questions/56572653
复制相似问题