专栏首页张浩的专栏Spark 中 Kafka Offset 管理
原创

Spark 中 Kafka Offset 管理

前言

Spark在spark-streaming-kafka-0-10的API中实现了对Kafka Offset提交的API,在Spark消费过消息之后,提交消费过消息的Offset到Kafka里面,在Spark重启后,可以继续消费没有消费的消息,实现Exactly once的语义。

提交Offsets

Spark官方文档中提供了在Spark应用程序中获取Offset和提交Offset的代码,现整合如下:

val conf = new SparkConf().setAppName("KafkaOffsetDemo").setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(1))

//设置日志的级别为warn
ssc.sparkContext.setLogLevel("warn")

val kafkaParams = Map[String, Object](
  //kafka 集群地址
  "bootstrap.servers" -> "localhost:9092",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  //消费者组名
  "group.id" -> "KafkaOffset",
  //当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从最新的数据开始消费
  "auto.offset.reset" -> "latest",
  //如果是true,则这个消费者的偏移量会在后台自动提交
  "enable.auto.commit" -> (false: java.lang.Boolean)
)

val topics = Array("mytopic")

val stream = KafkaUtils.createDirectStream[String, String](
  ssc,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams)
)

stream.foreachRDD(f => {
  // 获取offsetRanges
  val offsetRanges = f.asInstanceOf[HasOffsetRanges].offsetRanges
  //打印offset的信息
  f.foreachPartition(iter => {
    val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
    println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
  })
  // 等输出(保存)操作完成后提交offset
  stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)

}
)

ssc.start()
ssc.awaitTermination()

说明:

  1. group.id:offset是和group.id以及topic相关联的,如果换了一个group.id,那么消息就会从最新的开始消费;
  2. auto.offset.reset:可以接收earliest和latest两个参数,latest是从最新的开始消费,earliest是从头开始消费;
  3. enable.auto.commit:设置为false,这样做是为了后面手动提交offset;
  4. 提交后的offset会在保存在Kafka的 __consumer_offsets 这个topic中。

自己保存Offset的数据

这里直接贴出官网示例代码

// The details depend on your data store, but the general idea looks like this

// begin from the the offsets committed to the database
val fromOffsets = selectOffsetsFromYourDatabase.map { resultSet =>
  new TopicPartition(resultSet.string("topic"), resultSet.int("partition")) -> resultSet.long("offset")
}.toMap

val stream = KafkaUtils.createDirectStream[String, String](
  streamingContext,
  PreferConsistent,
  Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets)
)

stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

  val results = yourCalculation(rdd)

  // begin your transaction

  // update results
  // update offsets where the end of existing offsets matches the beginning of this batch of offsets
  // assert that offsets were updated correctly

  // end your transaction
}

参考文献:

Spark Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher)

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Scala 操作 HBase2.0 数据库

    ZHANGHAO
  • Sqoop在导入的时候生成的MR代码教你怎么指定路径去存放

    最近常用Sqoop导入一些数据到HDFS中。但是执行完成后,总是会在我执行sqoop命令的文件夹下生成一些MR代码。总是需要去手动清理。感觉比较困扰,然后就看S...

    ZHANGHAO
  • Spark Streaming基于网络的词频统计

    ZHANGHAO
  • go微服务框架go-micro深度学习(二) 入门例子

        上一篇帖子简单介绍了go-micro的整体框架结构,这一篇主要写go-micro使用方式的例子,中间会穿插一些go-micro的源码,和调用流程图,帮大...

    lpxxn
  • Java 环境变量 原

    Sun JDK - https://www.oracle.com/technetwork/java/javase/downloads/jdk8-download...

    HoneyMoose
  • winform下Textbox的AutoComplete功能

    本文转载:http://blog.csdn.net/xiaoxian8023/article/details/8511129

    跟着阿笨一起玩NET
  • 自定义View学习之路(四)————图片介绍框

      走往android的进阶之路,避不开自定义View的学习和绘制。这里以绘制一个可用的图片相框为例。开始系统的学习View的绘制。   麻雀虽小却五脏俱全,...

    饮水思源为名
  • war 包方式部署Jenkins

    密钥输入完成后,我们需要去安装插件,如果安装途中报红,则等他安装完其他插件,再次点击重新安装即可

    小手冰凉
  • CassandraAppender - distributed logging,分布式软件logback-appender

    农历年最后一场scala-meetup听刘颖分享专业软件开发经验,大受启发。突然意识到一直以来都没有完全按照任何标准的开发规范做事。诚然,在做技术调研和学...

    用户1150956
  • 这是一份你们需要的Windows版深度学习软件安装指南

    该配置版本最后更新的日期是今年七月,该更新版本允许本地使用 3 个不同的 GPU 加速后端,并添加对 MKL BLAS 库的支持。

    华章科技

扫码关注云+社区

领取腾讯云代金券