前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >spring-cloud-stream-binder-kafka属性配置

spring-cloud-stream-binder-kafka属性配置

作者头像
code4it
发布2018-09-17 15:04:13
3.9K0
发布2018-09-17 15:04:13
举报
文章被收录于专栏:码匠的流水账

本文简单介绍下spring-cloud-stream-binder-kafka的一些属性配置。

maven

代码语言:javascript
复制
       <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka</artifactId>
            <version>1.0.3.RELEASE</version>
        </dependency>

stream属性

spring-cloud-stream-1.0.3.RELEASE-sources.jar!/org/springframework/cloud/stream/config/ChannelBindingServiceProperties.java

代码语言:javascript
复制
spring:
  cloud:
     stream:
       instanceIndex: 0 ##支持环境变量INSTANCE_INDEX
                        ## The instance index of the application: a number from 0 to instanceCount-1. Used for partitioning and with Kafka
       instanceCount: 1 ## The number of deployed instances of an application. Must be set for partitioning and if using Kafka.
                        ## used to partition data across different consumers.

Topic在逻辑上可以被认为是一个queue。每条消费都必须指定它的topic,可以简单理解为必须指明把这条消息放进哪个queue里。为了使得 Kafka的吞吐率可以水平扩展,物理上把topic分成一个或多个partition,每个partition在物理上对应一个文件夹,该文件夹下存储 这个partition的所有消息和索引文件。partiton命名规则为topic名称+有序序号,第一个partiton序号从0开始,序号最大值为partitions数量减1。 同一个partition内的消息只能被同一个组中的一个consumer消费。 当消费者数量多于partition的数量时,多余的消费者空闲。 消费者少于和等于partition的数量时,会出现多个partition对应一个消费者的情况,个别消费者消费量会比其他的多。

instanceCount主要是consumer用的,一般小于或等于topic的partition数量,主要用作消费者的消费分区用。

bingdings属性

spring-cloud-stream-1.0.3.RELEASE-sources.jar!/org/springframework/cloud/stream/config/BindingProperties.java

代码语言:javascript
复制
spring:
  cloud:
     stream:
        bindings:
          output:
            destination: event-demo
            content-type: text/plain
            #group: test  ##consumer属性
            #producer:
            #consumer:

producer

spring-cloud-stream-1.0.3.RELEASE-sources.jar!/org/springframework/cloud/stream/binder/ProducerProperties.java

代码语言:javascript
复制
spring:
  cloud:
     stream:
        bindings:
          output:
            destination: event-demo
            content-type: text/plain
            producer:
              partitionCount: 1
              headerMode
              partitionKeyExtractorClass: org.springframework.cloud.stream.partitioning.CustomPartitionKeyExtractorClass
              partitionSelectorClass: org.springframework.cloud.stream.partitioning.CustomPartitionSelectorClass
              headerMode: raw
  • kafka producer扩展属性 spring-cloud-stream-binder-kafka-1.0.3.RELEASE-sources.jar!/org/springframework/cloud/stream/binder/kafka/KafkaProducerProperties.java spring: cloud: stream: bindings: output: destination: event-demo content-type: text/plain producer: bufferSize: 16384 maxRequestSize: 1048576 sync: true batchTimeout: 0

consumer

spring-cloud-stream-1.0.3.RELEASE-sources.jar!/org/springframework/cloud/stream/binder/ConsumerProperties.java

代码语言:javascript
复制
spring:
  cloud:
     stream:
        bindings:
          input:
            destination: event-demo
            content-type: text/plain
            consumer:
              concurrency: 1 ## The concurrency of the inbound consumer.
              partitioned: false ## Whether the consumer receives data from a partitioned producer.Default: false.
              headerMode: raw
  • kafka consumer扩展属性 spring-cloud-stream-binder-kafka-1.0.3.RELEASE-sources.jar!/org/springframework/cloud/stream/binder/kafka/KafkaConsumerProperties.java spring: cloud: stream: bindings: input: destination: event-demo content-type: text/plain consumer: autoCommitOffset: false resetOffsets: true startOffset: earliest enableDlq: false recoveryInterval: 5000

原生api

代码语言:javascript
复制
ConsumerConfig consumerConfig =  new kafka.consumer.ConsumerConfig(props);
        ConsumerConnector consumerConnector = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, consumerCount);
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector
                .createMessageStreams(topicCountMap);

这里头topicCountMap告诉Kafka我们在Consumer中将用多少个线程来消费该topic。topicCountMap的key是topic name,value针对该topic是线程的数量。

小结

整体的话,spring cloud stream自己抽象了一部分,但是有个硬伤就是spring.cloud.stream.instanceIndex这个不大友好,这样就造成服务的实例是有状态的了,在基于docker部署起来比较麻烦,还不如直接原生api。如果partition不多,或者每个consumer性能强悍的话,那么至少部署两个,配置起来也还可以接受。

doc

  • spring-cloud-stream-binder-kafka-docs
  • spring-cloud-stream-docs
  • SpringCloudStream 构建消息驱动的微服务框架
  • kafka中partition和消费者对应关系
  • kafka学习(四)-Topic & Partition
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2017-09-20,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • maven
  • stream属性
  • bingdings属性
    • producer
      • consumer
      • 原生api
      • 小结
      • doc
      相关产品与服务
      容器服务
      腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档