前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >必会:关于SparkStreaming checkpoint那些事儿

必会:关于SparkStreaming checkpoint那些事儿

作者头像
Spark学习技巧
发布2018-08-20 09:57:35
1.1K0
发布2018-08-20 09:57:35
举报
文章被收录于专栏:Spark学习技巧

spark Streaming的checkpoint是一个利器,帮助在driver端非代码逻辑错误导致的driver应用失败重启,比如网络,jvm等,当然也仅限于支持自动重启的集群管理器,比如yarn。由于checkpoint信息包含序列化的Scala / Java / Python对象,尝试使用新的修改类反序列化这些对象可能会导致错误。

本文主要讲解checkpoint使用的一些注意事项。

系统学习spark ,深入spark源码,大数据相关问题,spark源码视频及优质文章。请点击阅读原文,加入浪尖知识星球。

checkpoint简介

流应用程序必须7*24小时运行,因此必须能够适应与应用程序逻辑无关的故障(例如,系统故障,JVM崩溃等)。 为了实现这一点,Spark Streaming需要将足够的信息checkpoint到容错存储系统,以便它可以从故障中恢复。 checkpoint有两种类型的数据:

1. 元数据checkpoint

将定义流式计算的信息保存到容错存储(如HDFS)。这用于从运行流应用程序的driver节点的故障中恢复(稍后详细讨论)。元数据包括:

  • 配置 - 用于创建流应用程序的配置。
  • DStream操作 - 定义流应用程序的DStream操作集。
  • 未完成的批次 - 未完成的批次的job队列。

2. 数据checkpoint

将生成的RDD保存到可靠的存储。在一些跨多个批次组合数据的有状态转换中,这是必需的。在这种转换中,生成的RDD依赖于先前批次的RDD,这导致依赖链的长度随时间增加。 为了避免恢复时间的无限增加(故障恢复时间与依赖链成比例),有状态转换的中RDD周期性地checkpoint到可靠存储(例如HDFS)以切断依赖链。

总而言之,元数据checkpoint主要用于从driver故障中恢复,而如果使用有状态转换操作,也需要数据或RDD 进行checkpoint。

何时使能checkpoint

必须为具有以下任何要求的应用程序启用checkpoint:

1. 有状态的转换算子

如果在应用程序中使用updateStateByKey或reduceByKeyAndWindow),则必须提供checkpoint目录以允许定期RDD checkpoint。

2. 从driver故障中恢复

元数据checkpoint用于使用进度信息进行恢复。

请注意,可以在不启用checkpoint的情况下运行没有上述有状态转换的简单流应用程序。 在这种情况下,driver故障的恢复也不完整(某些已接收但未处理的数据可能会丢失)。 这通常是可以接受的,并且有许多以这种方式运行Spark Streaming应用程序。 对非Hadoop环境的支持希望将在未来得到改善。

如何配置 checkpoint

可以通过在容错,可靠的文件系统(例如,HDFS,S3等)中设置目录来启用checkpoint,在目录中将保存checkpoint信息。 通过使用streamingContext.checkpoint(checkpointDirectory)来完成设置。 此外,如果要使应用程序从driver故障中恢复,则应重写流应用程序以使其具有以下行为。

  • 当程序第一次启动时,它将创建一个新的StreamingContext,设置所有流然后调用start()。
  • 在失败后重新启动程序时,它将从checkpoint目录中的checkpoint数据重新创建StreamingContext。
代码语言:javascript
复制
object RecoverableNetworkWordCount {

  def createContext(ip: String, port: Int, outputPath: String, checkpointDirectory: String)
    : StreamingContext = {

    // If you do not see this printed, that means the StreamingContext has been loaded
    // from the new checkpoint
    println("Creating new context")
    val outputFile = new File(outputPath)
    if (outputFile.exists()) outputFile.delete()
    val sparkConf = new SparkConf().setAppName("RecoverableNetworkWordCount")
    // Create the context with a 1 second batch size
    val ssc = new StreamingContext(sparkConf, Seconds(1))
    ssc.checkpoint(checkpointDirectory)

    // Create a socket stream on target ip:port and count the
    // words in input stream of \n delimited text (eg. generated by 'nc')
    val lines = ssc.socketTextStream(ip, port)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map((_, 1)).reduceByKey(_ + _)
    wordCounts.foreachRDD { (rdd: RDD[(String, Int)], time: Time) =>
      // Get or register the blacklist Broadcast
      val blacklist = WordBlacklist.getInstance(rdd.sparkContext)
      // Get or register the droppedWordsCounter Accumulator
      val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext)
      // Use blacklist to drop words and use droppedWordsCounter to count them
      val counts = rdd.filter { case (word, count) =>
        if (blacklist.value.contains(word)) {
          droppedWordsCounter.add(count)
          false
        } else {
          true
        }
      }.collect().mkString("[", ", ", "]")
      val output = s"Counts at time $time $counts"
      println(output)
      println(s"Dropped ${droppedWordsCounter.value} word(s) totally")
      println(s"Appending to ${outputFile.getAbsolutePath}")
      Files.append(output + "\n", outputFile, Charset.defaultCharset())
    }
    ssc
  }

  def main(args: Array[String]) {
    if (args.length != 4) {
      System.err.println(s"Your arguments were ${args.mkString("[", ", ", "]")}")
      System.err.println(
        """
          |Usage: RecoverableNetworkWordCount <hostname> <port> <checkpoint-directory>
          |     <output-file>. <hostname> and <port> describe the TCP server that Spark
          |     Streaming would connect to receive data. <checkpoint-directory> directory to
          |     HDFS-compatible file system which checkpoint data <output-file> file to which the
          |     word counts will be appended
          |
          |In local mode, <master> should be 'local[n]' with n > 1
          |Both <checkpoint-directory> and <output-file> must be absolute paths
        """.stripMargin
      )
      System.exit(1)
    }
    val Array(ip, IntParam(port), checkpointDirectory, outputPath) = args
    val ssc = StreamingContext.getOrCreate(checkpointDirectory,
      () => createContext(ip, port, outputPath, checkpointDirectory))
    ssc.start()
    ssc.awaitTermination()
  }
}

如果checkpointDirectory存在,则将从checkpoint数据重新创建上下文。如果该目录不存在(即,第一次运行),则将调用函数functionToCreateContext以创建新上下文并设置DStream。

除了使用getOrCreate之外,还需要确保driver进程在失败时自动重新启动。这只能通过应用程序部署的集群管理器来完成,比如yarn。

请注意,RDD的checkpoint会导致写入可靠存储的开销。这可能导致RDD被checkpoint的那些批次的处理时间增加。因此,需要谨慎设置checkpoint的间隔。 在小批量(例如1秒)下,每批次checkpoint可能会显着降低操作吞吐量。相反,checkpoint太过不频繁会导致血统链增长和任务大小增加,这可能会产生不利影响。 对于需要RDDcheckpoint的有状态转换,默认时间间隔是批处理间隔的倍数,至少为10秒。可以使dstream.checkpoint(checkpointInterval)进行设置。 通常推荐,checkpoint间隔设置为DStream的5-10个滑动间隔(不是仅限于batch,还有windows的滑动间隔).

累加器,广播变量

spark streaming中的广播变量和累加器无法从checkpoint中恢复。如果启用了checkpoint并使用累加器或广播变量,则必须为累加器和广播变量创建lazy实例化的单例实例, 以便在driver重新启动失败后重新实例化它们。

代码语言:javascript
复制
/**
 * Use this singleton to get or register a Broadcast variable.
 */
object WordBlacklist {

  @volatile private var instance: Broadcast[Seq[String]] = null

  def getInstance(sc: SparkContext): Broadcast[Seq[String]] = {
    if (instance == null) {
      synchronized {
        if (instance == null) {
          val wordBlacklist = Seq("a", "b", "c")
          instance = sc.broadcast(wordBlacklist)
        }
      }
    }
    instance
  }
}

/**
 * Use this singleton to get or register an Accumulator.
 */
object DroppedWordsCounter {

  @volatile private var instance: LongAccumulator = null

  def getInstance(sc: SparkContext): LongAccumulator = {
    if (instance == null) {
      synchronized {
        if (instance == null) {
          instance = sc.longAccumulator("WordsInBlacklistCounter")
        }
      }
    }
    instance
  }
}

升级应用程序代码

如果需要使用新的应用程序代码升级正在运行的Spark Streaming应用程序,则有两种可能的机制:

方法1

升级的Spark Streaming应用程序启动并与现有应用程序并行运行。一旦新的程序(接收与旧的数据相同的数据)已经预热并准备好最合适的时间,旧应用可以被下架了。 请注意,这仅可以用于数据源支持同时将数据发送到两个地放(即早期和升级的应用程序)。

方法2

温柔地关闭现有应用程序(StreamingContext.stop或JavaStreamingContext.stop这两个API文档里有温柔停止应用程序的参数详解),以确保在关闭之前完全处理已接收的数据。 然后可以启动升级的应用程序,该应用程序将从早期应用程序停止的同一位置开始处理。请注意,这只能通过支持源端缓冲的输入源(如Kafka和Flume)来完成,因为在前一个应用程序关闭且升级的应用程序尚未启动时需要缓冲数据。 并且无法从早期checkpoint中重新启动升级前代码的信息。checkpoint信息包含序列化的Scala / Java / Python对象,尝试使用新的修改类反序列化这些对象可能会导致错误。 在这种情况下,要么使用不同的checkpoint目录启动升级的应用程序,要么删除以前的checkpoint目录。

方法如下:

代码语言:javascript
复制
def
stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit
 Permalink
Stop the execution of the streams, with option of ensuring all received data has been processed.

stopSparkContext
if true, stops the associated SparkContext. The underlying SparkContext will be stopped regardless of whether this StreamingContext has been started.

stopGracefully
if true, stops gracefully by waiting for the processing of all received data to be completed
当然,一个配置参数也可以设置:

spark.streaming.stopGracefullyOnShutdown  false  If true, Spark shuts down the StreamingContext gracefully on JVM shutdown rather than immediately.

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2018-08-05,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 浪尖聊大数据 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
流计算 Oceanus
流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的企业级实时大数据分析平台,具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档