有奖捉虫:办公协同&微信生态&物联网文档专题 HOT
PySpark 组件为使用 Python 的 Spark 用户提供服务,用户通过 Python 编写 Spark 应用程序,通过 PySpark 组件完成部署。更多详细介绍可参考 Spark 官方 Python API 文档

PySpark 包含标准 Spark 的功能,同时支持上传 Python 脚本、实时修改脚本和 SQL 功能,更加灵活,推荐您使用 PySpark 进行数据预处理。

操作步骤

1. 添加组件
从左侧菜单栏中,选择组件算子 > 机器学习列表下的 PySpark 节点,并将其拖拽至画布中。
2. 算法 IO 参数
高级设置内,主要包含输入数据和输出数据的设置,您可以通过自定义路径设定或通过拖拽数据源、输出算子连接到组件算子上进行设置。
输入数据:平台会默认把容器本地路径的数据上传到 COS 源路径下,您可以打开自定义路径开关修改本地路径;COS 场景下支持 COSN 直连方式,输入数据 0 对应的环境变量是 P_INPUT0 ,例如:P_INPUT0=cosn://${cos_bucket}/${cos_path}
输出数据:COS 场景下请使用 COSN 直连方式,输出数据 0 对应的环境变量是 P_OUTPUT0 ,例如:P_OUTPUT0=cosn://${cos_bucket}/${project_path}/${flow_id}/${node_run_id}/output0
3. 算法参数
*代码包:从指定对象存储 COS 存储桶中选择文件夹。
*启动命令:算子执行时启动的 Python 脚本。
*调优参数:填写的超参数 JSON 会保存为 /opt/ml/input/config/hyperparameters.json 文件,您的代码需自行解析。
4. 资源参数
*框架版本:使用的 Spark 框架版本。
*训练模式:默认为 SPARK 。
*计费模式:有以下两种选择:
按量计费:
*Driver 节点算力规格。
*Executor 节点算力规格。
*Executor 节点数量。
包年包月:
*资源组:选择您拥有的资源组。
*资源申请:
*Driver-cores :Driver 节点 CPU 核数。
*Driver-memory :Driver 节点内存大小。
*Executor-cores :Excutor 节点 CPU 核数。
*Executor-memory :Excutor 节点内存大小。
*Executors:Executor 节点数量。
5. 运行
单击保存并运行工作流。
6. 查看 PySpark 日志
在 PySpark 节点上单击右键菜单,可查看详细日志。

Demo

下面演示如何通过 PySpark 组件算子运行 Spark 官方样例计算圆周率。
1. 准备好代码及数据包,您可以点击 链接 下载。解压后将文件夹上传到您的 COS 存储桶中,使其可以在组件算子中被导入。
2. 代码的核心内容如下,主要逻辑就是通过 COSN 直连的方式从 COS 存储桶加载数据需要聚类的数据,通过 Spark 提供的 KMeans 算法,求出聚类结果后再将结果通过 COSN 直连的方式写回 COS 存储桶:
if __name__ == "__main__":
spark = SparkSession \\
.builder \\
.appName("KMeansExample") \\
.getOrCreate()

# Loads data via cosn.
dataset = spark.read.format("libsvm").load(os.environ["P_INPUT0"] + "/sample_kmeans_data.txt")

# Trains a k-means model.
kmeans = KMeans().setK(2).setSeed(1)
model = kmeans.fit(dataset)

# Make predictions
predictions = model.transform(dataset)

# Write the label and prediction back via cosn.
predictions.drop("features").write.option("header", "true").csv(os.environ["P_OUTPUT0"])

spark.stop()
可以看到,上述代码中在对数据的读取和写入,填写的路径都使用到了环境变量,以读取 COSN 路径,分别是 P_INPUT0 和 P_OUTPUT0 。而数据的构成则比较简单,为 6 个数据点,其中前 3 个点和后 3 个点为显著的两部分,因此在代码中 KMeans 的 K 值也是设置为了 2 :
0 1:0.1 2:0.1 3:0.1
1 1:0.2 2:0.2 3:0.2
2 1:0.3 2:0.3 3:0.3
3 1:9.0 2:9.0 3:9.0
4 1:9.1 2:9.1 3:9.1
5 1:9.2 2:9.2 3:9.2
3. 从控制台中拽拖出数据源 > 数据输入 > COS 算子以及组件算子 > 机器学习 > PySpark 算子,并进行连接如下:



4. 进行参数配置,首先配置 COS 算子的 cos 数据路径,由于输入数据已经存放在代码包中,直接选择您上传代码包的所在位置。即为:存储桶列表 / 存储桶名 / ... / pyspark :



然后是 PySpark 算子的代码包路径,同上。而启动命令则填写 kmeans_example.py :



为了在 COS 存储桶中获得输出数据,还需要在 PySpark 算子的高级设置中,对输出数据 0 设置自定义路径,数据源类型为 COS 。由于此处只做展示,目标路径仍选择代码包路径,您在使用时可选取希望数据输出到的路径:



资源参数则如下:
*框架版本:spark2.4.5-py3.7-cpu
*训练模式:SPARK
*计费模式:按量付费
*Driver 节点算力规格:2C4G
*Executor 节点算力规格:2C4G
*Executor 节点数量:1
5. 运行工作流,等待运行结束后,查看 COS 存储桶中输出数据对应到的路径,可以看到已经多出一个文件夹(以下界面为 COSBrowser 的界面):



打开这个文件夹,并打开里面的 output 文件夹,可以看到有两个文件,分别为 _SUCCESS 以及一个以 part 开头的文件。打开后者,可以看到结果为:
label,prediction
0.0,0
1.0,0
2.0,0
3.0,1
4.0,1
5.0,1
您还可以通过在平台中右键点击组件算子,选择查看数据中的查看中间结果 0 ,可以看到:



前三个点被预测为 0 类,后三个点被预测为 1 类。符合对结果的预测。

使用建议

使用 PySpark 的目的是更好地借助其分布式计算的优势,以解决单机完成不了的计算。如果您在 PySpark 中仍然是调用常规的 Python 库做单机计算,那就失去了使用 PySpark 的意义。下面举例说明如何编写 PySpark 分布式计算代码。

使用 Spark 的 DataFrame,而不要使用 Pandas 的 DataFrame

PySpark 本身就具有类似 pandas.DataFrame 的 DataFrame,所以直接使用 PySpark 的 DataFrame 即可,基于 PySpark的DataFrame 的操作都是分布式执行的,而 pandas.DataFrame 是单机执行的,例如:
...
df = spark.read.json("examples/src/main/resources/people.json")
df.show()
# +----+-------+
# | age| name|
# +----+-------+
# |null|Michael|
# | 30| Andy|
# | 19| Justin|
# +----+-------+
pandas_df = df.toPandas() # 将 PySpark 的 DataFrame 转换成 pandas.DataFrame,并获取'age'列
age = pandas_df['age']
...

df.toPandas() 操作会将分布在各节点的数据全部收集到 Driver上,再转成单机的 pandas.DataFrame 数据结构,适用于数据量很小的场景,如果数据量较大时,则此方法不可取。 PySpark的DataFrame 本身支持很多操作,直接基于它实现后续的业务逻辑即可,例如上述代码可以改成age = df.select('age')

在 Task 里使用 Python 库,而不是在 Driver上 使用 Python 库

下面有段代码,将数据全部 collect 到 Driver 端,然后使用 sklearn 进行预处理。
from sklearn import preprocessing
data = np.array(rdd.collect(), dtype=np.float)
normalized = preprocessing.normalize(data)
上述代码实际上已退化为单机程序,如果数据量较大的话,collect 操作会把 Driver 的内存填满,甚至 OOM(超出内存),通常基于 RDD 或 DataFrame 的 API 可以满足大多数需求,例如标准化操作:
from pyspark.ml.feature import Normalizer
df = spark.read.format("libsvm").load(path)
# Normalize each Vector using $L^1$ norm.
normalizer = Normalizer(inputCol="features", outputCol="normFeatures", p=1.0)
l1NormData = normalizer.transform(dataFrame)
如果 RDD 或 DataFrame 没有满足您要求的 API,您也可以自行写一个处理函数,针对每条记录进行处理:
# record -> other record
def process_fn(record):
# your process logic
# for example
# import numpy as np
# x = np.array(record, type=np.int32)
# ...
# record -> True or Flase
def judge_fn(record):
# return True or Flase
processed = rdd.map(process_fn).map(lambda x: x[1:3])
filtered = processed.filter(judge_fn)
process_fn 或 judge_fn 会分发到每个节点上分布式执行,您可以在 process_fn 或 judge_fn 中使用任何 Python 库(如 numpy、scikit-learn 等)。