Apache Spark 是用于大规模数据处理的统一分析引擎,它提供了 Java、Scala、Python 和 R 语言的高级 API,以及一个支持通用的执行图计算的优化引擎。
Spark Core 是 Spark 的核心模块,负责任务调度、内存管理等功能。Spark Core 的实现依赖于 RDD(Resilient Distributed Datasets,弹性分布式数据集)的程序抽象概念。
在 Spark Core 的基础上,Spark 提供了一系列面向不同应用需求的组件,包括使用 SQL 进行结构化数据处理的 Spark SQL、用于实时流处理的 Spark Streaming、用于机器学习的 MLlib 以及用于图处理的 GraphX。
Spark 本身并没有提供分布式文件系统,因而 Spark 的数据存储主要依赖于 HDFS,也可以使用 HBase 和 S3 等作为存储层。
Spark 有多种运行模式:
Driver 是 Spark 中的主控进程,负责执行应用程序的 main() 方法,创建 SparkContext 对象,负责与 Spark 集群进行交互,提交 Spark 作业,并将作业转化为 Task(一个作业由多个 Task 任务组成),然后在各个 Executor 进程间对 Task 进行调度和监控。
根据应用程序提交方式的不同,Driver 在集群中的位置也有所不同,应用程序提交方式主要有两种:Client 和 Cluster,默认是 Client,可以在向 Spark 集群提交应用程序时使用 --deploy-mode
参数指定提交方式。
Local 模式的部署方式比较简单,只需下载安装包并解压就可以使用了。具体可以参考上一章的 Spark 系列教程(1)Word Count 的介绍,本文就不再赘述了。
在 spark-shell 交互式界面执行一个简单的计算,取出 0~99 之间的值。
❯ bin/spark-shell
21/10/07 11:50:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://chengzw:4040
Spark context available as 'sc' (master = local[*], app id = local-1633578611004).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 3.1.2
/_/
Using Scala version 2.12.10 (OpenJDK 64-Bit Server VM, Java 1.8.0_302)
Type in expressions to have them evaluated.
Type :help for more information.
scala> val range = spark.range(100)
range: org.apache.spark.sql.Dataset[Long] = [id: bigint]
scala> range.collect()
res0: Array[Long] = Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99)
在 Spark Standalone 模式中,资源调度是由 Spark 自己实现的。 Spark Standalone 模式是 Master-Slaves 架构的集群模式,和大部分的 Master-Slaves 结构的集群一样,存在着 Master 单点故障的问题。对于单点故障的问题,Spark 提供了两种方案:
编辑 /etc/hosts 文件:
192.168.1.117 hadoop1
192.168.1.118 hadoop2
192.168.1.119 hadoop3
拷贝到其他两台机器上:
scp /etc/hosts root@hadoop2:/etc/hosts
scp /etc/hosts root@hadoop3:/etc/hosts
为了方便后续拷贝文件以及执行脚本,配置 SSH 免密登录。在 hadoop1 上生成 RSA 非对称密钥对:
[root@hadoop1 hadoop]# ssh-keygen
Generating public/private rsa key pair.
Enter file in which to save the key (/root/.ssh/id_rsa):
Enter passphrase (empty for no passphrase):
Enter same passphrase again:
Your identification has been saved in /root/.ssh/id_rsa.
Your public key has been saved in /root/.ssh/id_rsa.pub.
The key fingerprint is:
SHA256:wkMiPVpbBtjoZwBIpyvvluYtfQM9hQeHtgBFVfrwL1I root@hadoop1
The key's randomart image is:
+---[RSA 2048]----+
|+o.O+..o. |
|. *.o.+.. |
| o..=o*= |
| o+oOo+o |
|...o..+oE |
|.. . o+ . |
| .o .... . |
| .=.. o. . |
| +o... . |
+----[SHA256]-----+
将公钥拷贝到集群中的其他机器:
[root@hadoop1 hadoop]# ssh-copy-id root@hadoop1
[root@hadoop1 hadoop]# ssh-copy-id root@hadoop2
[root@hadoop1 hadoop]# ssh-copy-id root@hadoop3
进入 [Oracle 官网] (https://www.oracle.com/java/technologies/javase/javase-jdk8-downloads.html) 下载并解压 JDK 安装包。设置环境变量,编辑 vim /etc/profile:
export JAVA_HOME=/software/jdk
export PATH=$PATH:$JAVA_HOME/bin
wget https://archive.apache.org/dist/zookeeper/zookeeper-3.5.8/apache-zookeeper-3.5.8-bin.tar.gz
tar -xzvf apache-zookeeper-3.5.8-bin.tar.gz
mv apache-zookeeper-3.5.8 /software/zk
编辑 zk/conf/zoo.cfg 文件:
#用于配置 Zookeeper 中最小时间单位的长度,单位是毫秒
tickTime=2000
#该参数用于配置 Leader 服务器等待 Follower 启动,并完成数据同步的时间
#乘上 tickTime 得到具体时间:10 * 2000 = 20000 毫秒
initLimit=10
#Leader 与 Follower 心跳检测的超时时间。
#乘上 tickTime 得到具体时间:5 * 2000 = 10000 毫秒
syncLimit=5
#数据存放目录
dataDir=/software/zk/data
#客户端连接端口
clientPort=2181
#Zookeeper集群成员地址
#2888端口用于集群间通信,leader会监听此端口
#3888端口用于leader选举
server.1=hadoop1:2888:3888
server.2=hadoop2:2888:3888
server.3=hadoop3:2888:3888
同步修改后的配置文件到集群的其他节点:
scp -r zk root@hadoop2:/software/
scp -r zk root@hadoop3:/software/
#在 hadoop1 节点上执行
echo 1 > /root/zookeeper-cluster/zk1/myid
#在 hadoop2 节点上执行
echo 2 > /root/zookeeper-cluster/zk2/myid
#在 hadoop3 节点上执行
echo 3 > /root/zookeeper-cluster/zk3/myid
分别在 3 台节点上执行以下命令启动 Zookeeper:
zk/bin/zkServer.sh start
分别在 3 台节点上查看 Zookeeper 状态,可以看到此时 hadoop2 节点为 Zookeeper 的 Master 节点。
hadoop1 节点:
[root@hadoop1 software]# zk/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /software/zk/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: follower
hadoop2 节点:
[root@hadoop2 software]# zk/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /software/zk/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: leader
hadoop3 节点:
[root@hadoop3 software]# zk/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /software/zk/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: follower
wget https://mirrors.tuna.tsinghua.edu.cn/apache/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz
tar -xzvf spark-3.1.2-bin-hadoop2.7.tgz
mv spark-3.1.2-bin-hadoop2.7 /software/spark
编辑 spark/conf/spark-env.sh 文件,由于 Spark HA 使用 Zookeeper 来协调主从,因此需要指定 Zookeeper 的地址和 Spark 在 Zookeeper 中使用的目录。
export JAVA_HOME=/software/jdk
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=hadoop1:2181,hadoop2:2181,hadoop3:2181 -Dspark.deploy.zookeeper.dir=/spark"
编辑 spark/conf/slaves 文件:
hadoop1
hadoop2
hadoop3
同步修改后的配置文件到集群的其他节点:
scp -r spark root@hadoop2:/software/
scp -r spark root@hadoop3:/software/
在 hadoop1 节点上启动 Spark 集群,执行 start-all.sh 脚本会在 hadoop1 节点上启动 Master 进程,并且在 spark/conf/slaves 文件中配置的所有节点上启动 Worker 进程。
[root@hadoop1 software]# spark/sbin/start-all.sh
starting org.apache.spark.deploy.master.Master, logging to /software/spark/logs/spark-root-org.apache.spark.deploy.master.Master-1-hadoop1.out
hadoop2: starting org.apache.spark.deploy.worker.Worker, logging to /software/spark/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-hadoop2.out
hadoop1: starting org.apache.spark.deploy.worker.Worker, logging to /software/spark/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-hadoop1.out
hadoop3: starting org.apache.spark.deploy.worker.Worker, logging to /software/spark/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-hadoop3.out
登录 hadoop2 节点,启动第二个 Master(Standby Master)。
[root@hadoop2 software]# spark/sbin/start-master.sh
starting org.apache.spark.deploy.master.Master, logging to /software/spark/logs/spark-root-org.apache.spark.deploy.master.Master-1-hadoop2.out
在各节点执行 jps 命令查看启动的 Java 进程。可以看到 Spark 的 Master 进程分别在 hadoop1 和 hadoop2 节点上运行,Worker 进程在所有节点上运行。QuorumPeerMain 是 Zookeeper 的进程。
hadoop1 节点:
[root@hadoop1 software]# jps
18528 Worker
18427 Master
23468 QuorumPeerMain
18940 Jps
hadoop2 节点:
[root@hadoop2 software]# jps
27824 Worker
29954 Jps
23751 QuorumPeerMain
28135 Master
hadoop3 节点:
[root@hadoop3 software]# jps
11696 Worker
12939 QuorumPeerMain
13021 Jps
可以看到此时 3 个 Spark 节点都注册到 Zookeeper 上了,并且此时 192.168.1.117 hadoop1 这个节点是 Master。
[zk: localhost:2181(CONNECTED) 33] ls /spark/master_status
[worker_worker-20210821150002-192.168.1.117-42360, worker_worker-20210821150002-192.168.1.118-39584, worker_worker-20210821150002-192.168.1.119-42991]
[zk: localhost:2181(CONNECTED) 34] get /spark/master_status
192.168.1.117
浏览器访问 http://hadoop1:8081 进入 Spark WebUI 界面,此时 hadoop1 节点 Master 的状态为 ALIVE。
浏览器访问 http://hadoop1:8082 ,可以看到 hadoop2 节点 Master 的状态为 STANDBY。
停止 hadoop1 节点上的 Master 进程。
[root@hadoop1 software]# spark/sbin/stop-master.sh
stopping org.apache.spark.deploy.master.Master
等待几秒以后 hadoop2 节点的状态从 STANDBY 切换为 ALIVE。
Spark HA 测试完成,重新启动 hadoop1 节点的 Master 进程。
[root@hadoop1 software]# spark/sbin/start-master.sh
starting org.apache.spark.deploy.master.Master, logging to /software/spark/logs/spark-root-org.apache.spark.deploy.master.Master-1-hadoop1.out
--master 参数的连接地址后可以指定多个 Master 的地址,当第一个 Master 无法连接时,会依次往后尝试连接其他的 Master。
[root@hadoop1 software]# spark/bin/spark-shell --master spark://hadoop1:7077,hadoop2:7077
21/08/21 18:00:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://hadoop1:4040
Spark context available as 'sc' (master = spark://hadoop1:7077,hadoop2:7077, app id = app-20210821180100-0000).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 3.1.2
/_/
Using Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_181)
Type in expressions to have them evaluated.
Type :help for more information.
Spark 可以使用 Yarn、Mesos、Kubernetes 作为底层资源调度系统,目前 Mesos 使用的已经比较少了,本文将介绍 Spark 使用 Yarn 和 Kubernetes 作为调度系统的应用。
Spark On Yarn 模式的搭建比较简单,仅需要在 Yarn 集群的一个节点上安装 Spark 客户端即可,该节点可以作为提交 Spark 应用程序到 Yarn 集群的客户端。Spark 本身的 Master 节点和 Worker 节点不需要启动。前提是我们需要准备好 Yarn 集群,关于 Yarn 集群的安装可以参考 Hadoop 分布式集群安装。
使用此模式需要修改 Spark 的配置文件 conf/spark-env.sh,添加 Hadoop 相关属性,指定 Hadoop 配置文件所在的目录:
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
修改完毕后,即可运行 Spark 应用程序,例如运行 Spark 自带的求圆周率的例子,并以 Spark On Yarn 的 Cluster 模式运行。
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \
/software/spark/examples/jars/spark-examples_2.12-3.1.2.jar
在 Yarn 的 ResourceManager 对应的 WebUI 界面中可以查看应用程序执行的详细信息。
在 Application 详情页中可以查看输出日志的详细信息。
可以看到最后计算 Pi 的输出结果。
目前基于 Kubernetes 的 Spark 的应用主要采用两种方式运行:
Spark Operator 定义了两个 CRD(Custom Resource Definitions,自定义资源定义)对象,SparkApplication 和 ScheduledSparkApplication。 这些 CRD 是 Spark 作业的抽象,使得在 Kubernetes 集群中可以使用 YAML 来定义这些作业。另外还提供了 [sparkctl] (https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/blob/master/sparkctl/README.md) 命令行工具方便我们操控 SparkApplication 和 ScheduledSparkApplication CRD 资源对象。
使用 Spark On K8S Operator 模式时,需要预先在 Kubernetes 集群中部署 Spark Operator 容器,用于将 SparkApplication 和 ScheduledSparkApplication 这些 CRD 资源对象转换为 Kubernetes 原生的资源对象,例如 Pod,Service 等等。
在 Spark On K8S 模式中,Spark 客户端需要与 Kubernetes API Server 直接通信来创建相关的 Kubernetes 资源。
Spark On K8S 和 Spark On K8S Operator 提交作业的方式如下图所示。
使用 Spark On K8S Operator 模式时,需要预先在 Kubernetes 集群中部署 Spark Operator。
添加 Spark On K8S Operator Helm 仓库并下载 Helm 资源文件。
helm repo add spark-operator https://googlecloudplatform.github.io/spark-on-k8s-operator
helm pull spark-operator/spark-operator --untar
修改 values.yaml 文件中有以下两个地方需要修改:
helm 项目名-spark
。使用 helm install
命令安装 Spark Operator,spark-job 命名空间是之后提交 Spark 作业时使用的。
kubectl create namespace spark-job
helm install my-spark spark-operator \
--namespace spark-operator --create-namespace
确认 Spark Operator Pod 已经正常运行。
❯ kubectl get pod -n spark-operator
NAME READY STATUS RESTARTS AGE
my-spark-spark-operator-674cbc9d9c-8x22x 1/1 Running 0 5m24s
查看在 spark-job 命名空间创建的 ServiceAccount。
❯ kubectl get serviceaccounts -n spark-job
NAME SECRETS AGE
.....
my-spark-spark 1 2m33s
SparkApplications 资源对象中通常使用的 Cluster 模式来提交作业。在 YAML 文件中指定运行应由程序的 jar 包以及 main() 方法所在的类。
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
name: spark-pi
namespace: spark-job
spec:
type: Scala
mode: cluster
image: "registry.cn-hangzhou.aliyuncs.com/public-namespace/spark:v3.1.1"
imagePullPolicy: Always
mainClass: org.apache.spark.examples.SparkPi
mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.12-3.1.1.jar"
sparkVersion: "3.1.1"
restartPolicy:
type: Never
driver:
cores: 1
coreLimit: "1200m"
memory: "512m"
labels:
version: 3.1.1
serviceAccount: my-spark-spark
executor:
cores: 1
instances: 1
memory: "512m"
labels:
version: 3.1.1
等待一会,查看 SparkApplications 状态,COMPLETED 表示已经执行完成该作业。
❯ kubectl get sparkapplications -n spark-job spark-piNAME STATUS ATTEMPTS START FINISH AGEspark-pi COMPLETED 1 2021-10-04T13:13:27Z 2021-10-04T13:13:48Z 8h
查看在 spark-job 命名空间创建的 Pod 的日志,可以看到本次作业执行的详情。
kubectl logs -n spark-job spark-pi-driver spark-kubernetes-driver
使用 Spark On K8S 模式提交作业时我们通常可以使用 spark-submit 或者 spark-shell 两种命令行工具,其中 spark-submit 支持 Cluster 和 Client 两种提交方式,而 spark-shell 只支持 Client 一种提交方式。
Cluster 模式
使用 spark-submit 的 Cluster 模式提交作业时,由于我们的 Kubernetes 集群的 API Server 是使用自签名的证书进行 HTTPS 加密的,因此需要使用 spark.kubernetes.authenticate.submission.caCertFile
参数指定 Kubernetes 集群的 CA 证书,让 Spark 客户端信任自签名证书。注意这里的 ServiceAccount 需要自行创建并且赋予以下权限,如果你是按照顺序完成实验的,那么在前面 Spark On K8S Operator 中已经创建了该 ServiceAccount,可以跳过这一步。
❯ kubectl get rolebindings -n spark-job spark -o yaml
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
annotations:
meta.helm.sh/release-name: my-spark
meta.helm.sh/release-namespace: spark-operator
creationTimestamp: "2021-09-29T16:10:51Z"
labels:
app.kubernetes.io/instance: my-spark
app.kubernetes.io/managed-by: Helm
app.kubernetes.io/name: spark-operator
app.kubernetes.io/version: v1beta2-1.2.3-3.1.1
helm.sh/chart: spark-operator-1.1.6
name: spark
namespace: spark-job
resourceVersion: "204712527"
selfLink: /apis/rbac.authorization.k8s.io/v1/namespaces/spark-job/rolebindings/spark
uid: 225970e8-472d-4ea5-acb5-08630852f76c
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: Role
name: spark-role
subjects:
- kind: ServiceAccount
name: my-spark-spark
namespace: spark-job
❯ kubectl get role -n spark-job spark-role -o yaml
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
annotations:
meta.helm.sh/release-name: my-spark
meta.helm.sh/release-namespace: spark-operator
creationTimestamp: "2021-09-29T16:10:51Z"
labels:
app.kubernetes.io/instance: my-spark
app.kubernetes.io/managed-by: Helm
app.kubernetes.io/name: spark-operator
app.kubernetes.io/version: v1beta2-1.2.3-3.1.1
helm.sh/chart: spark-operator-1.1.6
name: spark-role
namespace: spark-job
resourceVersion: "204712525"
selfLink: /apis/rbac.authorization.k8s.io/v1/namespaces/spark-job/roles/spark-role
uid: 436afb3f-a304-4756-b64a-978d5836c3a2
rules:
- apiGroups:
- ""
resources:
- pods
verbs:
- '*'
- apiGroups:
- ""
resources:
- services
verbs:
- '*'
- apiGroups:
- ""
resources:
- configmaps
verbs:
- '*'
执行 spark-submit 命令向 Kubernetes 集群提交作业。
bin/spark-submit \
--master k8s://https://11.16.0.153:6443 \
--conf spark.kubernetes.authenticate.submission.caCertFile=/Users/chengzhiwei/software/spark/spark-3.1.2-bin-hadoop3.2/certs/ca.crt \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=my-spark-spark \
--deploy-mode cluster \
--name spark-pi-submit \
--class org.apache.spark.examples.SparkPi \
--conf spark.kubernetes.namespace=spark-job \
--conf spark.executor.instances=2 \
--conf spark.kubernetes.container.image.pullPolicy=Always \
--conf spark.kubernetes.container.image=registry.cn-hangzhou.aliyuncs.com/public-namespace/spark:v3.1.1 \
local:///opt/spark/examples/jars/spark-examples_2.12-3.1.1.jar
关于证书不受信任这里也有个讨巧的方式,就是使用 kubectl proxy
命令将 API Server 的 HTTPS 转化为 HTTP。
❯ kubectl proxy
Starting to serve on 127.0.0.1:8001
然后通过 http://localhost:8001 和 API Server 进行交互,此时就无需指定 CA 证书了。
bin/spark-submit \
--master k8s://http://localhost:8001 \
--deploy-mode cluster \
--name spark-pi-submit \
--class org.apache.spark.examples.SparkPi \
--conf spark.kubernetes.namespace=spark-job \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=my-spark-spark \
--conf spark.executor.instances=2 \
--conf spark.kubernetes.container.image.pullPolicy=Always \
--conf spark.kubernetes.container.image=registry.cn-hangzhou.aliyuncs.com/public-namespace/spark:v3.1.1 \
local:///opt/spark/examples/jars/spark-examples_2.12-3.1.1.jar
通过查看 Kubernetes 为本次 Spark 作业创建的 Pod 的日志,可以看到运行结果。
❯ kubectl logs -n spark-job spark-pi-submit-fc7b507c4be84351-driver
......
Pi is roughly 3.140075700378502
......
Client 模式
Client 模式无需指定 CA 证书,但是需要使用 spark.driver.host
和 spark.driver.port
指定提交作业的 Spark 客户端所在机器的地址,端口号默认就是 7078。
bin/spark-submit \
--master k8s://https://11.16.0.153:6443 \
--deploy-mode client \
--name spark-pi-submit-client \
--class org.apache.spark.examples.SparkPi \
--conf spark.kubernetes.namespace=spark-job \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=my-spark-spark \
--conf spark.executor.instances=2 \
--conf spark.kubernetes.container.image=registry.cn-hangzhou.aliyuncs.com/public-namespace/spark:v3.1.1 \
--conf spark.driver.host=11.8.38.43 \
--conf spark.driver.port=7078 \
/home/chengzw/spark-3.1.2-bin-hadoop3.2/examples/jars/spark-examples_2.12-3.1.2.jar
使用 Client 模式提交作业在终端就可以直接看到输出结果了。
spark-shell 只支持 Client 方式,使用以下命令连接 Kubernetes API Server 并打开 spark-shell 交互式界面。
bin/spark-shell \
--master k8s://https://11.16.0.153:6443 \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=my-spark-spark \
--deploy-mode client \
--name spark-shell \
--conf spark.kubernetes.namespace=spark-job \
--conf spark.executor.instances=2 \
--conf spark.kubernetes.container.image.pullPolicy=Always \
--conf spark.kubernetes.container.image=registry.cn-hangzhou.aliyuncs.com/public-namespace/spark:v3.1.1 \
--conf spark.driver.host=11.8.38.43 \
--conf spark.driver.port=7078
在 spark-shell 交互式界面执行一个简单的计算,取出 0~99 之间的值。
21/10/05 10:44:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://11.8.38.43:4040
Spark context available as 'sc' (master = k8s://https://11.16.0.153:6443, app id = spark-application-1633401878962).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 3.1.2
/_/
Using Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_101)
Type in expressions to have them evaluated.
Type :help for more information.
scala> val range = spark.range(100)
range: org.apache.spark.sql.Dataset[Long] = [id: bigint]
scala> range.collect()
res1: Array[Long] = Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99)
在运行 Spark Application 的时候,Spark 会提供一个 WebUI 列出应用程序的运行时信息,但是一旦该应用程序执行完毕后,将无法查看应用程序执行的历史记录。Spark History Server 就是为了处理这种情况而诞生的,我们可以将 Spark 作业的日志提交到一个统一的地方,例如 HDFS,然后 Spark History Server 就可以通过读取 HDFS 目录中的文件来重新渲染生成 WebUI 界面来展示应用程序执行的历史信息。
使用以下资源文件部署一个 Spark History Server,并且通过 NodePort Service 的方式将服务暴露到集群外部,集群外部可以通过节点地址:NodePort 来访问 Spark History Server。前提是我们需要准备好 HDFS 集群,关于 HDFS 集群的安装可以参考 Hadoop 分布式集群安装。
apiVersion: apps/v1
kind: Deployment
metadata:
name: spark-history-server
namespace: spark-job
spec:
selector:
matchLabels:
run: spark-history-server
replicas: 1
template:
metadata:
labels:
run: spark-history-server
spec:
containers:
- image: "registry.cn-hangzhou.aliyuncs.com/public-namespace/spark:v3.1.1"
name: spark-history-server
args: ["/opt/spark/bin/spark-class", "org.apache.spark.deploy.history.HistoryServer"]
ports:
- containerPort: 18080
name: http
env:
- name: SPARK_HISTORY_OPTS
value: "-Dspark.history.fs.logDirectory=hdfs://11.8.36.125:8020/spark-k8s"
---
apiVersion: v1
kind: Service
metadata:
name: spark-history-server
namespace: spark-job
spec:
ports:
- name: http
nodePort: 30080
port: 18080
protocol: TCP
targetPort: 18080
selector:
run: spark-history-server
type: NodePort
设置 spark.eventLog.enabled
参数值为 true 启用记录 Spark 日志,spark.eventLog.dir
指定输出日志的目录为 HDFS 目录。
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
name: spark-pi
namespace: spark-job
spec:
type: Scala
mode: cluster
image: "registry.cn-hangzhou.aliyuncs.com/public-namespace/spark:v3.1.1"
imagePullPolicy: Always
mainClass: org.apache.spark.examples.SparkPi
mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.12-3.1.1.jar"
sparkVersion: "3.1.1"
sparkConf:
"spark.eventLog.enabled": "true"
"spark.eventLog.dir": "hdfs://11.8.36.125:8020/spark-k8s"
restartPolicy:
type: Never
driver:
cores: 1
coreLimit: "1200m"
memory: "512m"
labels:
version: 3.1.1
serviceAccount: my-spark-spark
executor:
cores: 1
instances: 1
memory: "512m"
labels:
version: 3.1.1
在集群外通过节点地址:30080 访问 Spark History Server,可以在应用程序执行完毕后看到详细的信息。
bin/spark-submit \
--master k8s://https://11.16.0.153:6443 \
--conf spark.kubernetes.authenticate.submission.caCertFile=/Users/chengzhiwei/software/spark/spark-3.1.2-bin-hadoop3.2/certs/ca.crt \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=my-spark-spark \
--deploy-mode cluster \
--name spark-pi-submit \
--class org.apache.spark.examples.SparkPi \
--conf spark.kubernetes.namespace=spark-job \
--conf spark.executor.instances=2 \
--conf spark.kubernetes.container.image.pullPolicy=Always \
--conf spark.kubernetes.container.image=registry.cn-hangzhou.aliyuncs.com/public-namespace/spark:v3.1.1 \
--conf spark.eventLog.enabled=true \
--conf spark.eventLog.dir=hdfs://11.8.36.125:8020/spark-k8s \
local:///opt/spark/examples/jars/spark-examples_2.12-3.1.1.jar
上面的例子都是使用 Spark 官方自带的程序来提交作业,如果我们想要自定义一个程序可以使用 Spark 官网提供的脚本来构建镜像。
该项目使用 Maven 来管理依赖。
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.12</version>
<scope>compile</scope>
</dependency>
</dependencies>
程序代码如下,使用 Java 编写了一个 Word Count 程序。
package com.chengzw.wordcount;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
/**
* @description WordCount 示例
* @author chengzw
* @since 2021/7/25 8:39 下午
*/
public class MyJavaWordCount {
public static void main(String[] args) {
Logger.getLogger("org").setLevel(Level.OFF);
System.setProperty("spark.ui.showConsoleProgress","false");
//创建配置对象
//本地运行
//SparkConf conf = new SparkConf().setAppName("MyJavaWordCount").setMaster("local");
//在Spark上运行
SparkConf conf = new SparkConf().setAppName("MyJavaWordCount");
//创建SparkContext对象
JavaSparkContext sc = new JavaSparkContext(conf);
//读取hdfs数据
//在本地运行
//JavaRDD<String> rdd1= sc.textFile("/tmp/data.txt");
//在Spark上运行
JavaRDD<String> rdd1= sc.textFile(args[0]);
//分词
JavaRDD<String> rdd2 = rdd1.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String input) throws Exception {
return Arrays.asList(input.split(" ")).iterator();
}
});
//单词计数 word,1
JavaPairRDD<String, Integer> rdd3 = rdd2.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String word) throws Exception {
return new Tuple2<String, Integer>(word, 1);
}
});
//相同Key的值累加
JavaPairRDD<String, Integer> rdd4 = rdd3.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer a, Integer b) throws Exception {
return a + b;
}
});
//触发计算
List<Tuple2<String, Integer>> result = rdd4.collect();
//打印
for(Tuple2<String,Integer> r : result){
System.out.println(r._1 + "\t" + r._2);
}
//释放资源
sc.stop();
}
}
点击 mvn package 将程序打成 jar 包。
将 jar 包放到 Spark 安装包的 examples/jars 目录中,进入 Spark 目录然后执行以下命令构建镜像。
bin/docker-image-tool.sh -r registry.cn-hangzhou.aliyuncs.com/public-namespace -t my-spark:1.0.0 build
查看构建好的镜像。
❯ docker images | grep spark
registry.cn-hangzhou.aliyuncs.com/public-namespace/spark v1.0.0 372341ae930d 12 minutes ago 529MB
上传镜像。
./docker-image-tool.sh -r registry.cn-hangzhou.aliyuncs.com/public-namespace -t v1.0.0 push
使用自己构建的镜像执行 Word Count 程序。
bin/spark-submit \
--master k8s://https://11.16.0.153:6443 \
--conf spark.kubernetes.authenticate.submission.caCertFile=/Users/chengzhiwei/software/spark/spark-3.1.2-bin-hadoop3.2/certs/ca.crt \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=my-spark-spark \
--deploy-mode cluster \
--name spark-pi-submit \
--class com.chengzw.wordcount.MyJavaWordCount \
--conf spark.kubernetes.namespace=spark-job \
--conf spark.executor.instances=2 \
--conf spark.kubernetes.container.image.pullPolicy=Always \
--conf spark.kubernetes.container.image=registry.cn-hangzhou.aliyuncs.com/public-namespace/spark:v1.0.0 \
local:///opt/spark/examples/jars/spark-lab-1.0-SNAPSHOT.jar /etc/security/limits.conf
查看执行结果:
kubectl logs -n spark-job spark-pi-submit-37945f7c4f24e729-driver
#返回结果
......
rss 2
space 2
priority 4
4 1
this 1
"soft" 1
max 14
cpu 1
memlock 1
apply 1
......