PySpark 的背后原理

Spark主要是由 Scala 语言开发,为了方便和其他系统集成而不引入 scala 相关依赖,部分实现使用 Java 语言开发,例如 External Shuffle Service 等。总体来说,Spark 是由 JVM 语言实现,会运行在 JVM 中。然而,Spark 除了提供 Scala/Java 开发接口外,还提供了 Python、R 等语言的开发接口,为了保证 Spark 核心实现的独立性,Spark 仅在外围做包装,实现对不同语言的开发支持,本文主要介绍 Python Spark 的实现原理,剖析 pyspark 应用程序是如何运行起来的。

Spark 运行时架构

首先我们先回顾下 Spark 的基本运行时架构,如下图所示,其中橙色部分表示为 JVM,Spark 应用程序运行时主要分为 Driver 和 Executor,Driver 负载总体调度及 UI 展示,Executor 负责 Task 运行,Spark 可以部署在多种资源管理系统中,例如 Yarn、Mesos 等,同时 Spark 自身也实现了一种简单的 Standalone(独立部署) 资源管理系统,可以不用借助其他资源管理系统即可运行。更多细节请参考 Spark Scheduler 内部原理剖析

用户的 Spark 应用程序运行在 Driver 上(某种程度上说,用户的程序就是 Spark Driver 程序),经过 Spark 调度封装成一个个 Task,再将这些 Task 信息发给 Executor 执行,Task 信息包括代码逻辑以及数据信息,Executor 不直接运行用户的代码。

PySpark 运行时架构

为了不破坏 Spark 已有的运行时架构,Spark 在外围包装一层 Python API,借助 Py4j实现 Python 和 Java 的交互,进而实现通过 Python 编写 Spark 应用程序,其运行时架构如下图所示。

其中白色部分是新增的 Python 进程,在 Driver 端,通过 Py4j 实现在 Python 中调用 Java 的方法,即将用户写的 PySpark 程序"映射"到 JVM 中,例如,用户在 PySpark 中实例化一个 Python 的 SparkContext 对象,最终会在 JVM 中实例化 Scala 的 SparkContext 对象;在 Executor 端,则不需要借助 Py4j,因为 Executor 端运行的 Task 逻辑是由 Driver 发过来的,那是序列化后的字节码,虽然里面可能包含有用户定义的 Python 函数或 Lambda 表达式,Py4j 并不能实现在 Java 里调用 Python 的方法,为了能在 Executor 端运行用户定义的 Python 函数或 Lambda 表达式,则需要为每个 Task 单独启一个 Python 进程,通过 socket 通信方式将 Python 函数或 Lambda 表达式发给 Python 进程执行。语言层面的交互总体流程如下图所示,实线表示方法调用,虚线表示结果返回。

下面分别详细剖析 PySpark 的 Driver 是如何运行起来的以及 Executor 是如何运行 Task 的。

Driver 端运行原理

当我们通过 spark-submmit 提交 pyspark 程序,首先会上传 python 脚本及依赖,并申请 Driver 资源,当申请到 Driver 资源后,会通过 PythonRunner(其中有 main 方法) 拉起 JVM,如下图所示。

PythonRunner 入口 main 函数里主要做两件事:

  • 开启 Py4j GatewayServer
  • 通过 Java Process 方式运行用户上传的 Python 脚本

用户 Python 脚本起来后,首先会实例化 Python 版的 SparkContext 对象,在实例化过程中会做两件事:

  • 实例化 Py4j GatewayClient,连接 JVM 中的 Py4j GatewayServer,后续在 Python 中调用 Java 的方法都是借助这个 Py4j Gateway
  • 通过 Py4j Gateway 在 JVM 中实例化 SparkContext 对象

经过上面两步后,SparkContext 对象初始化完毕,Driver 已经起来了,开始申请 Executor 资源,同时开始调度任务。用户 Python 脚本中定义的一系列处理逻辑最终遇到 action 方法后会触发 Job 的提交,提交 Job 时是直接通过 Py4j 调用 Java 的 PythonRDD.runJob 方法完成,映射到 JVM 中,会转给 sparkContext.runJob 方法,Job 运行完成后,JVM 中会开启一个本地 Socket 等待 Python 进程拉取,对应地,Python 进程在调用 PythonRDD.runJob 后就会通过 Socket 去拉取结果。

把前面运行时架构图中 Driver 部分单独拉出来,如下图所示,通过 PythonRunner 入口 main 函数拉起 JVM 和 Python 进程,JVM 进程对应下图橙色部分,Python 进程对应下图白色部分。Python 进程通过 Py4j 调用 Java 方法提交 Job,Job 运行结果通过本地 Socket 被拉取到 Python 进程。还有一点是,对于大数据量,例如广播变量等,Python 进程和 JVM 进程是通过本地文件系统来交互,以减少进程间的数据传输。

Executor 端运行原理

为了方便阐述,以 Spark On Yarn 为例,当 Driver 申请到 Executor 资源时,会通过 CoarseGrainedExecutorBackend(其中有 main 方法) 拉起 JVM,启动一些必要的服务后等待 Driver 的 Task 下发,在还没有 Task 下发过来时,Executor 端是没有 Python 进程的。当收到 Driver 下发过来的 Task 后,Executor 的内部运行过程如下图所示。

Executor 端收到 Task 后,会通过 launchTask 运行 Task,最后会调用到 PythonRDD 的 compute 方法,来处理一个分区的数据,PythonRDD 的 compute 方法的计算流程大致分三步走:

  • 如果不存在 pyspark.deamon 后台 Python 进程,那么通过 Java Process 的方式启动 pyspark.deamon 后台进程,注意每个 Executor 上只会有一个 pyspark.deamon 后台进程,否则,直接通过 Socket 连接 pyspark.deamon,请求开启一个 pyspark.worker 进程运行用户定义的 Python 函数或 Lambda 表达式。pyspark.deamon 是一个典型的多进程服务器,来一个 Socket 请求,fork 一个 pyspark.worker 进程处理,一个 Executor 上同时运行多少个 Task,就会有多少个对应的 pyspark.worker 进程。
  • 紧接着会单独开一个线程,给 pyspark.worker 进程喂数据,pyspark.worker 则会调用用户定义的 Python 函数或 Lambda 表达式处理计算。
  • 在一边喂数据的过程中,另一边则通过 Socket 去拉取 pyspark.worker 的计算结果。

把前面运行时架构图中 Executor 部分单独拉出来,如下图所示,橙色部分为 JVM 进程,白色部分为 Python 进程,每个 Executor 上有一个公共的 pyspark.deamon 进程,负责接收 Task 请求,并 fork pyspark.worker 进程单独处理每个 Task,实际数据处理过程中,pyspark.worker 进程和 JVM Task 会较频繁地进行本地 Socket 数据通信。

总结

总体上来说,PySpark 是借助 Py4j 实现 Python 调用 Java,来驱动 Spark 应用程序,本质上主要还是 JVM runtime,Java 到 Python 的结果返回是通过本地 Socket 完成。虽然这种架构保证了 Spark 核心代码的独立性,但是在大数据场景下,JVM 和 Python 进程间频繁的数据通信导致其性能损耗较多,恶劣时还可能会直接卡死,所以建议对于大规模机器学习或者 Streaming 应用场景还是慎用 PySpark,尽量使用原生的 Scala/Java 编写应用程序,对于中小规模数据量下的简单离线任务,可以使用 PySpark 快速部署提交。

原创声明,本文系作者授权云+社区-专栏发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

编辑于

我来说两句

6 条评论
登录 后参与评论

相关文章

来自专栏Spark学习技巧

hadoop系列之基础系列

一、Hadoop基础 1、分布式概念 通过爬虫-->爬到网页存储-->查找关键字 一台机器存储是有限的 Google采用多台机器,...

2527
来自专栏Kubernetes

Linux Kernel Cgroups源码浅析

本文是我几个月前在研究linux kernel Cgroups时整理的。文中大部分的理论知识是从网上各种贴子solo的,源码分析部分,我是基于kernel 4....

4937
来自专栏岑玉海

Spark源码系列(四)图解作业生命周期

这一章我们探索了Spark作业的运行过程,但是没把整个过程描绘出来,好,跟着我走吧,let you know! ? 我们先回顾一下这个图,Driver Prog...

2664
来自专栏我是攻城师

spark总体概况

3146
来自专栏同步博客

PHP操作Memcached的方法汇总

memcached非关系型数据库安装、php中的memcache的扩展安装、以及php中的memcached的扩展安装可以参考:

522
来自专栏岑玉海

oozie 重新提交作业

  在oozie的运行过程当中可能会出现错误,比如数据库连接不上,或者作业执行报错导致流程进入suspend或者killed状态,这个时候我们就要分析了,如果确...

2789
来自专栏扎心了老铁

springboot高并发redis细粒度加锁(key粒度加锁)

本文探讨在web开发中如何解决并发访问带来的数据同步问题。 1、需求: 通过REST接口请求并发访问redis,例如:将key=fusor:${order_id...

5054
来自专栏Albert陈凯

Spark中Task,Partition,RDD、节点数、Executor数、core数目的关系

Spark中关于并发度涉及的几个概念File,Block,Split,Task,Partition,RDD以及节点数、Executor数、core数目的关系。 ...

3126
来自专栏IT技术精选文摘

Kafka剖析系列之高可用(下)

本文主要阐述了HA相关各种场景,如Broker failover、Controller failover、Topic创建/删除、Broker启动、Followe...

1966
来自专栏Jed的技术阶梯

zookeeper编程02-服务器上下线动态感知

NameNode判断DataNode是否下线的时间太长了,利用zookeeper实现服务器上下线动态感知

822

扫码关注云+社区