前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Effective PySpark(PySpark 常见问题)

Effective PySpark(PySpark 常见问题)

作者头像
用户2936994
发布2018-08-27 14:37:48
2.1K0
发布2018-08-27 14:37:48
举报
文章被收录于专栏:祝威廉祝威廉

构建PySpark环境

首先确保安装了python 2.7 ,强烈建议你使用Virtualenv方便python环境的管理。之后通过pip 安装pyspark

代码语言:javascript
复制
pip install pyspark

文件比较大,大约180多M,有点耐心。

下载 spark 2.2.0,然后解压到特定目录,设置SPARK_HOME即可。

其实如果通过spark-submit 提交程序,并不会需要额外安装pyspark, 这里通过pip安装的主要目的是为了让你的IDE能有代码提示。

PySpark worker启动机制

PySpark的工作原理是通过Spark里的PythonRDD启动一个(或者多个,以pythonExec, 和envVars为key)Python deamon进程,然后一旦有task过来了,就通过python deamon进程fork一个新的python worker。 python worker是可以复用的,并不会用完就立马销毁。一个task过来的流程为, 看看worker里有清闲的么,如果有,就直接返回。没有就fork一个新的worker.

PySpark 如何实现某个worker 里的变量单例

从前面PySpark worker启动机制里,我们可以看到,一个Python worker是可以反复执行任务的。在NLP任务中,我们经常要加载非常多的字典,我们希望字典只会加载一次。这个时候就需要做些额外处理了。做法如下:

代码语言:javascript
复制
class DictLoader(object):
    clf = None

   def __init__(self, baseDir, archive_auto_extract, zipResources):
        if not DictLoader.is_loaded():            
            DictLoader.load_dic(baseDir)

    @staticmethod
    def load_dic(baseDir):
        globPath = baseDir + "/dic/*.dic"
        dicts = glob.glob(globPath)
        for dictFile in dicts:
            temp = dictFile if os.path.exists(dictFile) else SparkFiles.get(dictFile)
            jieba.load_userdict(temp)
        jieba.cut("nice to meet you")
        DictLoader.clf = "SUCCESS"

    @staticmethod
    def is_loaded():
        return DictLoader.clf is not None

定义一个cls对象,并且使用staicmethod annotation,这样就可以模拟类似Java的静态方法了。之后你可以随心所欲的loader = DictLoader ()

如何加载资源文件

在NLP处理了,字典是少不了,前面我们避免了一个worker多次加载字典,现在还有一个问题,就是程序如何加载字典。通常我们希望能够把字典打成一个zip包,代码也打成一个zip包,然后通过下面的命令进行提交:

代码语言:javascript
复制
./bin/spark-submit \
--py-files dist/jobs.zip \
--files dist/dics.zip \
--master "local[*]"  python/src/batch.py

自己开发的模块可以打包成jobs.zip,对应的spark任务单独成一个batch.py文件,然后字典打包成dics.zip.

那么程序中如何读取dics.zip里的文件呢? 在Spark standalone 和 local模式下,dics.zip在各个worker的工作目录里并不会被解压,所以需要额外处理下:

代码语言:javascript
复制
   def __init__(self, baseDir, archive_auto_extract, zipResources):
        if not DictLoader.is_loaded():  
            for zr in zipResources:
                if not archive_auto_extract:
                    with zipfile.ZipFile(SparkFiles.getRootDirectory() + '/' + zr, 'r') as f:
                        f.extractall(".")          
            DictLoader(baseDir)

archive_auto_extract 判定是不是会自动解压(yarn模式下回自动解压),判断的方法为:

代码语言:javascript
复制
archive_auto_extract = spark.conf.get("spark.master").lower().startswith("yarn")

zipResources 则是所有需要解压的zip包的名字,对应获取的方法为:

代码语言:javascript
复制
zipfiles = [f.split("/")[-1] for f in spark.conf.get("spark.files").split(",") if f.endswith(".zip")]

对应的zipfiles所在的目录你可以这样拼接:

代码语言:javascript
复制
SparkFiles.getRootDirectory() + '/' + zfilename

所以如果你不是运行在yarn模式的情况下,你需要先解压,然后进行加载。获取路径的方式建议如下:

代码语言:javascript
复制
temp = dictFile if os.path.exists(dictFile) else SparkFiles.get(dictFile)

这样可以兼容IDE里运行,local/standalone/yarn 模式运行。

前面的jobs.zip文件里面全部是python文件,并不需要压缩就可以直接读到。

主动定义schema,避免spark auto inference schema

我之前写过这么一段代码:

代码语言:javascript
复制
oldr = df.rdd.map(
    lambda row: Row(ids=row['ids'], mainId=row["mainId"].item(), tags=row["tags"]))

然后我需要把oldr 变回为rdd,这个时候我这么用:

代码语言:javascript
复制
resultDf = spark.createDataFrame(oldr)
resultDf.mode("overwrite").format(...).save(...

这会导致oldr被执行两次,一次是为了做schema推测,一次是为了做实际的计算。 我们可以这么写:

代码语言:javascript
复制
from pyspark.sql.types import StructType, IntegerType, ArrayType, StructField, StringType, MapType

fields = [StructField("ids", ArrayType(IntegerType())), StructField("mainId", IntegerType()),
          StructField("tags", MapType(StringType(), IntegerType()))]
resultDf = spark.createDataFrame(resultRdd, StructType(fields=fields)

这样显示的为rdd定义schema,就可以避免额外的推测了。

lambda 和 函数的选择

lambda可以定义匿名函数,但是表现力有限:

代码语言:javascript
复制
.map(
    lambda row: Row(ids=row['ids'], mainId=row["mainId"].item(), tags=row["tags"]))

我们也可以定义函数:

代码语言:javascript
复制
def create_new_row(row):
    return Row(ids=row['ids'], mainId=row["mainId"].item(), tags=row["tags"])

然后直接使用:

代码语言:javascript
复制
.map(create_new_row).....

如何定义udf函数/如何避免使用Python UDF函数

先定义一个常规的python函数:

代码语言:javascript
复制
# 自定义split函数
def split_sentence(s):
    return s.split(" ")

转化为udf函数并且使用。

代码语言:javascript
复制
from pyspark.sql.functions import udf
from pyspark.sql.types import *

ss = udf(split_sentence, ArrayType(StringType()))
documentDF.select(ss("text").alias("text_array")).show()

唯一麻烦的是,定义好udf函数时,你需要指定返回值的类型。

使用Python 的udf函数,显然效率是会受到损伤的,我们建议使用标准库的函数,具体这么用:

代码语言:javascript
复制
from pyspark.sql import functions as f
documentDF.select(f.split("text", "\\s+").alias("text_array")).show()

pyspark.sql. functions 引用的都是spark的实现,所以效率会更高。

另外,在使用UDF函数的时候,发现列是NoneType 或者null,那么有两种可能:

在PySpark里,有时候会发现udf函数返回的值总为null,可能的原因有:

  1. 忘了写return
代码语言:javascript
复制
def abc(c):
    "yes"
  1. 返回的类型不匹配。

比如你明明是一个FloatType,但是你定义的时候说是一个ArrayType,这个时候似乎不会报错,而是udf函数执行会是null. 这个问题之前在处理二进制字段时遇到了。我们理所当然的认为二进制应该是类型 ArrayType(Byte(),True) ,但实际上是BinaryType.

dataframe.show 问题

详细问题可参看: https://stackoverflow.com/questions/39662384/pyspark-unicodeencodeerror-ascii-codec-cant-encode-character

主要是python方面的问题。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2017.10.23 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 构建PySpark环境
  • PySpark worker启动机制
  • PySpark 如何实现某个worker 里的变量单例
  • 如何加载资源文件
  • 主动定义schema,避免spark auto inference schema
  • lambda 和 函数的选择
  • 如何定义udf函数/如何避免使用Python UDF函数
  • dataframe.show 问题
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档