PySpark如何设置worker的python命令

前言

因为最近在研究spark-deep-learning项目,所以重点补习了下之前PySpark相关的知识,跟着源码走了一遍。希望能够对本文的读者有所帮助。

问题描述

关于PySpark的基本机制我就不讲太多,你google搜索“PySpark原理”就会有不少还不错的文章。我这次是遇到一个问题,因为我原先安装了python2.7, python3.6。 后面为了方便我在我的电脑上使用virtualenv来做环境隔离,这个时候就发生一个比较诡异的事情:

在driver端能够正常使用PIL图片处理模块,但是executor端则不行。那显然是我在~/.bash_profile的配置 在executor 启动python worker时没有生效,程序依然走了我早先安装的 python2.7,而早先的2.7里我没有安装PIL。那么应该怎么解决这个问题呢?

Python里的RDD 和 JVM的RDD如何进行关联

要解答上面的问题,核心是要判定JVM里的PythonRunner启动python worker时,python的地址是怎么指定的。

我们以python rdd里的map作为起点,

def map(self, f, preservesPartitioning=False):       
        def func(_, iterator):
            return map(f, iterator)
        return self.mapPartitionsWithIndex(func, preservesPartitioning)

进入self.mapPartitionsWithIndex:

  def mapPartitionsWithIndex(self, f, preservesPartitioning=False):
      
        return PipelinedRDD(self, f, preservesPartitioning)

可以看到PipelinedRDD,进入PipelinedRDD._jrdd里,可以看到:

       wrapped_func = _wrap_function(self.ctx, self.func, self._prev_jrdd_deserializer,
                                      self._jrdd_deserializer, profiler)
        python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(), wrapped_func,
                                             self.preservesPartitioning)
        self._jrdd_val = python_rdd.asJavaRDD()

这里和JVM里的PythonRDD建立了联系。进入_wrap_function:

pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
    return sc._jvm.PythonFunction(bytearray(pickled_command), env, includes, sc.pythonExec, sc.pythonVer, broadcast_vars, sc._javaAccumulator)

我们看到了sc.pythonExec对象,这个是传入到PythonRDD里的python命令。为了看的更清楚,我们看看sc.pythonExec的申明:

self.pythonExec = os.environ.get("PYSPARK_PYTHON", 'python')

也就是你在很多文档中看到的,通过设置PYSPARK_PYTHON变量来设置启用哪个python。那么pythonExec是JVM里是怎么用的呢?

private[spark] class PythonRDD(
    parent: RDD[_],
    func: PythonFunction,
    preservePartitoning: Boolean)
  extends RDD[Array[Byte]](parent) {

PythonRDD是在python中通过_jvm对象在JVM里创建的,里面哟给重要的对象是PythonFunction,这个PythonFunction就是wrapped_func,wrapped_func里包含了env,pythonExec等。PythonRDD的compute方法里会调用PythonRunner的compute方法:

val runner = PythonRunner(func, bufferSize, reuse_worker)
    runner.compute(firstParent.iterator(split, context), split.index, context)

上面的func其实就是PythonFunction,在PythonRunner里你可以看到:

 // All the Python functions should have the same exec, version and envvars.
  private val envVars = funcs.head.funcs.head.envVars
  private val pythonExec = funcs.head.funcs.head.pythonExec
  private val pythonVer = funcs.head.funcs.head.pythonVer

三个变量的申明,具体使用在这:

val worker: Socket = env.createPythonWorker(pythonExec, envVars.asScala.toMap)

这里通过pythonRunner运行启动python worker。

额外福利:Python如何启动JVM,从而启动Spark

建议配置一套spark的开发环境,然后debug进行跟踪。Python启动时,首先启动SparkContext(context.py),在init 方法里会_ensure_initialized 方法确保Java 里的SparkContext被初始化:

@classmethod
    def _ensure_initialized(cls, instance=None, gateway=None, conf=None):
        with SparkContext._lock:
            if not SparkContext._gateway:
                SparkContext._gateway = gateway or launch_gateway(conf)
                SparkContext._jvm = SparkContext._gateway.jvm

初始时会调用lauch_gateway(java_gateway.py),该方法首先会到环境变量里找SPARK_HOME,然后使用里面的./bin/spark-submit 进行Spark的启动,通过环境变量中的PYSPARK_SUBMIT_ARGS获取一些参数,默认是pyspark-shell,最后通过Popen 启动Spark进程,返回一个JavaGateWay,之后持有这个对象,就可以向JVM发送指令了。

解决问题

有了上面的铺垫后,问题就变得很好解决了,下面的单元测试原先是跑步过去的

def test_readImages(self):
        # Test that reading
        imageDF = imageIO._readImages("some/path", 2, self.binaryFilesMock)
        self.assertTrue("image" in imageDF.schema.names)
        self.assertTrue("filePath" in imageDF.schema.names)

        # The DF should have 2 images and 1 null.
        self.assertEqual(imageDF.count(), 3)
        validImages = imageDF.filter(col("image").isNotNull())
        self.assertEqual(validImages.count(), 2)

        img = validImages.first().image
        self.assertEqual(img.height, array.shape[0])
        self.assertEqual(img.width, array.shape[1])
        self.assertEqual(imageIO.imageType(img).nChannels, array.shape[2])
        self.assertEqual(img.data, array.tobytes())

现在我该如何让他通过呢?可以在setUp的时候添加

import os
os.environ["PYSPARK_PYTHON"] = "your-python-path" 

即可。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏移动开发面面观

ProgressiveJpeg介绍与在Android中的使用

28340
来自专栏向治洪

React Native调用Android相机图库

概述 在很多的React Native开发中,我们需要调用原生的api实现调用相机和图库的功能,网上用的最多的开源库如:react-native-image-p...

26850
来自专栏生信技能树

下载TCGA所有癌症的maf文件做signature分析

才sanger研究所已经做好了这个分析,但是值得我们重复一下,效果如下: ? TCGA所有癌症的mutation signature 首先TCGA所有癌症的ma...

917120
来自专栏程序员的SOD蜜

用事实说话,成熟的ORM性能不是瓶颈,灵活性不是问题:EF5.0、PDF.NET5.0、Dapper原理分析与测试手记

[本文篇幅较长,可以通过目录查看您感兴趣的内容,或者下载格式良好的PDF版本文件查看]  目录 一、ORM的"三国志"    2 1,PDF.NET诞生历程...

52090
来自专栏程序员互动联盟

android apk 防反编译技术第一篇-加壳技术

做android framework方面的工作将近三年的时间了,现在公司让做一下android apk安全方面的研究,于是最近就在网上找大量的资料来学习。现在...

68080
来自专栏我是攻城师

ElasticSearch之Java Api聚合分组实战

50860
来自专栏lgp20151222

整理代码,将一些曾经用过的功能整合进一个spring-boot

由于本人的码云太多太乱了,于是决定一个一个的整合到一个springboot项目里面。

27830
来自专栏Kubernetes

kube-proxy源码分析

##kube-proxy介绍 请参考我的另一篇博文:kube-proxy工作原理 ##源码目录结构分析 cmd/kube-proxy //负责kub...

88650
来自专栏一个会写诗的程序员的博客

第13章 Kotlin 集成 SpringBoot 服务端开发(2)

其中,ON DUPLICATE KEY UPDATE 这句表明当遇到重复的键值的时候,执行更新 gmt_modified = now() 的操作。这里nativ...

14610
来自专栏GIS讲堂

Arcgis4js实现链家找房的效果

买房的各位亲们不知是否留意过链家的"地图找房",这样的功能对于使用者来说,是非常方便的,大家可通过连接(https://bj.lianjia.com/ditu/...

15120

扫码关注云+社区

领取腾讯云代金券