专栏首页牛肉圆粉不加葱揭开Spark Streaming神秘面纱④ - job 的提交与执行

揭开Spark Streaming神秘面纱④ - job 的提交与执行

前文揭开Spark Streaming神秘面纱③ - 动态生成 job 我们分析了 JobScheduler 是如何动态为每个 batch生成 jobs,本文将说明这些生成的 jobs 是如何被提交的。

在 JobScheduler 生成某个 batch 对应的 Seq[Job] 之后,会将 batch 及 Seq[Job] 封装成一个 JobSet 对象,JobSet 持有某个 batch 内所有的 jobs,并记录各个 job 的运行状态。

之后,调用JobScheduler#submitJobSet(jobSet: JobSet)来提交 jobs,在该函数中,除了一些状态更新,主要任务就是执行

jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))

即,对于 jobSet 中的每一个 job,执行jobExecutor.execute(new JobHandler(job)),要搞懂这行代码干了什么,就必须了解 JobHandler 及 jobExecutor。

JobHandler

JobHandler 继承了 Runnable,为了说明与 job 的关系,其精简后的实现如下:

private class JobHandler(job: Job) extends Runnable with Logging {
  import JobScheduler._

  def run() {
    _eventLoop.post(JobStarted(job))
    PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
      job.run()
    }
    _eventLoop = eventLoop
    if (_eventLoop != null) {
      _eventLoop.post(JobCompleted(job))
    }
  }

}

JobHandler#run 方法主要执行了 job.run(),该方法最终将调用到 揭开Spark Streaming神秘面纱③ - 动态生成 job 中的『生成该 batch 对应的 jobs的Step2 定义的 jobFunc』,jonFunc 将提交对应 RDD DAG 定义的 job。

JobExecutor

知道了 JobHandler 是用来执行 job 的,那么 JobHandler 将在哪里执行 job 呢?答案是 jobExecutor,jobExecutor为 JobScheduler 成员,是一个线程池,在JobScheduler 主构造函数中创建,如下:

private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1)
private val jobExecutor = ThreadUtils.newDaemonFixedThreadPool(numConcurrentJobs, "streaming-job-executor")

JobHandler 将最终在 线程池jobExecutor 的线程中被调用,jobExecutor的线程数可通过spark.streaming.concurrentJobs配置,默认为1。若配置多个线程,就能让多个 job 同时运行,若只有一个线程,那么同一时刻只能有一个 job 运行。

以上,即 jobs 被执行的逻辑。


本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 为什么 Spark Streaming + Kafka 无法保证 exactly once?

    结合文章 揭开Spark Streaming神秘面纱④ - job 的提交与执行我们画出了如下 job 调度执行流程图:

    codingforfun
  • [Spark源码剖析] JobWaiter

    来创建容纳job结果的数据,数组的每个元素对应与之下标相等的partition的计算结果;并将结果处理函数(index, res) => results(ind...

    codingforfun
  • IDEA本地执行&调试Spark Application方法

    对于一些比较简单的application,我们可以在IDEA编码并直接以local的方式在IDEA运行。有两种方法:

    codingforfun
  • 【Python】APScheduler简介

    APScheduler,全称是_Advanced Python Scheduler_,具体的介绍可以看PyPI或者readthedocs的文档介绍,这篇 blo...

    用户5522200
  • 左手用R右手Python系列——多进程/线程数据抓取与网页请求

    这一篇涉及到如何在网页请求环节使用多进程任务处理功能,因为网页请求涉及到两个重要问题:一是多进程的并发操作会面临更大的反爬风险,所以面临更严峻的反爬风险,二是抓...

    数据小磨坊
  • python apscheduler job处理

    scheduler.add_job(job_func, 'interval', minutes=2, id='job_one') scheduler.remo...

    用户5760343
  • job controller 源码分析

    job 在 kubernetes 中主要用来处理离线任务,job 直接管理 pod,可以创建一个或多个 pod 并会确保指定数量的 pod 运行完成。kuber...

    田飞雨
  • Jenkins-API使用(python)

    jenkinsapi、python-jenkins、pbr、multi-key-dict

    泽阳
  • Python数据可视化:浅谈数据挖掘岗

    没找到如何用Python创建PG数据库,所以数据库的创建在Navicat for PostgreSQL中完成。

    数据森麟
  • Python-jenkins模块获取jobs的执行状态操作

    在job执行结束前使用server_1.get_build_console_output(‘2019/get_node_list’,7).split(‘\n’)...

    砸漏

扫码关注云+社区

领取腾讯云代金券