必会:关于SparkStreaming checkpoint那些事儿

spark Streaming的checkpoint是一个利器,帮助在driver端非代码逻辑错误导致的driver应用失败重启,比如网络,jvm等,当然也仅限于支持自动重启的集群管理器,比如yarn。由于checkpoint信息包含序列化的Scala / Java / Python对象,尝试使用新的修改类反序列化这些对象可能会导致错误。

本文主要讲解checkpoint使用的一些注意事项。

系统学习spark ,深入spark源码,大数据相关问题,spark源码视频及优质文章。请点击阅读原文,加入浪尖知识星球。

checkpoint简介

流应用程序必须7*24小时运行,因此必须能够适应与应用程序逻辑无关的故障(例如,系统故障,JVM崩溃等)。 为了实现这一点,Spark Streaming需要将足够的信息checkpoint到容错存储系统,以便它可以从故障中恢复。 checkpoint有两种类型的数据:

1. 元数据checkpoint

将定义流式计算的信息保存到容错存储(如HDFS)。这用于从运行流应用程序的driver节点的故障中恢复(稍后详细讨论)。元数据包括:

  • 配置 - 用于创建流应用程序的配置。
  • DStream操作 - 定义流应用程序的DStream操作集。
  • 未完成的批次 - 未完成的批次的job队列。

2. 数据checkpoint

将生成的RDD保存到可靠的存储。在一些跨多个批次组合数据的有状态转换中,这是必需的。在这种转换中,生成的RDD依赖于先前批次的RDD,这导致依赖链的长度随时间增加。 为了避免恢复时间的无限增加(故障恢复时间与依赖链成比例),有状态转换的中RDD周期性地checkpoint到可靠存储(例如HDFS)以切断依赖链。

总而言之,元数据checkpoint主要用于从driver故障中恢复,而如果使用有状态转换操作,也需要数据或RDD 进行checkpoint。

何时使能checkpoint

必须为具有以下任何要求的应用程序启用checkpoint:

1. 有状态的转换算子

如果在应用程序中使用updateStateByKey或reduceByKeyAndWindow),则必须提供checkpoint目录以允许定期RDD checkpoint。

2. 从driver故障中恢复

元数据checkpoint用于使用进度信息进行恢复。

请注意,可以在不启用checkpoint的情况下运行没有上述有状态转换的简单流应用程序。 在这种情况下,driver故障的恢复也不完整(某些已接收但未处理的数据可能会丢失)。 这通常是可以接受的,并且有许多以这种方式运行Spark Streaming应用程序。 对非Hadoop环境的支持希望将在未来得到改善。

如何配置 checkpoint

可以通过在容错,可靠的文件系统(例如,HDFS,S3等)中设置目录来启用checkpoint,在目录中将保存checkpoint信息。 通过使用streamingContext.checkpoint(checkpointDirectory)来完成设置。 此外,如果要使应用程序从driver故障中恢复,则应重写流应用程序以使其具有以下行为。

  • 当程序第一次启动时,它将创建一个新的StreamingContext,设置所有流然后调用start()。
  • 在失败后重新启动程序时,它将从checkpoint目录中的checkpoint数据重新创建StreamingContext。
object RecoverableNetworkWordCount {

  def createContext(ip: String, port: Int, outputPath: String, checkpointDirectory: String)
    : StreamingContext = {

    // If you do not see this printed, that means the StreamingContext has been loaded
    // from the new checkpoint
    println("Creating new context")
    val outputFile = new File(outputPath)
    if (outputFile.exists()) outputFile.delete()
    val sparkConf = new SparkConf().setAppName("RecoverableNetworkWordCount")
    // Create the context with a 1 second batch size
    val ssc = new StreamingContext(sparkConf, Seconds(1))
    ssc.checkpoint(checkpointDirectory)

    // Create a socket stream on target ip:port and count the
    // words in input stream of \n delimited text (eg. generated by 'nc')
    val lines = ssc.socketTextStream(ip, port)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map((_, 1)).reduceByKey(_ + _)
    wordCounts.foreachRDD { (rdd: RDD[(String, Int)], time: Time) =>
      // Get or register the blacklist Broadcast
      val blacklist = WordBlacklist.getInstance(rdd.sparkContext)
      // Get or register the droppedWordsCounter Accumulator
      val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext)
      // Use blacklist to drop words and use droppedWordsCounter to count them
      val counts = rdd.filter { case (word, count) =>
        if (blacklist.value.contains(word)) {
          droppedWordsCounter.add(count)
          false
        } else {
          true
        }
      }.collect().mkString("[", ", ", "]")
      val output = s"Counts at time $time $counts"
      println(output)
      println(s"Dropped ${droppedWordsCounter.value} word(s) totally")
      println(s"Appending to ${outputFile.getAbsolutePath}")
      Files.append(output + "\n", outputFile, Charset.defaultCharset())
    }
    ssc
  }

  def main(args: Array[String]) {
    if (args.length != 4) {
      System.err.println(s"Your arguments were ${args.mkString("[", ", ", "]")}")
      System.err.println(
        """
          |Usage: RecoverableNetworkWordCount <hostname> <port> <checkpoint-directory>
          |     <output-file>. <hostname> and <port> describe the TCP server that Spark
          |     Streaming would connect to receive data. <checkpoint-directory> directory to
          |     HDFS-compatible file system which checkpoint data <output-file> file to which the
          |     word counts will be appended
          |
          |In local mode, <master> should be 'local[n]' with n > 1
          |Both <checkpoint-directory> and <output-file> must be absolute paths
        """.stripMargin
      )
      System.exit(1)
    }
    val Array(ip, IntParam(port), checkpointDirectory, outputPath) = args
    val ssc = StreamingContext.getOrCreate(checkpointDirectory,
      () => createContext(ip, port, outputPath, checkpointDirectory))
    ssc.start()
    ssc.awaitTermination()
  }
}

如果checkpointDirectory存在,则将从checkpoint数据重新创建上下文。如果该目录不存在(即,第一次运行),则将调用函数functionToCreateContext以创建新上下文并设置DStream。

除了使用getOrCreate之外,还需要确保driver进程在失败时自动重新启动。这只能通过应用程序部署的集群管理器来完成,比如yarn。

请注意,RDD的checkpoint会导致写入可靠存储的开销。这可能导致RDD被checkpoint的那些批次的处理时间增加。因此,需要谨慎设置checkpoint的间隔。 在小批量(例如1秒)下,每批次checkpoint可能会显着降低操作吞吐量。相反,checkpoint太过不频繁会导致血统链增长和任务大小增加,这可能会产生不利影响。 对于需要RDDcheckpoint的有状态转换,默认时间间隔是批处理间隔的倍数,至少为10秒。可以使dstream.checkpoint(checkpointInterval)进行设置。 通常推荐,checkpoint间隔设置为DStream的5-10个滑动间隔(不是仅限于batch,还有windows的滑动间隔).

累加器,广播变量

spark streaming中的广播变量和累加器无法从checkpoint中恢复。如果启用了checkpoint并使用累加器或广播变量,则必须为累加器和广播变量创建lazy实例化的单例实例, 以便在driver重新启动失败后重新实例化它们。

/**
 * Use this singleton to get or register a Broadcast variable.
 */
object WordBlacklist {

  @volatile private var instance: Broadcast[Seq[String]] = null

  def getInstance(sc: SparkContext): Broadcast[Seq[String]] = {
    if (instance == null) {
      synchronized {
        if (instance == null) {
          val wordBlacklist = Seq("a", "b", "c")
          instance = sc.broadcast(wordBlacklist)
        }
      }
    }
    instance
  }
}

/**
 * Use this singleton to get or register an Accumulator.
 */
object DroppedWordsCounter {

  @volatile private var instance: LongAccumulator = null

  def getInstance(sc: SparkContext): LongAccumulator = {
    if (instance == null) {
      synchronized {
        if (instance == null) {
          instance = sc.longAccumulator("WordsInBlacklistCounter")
        }
      }
    }
    instance
  }
}

升级应用程序代码

如果需要使用新的应用程序代码升级正在运行的Spark Streaming应用程序,则有两种可能的机制:

方法1

升级的Spark Streaming应用程序启动并与现有应用程序并行运行。一旦新的程序(接收与旧的数据相同的数据)已经预热并准备好最合适的时间,旧应用可以被下架了。 请注意,这仅可以用于数据源支持同时将数据发送到两个地放(即早期和升级的应用程序)。

方法2

温柔地关闭现有应用程序(StreamingContext.stop或JavaStreamingContext.stop这两个API文档里有温柔停止应用程序的参数详解),以确保在关闭之前完全处理已接收的数据。 然后可以启动升级的应用程序,该应用程序将从早期应用程序停止的同一位置开始处理。请注意,这只能通过支持源端缓冲的输入源(如Kafka和Flume)来完成,因为在前一个应用程序关闭且升级的应用程序尚未启动时需要缓冲数据。 并且无法从早期checkpoint中重新启动升级前代码的信息。checkpoint信息包含序列化的Scala / Java / Python对象,尝试使用新的修改类反序列化这些对象可能会导致错误。 在这种情况下,要么使用不同的checkpoint目录启动升级的应用程序,要么删除以前的checkpoint目录。

方法如下:

def
stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit
 Permalink
Stop the execution of the streams, with option of ensuring all received data has been processed.

stopSparkContext
if true, stops the associated SparkContext. The underlying SparkContext will be stopped regardless of whether this StreamingContext has been started.

stopGracefully
if true, stops gracefully by waiting for the processing of all received data to be completed
当然,一个配置参数也可以设置:

spark.streaming.stopGracefullyOnShutdown  false  If true, Spark shuts down the StreamingContext gracefully on JVM shutdown rather than immediately.

原文发布于微信公众号 - Spark学习技巧(bigdatatip)

原文发表时间:2018-08-05

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Spark学习技巧

hadoop系列之基础系列

一、Hadoop基础 1、分布式概念 通过爬虫-->爬到网页存储-->查找关键字 一台机器存储是有限的 Google采用多台机器,...

3817
来自专栏Spark学习技巧

Spark调优系列之序列化方式调优

由于大多数的spark计算是基于内存的的天性,spark应用的瓶颈一般受制于集群的CPU,网络带宽,内存。大部分情况下,如果内存适合当前数据量的计算,那么瓶颈往...

3179
来自专栏美图数据技术团队

快速、安全、可靠!Yarn!| MTdata小讲堂

Yarn 的全称是 Yet Anther Resource Negotiator(另一种资源协商者)。它作为 Hadoop 的一个组件,官方对它的定义是一个工作...

1282
来自专栏cloudskyme

hadoop使用(五)

第1章 引言 1.1 编写目的 对关于hadoop的文档及资料进行进一步的整理。 1.2 相关网站    毋庸置疑 http://hadoop.apache.o...

3495
来自专栏星回的实验室

打造自己的MapReduce[一]:Hadoop集群搭建

那就是我在这里的每一篇文开头都必然是:最近工作好忙,又断更很久了……Anyway,这也不能成为偷懒的理由。我可能对记录技术有些固执的误解,总认为是要待到整理出一...

1071
来自专栏大数据-Hadoop、Spark

2018-08-08

1、spark程序停-启,实时数据量一下子太多,如何处理 2、spark程序数据丢失,如何处理?duration是多少?

852
来自专栏about云

spark streaming知识总结[优化]

问题导读 1.DStreams的含义是什么? 2.DStreams提供哪两种类型的操作? 3.Transformations操作分为哪两种类型? 4.本文说...

5154
来自专栏Hadoop实操

Python3环境通过JDBC访问非Kerberos环境的Hive

在前面Fayson介绍了在Python2的环境下《如何使用Python Impyla客户端连接Hive和Impala》,本篇文章Fayson主要介绍在Pytho...

2231
来自专栏赵俊的Java专栏

Python 版 WordCount

2153
来自专栏Hadoop实操

Impala的Short-Circuit Reads

我们知道读取HDFS的数据需要通过DataNode。当客户端找DataNode读取数据时,DataNode会首先从磁盘中读取文件,然后通过TCP socket将...

5658

扫码关注云+社区

领取腾讯云代金券