Spark Streamming+Kafka提交offset实现有且仅有一次

前言

本文讲Spark Streamming使用Direct方式读取Kafka,并在输出(存储)操作之后提交offset到Kafka里实现程序读写操作有且仅有一次,即程序重启之后之前消费并且输出过的数据不再重复消费,接着上次消费的位置继续消费Kafka里的数据。 Spark Streamming+Kafka官方文档:http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html

1、提交offset的程序

根据官方文档可知,在spark代码里可以获取对应的offset信息,并且可以提交offset存储到kafka中。 代码:

package com.dkl.leanring.spark.kafka

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.SparkConf
import org.apache.spark.streaming.Seconds
import org.apache.spark.TaskContext

object KafkaOffsetDemo {
  def main(args: Array[String]) {

    //创建sparkConf
    val sparkConf = new SparkConf().setAppName("KafkaOffsetDemo").setMaster("local[2]")
    // 创建StreamingContext batch size 为 1秒
    val ssc = new StreamingContext(sparkConf, Seconds(1))
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "ambari.master.com:6667", //kafka集群地址
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "KafkaOffsetDemo", //消费者组名
      "auto.offset.reset" -> "earliest", //当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
      "enable.auto.commit" -> (false: java.lang.Boolean)) //如果是true,则这个消费者的偏移量会在后台自动提交
    val topics = Array("top1") //消费主题
    //创建DStream,返回接收到的输入数据
    val stream = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams))
    // 打印获取到的数据,因为1秒刷新一次,所以数据长度大于0时才打印
    stream.foreachRDD(f => {

      if (f.count > 0) {
        println("=============================")
        println("打印获取到的kafka里的内容")
        f.foreach(f => {
          val value = f.value()
          println(value)

        })
        println("=============================")
        println("打印offset的信息")
        // offset
        val offsetRanges = f.asInstanceOf[HasOffsetRanges].offsetRanges

        //打印offset
        f.foreachPartition { iter =>
          val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
          println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
        }
        println("=============================")
        // 等输出操作完成后提交offset
        stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)

      }
    })
    //启动
    ssc.start()
    //等待停止
    ssc.awaitTermination()
  }
}

说明:

  • auto.offset.reset设置为earliest,即当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始,这样设置的目的是为了一开始可以获取到kafka对应主题下的所有的历史消息。
  • enable.auto.commit 设置为false,如果是true,则这个消费者的偏移量会在后台自动提交,这样设置目的是为了后面自己提交offset,因为如果虽然获取到了消息,但是后面的转化操作并将结果写到如hive中并没有完成程序就挂了的话,这样是不能将这次的offset提交的,这样就可以等程序重启之后接着上次失败的地方继续消费
  • group.id 是不能变得,也就是offset是和topic和group绑定的,如果换一个group的话,程序将从头消费所有的历史数据
  • 这个api是将offset存储到kakfa的一个指定的topic里,名字为__consumer_offsets,而不是zookeeper中2、测试程序 1、首先创建对应的topic 2、生产几条数据作为历史消息bin/kafka-console-producer.sh --broker-list ambari.master.com:6667 --topic top1

3、启动上面的程序 4、继续生产几条数据 接下来先看一下结果:

由图可得,这样可以将历史数据全部打印出来,并且后面实时增加的数据,也打印出来了,且可以看到offset是在增加的,最后一个offset是202,那么接下来测试一下程序重启之后是否会接着之前的数据继续消费呢 5、停止程序 6、生产几条数据 7、启动程序 看一下结果:

可以看出,程序确实是接着上次消费的地方消费的,为了证实这一点,我将earliest和offset圈了起来,从offset可以看到是从上次的202开始消费的。

3、关于offset过期时间

kafka offset默认的过期时间是一天,当上面的程序挂掉,一天之内没有重启,也就是一天之内没有保存新的offset的话,那么之前的offset就会被删除,再重启程序,就会从头开始消费kafka里的所有历史数据,这种情况是有问题的,所以可以通过设置offsets.retention.minutes自定义offset过期时间,该设置单位为分钟,默认为1440。 修改kafka的offset过期时间详细信息见:https://dongkelun.com/2018/06/21/modifyKafkaOffsetTime/

4、自己保存offset

可以通过自己保存offset的信息到数据库里,然后需要时再取出来,根据得到的offset信息消费kafka里的数据,这样就不用担心offset的过期的问题了,因为没有自己写代码实现,所以先给出官网的示例代码:

// The details depend on your data store, but the general idea looks like this

// begin from the the offsets committed to the database
val fromOffsets = selectOffsetsFromYourDatabase.map { resultSet =>
  new TopicPartition(resultSet.string("topic"), resultSet.int("partition")) -> resultSet.long("offset")
}.toMap

val stream = KafkaUtils.createDirectStream[String, String](
  streamingContext,
  PreferConsistent,
  Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets)
)

stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

  val results = yourCalculation(rdd)

  // begin your transaction

  // update results
  // update offsets where the end of existing offsets matches the beginning of this batch of offsets
  // assert that offsets were updated correctly

  // end your transaction
}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Hadoop实操

如何在Redhat中配置R环境

R是一套完整的数据处理、计算和制图软件系统。其功能包括:数据存储和处理系统;数组运算工具(其向量、矩阵运算方面功能尤其强大);完整连贯的统计分析工具;优秀的统计...

7185
来自专栏分布式系统和大数据处理

使用源码编译Hadoop

Hadoop以两种方式发行,一种是binary格式的tar压缩包,一种是source源文件。考虑到暂时并无修改Hadoop源码的需求,简单起见,我都是直接使用编...

1992
来自专栏Hadoop实操

Cloudera Manager首页

当你从Cloudera Manager进入“主页 -> 状态”时,会看到如下页面,实际也就是Cloudera Manager的主页。

51811
来自专栏牛肉圆粉不加葱

YARN Capacity Scheduler(容量调度器)

以队列为单位划分资源,每个队列可设定一定比例的资源最低保证和使用上限,同时,每个用户也可设定一定的资源使用上限以防止资源滥用。而当一个队列的资源有剩余时,可暂时...

1173
来自专栏葡萄城控件技术团队

Winform文件下载之WinINet

在C#中,除了webclient我们还可以使用一组WindowsAPI来完成下载任务。这就是Windows Internet,简称 WinINet。本文通过一个...

2068
来自专栏微服务生态

深入淘宝Diamond之客户端架构解析

diamond是淘宝内部使用的一个管理持久配置的系统,它的特点是简单、可靠、易用,目前淘宝内部绝大多数系统的配置,由diamond来进行统一管理。 diamo...

953
来自专栏Java帮帮-微信公众号-技术文章全总结

day07.HDFS学习【大数据教程】

分而治之:将大文件、大批量文件,分布式存放在大量服务器上,以便于采取分而治之的方式对海量数据进行运算分析;

1504
来自专栏我是攻城师

Hadoop问题笔记之五问五答

3508
来自专栏分布式系统进阶

Kafka的日志管理模块--LogManagerKafka源码分析-汇总

a. 如果kafka进程是优雅干净地退出的,会创建一个名为.kafka_cleanshutdown的文件作为标识; b. 启动kafka时, 如果不存在该文件...

1191
来自专栏互联网大杂烩

操作系统-进程和线程

进程线程的区别 1、进程是什么? 是具有一定独立功能的程序、它是系统进行资源分配和调度的一个独立单位,重点在系统调度和单独的单位,也就是说进程是可以独立运行...

964

扫码关注云+社区