有奖捉虫:行业应用 & 管理与支持文档专题 HOT
Spark 组件是面向使用 Scala / Java 的 Spark 用户,用户编写 Spark 应用程序并编译打包成 jar 后,可通过 Spark 组件完成部署。更多详细介绍可参考 Spark 官方文档

操作步骤

1. 添加组件
从左侧菜单栏中,选择组件算子 > 机器学习列表下的 Spark 节点,并将其拖拽至画布中。
2. 算法 IO 参数
高级设置内,主要包含输入数据和输出数据的设置,您可以通过自定义路径设定或通过拖拽数据源、输出算子连接到组件算子上进行设置。请注意本地路径,您的程序可能需要在对应路径进行读写。
输入数据:平台会默认把容器本地路径的数据上传到 COS 源路径下,您可以打开自定义路径开关修改本地路径;COS 场景下支持 COSN 直连方式,输入数据 0 对应的环境变量是 P_INPUT0 ,例如:P_INPUT0=cosn://${cos_bucket}/${cos_path}
输出数据:COS 场景下请使用 COSN 直连方式,输出数据 0 对应的环境变量是 P_OUTPUT0 ,例如:P_OUTPUT0=cosn://${cos_bucket}/${project_path}/${flow_id}/${node_run_id}/output0
3. 算法参数
*代码包:从指定对象存储 COS 存储桶中选择文件夹。
*启动命令:算子启动时执行的类,填写为“包文件名@类名”。
调优参数:填写的超参数 JSON 会保存为 /opt/ml/input/config/hyperparameters.json 文件,您的代码需自行解析。
4. 资源参数
*框架版本:使用的 spark 框架版本。
*训练模式:默认为 SPARK 。
*计费模式:有以下两种选择:
按量计费:
*Driver 节点算力规格。
*Executor 节点算力规格。
*Executor 节点数量。
包年包月:
*资源组:选择您拥有的资源组。
*资源申请:
*Driver-cores :Driver 节点 CPU 核数。
*Driver-memory :Driver 节点内存大小。
*Executor-cores :Excutor 节点 CPU 核数。
*Executor-memory :Excutor 节点内存大小。
*Executors:Executor 节点数量。
5. 运行
单击保存并运行工作流。
6. 查看 Spark 控制台和日志
在 Spark 节点上单击右键菜单,可查看任务状态和详细日志。

Demo

下面演示如何通过 Spark 组件算子运行 KMeans 算法做数据聚类处理,并通过 COSN 对数据进行读写。
1. 准备好代码及数据包(代码已打包为 JAR 文件),您可以点击 链接 下载。解压后将文件夹上传到您的 COS 存储桶中,使其可以在组件算子中被导入。
2. 代码的核心内容如下,主要逻辑就是通过 COSN 直连的方式从 COS 存储桶加载数据需要聚类的数据,通过 Spark 提供的 KMeans 算法,求出聚类结果后再将结果通过 COSN 直连的方式写回 COS 存储桶:
public static void main(String[] args) {
SparkSession spark = SparkSession
.builder()
.appName("JavaKMeansExample")
.getOrCreate();

// Loads data via cosn.
Dataset<Row> dataset = spark.read().format("libsvm").load(System.getenv("P_INPUT0") + "/sample_kmeans_data.txt");

// Trains a k-means model.
KMeans kmeans = new KMeans().setK(2).setSeed(1L);
KMeansModel model = kmeans.fit(dataset);

// Make predictions
Dataset<Row> predictions = model.transform(dataset);

// Write the label and prediction back via cosn.
predictions.drop("features").write().option("header", "true").csv(System.getenv("P_OUTPUT0"));

spark.stop();
}
可以看到,上述代码中在对数据的读取和写入,填写的路径都使用到了环境变量,以读取 COSN 路径,分别是 P_INPUT0 和 P_OUTPUT0 。而数据的构成则比较简单,为 6 个数据点,其中前 3 个点和后 3 个点为显著的两部分,因此在代码中 KMeans 的 K 值也是设置为了 2 :
0 1:0.1 2:0.1 3:0.1
1 1:0.2 2:0.2 3:0.2
2 1:0.3 2:0.3 3:0.3
3 1:9.0 2:9.0 3:9.0
4 1:9.1 2:9.1 3:9.1
5 1:9.2 2:9.2 3:9.2
3. 从控制台中拽拖出数据源 > 数据输入 > COS算子以及组件算子 > 机器学习 > Spark 算子,并进行连接如下:



4. 进行参数配置,首先配置 COS 算子的 cos 数据路径,由于输入数据已经存放在代码包中,直接选择您上传代码包的所在位置。即为:存储桶列表 / 存储桶名 / ... / pyspark :



然后是 Spark 算子的代码包路径,同上。而启动命令则填写 spark.jar@org.spark.example.JavaKMeansExample :



为了在 COS 存储桶中获得输出数据,还需要在 PySpark 算子的高级设置中,对输出数据 0 设置自定义路径,数据源类型为 COS 。由于此处只做展示,目标路径仍选择代码包路径,您在使用时可选取希望数据输出到的路径:



资源参数则如下:
*框架版本:spark2.4.5-cpu
*训练模式:SPARK
*计费模式:后付费
*Driver 节点算力规格:2C4G
*Executor 节点算力规格:2C4G
*Executor 节点数量:1
5. 运行工作流,等待运行结束后,查看 COS 存储桶中输出数据对应到的路径,可以看到已经多出一个文件夹(以下界面为 COSBrowser 的界面):



打开这个文件夹,并打开里面的 output0 文件夹,可以看到有两个文件,分别为 _SUCCESS 以及一个以 part 开头的文件。打开后者,可以看到结果为:
label,prediction
0.0,0
1.0,0
2.0,0
3.0,1
4.0,1
5.0,1
您还可以通过在平台中右键点击组件算子,选择查看数据中的查看中间结果 0 ,可以看到:



前三个点被预测为 0 类,后三个点被预测为 1 类。符合对结果的预测。