Spark

最近更新时间:2023-11-27 18:01:02

我的收藏
注意:
需要在 EMR 集群中启动 Hive、Spark 组件服务,需要有 COS 权限,资源文件需要在 COS 中存储。
示例:以下示例中用户在 EMR 集群有权限。

功能说明

向 WeData 的工作流调度平台提交一个 Spark 任务执行。


参数说明

参数
说明
spark 程序 zip 包
用户直接上传编写的 spark 程序代码文件,需要打包为 jar 后,将所有自定义的依赖打包为一个 zip 文件,不要打包目录,直接打包文件本身。
执行参数
spark 程序的执行参数,无需用户写 spark-submit,无需指定提交用户,无需指定提交队列,无需指定提交模式(默认为 yarn)。参数格式如:--class mainClass run.jar args 或 wordcount.py input output。
应用参数
spark 的应用参数。

SparkJar 示例:

提交一个统计单词个数即 wordcount 的任务,需要提前在 COS 中上传需要统计的文件。

步骤一:本地编写 Spark Jar 任务

创建工程

1. 以 maven 为例,创建一个工程并引入 spark 依赖。
说明:
这里 groupId 和 artifactId 需要替换为的 groupId 和 artifactId。
这里 spark 依赖范围设置为 scope,表示 spark 仅在编译和打包过程中需要提供依赖,运行时依赖由平台提供。
# 生成maven工程,也可以通过ide操作
mvn archetype:generate -DgroupId=com.example -DartifactId=my-spark -DarchetypeArtifactId=maven-archetype-quickstart
2. 生成的目录结构如:



3. 引入依赖:
# pom.xml中引入spark依赖
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.7</version>
<scope>provided</scope>
</dependency>
</dependencies>

编写代码

1. 在 src/main/java/com/example 目录下新建一个 JavaClass,输入的 Class 名,这里使用 WordCount,在 Class 添加样例代码如下:
package com.example;

import java.util.Arrays;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

public class WordCount {
public static void main(String[] args) {
// create SparkConf object
SparkConf conf = new SparkConf().setAppName("WordCount");
// create JavaSparkContext object
JavaSparkContext sc = new JavaSparkContext(conf);
// read input file to RDD
JavaRDD<String> lines = sc.textFile(args[0]);
// split each line into words
JavaRDD<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
// count the occurrence of each word
JavaPairRDD<String, Integer> wordCounts = words.mapToPair(word -> new Tuple2<>(word, 1)).reduceByKey((x, y) -> x + y);
// save the word counts to output file
wordCounts.saveAsTextFile(args[1]);
}
}
说明:
这里 spark 依赖范围设置为 scope,表示 spark 仅在编译和打包过程中需要提供依赖,运行时依赖由平台提供。
2. 将代码打包成 jar 文件,并在 maven 中加入以下打包插件:
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>utf-8</encoding>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
3. 然后在项目根目录执行:
mvn package
4. 在 target 目录可以看到包含依赖的 jar 文件。这里为 my-spark-1.0-SNAPSHOT-jar-with-dependencies.jar。

数据准备

由于 WeData 数据开发只支持 zip 文件,因此首先需要将 jar 包打成 zip 文件,还需要执行以下操作获得 zip 文件。如果有其他依赖的配置文件等也可以一并打成 zip 包。
zip spark-wordcount.zip my-spark-1.0-SNAPSHOT-jar-with-dependencies.jar

步骤二:上传 SparkJar 的任务包

1. 资源管理中新建资源文件,上传资源文件包。

2. 新建资源配置:




步骤三:创建 SparkJar 任务并配置调度

1. 在编排空间中新建一个工作流,在工作流中创建 Spark 任务。

2. 填写任务参数。

3. 执行参数格式示例:
--class mainClass run.jar args或wordcount.py input output
4. 在示例中完整的格式如下:
--class com.example.WordCount my-spark-1.0-SNAPSHOT-jar-with-dependencies.jar cosn://wedata-demo-1314991481/wordcount.txt
cosn://wedata-demo-1314991481/result/output
注意:
其中 cosn://wedata-demo-1314991481/wordcount.txt 是需要处理的文件的 COS 路径
cosn://wedata-demo-1314991481/result/output 是计算结果的输出的 COS 路径,这个文件夹目录事先不能被创建,不然运行会失败。
5. wordcount.txt 的示例文件如下:
hello WeData
hello Spark
hello Scala
hello PySpark
hello Hive
6. 调试运行后,查看计算的结果如下:

7. 发布 Spark 任务,开启调度。提交 SparkJar 任务:

8. SparkJar 任务运维如下图所示: