前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >kafka的offset相关知识

kafka的offset相关知识

作者头像
小勇DW3
发布2019-11-07 19:42:37
1.6K0
发布2019-11-07 19:42:37
举报
文章被收录于专栏:小勇DW3小勇DW3小勇DW3

Offset存储模型

由于一个partition只能固定的交给一个消费者组中的一个消费者消费,因此Kafka保存offset时并不直接为每个消费者保存,而是以 groupid-topic-partition -> offset 的方式保存。

如图所示:

Kafka在保存Offset的时候,实际上是将Consumer Group和partition对应的offset以消息的方式保存在__consumers_offsets这个topic中

__consumers_offsets默认拥有50个partition,可以通过

Math.abs(groupId.hashCode() % offsets.topic.num.partitions) 

的方式来查询某个Consumer Group的offset信息保存在__consumers_offsets的哪个partition中。

下图展示了__consumers_offsets中保存的offset消息的格式:

如图所示,一条offset消息的格式为groupid-topic-partition -> offset。

因此consumer poll消息时,已知groupid和topic,又通过Coordinator分配partition的方式获得了对应的partition,自然能够通过Coordinator查找__consumers_offsets的方式获得最新的offset了。

Offset查询

前面我们已经描述过offset的存储模型,它是按照groupid-topic-partition -> offset的方式存储的。然而Kafka只提供了根据offset读取消息的模型,并不支持根据key读取消息的方式。那么Kafka是如何支持Offset的查询呢?

答案就是Offsets Cache!!

如图所示,Consumer提交offset时,Kafka Offset Manager会首先追加一条条新的commit消息到__consumers_offsets topic中,然后更新对应的缓存。读取offset时从缓存中读取,而不是直接读取__consumers_offsets这个topic。

Offset管理方式

通常由如下几种 Kafka Offset 的管理方式:

Spark Checkpoint:在 Spark Streaming 执行Checkpoint 操作时,将 Kafka Offset 一并保存到 HDFS 中。这种方式的问题在于:当 Spark Streaming 应用升级或更新时,以及当Spark 本身更新时,Checkpoint 可能无法恢复。
因而,不推荐采用这种方式。

HBASE、Redis 等外部 NOSQL 数据库:这一方式可以支持大吞吐量的 Offset 更新,但它最大的问题在于:用户需要自行编写 HBASE 或 Redis 的读写程序,并且需要维护一个额外的组件。

ZOOKEEPER:老版本的位移offset是提交到zookeeper中的,目录结构是 :/consumers/<group.id>/offsets/ <topic>/<partitionId> ,但是由于 ZOOKEEPER 的写入能力并不会随着 ZOOKEEPER 节点数量的增加而扩大,
因而,当存在频繁的 Offset 更新时,ZOOKEEPER 集群本身可能成为瓶颈。因而,不推荐采用这种方式。

KAFKA 自身的一个特殊 Topic(__consumer_offsets)中:这种方式支持大吞吐量的Offset 更新,又不需要手动编写 Offset 管理程序或者维护一套额外的集群,因而是迄今为止最为理想的一种实现方式。
另外几个与 Kafka Offset 管理相关的要点如下:

Kafka 默认是定期帮你自动提交位移的(enable.auto.commit=true)。

有时候,我们需要采用自己来管理位移提交,这时候需要设置 enable.auto.commit=false。


属性 auto.offset.reset 值含义解释如下:
earliest : 当各分区下有已提交的 Offset 时,从“提交的 Offset”开始消费;无提交的Offset 时,从头开始消费;
latest   : 当各分区下有已提交的 Offset 时,从提交的 Offset 开始消费;无提交的 Offset时,消费新产生的该分区下的数
none     : Topic 各分区都存在已提交的 Offset 时,从 Offset 后开始消费;只要有一个分区不存在已提交的 Offset,则抛出异常。 
kafka-0.10.1.X版本之前: auto.offset.reset 的值为smallest,和,largest.(offest保存在zk中);

kafka-0.10.1.X版本之后: auto.offset.reset 的值更改为:earliest,latest,和none (offest保存在kafka的一个特殊的topic名为:__consumer_offsets里面);
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019-11-06 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Offset存储模型
  • Offset查询
  • Offset管理方式
相关产品与服务
对象存储
对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档