前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >pyspark 原理、源码解析与优劣势分析(1) ---- 架构与java接口

pyspark 原理、源码解析与优劣势分析(1) ---- 架构与java接口

作者头像
流川疯
发布2021-12-06 16:13:44
1.1K0
发布2021-12-06 16:13:44
举报
文章被收录于专栏:流川疯编写程序的艺术

文章大纲

Spark 框架主要是由 Scala 语言实现,同时也包含少量 Java 代码。Spark 面向用户的编程接口,也是 Scala。然而,在数据科学领域,Python 一直占据比较重要的地位,仍然有大量的数据工程师在使用各类 Python 数据处理和科学计算的库,例如 numpy、Pandas、scikit-learn 等。同时,Python 语言的入门门槛也显著低于 Scala。为此,Spark 推出了 PySpark,在 Spark 框架上提供一套 Python 的接口,方便广大数据科学家使用。本文主要从源码实现层面解析 PySpark 的实现原理,包括以下几个方面:

01. PySpark 的多进程架构

PySpark 采用了 Python、JVM 进程分离的多进程架构,在 Driver、Executor 端均会同时有 Python、JVM 两个进程。当通过 spark-submit 提交一个 PySpark 的 Python 脚本时,Driver 端会直接运行这个 Python 脚本,并从 Python 中启动 JVM;而在 Python 中调用的 RDD 或者 DataFrame 的操作,会通过 Py4j 调用到 Java 的接口。在 Executor 端恰好是反过来,首先由 Driver 启动了 JVM 的 Executor 进程,然后在 JVM 中去启动 Python 的子进程,用以执行 Python 的 UDF,这其中是使用了 socket 来做进程间通信。总体的架构图如下所示:

在这里插入图片描述
在这里插入图片描述

02. Python Driver 如何调用 Java 的接口

上面提到,通过 spark-submit 提交 PySpark 作业后,Driver 端首先是运行用户提交的 Python 脚本,然而 Spark 提供的大多数 API 都是 Scala 或者 Java 的,那么就需要能够在 Python 中去调用 Java 接口。这里 PySpark 使用了 Py4j 这个开源库。

当创建 Python 端的 SparkContext 对象时,实际会启动 JVM,并创建一个 Scala 端的 SparkContext 对象。 代码实现在 python/pyspark/context.py:

02.1 pyspark.SparkContext context.py源码剖析

github代码: https://github.com/apache/spark/blob/master/python/pyspark/context.py

文档代码 http://spark.apache.org/docs/latest/api/python/_modules/pyspark/context.html#SparkContext

代码语言:javascript
复制
 @classmethod
    def _ensure_initialized(cls, instance=None, gateway=None, conf=None):
        """
        Checks whether a SparkContext is initialized or not.
        Throws error if a SparkContext is already running.
        """
        with SparkContext._lock:
            if not SparkContext._gateway:
                SparkContext._gateway = gateway or launch_gateway(conf)
                SparkContext._jvm = SparkContext._gateway.jvm

            if instance:
                if (SparkContext._active_spark_context and
                        SparkContext._active_spark_context != instance):
                    currentMaster = SparkContext._active_spark_context.master
                    currentAppName = SparkContext._active_spark_context.appName
                    callsite = SparkContext._active_spark_context._callsite

                    # Raise error if there is already a running Spark context
                    raise ValueError(
                        "Cannot run multiple SparkContexts at once; "
                        "existing SparkContext(app=%s, master=%s)"
                        " created by %s at %s:%s "
                        % (currentAppName, currentMaster,
                            callsite.function, callsite.file, callsite.linenum))
                else:
                    SparkContext._active_spark_context = instance

在 launch_gateway (python/pyspark/java_gateway.py)中,首先启动JVM 进程,然后创建 JavaGateway 并 import 一些关键的 class,拿到 JavaGateway 对象,即可以通过它的 jvm 属性,去调用 Java 的类了,例如:

然后会继续创建 JVM 中的 SparkContext 对象

代码语言:javascript
复制
def launch_gateway(conf=None, popen_kwargs=None):
    """
    launch jvm gateway
    Parameters
    ----------
    conf : :py:class:`pyspark.SparkConf`
        spark configuration passed to spark-submit
    popen_kwargs : dict
        Dictionary of kwargs to pass to Popen when spawning
        the py4j JVM. This is a developer feature intended for use in
        customizing how pyspark interacts with the py4j JVM (e.g., capturing
        stdout/stderr).
    Returns
    -------
    ClientServer or JavaGateway
    """
    if "PYSPARK_GATEWAY_PORT" in os.environ:
        gateway_port = int(os.environ["PYSPARK_GATEWAY_PORT"])
        gateway_secret = os.environ["PYSPARK_GATEWAY_SECRET"]
        # Process already exists
        proc = None
    else:
        SPARK_HOME = _find_spark_home()
        # Launch the Py4j gateway using Spark's run command so that we pick up the
        # proper classpath and settings from spark-env.sh
        on_windows = platform.system() == "Windows"
        script = "./bin/spark-submit.cmd" if on_windows else "./bin/spark-submit"
        command = [os.path.join(SPARK_HOME, script)]
        if conf:
            for k, v in conf.getAll():
                command += ['--conf', '%s=%s' % (k, v)]
        submit_args = os.environ.get("PYSPARK_SUBMIT_ARGS", "pyspark-shell")
        if os.environ.get("SPARK_TESTING"):
            submit_args = ' '.join([
                "--conf spark.ui.enabled=false",
                submit_args
            ])
        command = command + shlex.split(submit_args)

        # Create a temporary directory where the gateway server should write the connection
        # information.
        conn_info_dir = tempfile.mkdtemp()
        try:
            fd, conn_info_file = tempfile.mkstemp(dir=conn_info_dir)
            os.close(fd)
            os.unlink(conn_info_file)

            env = dict(os.environ)
            env["_PYSPARK_DRIVER_CONN_INFO_PATH"] = conn_info_file

            # Launch the Java gateway.
            popen_kwargs = {} if popen_kwargs is None else popen_kwargs
            # We open a pipe to stdin so that the Java gateway can die when the pipe is broken
            popen_kwargs['stdin'] = PIPE
            # We always set the necessary environment variables.
            popen_kwargs['env'] = env
            if not on_windows:
                # Don't send ctrl-c / SIGINT to the Java gateway:
                def preexec_func():
                    signal.signal(signal.SIGINT, signal.SIG_IGN)
                popen_kwargs['preexec_fn'] = preexec_func
                proc = Popen(command, **popen_kwargs)
            else:
                # preexec_fn not supported on Windows
                proc = Popen(command, **popen_kwargs)

            # Wait for the file to appear, or for the process to exit, whichever happens first.
            while not proc.poll() and not os.path.isfile(conn_info_file):
                time.sleep(0.1)

            if not os.path.isfile(conn_info_file):
                raise Exception("Java gateway process exited before sending its port number")

            with open(conn_info_file, "rb") as info:
                gateway_port = read_int(info)
                gateway_secret = UTF8Deserializer().loads(info)
        finally:
            shutil.rmtree(conn_info_dir)

        # In Windows, ensure the Java child processes do not linger after Python has exited.
        # In UNIX-based systems, the child process can kill itself on broken pipe (i.e. when
        # the parent process' stdin sends an EOF). In Windows, however, this is not possible
        # because java.lang.Process reads directly from the parent process' stdin, contending
        # with any opportunity to read an EOF from the parent. Note that this is only best
        # effort and will not take effect if the python process is violently terminated.
        if on_windows:
            # In Windows, the child process here is "spark-submit.cmd", not the JVM itself
            # (because the UNIX "exec" command is not available). This means we cannot simply
            # call proc.kill(), which kills only the "spark-submit.cmd" process but not the
            # JVMs. Instead, we use "taskkill" with the tree-kill option "/t" to terminate all
            # child processes in the tree (http://technet.microsoft.com/en-us/library/bb491009.aspx)
            def killChild():
                Popen(["cmd", "/c", "taskkill", "/f", "/t", "/pid", str(proc.pid)])
            atexit.register(killChild)

    # Connect to the gateway (or client server to pin the thread between JVM and Python)
    if os.environ.get("PYSPARK_PIN_THREAD", "false").lower() == "true":
        gateway = ClientServer(
            java_parameters=JavaParameters(
                port=gateway_port,
                auth_token=gateway_secret,
                auto_convert=True),
            python_parameters=PythonParameters(
                port=0,
                eager_load=False))
    else:
        gateway = JavaGateway(
            gateway_parameters=GatewayParameters(
                port=gateway_port,
                auth_token=gateway_secret,
                auto_convert=True))

    # Store a reference to the Popen object for use by the caller (e.g., in reading stdout/stderr)
    gateway.proc = proc

    # Import the classes used by PySpark
    java_import(gateway.jvm, "org.apache.spark.SparkConf")
    java_import(gateway.jvm, "org.apache.spark.api.java.*")
    java_import(gateway.jvm, "org.apache.spark.api.python.*")
    java_import(gateway.jvm, "org.apache.spark.ml.python.*")
    java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*")
    java_import(gateway.jvm, "org.apache.spark.resource.*")
    # TODO(davies): move into sql
    java_import(gateway.jvm, "org.apache.spark.sql.*")
    java_import(gateway.jvm, "org.apache.spark.sql.api.python.*")
    java_import(gateway.jvm, "org.apache.spark.sql.hive.*")
    java_import(gateway.jvm, "scala.Tuple2")

    return gateway

02.2 spark.sql.session session.py 源码剖析

https://github.com/apache/spark/blob/master/python/pyspark/sql/session.py

spark 2.0 版本后推荐使用Spark.session 作为初始化的api,或者为了兼容1.0 或者2.0版本的api 把他们同时返回,当然他们直接可以互相转化:

代码语言:javascript
复制
def setup_spark_session(param_dict):
    """
    Description : Used to setup spark session

    Input : param_dict - parameter dictionary

    Output : Spark Session, Spark Context, and SQL Context
    """


    os.environ["PYSPARK_PYTHON"] = "/home/hadoop/anaconda/envs/playground_py36/bin/python"

    try:
        spark.stop()
        print("Stopped a SparkSession")
    except Exception as e:
        print("No existing SparkSession")

    SPARK_DRIVER_MEMORY = param_dict["SPARK_DRIVER_MEMORY"]  # "10G"
    SPARK_DRIVER_CORE = param_dict["SPARK_DRIVER_CORE"]  # "5"
    SPARK_EXECUTOR_MEMORY = param_dict["SPARK_EXECUTOR_MEMORY"]  # "3G"
    SPARK_EXECUTOR_CORE = param_dict["SPARK_EXECUTOR_CORE"]  # "1"
    AWS_ACCESS_KEY = param_dict["AWS_ACCESS_KEY"]  
    AWS_SECRET_KEY = param_dict["AWS_SECRET_KEY"]
    AWS_S3_ENDPOINT = param_dict["AWS_S3_ENDPOINT"]

    conf = SparkConf().\
        setAppName(param_dict["APP_NAME"]).\
        setMaster('yarn-client').\
        set('spark.executor.cores', SPARK_EXECUTOR_CORE).\
        set('spark.executor.memory', SPARK_EXECUTOR_MEMORY).\
        set('spark.driver.cores', SPARK_DRIVER_CORE).\
        set('spark.driver.memory', SPARK_DRIVER_MEMORY).\
        set('spark.driver.maxResultSize', '0')
    spark = SparkSession.builder.\
        config(conf=conf).\
        getOrCreate()

    sc = spark.sparkContext
    hadoop_conf = sc._jsc.hadoopConfiguration()
    hadoop_conf.set("fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    hadoop_conf.set("fs.s3a.access.key", AWS_ACCESS_KEY)
    hadoop_conf.set("fs.s3a.secret.key", AWS_SECRET_KEY)
    hadoop_conf.set("fs.s3a.endpoint", AWS_S3_ENDPOINT)
    hadoop_conf.set("mapreduce.fileoutputcommitter.algorithm.version", "2")

    sqlContext = SQLContext(sc)

    return spark, sc, sqlContext

我们来看看 getOrCreate()源码,此方法首先检查是否存在有效的全局默认SparkSession,如果有则返回。如果不存在有效的全局默认SparkSession,则创建新的SparkSession并将新创建的SparkSession指定为全局默认的SparkSession。

注意到,self._lock 是一个from threading import RLock 导入的锁,RLock被称为重入锁,RLock锁是一个可以被同一个线程多次 acquire 的锁,但是最后必须由获取它的线程来释放它,不论同一个线程调用了多少次的acquire,最后它都必须调用相同次数的 release 才能完全释放锁,这个时候其他的线程才能获取这个锁。在Builder 中对其进行了声明。

代码语言:javascript
复制
    class Builder(object):
        """Builder for :class:`SparkSession`.
        """

        _lock = RLock()
        _options = {}
        _sc = None
代码语言:javascript
复制
def getOrCreate(self):
            """Gets an existing :class:`SparkSession` or, if there is no existing one, creates a
            new one based on the options set in this builder.
            .. versionadded:: 2.0.0
            Examples
            --------
            This method first checks whether there is a valid global default SparkSession, and if
            yes, return that one. If no valid global default SparkSession exists, the method
            creates a new SparkSession and assigns the newly created SparkSession as the global
            default.
            >>> s1 = SparkSession.builder.config("k1", "v1").getOrCreate()
            >>> s1.conf.get("k1") == "v1"
            True
            In case an existing SparkSession is returned, the config options specified
            in this builder will be applied to the existing SparkSession.
            >>> s2 = SparkSession.builder.config("k2", "v2").getOrCreate()
            >>> s1.conf.get("k1") == s2.conf.get("k1")
            True
            >>> s1.conf.get("k2") == s2.conf.get("k2")
            True
            """
            with self._lock:
                from pyspark.context import SparkContext
                from pyspark.conf import SparkConf
                session = SparkSession._instantiatedSession
                if session is None or session._sc._jsc is None:
                    if self._sc is not None:
                        sc = self._sc
                    else:
                        sparkConf = SparkConf()
                        for key, value in self._options.items():
                            sparkConf.set(key, value)
                        # This SparkContext may be an existing one.
                        sc = SparkContext.getOrCreate(sparkConf)
                    # Do not update `SparkConf` for existing `SparkContext`, as it's shared
                    # by all sessions.
                    session = SparkSession(sc)
                for key, value in self._options.items():
                    session._jsparkSession.sessionState().conf().setConfString(key, value)
                return session

03. Python Driver 端的 RDD、SQL 接口

在 PySpark 中,继续初始化一些 Python 和 JVM 的环境后,Python 端的 SparkContext 对象就创建好了,它实际是对 JVM 端接口的一层封装。和 Scala API 类似,SparkContext 对象也提供了各类创建 RDD 的接口,和 Scala API 基本一一对应,我们来看一些例子。

参考文献

https://www.mobvista.com/cn/blog/2019-12-27-2/ https://blog.csdn.net/Pysamlam/article/details/115683424

pyspark 启动原理: https://blog.csdn.net/wangyaninglm/article/details/114038572

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2021/05/20 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 文章大纲
  • 01. PySpark 的多进程架构
  • 02. Python Driver 如何调用 Java 的接口
    • 02.1 pyspark.SparkContext context.py源码剖析
      • 02.2 spark.sql.session session.py 源码剖析
      • 03. Python Driver 端的 RDD、SQL 接口
      • 参考文献
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档