PySpark 的背后原理

Spark主要是由 Scala 语言开发,为了方便和其他系统集成而不引入 scala 相关依赖,部分实现使用 Java 语言开发,例如 External Shuffle Service 等。总体来说,Spark 是由 JVM 语言实现,会运行在 JVM 中。然而,Spark 除了提供 Scala/Java 开发接口外,还提供了 Python、R 等语言的开发接口,为了保证 Spark 核心实现的独立性,Spark 仅在外围做包装,实现对不同语言的开发支持,本文主要介绍 Python Spark 的实现原理,剖析 pyspark 应用程序是如何运行起来的。

Spark 运行时架构

首先我们先回顾下 Spark 的基本运行时架构,如下图所示,其中橙色部分表示为 JVM,Spark 应用程序运行时主要分为 Driver 和 Executor,Driver 负载总体调度及 UI 展示,Executor 负责 Task 运行,Spark 可以部署在多种资源管理系统中,例如 Yarn、Mesos 等,同时 Spark 自身也实现了一种简单的 Standalone(独立部署) 资源管理系统,可以不用借助其他资源管理系统即可运行。更多细节请参考 Spark Scheduler 内部原理剖析

用户的 Spark 应用程序运行在 Driver 上(某种程度上说,用户的程序就是 Spark Driver 程序),经过 Spark 调度封装成一个个 Task,再将这些 Task 信息发给 Executor 执行,Task 信息包括代码逻辑以及数据信息,Executor 不直接运行用户的代码。

PySpark 运行时架构

为了不破坏 Spark 已有的运行时架构,Spark 在外围包装一层 Python API,借助 Py4j实现 Python 和 Java 的交互,进而实现通过 Python 编写 Spark 应用程序,其运行时架构如下图所示。

其中白色部分是新增的 Python 进程,在 Driver 端,通过 Py4j 实现在 Python 中调用 Java 的方法,即将用户写的 PySpark 程序"映射"到 JVM 中,例如,用户在 PySpark 中实例化一个 Python 的 SparkContext 对象,最终会在 JVM 中实例化 Scala 的 SparkContext 对象;在 Executor 端,则不需要借助 Py4j,因为 Executor 端运行的 Task 逻辑是由 Driver 发过来的,那是序列化后的字节码,虽然里面可能包含有用户定义的 Python 函数或 Lambda 表达式,Py4j 并不能实现在 Java 里调用 Python 的方法,为了能在 Executor 端运行用户定义的 Python 函数或 Lambda 表达式,则需要为每个 Task 单独启一个 Python 进程,通过 socket 通信方式将 Python 函数或 Lambda 表达式发给 Python 进程执行。语言层面的交互总体流程如下图所示,实线表示方法调用,虚线表示结果返回。

下面分别详细剖析 PySpark 的 Driver 是如何运行起来的以及 Executor 是如何运行 Task 的。

Driver 端运行原理

当我们通过 spark-submmit 提交 pyspark 程序,首先会上传 python 脚本及依赖,并申请 Driver 资源,当申请到 Driver 资源后,会通过 PythonRunner(其中有 main 方法) 拉起 JVM,如下图所示。

PythonRunner 入口 main 函数里主要做两件事:

  • 开启 Py4j GatewayServer
  • 通过 Java Process 方式运行用户上传的 Python 脚本

用户 Python 脚本起来后,首先会实例化 Python 版的 SparkContext 对象,在实例化过程中会做两件事:

  • 实例化 Py4j GatewayClient,连接 JVM 中的 Py4j GatewayServer,后续在 Python 中调用 Java 的方法都是借助这个 Py4j Gateway
  • 通过 Py4j Gateway 在 JVM 中实例化 SparkContext 对象

经过上面两步后,SparkContext 对象初始化完毕,Driver 已经起来了,开始申请 Executor 资源,同时开始调度任务。用户 Python 脚本中定义的一系列处理逻辑最终遇到 action 方法后会触发 Job 的提交,提交 Job 时是直接通过 Py4j 调用 Java 的 PythonRDD.runJob 方法完成,映射到 JVM 中,会转给 sparkContext.runJob 方法,Job 运行完成后,JVM 中会开启一个本地 Socket 等待 Python 进程拉取,对应地,Python 进程在调用 PythonRDD.runJob 后就会通过 Socket 去拉取结果。

把前面运行时架构图中 Driver 部分单独拉出来,如下图所示,通过 PythonRunner 入口 main 函数拉起 JVM 和 Python 进程,JVM 进程对应下图橙色部分,Python 进程对应下图白色部分。Python 进程通过 Py4j 调用 Java 方法提交 Job,Job 运行结果通过本地 Socket 被拉取到 Python 进程。还有一点是,对于大数据量,例如广播变量等,Python 进程和 JVM 进程是通过本地文件系统来交互,以减少进程间的数据传输。

Executor 端运行原理

为了方便阐述,以 Spark On Yarn 为例,当 Driver 申请到 Executor 资源时,会通过 CoarseGrainedExecutorBackend(其中有 main 方法) 拉起 JVM,启动一些必要的服务后等待 Driver 的 Task 下发,在还没有 Task 下发过来时,Executor 端是没有 Python 进程的。当收到 Driver 下发过来的 Task 后,Executor 的内部运行过程如下图所示。

Executor 端收到 Task 后,会通过 launchTask 运行 Task,最后会调用到 PythonRDD 的 compute 方法,来处理一个分区的数据,PythonRDD 的 compute 方法的计算流程大致分三步走:

  • 如果不存在 pyspark.deamon 后台 Python 进程,那么通过 Java Process 的方式启动 pyspark.deamon 后台进程,注意每个 Executor 上只会有一个 pyspark.deamon 后台进程,否则,直接通过 Socket 连接 pyspark.deamon,请求开启一个 pyspark.worker 进程运行用户定义的 Python 函数或 Lambda 表达式。pyspark.deamon 是一个典型的多进程服务器,来一个 Socket 请求,fork 一个 pyspark.worker 进程处理,一个 Executor 上同时运行多少个 Task,就会有多少个对应的 pyspark.worker 进程。
  • 紧接着会单独开一个线程,给 pyspark.worker 进程喂数据,pyspark.worker 则会调用用户定义的 Python 函数或 Lambda 表达式处理计算。
  • 在一边喂数据的过程中,另一边则通过 Socket 去拉取 pyspark.worker 的计算结果。

把前面运行时架构图中 Executor 部分单独拉出来,如下图所示,橙色部分为 JVM 进程,白色部分为 Python 进程,每个 Executor 上有一个公共的 pyspark.deamon 进程,负责接收 Task 请求,并 fork pyspark.worker 进程单独处理每个 Task,实际数据处理过程中,pyspark.worker 进程和 JVM Task 会较频繁地进行本地 Socket 数据通信。

总结

总体上来说,PySpark 是借助 Py4j 实现 Python 调用 Java,来驱动 Spark 应用程序,本质上主要还是 JVM runtime,Java 到 Python 的结果返回是通过本地 Socket 完成。虽然这种架构保证了 Spark 核心代码的独立性,但是在大数据场景下,JVM 和 Python 进程间频繁的数据通信导致其性能损耗较多,恶劣时还可能会直接卡死,所以建议对于大规模机器学习或者 Streaming 应用场景还是慎用 PySpark,尽量使用原生的 Scala/Java 编写应用程序,对于中小规模数据量下的简单离线任务,可以使用 PySpark 快速部署提交。

原创声明,本文系作者授权云+社区-专栏发表,未经许可,不得转载。

如有侵权,请联系 zhuanlan_guanli@qq.com 删除。

编辑于

扫描关注云+社区

涂小刚的专栏

7 篇文章18 人订阅

我来说两句

5 条评论
登录 后参与评论

相关文章

来自专栏Java专栏

Java 接口——面向对象的精髓

接口有何用?面试宝典上背下来的总结,你真的明白吗? 接口&工厂方法 其实很简单>。<~ 什么是接口 先看看生活中的接口,比如USB接口。

70
来自专栏Java专栏

Java抽象类与oop三大特征

在了解抽象类之前,先来了解一下抽象方法。抽象方法是一种特殊的方法:它 只有声明,而没有具体的实现 。抽象方法的声明格式为:

30
来自专栏Java专栏

Java 8 Streams 中的数据库 CRUD 操作

接触一个新工具的时候,刚开始要克服的最大障碍就是如何让你自己先尝试做出一个小东西来。现在你也许对 Java 8 中新的 Stream API 的运作方式在理解上...

60
来自专栏Java架构沉思录

十分钟搞懂负载均衡

我们知道负载均衡层的作用是“将来源于外部的处理压力通过某种规律/手段分摊到内部各个处理节点上”,那么不同的业务场景需要的负载均衡方式又是不一样的,架构师还要考虑...

80
来自专栏Java专栏

Java程序员必备的6款最佳开发工具

工欲善其事,必先利其器。每一个Java程序员都有其惯用的工具组件。对于Java程序员,各种有用的软件和工具泛滥成灾。初级开发人员要么找不到合适的工具,要么在寻找...

100
来自专栏Java专栏

如何学好java语言?

IT行业的朋友,应为本事就有编程语言的技术,学习Java语言不是困难的事情。所谓的懂一门语言就会其他语言。为什么这么说,应他们有一些编程的基本知识,他们知道如何...

50

扫描关注云+社区