Spark在spark-streaming-kafka-0-10的API中实现了对Kafka Offset提交的API,在Spark消费过消息之后,提交消费过消息的Offset到Kafka里面,在Spark重启后,可以继续消费没有消费的消息,实现Exactly once的语义。
Spark官方文档中提供了在Spark应用程序中获取Offset和提交Offset的代码,现整合如下:
val conf = new SparkConf().setAppName("KafkaOffsetDemo").setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(1))
//设置日志的级别为warn
ssc.sparkContext.setLogLevel("warn")
val kafkaParams = Map[String, Object](
//kafka 集群地址
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
//消费者组名
"group.id" -> "KafkaOffset",
//当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从最新的数据开始消费
"auto.offset.reset" -> "latest",
//如果是true,则这个消费者的偏移量会在后台自动提交
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("mytopic")
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
stream.foreachRDD(f => {
// 获取offsetRanges
val offsetRanges = f.asInstanceOf[HasOffsetRanges].offsetRanges
//打印offset的信息
f.foreachPartition(iter => {
val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
})
// 等输出(保存)操作完成后提交offset
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
)
ssc.start()
ssc.awaitTermination()
这里直接贴出官网示例代码
// The details depend on your data store, but the general idea looks like this
// begin from the the offsets committed to the database
val fromOffsets = selectOffsetsFromYourDatabase.map { resultSet =>
new TopicPartition(resultSet.string("topic"), resultSet.int("partition")) -> resultSet.long("offset")
}.toMap
val stream = KafkaUtils.createDirectStream[String, String](
streamingContext,
PreferConsistent,
Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets)
)
stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
val results = yourCalculation(rdd)
// begin your transaction
// update results
// update offsets where the end of existing offsets matches the beginning of this batch of offsets
// assert that offsets were updated correctly
// end your transaction
}
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。