本文为 Spark 2.0 源码分析笔记,由于源码只包含 standalone 模式下完整的 executor 相关代码,所以本文主要针对 standalone 模式下的 executor 模块,文中内容若不特意说明均为...,它在接收到 driver 回应的 RegisteredExecutor 消息后,会创建一个 Executor。...至此,Executor 创建完毕(Executor 在 Mesos、YARN、Standalone 模式下都是相同的,不同的只是资源的分配方式) driver 端调用 CoarseGrainedSchedulerBackend.DriverEndpoint...进程退出后,向 worker 发送 ExecutorStateChanged(Executor 状态变更为 EXITED) 消息通知其 Executor 退出 其中,在创建、启动或等待 CoarseGrainedExecutorBackend...方法来结束 CoarseGrainedExecutorBackend 进程 至此,我们完成了对 executor 启动过程的分析。
今天在给 Executor 配置环境变量的时候,以为 Executor 环境变量跟 Driver 一样是通过 spark.kubernetes.driverEnv.XXX=YYY,最后发现其实是 spark.executorEnv...,要注意 Env 的大小写,都是细节… 可以看到下图,其实 Spark on Kubernetes 的文档,并没有写,所以 Executor 环境变量就是普通的参数即可。
,轮询可用的work 分配给 Executor 所需的 CPU 核数,即你指定的--executor-cores , 以及内存,即你指定的--executor-memory, 如果 spark.deploy.spreadOut...才会继续去寻找下一个可用work 重复 1 到 3.直到满足该任务需要的资源,或者集群资源消耗完。 5.spark 1.4.2 资源分配的一个bug?...在某一集群中有4 个 Worker 节点,每个节点拥有16个 CPU 核数, 其中设置了 spark.cores.max = 48 和 spark.executor.cores = 16, 如果...48, 但却没有满足executor启动的最小cores 16, 所以将没有 Executor 能够启动,参见 SPARK -8881问题说明。...通过以上几个问题,大概也能了解到 executor 在worker端启动的整个流程了, 本文主要是从源码角度挖掘的信息,如有不对的地方,麻烦指出,谢谢!
本文为 Spark 2.0 源码分析笔记,由于源码只包含 standalone 模式下完整的 executor 相关代码,所以本文主要针对 standalone 模式下的 executor 模块,文中内容若不特意说明均为...standalone 模式内容 在 executor 模块中,最重要的几个类(或接口、trait)是: AppClient:在 Standalone 模式下的实现是 StandaloneAppClient...RegisteredApplication:application 已成功注册 ApplicationRemoved:application 已移除 ExecutorAdded:有新增加的 Executor...为执行 Application 的 tasks 申请资源 KillExecutors:StandaloneAppClient 通过 ClientEndpoint 向 master 发送消息来 kill executor...:接收到 executor 心跳信息 def executorLost(executorId: String, reason: ExecutorLossReason):处理 executor lost
本文为 Spark 2.0 源码分析笔记,由于源码只包含 standalone 模式下完整的 executor 相关代码,所以本文主要针对 standalone 模式下的 executor 模块,文中内容若不特意说明均为...standalone 模式内容 前一篇文章简要介绍了 Spark 执行模块中几个主要的类以及 AppClient 是如何被创建的,这篇文章将详细的介绍 AppClient 向 Master 注册...其中 appDescription: ApplicationDescription 成员描述了要注册并启动一个怎么样的 Application(主要包含属性及资源信息),其定义如下: private[spark...在这个基本目录下,Spark为每个 Application 创建一个子目录。各个应用程序记录日志到相应的目录。...")) { override def toString: String = "ApplicationDescription(" + name + ")" } private[spark
内部创建了一个 ExecutorRunner ,把启动 Executor 这件事交给它来处理 点进去 start() 方法可以看到启动了一个线程来启动 Executor: 主要逻辑在 fetchAndRunExecutor...指粗粒度的 Executor 的后台进程,在服务器上的进程名字就是这个,而不是 Executor。...run 方法中,向 Driver 发送了一个消息,来获取 spark 的配置 然后用这个配置为 Executor 创建了SparkEnv,并且启动了 CoarseGrainedExecutorBackend...消息,表示注册 Executor 成功。...三、Executor 启动后,通知各个组件 来到 ExecutorRunner 类中,Executor 启动了之后,给 Worker 发送了 消息 看下 Worker 的处理,Worker 把这个消息发送给
Spark Operator 中的 executor 和 driver 是通过 driver 的 service 来通信的,如果 Kubernetes 集群的 dns 组件有问题,那么 executor...driver/executor 容器是否有设置 http_proxy 等环节变量 Kubernetes 的 dns 组件是否正常,是否需要扩容
Spark为了执行任务,会将RDD的操作分解为多个task,并且这些task是由executor执行的。...闭包函数从产生到在executor执行经历了什么? 首先,对RDD相关的操作需要传入闭包函数,如果这个函数需要访问外部定义的变量,就需要满足一定条件(比如必须可被序列化),否则会抛出运行时异常。...闭包函数在最终传入到executor执行,需要经历以下步骤: 1.driver通过反射,运行时找到闭包访问的变量,并封装成一个对象,然后序列化该对象 2.将序列化后的对象通过网络传输到worker节点...executor是真正执行task地方,而task执行离不开具体的数据,这些task运行的结果可以是shuffle中间结果,也可以持久化到外部存储系统。一般都是将结果、状态等汇集到driver。...但是,目前executor之间不能互相通信,只能借助第三方来实现数据的共享或者通信。 编写的Spark程序代码,运行在driver端还是executor端呢?
有人说spark的代码不优雅,这个浪尖就忍不了了。实际上,说spark代码不优雅的主要是对scala不熟悉,spark代码我觉得还是很赞的,最值得阅读的大数据框架之一。...今天这篇文章不是为了争辩Spark 代码优雅与否,主要是讲一下理解了spark源码之后我们能使用的一些小技巧吧。...spark 使用的时候,总有些需求比较另类吧,比如有球友问过这样一个需求: 浪尖,我想要在driver端获取executor执行task返回的结果,比如task是个规则引擎,我想知道每条规则命中了几条数据...这样就可以在executor端将结果累加然后在driver端使用,不过具体实现也是很麻烦。大家也可以自己琢磨一下下~ 那么,浪尖就给大家介绍一个比较常用也比较骚的操作吧。...浪尖在这里直接上案例了: import org.apache.spark.
spark版本是1.3+ Woker启动Executor过程并向Driver注册时序图: 1.launchExecutor Master发送消息让Worker启动Executor 2.Worker...new() Master 发送给Worker的消息,让Worker启动Execitor,LaunchExecutor是一个Case Class,里面封装以后要启动的Executor的信息 new...ExecutorRunner 创建ExcutorRunner,将参数都放到其中,然后在通过他启动Executor 注册ExecutorID -> Executor放到一个map中,对应关系 executors...) makeOffers() 16.CoarseGrainedExecutorBackend new() 创建一个Executor实例,用来执行业务逻辑 executor = new Executor...= Utils.newDaemonCachedThreadPool("Executor task launch worker") 18.Executor Executor向DriverActor
如果配置项spark.dynamicAllocation.enabled为true,并且满足以下两条件之一:配置项spark.dynamicAllocation.testing为true,或者当前不是本地模式...", s"${Integer.MAX_VALUE}s") private val tasksPerExecutor = conf.getInt("spark.executor.cores"...、s.d.initialExecutors、spark.executor.instances三个参数的较大值。...tasksPerExecutor:每个Executor执行的Task数的近似值,由spark.executor.cores与spark.task.cpus两个参数共同决定。...localityAwareTasks, hostToLocalTaskCount) } 可见,ExecutorAllocationManager启动时,会先将ExecutorAllocationListener注册到LiveListenerBus
欢迎阅读美图数据技术团队的「Spark,从入门到精通」系列文章,本系列文章将由浅入深为大家介绍 Spark,从框架入门到底层架构的实现,相信总有一种姿势适合你,欢迎大家持续关注:) 什么是 Spark?...Spark 提供两种方式进行故障恢复:通过数据的血缘关系再执行一遍前面的处理;Checkpoint 将数据集存储到持久存储中。...Spark 应用程序的入口负责调度各个运算资源,协调各个 Worker Node上 的 Executor。...根据用户输入的参数会产生若干个 workr,workr 节点运行若干个 executor,一个 executor 是一个进程,运行各自的 task,每个 task 执行相同的代码段处理不同的数据。...图 7 描述了一个 Spark 程序,从 HDFS 上读取数据产生 RDD-A 然后 flatmap 操作到 RDD-B,读取另一部分数据的到RDD-C,然后 map 操作的到 RDD-D,RDD-D
欢迎阅读美图数据技术团队的「Spark,从入门到精通」系列文章,本系列文章将由浅入深为大家介绍 Spark,从框架入门到底层架构的实现,相信总有一种姿势适合你,欢迎大家持续关注:) 往期直通车:Hello...Worker 是 Spark 的工作节点,向 Master 汇报自身的资源、Executeor 执行状态的改变,并接受 Master 的命令启动 Executor 或 Driver。...Executor 是 Spark 的工作进程,由 Worker 监管,负责具体任务的执行。...之后 App Master 申请 Container 并启动,Spark Driver 在 Container 上启动 Spark Executor,并调度 Spark Task 在 Spark Executor...App Master 申请完 Container 之后同样也是由 Spark Driver 去启动 Spark Executor,执行任务。 那为什么使用 Yarn 作为 Spark 的资源管理呢?
欢迎阅读美图数据技术团队的「Spark,从入门到精通」系列文章,本系列文章将由浅入深为大家介绍 Spark,从框架入门到底层架构的实现,相信总有一种姿势适合你,欢迎大家持续关注:) 往期直通车:Hello...由 Receiver 的总指挥 ReceiverTracker 分发多个 job,到多个 executor 上分别启动 ReceiverSupervisor 实例; 每个 ReceiverSupervisor...Spark Streaming 对源头块数据的保障,分为 4 个层次,全面、相互补充,又可根据不同场景灵活设置: 热备:热备是指在存储块数据时,将其存储到本 executor、并同时 replicate...到另外一个 executor 上去。...*1.5.2 update 这已经是默认了 冷备:冷备是每次存储块数据前,先把块数据作为 log 写出到 WriteAheadLog 里,再存储到本 executor。
欢迎阅读美图数据技术团队的「Spark,从入门到精通」系列文章,本系列文章将由浅入深为大家介绍 Spark,从框架入门到底层架构的实现,相信总有一种姿势适合你,欢迎大家持续关注:) / 什么是 Spark...Spark 提供两种方式进行故障恢复:通过数据的血缘关系再执行一遍前面的处理;Checkpoint 将数据集存储到持久存储中。...Spark 应用程序的入口负责调度各个运算资源,协调各个 Worker Node上 的 Executor。...根据用户输入的参数会产生若干个 workr,workr 节点运行若干个 executor,一个 executor 是一个进程,运行各自的 task,每个 task 执行相同的代码段处理不同的数据。...图 7 图 7 描述了一个 Spark 程序,从 HDFS 上读取数据产生 RDD-A 然后 flatmap 操作到 RDD-B,读取另一部分数据的到RDD-C,然后 map 操作的到 RDD-D,RDD-D
欢迎阅读美图数据技术团队的「Spark,从入门到精通」系列文章,本系列文章将由浅入深为大家介绍 Spark,从框架入门到底层架构的实现,相信总有一种姿势适合你,欢迎大家持续关注:) 往期直通车:Hello...的输入参数向 NameNode 请求包含这些文件数据块的 DataNode 节点列表; 4.JobTracker 确定 Job 的执行计划:确认 Map、Reduce 的 Task 数量,并分配 Task 到离数据块最近的节点上执行...Executor 是 Spark 的工作进程,由 Worker 监管,负责具体任务的执行。...之后 App Master 申请 Container 并启动,Spark Driver 在 Container 上启动 Spark Executor,并调度 Spark Task 在 Spark Executor...App Master 申请完 Container 之后同样也是由 Spark Driver 去启动 Spark Executor,执行任务。 那为什么使用 Yarn 作为 Spark 的资源管理呢?
欢迎阅读美图数据技术团队的「Spark,从入门到精通」系列文章,本系列文章将由浅入深为大家介绍 Spark,从框架入门到底层架构的实现,相信总有一种姿势适合你。...到 spark2.0 以后,DataFrame 变成类型为 Row 的 Dataset,即为: type DataFrame = Dataset[Row] ?...所以,很多移植 spark1.6 及之前的代码到 spark2+的都会报错误,找不到 dataframe 类。...安装部署 /1 开启 hive 的 metastore bin/hive --service metastore /2 将配置文件复制到spark/conf/目录下 /3 thriftserver sbin...LocalRelation(output, data, _) => LocalTableScanExec(output, data):: Nil case _ => Nil } } /3 注册到
Spark中关于并发度涉及的几个概念File,Block,Split,Task,Partition,RDD以及节点数、Executor数、core数目的关系。...spark-learning 输入可能以多个文件的形式存储在HDFS上, 每个File都包含了很多块,称为Block。...当Spark读取这些文件作为输入时, 会根据具体数据格式对应的InputFormat进行解析, 一般是将若干个Block合并成一个输入分片,称为InputSplit, 注意InputSplit不能跨越文件...随后这些具体的Task每个都会被分配到集群上的某个节点的某个Executor去执行。 每个节点可以起一个或多个Executor。...而 Task被执行的并发度 = Executor数目 * 每个Executor核数。
领取专属 10元无门槛券
手把手带您无忧上云