Java 程序 更详细的代码工程,可以参考我的GitHub 消费者获取分区列表,并获取分区最新的OFFSET import java.util.ArrayList; import java.util.Collection...Consumer consumer = new KafkaConsumer(props); return consumer; } // 获取某个...Topic的所有分区以及分区最新的Offset public static void getPartitionsForTopic() { final Consumer<Long,...consumer.seekToEnd(tp); System.out.println("Partition " + str.partition() + " 's latest offset...record.key(), record.value(), record.partition(), record.offset
版本信息 Kafka 0.8.2,JDK1.7 问题现象 最近我们在生产环境执行删除无用的kafka topic的操作时,因为错误的按照8.2版本之前的删除方式操作8.2.2版本的kafka,导致删除过程异常...端又配置了auto.offset.reset=smallest[^offset.reset],所以当offset信息丢失、没有初始化或者出现异常时,consumer会自动从最小的offset处开始消费,...总结反思 出现这种问题一是因为我们缺少kafka运维经验,之前并没有操作过删除kafka topic的经历;二是测试不充分。...反馈建议 参考资料 [^offset.reset]: auto.offset.reset定义了consumer在zooKeeper中发现没有初始的offset时或者发现offset非法时定义comsumer...的行为,常见的配置有smallest:自动把offset设为最小的offset;largest:自动把offset设为最大的offset;anything else:抛出异常。
一、Kafka 消费者如何管理 offset 我之前有写一篇kafka Consumer — offset的控制 如果你对于这方面的知识还不太清楚, 建议你去看一下, 毕竟理解了Kafka的消费者...二、Spark Streaming On Kafka 如何管理 offset 1....2.1 使用 首先确保 enable.auto.commit=false, 当我们从kafka拉取到数据, 就不会再自动提交offset了, 这时候的offset就可以任由我们自己控制, 一个很典型的方式就是..., 当Spark处理完一个批次的数据, 我们把这个offset 提交到 kafka。...2.2 手动提交容易出现的问题 我们可以想象,当我们处理完数据后, 我们才对offset进行了提交, 这也意味着如果数据处理失败, 我们可以选择不提交offset, 下次我们还是可以从kafka
在 Kafka 中有三个重要概念,分别是 topic、partition 和 offset。...offset 的作用和意义 offset 是 Kafka 为每条消息分配的一个唯一的编号,它表示消息在分区中的顺序位置。...消费者端 消费者在消费 Kafka 消息时,需要维护一个当前消费的 offset 值,以及一个已提交的 offset 值。...offset 的提交和重置 提交 offset 是消费者在消费完一条消息后,将当前消费的 offset 值更新到 Kafka broker 中的操作。...earliest 表示从最早的可用消息开始消费;latest 表示从最新的可用消息开始消费;none 表示如果没有可用的 offset,则抛出异常。
在Kafka Version为0.11.0.0之后,Consumer的Offset信息不再默认保存在Zookeeper上,而是选择用Topic的形式保存下来。...在命令行中可以使用kafka-consumer-groups的脚本实现Offset的相关操作。 更新Offset由三个维度决定:Topic的作用域、重置策略、执行方案。...topic的所有分区调整位移 --topic t1:0,1,2:为指定的topic分区调整位移 重置策略 --to-earliest:把位移调整到分区当前最小位移 --to-latest:把位移调整到分区当前最新位移...--all-topics --to-earliest --execute 更新到指定的offset位置 bin/kafka-consumer-groups.sh --bootstrap-server localhost...--all-topics --to-current --execute offset位置按设置的值进行位移 bin/kafka-consumer-groups.sh --bootstrap-server
前言 在N久之前,曾写过kafka 生产者使用详解, 今天补上关于 offset 相关的内容。...那么本文主要涉及: Kafka 消费者的两个大版本 消费者的基本使用流程 重点:offset 的控制 消費者版本 开源之初使用Scala 语言编写的客户端, 我们可以称之为旧消费者客户端(Old Consumer...生产者使用详解 Offset 提交 这里指的是消费者消费的位移, 而不是Kafka端储存的消息的 offset, 这其中的区别希望读者清楚,不要混淆了。...对于offset 的提交, 我们要清楚一点 如果我们消费到了 offset=x 的消息 那么提交的应该是 offset=x+1, 而不是 offset=x kafka的提交方式分为两种: 自动提交...来不及提交, 这时候我们需要在监听到再均衡发生的时候进行一次offset提交: //该对象需要保存该消费者消费的分区的最新的 offset //本段代码中没有体现,可以在消费数据之后 进行更新该对象
Kafka中是持久化的,不用担心数据丢失问题,但由于Consumer在消费过程中可能会出现断电宕机等故障,Consumer恢复以后,需要从故障前的位置继续消费,所以Consumer需要实时记录自己消费的...Kafka0.9版本之前,consumer默认将 offset 保存在zookeeper中,0.9版本之后,consumer默认将offset保存在kafka一个内置的topic中,该topic为__consumer_offsets...、消息大小、消息体……消费者如何通过offset获取消息:利用 segment+有序offset+稀疏索引+二分查找+顺序查找等多种手段来高效的查找数据!...由于该文件采用的是稀疏索引的方式存储着相对offset及对应message物理偏移量的关系,所以直接找相对offset为5的索引找不到, 这里同样利用二分法查找相对offset小于或者等于指定的相对...offset的索引条目中最大的那个相对offset,所以找到的是相对offset为4的这个索引。
groupid和topic,又通过Coordinator分配partition的方式获得了对应的partition,自然能够通过Coordinator查找__consumers_offsets的方式获得最新的...然而Kafka只提供了根据offset读取消息的模型,并不支持根据key读取消息的方式。那么Kafka是如何支持Offset的查询呢? 答案就是Offsets Cache!! ?...Offset管理方式 通常由如下几种 Kafka Offset 的管理方式: Spark Checkpoint:在 Spark Streaming 执行Checkpoint 操作时,将 Kafka Offset...另外几个与 Kafka Offset 管理相关的要点如下: Kafka 默认是定期帮你自动提交位移的(enable.auto.commit=true)。...(offest保存在zk中); kafka-0.10.1.X版本之后: auto.offset.reset 的值更改为:earliest,latest,和none (offest保存在kafka的一个特殊的
前言 为了让Spark Streaming消费kafka的数据不丢数据,可以创建Kafka Direct DStream,由Spark Streaming自己管理offset,并不是存到zookeeper...因此本文将手动存储offset到zookeeper,完全自我掌控offset。...从ZK获取offset 创建ZKClient,API有好几个,最后用带序列化参数的,不然保存offset的时候容易出现乱码。 ?...注意红色线框部分,在zookeeper里存储的offset有可能在kafka里过期了,所以要拿kafka最小的offset和zookeeper里的offset比较一下。...接下来就可以创建Kafka Direct DStream了,前者是从zookeeper拿的offset,后者是直接从最新的开始(第一次消费)。 ? 最后就是处理RDD,保存Offset。 ? ?
为什么要写这个小工具 在之前的文章 Kafka重置消费的Offset 介绍过可以利用librdkafka 来写一个重置offset的小工具; librdkafka有个小问题,在当前的版本里作者限制了提交最早的...offset, 可以看这个issue: Allow re-Committing offsets; 当kafka集群里有一台broker机器坏掉无法修复,对于一个没有复本的topic, 针对这台坏掉的broker...上的partition, 将无法继续提交offset, 需要停掉consumer, 重置offset,然后再重启consumer; 如果线上有大量这样的topic和对应的consumer, 重启所有consumer...不是一个好的办法 :( 获取这个工具 github地址: KafkaOffsetTools 使用前需要编译 使用方法: Usage: --broker_list arg kafka broker...; 线上已运行的consumer不需要停止; 由于kafka rebalance的特点, 这个工具也不是百分百的每次都有效, 但在我的测试中成功率还是相当高, 相比手动重置再重启consumer要省时省力得多
checkpoint中最新的offset,以此为思路,来解决生产中的实际问题。...spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"1"}} 2400000001667289 最终获取最新...,然后再通过消费者程序消费kafka中的数据保存到存储系统中,如delta,通过offset信息对比来校验,binlog到kafka的延迟(如,通过获取binlog中的offset信息与流程序同步到kafka...时进行checkpoint的offset做对比)、kafka到存储系统中的延迟。...在实际进行offset比对时,要以此为基准再去获取offsets目录下的offsets信息。
前言 Spark在spark-streaming-kafka-0-10的API中实现了对Kafka Offset提交的API,在Spark消费过消息之后,提交消费过消息的Offset到Kafka里面,在...提交Offsets Spark官方文档中提供了在Spark应用程序中获取Offset和提交Offset的代码,现整合如下: val conf = new SparkConf().setAppName("...时,从提交的offset开始消费;无提交的offset时,从最新的数据开始消费 "auto.offset.reset" -> "latest", //如果是true,则这个消费者的偏移量会在后台自动提交...是和group.id以及topic相关联的,如果换了一个group.id,那么消息就会从最新的开始消费; auto.offset.reset:可以接收earliest和latest两个参数,latest...是从最新的开始消费,earliest是从头开始消费; enable.auto.commit:设置为false,这样做是为了后面手动提交offset; 提交后的offset会在保存在Kafka的 __consumer_offsets
序 本文主要讨论一下kafka consumer offset lag的监控 方案 利用官方的类库 ConsumerOffsetChecker ConsumerGroupCommand 利用官方的JMX...ConsumerOffsetChecker 在0.8.2.2版本如下 kafka_2.10-0.8.2.2-sources.jar!.../kafka/tools/ConsumerOffsetChecker.scala object ConsumerOffsetChecker extends Logging { private val...本身写入的JMX的数据,就不用额外在去像ConsumerOffsetChecker去自己连接再去获取。...doc kafka官方JMX+Reporters
序 本文主要解析一下遇到的一个kafka consumer offset lag不断增大的异常。...查看consumer消费情况 Group Topic Pid Offset logSize Lag Owner demo-group...9683393 9864518 181125 xxx-service-q7vch-1510557399475-b1d7d22c-1 发现消费者的offset...正常的情况 Group Topic Pid Offset logSize Lag Owner demo-group...小结 使用kafka消费数据的时候,需要对offset的lag值进行实时监控,以确认消费速度是否ok 调用KafkaStream的iterator消费线程必须catch住异常,否则抛出了异常,就停止消费了
offset 0 INFO storm.kafka.PartitionManager - Read partition information from: /kafka-offset/onetest/...- Starting Kafka hadoop001.icccuat.com:2 from offset 0 这个时候在zookeeper的/kafka-offset下没有生成名为onetest的目录...,kafkaspout会先用偏移量6去拉去,发现拉去不到,就到broker中获取该主题对应分区的最大偏移量。。...Starting Kafka hadoop002.icccuat.com:0 from offset 7 Read partition information from: /kafka-offset...这样从新部署都会从最新的偏移量开始运行。 下面的是我当初自己学习kafka时,思考自己写kafka时,该如何解决kafka的消费者和消费组之间对数据消费时的判断。
在调用该方法时,会先创建 val kc = new KafkaCluster(kafkaParams) KafkaCluster 这个类是真实负责和Kafka 交互的类,该类会获取Kafka的partition...此时会获取每个Topic的每个Partition的offset。如果配置成smallest 则拿到最早的offset,否则拿最近的offset。...到了计算周期后,对应的DirectKafkaInputDStream .compute方法会被调用,此时做下面几个操作: 获取对应Kafka Partition的untilOffset。...这样就确定过了需要获取数据的区间,同时也就知道了需要计算多少数据了 构建一个KafkaRDD实例。...4、使用Java来管理offset // 注意:一定要存在这个包下面 package org.apache.spark.streaming.kafka; import kafka.common.TopicAndPartition
放弃不难,但坚持很酷~ kafka_2.11-1.1.0 Kafka 手动异步提交 offset 的步骤大概分为以下几步,如下图所示: ?...2、订阅 topic consumer.subscribe(Arrays.asList("topic name")); 3、获取 topic 各分区当前读取到的最后一条记录的offset 首先定义一个全局变量...)) { List> partitionRecords = records.records(partition); // 获取当前读取到的最后一条记录的...offset long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); // 提交offset...offsets.put(partition, lastOffset + 1); } 至于为什么消费者提交 offsets 时要 +1,在《Kafka消费者 之 如何提交消息的偏移量》中的概述章节里面也给出了答案
,包括调用kafka-consumer-offset-checker.sh脚本写的lag监控,kafkaoffsetmonitor开源监控以及kafka-manager管理系统。...最近发现kafka-consumer-offset-checker.sh脚本在原本运行正常的情况下一直出现"Exiting due to:null"的错误,这个问题会导致脚本直接退出无法获取完整的partition...为了搞明白问题,直接把kafka-consumer-offset-checker.sh脚本调用的kafka类ConsumerOffsetChecker拿出来进行研究,发现最后输出lag结果的方法如下...logSize = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head 获取...后续发现kafkaoffsetmonitor以及kafka-manager出现的lag查询页面出现的分区显示不全或者数据为空的情况都通过补全host解决了。 吐槽一下kafka对于host的强依赖。
对于 Kafka 中的分区而言,它的每条消息都有唯一的 offset,用来表示消息在分区中对应的位置。...对于消费者而言,它也有一个 offset 的概念,消费者使用 offset 来表示消费到分区中某个消息所在的位置。 单词“offset”可以翻译为“偏移量”,也可以翻译为“位移”。...在每次调用 poll() 方法时,它返回的是还没有被消费过的消息集(当然这个前提是消息已经存储在 Kafka 中了,并且暂不考虑异常情况的发生),要做到这一点,就需要记录上一次消费时的消费位移。...而在新消费者客户端中,消费位移存储在 Kafka 内部的主题__consumer_offsets 中。...KafkaConsumer 类提供了 position(TopicPartition) 和 committed(TopicPartition) 两个方法来分别获取上面所说的 position 和 committed
领取专属 10元无门槛券
手把手带您无忧上云