[spark streaming] 动态生成 Job 并提交执行

前言

Spark Streaming Job的生成是通过JobGenerator每隔 batchDuration 长时间动态生成的,每个batch 对应提交一个JobSet,因为针对一个batch可能有多个输出操作。

概述流程:

  • 定时器定时向 eventLoop 发送生成job的请求
  • 通过receiverTracker 为当前batch分配block
  • 为当前batch生成对应的 Jobs
  • 将Jobs封装成JobSet 提交执行

入口

在 JobGenerator 初始化的时候就创建了一个定时器:

private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
    longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")

每隔 batchDuration 就会向 eventLoop 发送 GenerateJobs(new Time(longTime))消息,eventLoop的事件处理方法中会调用generateJobs(time)方法:

      case GenerateJobs(time) => generateJobs(time)
private def generateJobs(time: Time) {
    // Checkpoint all RDDs marked for checkpointing to ensure their lineages are
    // truncated periodically. Otherwise, we may run into stack overflows (SPARK-6847).
    ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true")
    Try {
      jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
      graph.generateJobs(time) // generate jobs using allocated block
    } match {
      case Success(jobs) =>
        val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
        jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
      case Failure(e) =>
        jobScheduler.reportError("Error generating jobs for time " + time, e)
        PythonDStream.stopStreamingContextIfPythonProcessIsDead(e)
    }
    eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
  }

为当前batchTime分配Block

首先调用receiverTracker.allocateBlocksToBatch(time)方法为当前batchTime分配对应的Block,最终会调用receiverTracker的Block管理者receivedBlockTrackerallocateBlocksToBatch方法:

def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {
    if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) {
      val streamIdToBlocks = streamIds.map { streamId =>
          (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true))
      }.toMap
      val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)
      if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {
        timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
        lastAllocatedBatchTime = batchTime
      } else {
        logInfo(s"Possibly processed batch $batchTime needs to be processed again in WAL recovery")
      }
    } else {
      logInfo(s"Possibly processed batch $batchTime needs to be processed again in WAL recovery")
    }
  }
private def getReceivedBlockQueue(streamId: Int): ReceivedBlockQueue = {
    streamIdToUnallocatedBlockQueues.getOrElseUpdate(streamId, new ReceivedBlockQueue)
  }

可以看到是从streamIdToUnallocatedBlockQueues中获取到所有streamId对应的未分配的blocks,该队列的信息是supervisor 存储好Block后向receiverTracker上报的Block信息,详情可见 ReceiverTracker 数据产生与存储

获取到所有streamId对应的未分配的blockInfos后,将其放入了timeToAllocatedBlocks:Map[Time, AllocatedBlocks]中,后面生成RDD的时候会用到。

为当前batchTime生成Jobs

调用DStreamGraphgenerateJobs方法为当前batchTime生成job:

 def generateJobs(time: Time): Seq[Job] = {
    logDebug("Generating jobs for time " + time)
    val jobs = this.synchronized {
      outputStreams.flatMap { outputStream =>
        val jobOption = outputStream.generateJob(time)
        jobOption.foreach(_.setCallSite(outputStream.creationSite))
        jobOption
      }
    }
    logDebug("Generated " + jobs.length + " jobs for time " + time)
    jobs
  }

一个outputStream就对应一个job,遍历所有的outputStreams,为其生成job:

# ForEachDStream
override def generateJob(time: Time): Option[Job] = {
    parent.getOrCompute(time) match {
      case Some(rdd) =>
        val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) {
          foreachFunc(rdd, time)
        }
        Some(new Job(time, jobFunc))
      case None => None
    }
  }

先获取到time对应的RDD,然后将其作为参数再调用foreachFunc方法,foreachFunc方法是通过构造器传过来的,我们来看看print()输出的情况:

def print(num: Int): Unit = ssc.withScope {
    def foreachFunc: (RDD[T], Time) => Unit = {
      (rdd: RDD[T], time: Time) => {
        val firstNum = rdd.take(num + 1)
        // scalastyle:off println
        println("-------------------------------------------")
        println(s"Time: $time")
        println("-------------------------------------------")
        firstNum.take(num).foreach(println)
        if (firstNum.length > num) println("...")
        println()
        // scalastyle:on println
      }
    }
    foreachRDD(context.sparkContext.clean(foreachFunc), displayInnerRDDOps = false)
  }

这里的构造的foreachFunc方法就是最终和rdd一起提交job的执行方法,也即对rdd调用take()后并打印,真正触发action操作的是在这个func函数里,现在再来看看是怎么拿到rdd的,每个DStream都有一个generatedRDDs:Map[Time, RDD[T]]变量,来保存time对应的RDD,若获取不到则会通过compute()方法来计算,对于需要在executor上启动Receiver来接收数据的ReceiverInputDStream来说:

 override def compute(validTime: Time): Option[RDD[T]] = {
    val blockRDD = {

      if (validTime < graph.startTime) {
        // If this is called for any time before the start time of the context,
        // then this returns an empty RDD. This may happen when recovering from a
        // driver failure without any write ahead log to recover pre-failure data.
        new BlockRDD[T](ssc.sc, Array.empty)
      } else {
        // Otherwise, ask the tracker for all the blocks that have been allocated to this stream
        // for this batch
        val receiverTracker = ssc.scheduler.receiverTracker
        val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty)

        // Register the input blocks information into InputInfoTracker
        val inputInfo = StreamInputInfo(id, blockInfos.flatMap(_.numRecords).sum)
        ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)

        // Create the BlockRDD
        createBlockRDD(validTime, blockInfos)
      }
    }
    Some(blockRDD)
  }

会通过receiverTracker来获取该batch对应的blocks,前面已经分析过为所有streamId分配了对应的未分配的block,并且放在了timeToAllocatedBlocks:Map[Time, AllocatedBlocks]中,这里底层就是从这个timeToAllocatedBlocks获取到的blocksInfo,然后调用了createBlockRDD(validTime, blockInfos)通过blockId创建了RDD。

最后,将通过此RDD和foreachFun构建jobFunc,并创建Job返回。

封装jobs成JobSet并提交执行

每个outputStream对应一个Job,最终就会生成一个jobs,为这个jobs创建JobSet,并通过jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))来提交这个JobSet:

jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))

然后通过jobExecutor来执行,jobExecutor是一个线程池,并行度默认为1,可通过spark.streaming.concurrentJobs配置,即同时可执行几个批次的数据。

处理类JobHandler中调用的是Job.run(),执行的是前面构建的 jobFunc 方法。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏LhWorld哥陪你聊算法

Hive篇---Hive使用优化

本节主要描述Hive的优化使用,Hive的优化着重强调一个 把Hive SQL 当做Mapreduce程序去优化 二.主要优化点

5811
来自专栏伦少的博客

SparkStreaming+Kafka 实现统计基于缓存的实时uv

2273
来自专栏24k

Spark wordcount开发并提交到单机(伪分布式)运行

1654
来自专栏Java帮帮-微信公众号-技术文章全总结

数据库连接池、dbutil_知识点全掌握

数据库连接池、dbutil ? 数据库连接池 1 数据库连接池的概念 用池来管理Connection,这可以重复使用Connection。有了池,所以我们就不用...

2965
来自专栏Java技术栈

Spring Boot 2.x 启动全过程源码分析(全)

上篇《Spring Boot 2.x 启动全过程源码分析(一)入口类剖析》我们分析了 Spring Boot 入口类 SpringApplication 的源码...

1.3K6
来自专栏xingoo, 一个梦想做发明家的程序员

【Spring实战】—— 3 使用facotry-method创建单例Bean总结

如果有这样的需求:   1 不想再bean.xml加载的时候实例化bean,而是想把加载bean.xml与实例化对象分离。   2 实现单例的bean ...

1985
来自专栏Spark学习技巧

spark调优系列之内存和GC调优

本文基于spark1.6讲解。 一,基本概述 调优内存的使用主要有三个方面的考虑:对象的内存占用量(你可能希望整个数据集都适合内存),访问这些数据的开销,垃圾...

7679
来自专栏CDA数据分析师

专栏 | Learning Spark (Python版) 学习笔记(二)----键值对、数据读取与保存、共享特性

本来应该上周更新的,结果碰上五一,懒癌发作,就推迟了 = =。以后还是要按时完成任务。废话不多说,第四章-第六章主要讲了三个内容:键值对、数据读取与保存与Spa...

2149
来自专栏大数据

Zzreal的大数据笔记-SparkDay04

Spark SQL SparkSQL的前身是Shark,它抛弃原有Shark的代码,汲取了Shark的一些优点,如内存列存储(In-Memory Columna...

1999
来自专栏个人分享

SparkConf加载与SparkContext创建(源码阅读一)

即日起开始spark源码阅读之旅,这个过程是相当痛苦的,也许有大量的看不懂,但是每天一个方法,一点点看,相信总归会有极大地提高的。那么下面开始:

1551

扫码关注云+社区

领取腾讯云代金券