Kafka单线程Consumer及参数详解

请使用0.9以后的版本:

示例代码

 Properties props = new Properties();
        props.put("bootstrap.servers", "kafka01:9092,kafka02:9092");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        
        props.put("auto.offset.reset","earliest");
        
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("foo", "bar"));
      try{  
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(1000);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
         }
        }finally{
          consumer.close();
        }

1、只需要配置kafka的server groupid autocommit 序列化 autooffsetreset(其中 bootstrap.server group.id key.deserializer value.deserializer 必须指定);

2、用这些Properties构建consumer对象(KafkaConsumer还有其他构造,可以把序列化传进去);

3、subscribe订阅topic列表(可以用正则订阅Pattern.compile("kafka.*")

使用正则必须指定一个listener subscribe(Pattern pattern, ConsumerRebalanceListener listener)); 可以重写这个接口来实现 分区变更时的逻辑。如果设置了enable.auto.commit = true 就不用理会这个逻辑。

4、然后循环poll消息(这里的1000是超时设定,如果没有很多数据,也就等一秒);

5、处理消息(打印了offset key value 这里写处理逻辑)。

6、关闭KafkaConsumer(可以传一个timeout值 等待秒数 默认是30)。

参数详解

bootstrap.server(最好用主机名不用ip kafka内部用的主机名 除非自己配置了ip)

deserializer 反序列化consumer从broker端获取的是字节数组,还原回对象类型。

默认有十几种:StringDeserializer LongDeserializer DoubleDeserializer。。

也可以自定义:定义serializer格式 创建自定义deserializer类实现Deserializer 接口 重写逻辑

除了四个必传的 bootstrap.server group.id key.deserializer value.deserializer

还有session.timeout.ms "coordinator检测失败的时间"

是检测consumer挂掉的时间 为了可以及时的rebalance 默认是10秒 可以设置更小的值避免消息延迟。

max.poll.interval.ms "consumer处理逻辑最大时间"

处理逻辑比较复杂的时候 可以设置这个值 避免造成不必要的 rebalance ,因为两次poll时间超过了这个参数,kafka认为这个consumer已经跟不上了,会踢出组,而且不能提交offset,就会重复消费。默认是5分钟。

auto.offset.reset "无位移或者位移越界时kafka的应对策略"

所以如果启动了一个group从头消费 成功提交位移后 重启后还是接着消费 这个参数无效

所以3个值的解释是:

earliset 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从最早的位移消费

latest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 none topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常

(注意kafka-0.10.1.X版本之前: auto.offset.reset 的值为smallest,和,largest.(offest保存在zk中) 、

我们这是说的是新版本:kafka-0.10.1.X版本之后: auto.offset.reset 的值更改为:earliest,latest,和none (offest保存在kafka的一个特殊的topic名为:__consumer_offsets里面))

enable.auto.commit 是否自动提交位移

true 自动提交 false需要用户手动提交 有只处理一次需要的 最近设置为false自己控制。

fetch.max.bytes consumer单次获取最大字节数

max.poll.records 单次poll返回的最大消息数

默认500条 如果消费很轻量 可以适当提高这个值 增加消费速度。

hearbeat.interval.ms consumer其他组员感知rabalance的时间

该值必须小于 session.timeout.ms 如果检测到 consumer挂掉 也就根本无法感知rabalance了

connections.max.idle.ms 定期关闭连接的时间

默认是9分钟 可以设置为-1 永不关闭

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏中科院渣渣博肆僧一枚

python的CSV模块

版权声明:本文为博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。

17240
来自专栏leon的专栏

lodash源码解读之模块化的基础——IIFE

IIFE包含两部分。 第一部分是一个匿名函数,它包裹在分组操作符()中,拥有独立的词法作用域。 第二部分是再一次使用分组操作符(),创建一个立即执行函数表达式。...

9730
来自专栏业余草

Redis 与序列化

序列化只是一种拆装组装对象的规则,那么这种规则肯定也可能有多种多样,比如现在常见的序列化方式有:JDK(不支持跨语言)、JSON、XML、Hessian、Kry...

22040
来自专栏SAMshare

推荐收藏 | Facets快速评估数据集质量

在机器学习任务中,数据集的质量优劣对数据分析的结果影响非常大,所谓Garbage in, garbage out,数据决定模型的上限,因此数据质量成为数据分析流...

12730
来自专栏小小挖掘机

Facets:快速评估数据集质量,把控数据分析核心环节

在机器学习任务中,数据集的质量优劣对数据分析的结果影响非常大,所谓Garbage in, garbage out,数据决定模型的上限,因此数据质量成为数据分析流...

14820
来自专栏机器之心

一行代码自动调参,支持模型压缩指定大小,Facebook升级FastText

FastText 是 Facebook 开源的一款自然语言处理机器学习框架。通过这套工具,用户可以快速完成诸如文本分类等的相关任务,而且不需要设计模型架构。近日...

30750
来自专栏数据处理与分析

GIS基础技能篇之一(文本数据矢量化)

包含xy信息的Excel都可以,xy可以在一个字段里,也可以在两个字段中。另外如果包含高程信息,还可以生成三维的矢量数据。

13120
来自专栏leon的专栏

优雅的类写法

虽然现在已经是ES6的时代,但是,还是有必要了解下ES5是怎么写一个类的。 本文详述JavaScript面向对象编程中的类写法,并分步骤讲述如何写出优雅的类。

7550
来自专栏毛利学Python

爬虫篇| 爬取豆瓣电影(二)

上次爬取了百度图片,是分析解决ajax的json的响应的,对于一些网站的常见的数据的爬取,是这次主要内容。

11660
来自专栏Java技术栈

从0 开始手写一个 RPC 框架,轻松搞定!

来源:juejin.im/post/5c4481a4f265da613438aec3

12520

扫码关注云+社区

领取腾讯云代金券

年度创作总结 领取年终奖励