前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark 中 Kafka Offset 管理

Spark 中 Kafka Offset 管理

原创
作者头像
ZHANGHAO
修改2019-03-20 09:55:02
1.8K0
修改2019-03-20 09:55:02
举报
文章被收录于专栏:张浩的专栏张浩的专栏

前言

Spark在spark-streaming-kafka-0-10的API中实现了对Kafka Offset提交的API,在Spark消费过消息之后,提交消费过消息的Offset到Kafka里面,在Spark重启后,可以继续消费没有消费的消息,实现Exactly once的语义。

提交Offsets

Spark官方文档中提供了在Spark应用程序中获取Offset和提交Offset的代码,现整合如下:

代码语言:javascript
复制
val conf = new SparkConf().setAppName("KafkaOffsetDemo").setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(1))

//设置日志的级别为warn
ssc.sparkContext.setLogLevel("warn")

val kafkaParams = Map[String, Object](
  //kafka 集群地址
  "bootstrap.servers" -> "localhost:9092",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  //消费者组名
  "group.id" -> "KafkaOffset",
  //当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从最新的数据开始消费
  "auto.offset.reset" -> "latest",
  //如果是true,则这个消费者的偏移量会在后台自动提交
  "enable.auto.commit" -> (false: java.lang.Boolean)
)

val topics = Array("mytopic")

val stream = KafkaUtils.createDirectStream[String, String](
  ssc,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams)
)

stream.foreachRDD(f => {
  // 获取offsetRanges
  val offsetRanges = f.asInstanceOf[HasOffsetRanges].offsetRanges
  //打印offset的信息
  f.foreachPartition(iter => {
    val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
    println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
  })
  // 等输出(保存)操作完成后提交offset
  stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)

}
)

ssc.start()
ssc.awaitTermination()

说明:

  1. group.id:offset是和group.id以及topic相关联的,如果换了一个group.id,那么消息就会从最新的开始消费;
  2. auto.offset.reset:可以接收earliest和latest两个参数,latest是从最新的开始消费,earliest是从头开始消费;
  3. enable.auto.commit:设置为false,这样做是为了后面手动提交offset;
  4. 提交后的offset会在保存在Kafka的 __consumer_offsets 这个topic中。

自己保存Offset的数据

这里直接贴出官网示例代码

代码语言:javascript
复制
// The details depend on your data store, but the general idea looks like this

// begin from the the offsets committed to the database
val fromOffsets = selectOffsetsFromYourDatabase.map { resultSet =>
  new TopicPartition(resultSet.string("topic"), resultSet.int("partition")) -> resultSet.long("offset")
}.toMap

val stream = KafkaUtils.createDirectStream[String, String](
  streamingContext,
  PreferConsistent,
  Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets)
)

stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

  val results = yourCalculation(rdd)

  // begin your transaction

  // update results
  // update offsets where the end of existing offsets matches the beginning of this batch of offsets
  // assert that offsets were updated correctly

  // end your transaction
}

参考文献:

Spark Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher)

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 提交Offsets
    • 说明:
    • 自己保存Offset的数据
    • 参考文献:
    • Spark Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher)
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档