首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >Kafka Consumer

Kafka Consumer

作者头像
shysh95
发布2020-03-31 17:29:46
发布2020-03-31 17:29:46
1.4K0
举报
文章被收录于专栏:shysh95shysh95

Kafka Consumer消费以组的方式划分,Topic中的每一个分区只会分给同一个组中的其中一个实例。这是基于队列模式,如果想基于发布订阅模式,那订阅同一个Topic的实例需要指定不同的组名。

必需参数
bootstrap.servers

Kafka服务器

group.id

Consumer Group的名字,唯一标识一个consumer group

key.deserializer

Key的反序列化,二进制的消息Key转换成具体的类型

value.desrializer

Value的反序列化,二进制的消息内容转换成具体的类型

主要参数
session.timeout.ms

coordinator检测失败的时间,通常需要设置一个较小的值,这样可以快速检测到consumer崩溃的情况,尽快开启rebalance。

max.poll.interval.ms

用于设置消息处理逻辑的最大时间

auto.offset.reset

consumer group无位移信息和位移越界时Kafka对应的策略。consumer group重启不会使用该策略,因为Kafka已经记录了group的唯一信息

  • earliest:从最早的位移开始消费,不一定就是0
  • latest:从最新位移处开始消费
  • none:如果无位移信息和位移越界,抛出异常。
enable.auto.commit

指定consumer是否自动提交位移,默认为true

fetch.max.bytes

指定consumer单次获取数据的最大字节数

max.poll.records

控制poll方法返回的最大消息数量

heartbeat.interval.ms

控制consumer group中成员感知rebalance的时间。

connections.max.idle.ms

空闲连接空闲时间超过该参数,会被关闭。

auto.commit.interval.ms

后台自动提交位移的时间间隔

消息轮询Poll

新版Consumer采用了类似Linux I/O模型Poll,使用一个线程管理多个socket连接,然后循环Poll消息。

poll方法返回的条件是要不获得了足够多的数据,或者超过了指定的超时时间。

位移管理

新版本的consumer位移已交由内部topic管理(_consumeroffsets),该Topic有多个分区,每个分区有多个副本(可以通过参数控制)。该内部Topic存在的唯一目的保存consumer提交的位移。

手动提交位移支持同步和异步,提交需要位移需要指定一个Map,key是TopicPartition,value是OffsetAndMetadata,里面存储了下一条待消费消息的offset。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-03-22,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 程序员修炼笔记 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 必需参数
    • bootstrap.servers
    • group.id
    • key.deserializer
    • value.desrializer
  • 主要参数
    • session.timeout.ms
    • max.poll.interval.ms
    • auto.offset.reset
    • enable.auto.commit
    • fetch.max.bytes
    • max.poll.records
    • heartbeat.interval.ms
    • connections.max.idle.ms
    • auto.commit.interval.ms
  • 消息轮询Poll
  • 位移管理
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档