有奖捉虫:办公协同&微信生态&物联网文档专题 HOT
Spark Load 通过外部的 Spark 资源实现对导入数据的预处理,提高 Doris 大数据量的导入性能并且节省 Doris 集群的计算资源。主要用于初次迁移,大数据量导入 Doris 的场景。
Spark Load 是利用了 Spark 集群的资源对要导入的数据进行了排序,Doris BE 直接写文件,这样能大大降低 Doris 集群的资源使用,对于历史海量数据迁移降低 Doris 集群资源使用及负载有很好的效果。
如果用户在没有 Spark 集群这种资源的情况下,又想方便、快速的完成外部存储历史数据的迁移,可以使用 Broker Load 。相对 Spark Load 导入,Broker Load 对 Doris 集群的资源占用会更高。
Spark Load 是一种异步导入方式,用户需要通过 MySQL 协议创建 Spark 类型导入任务,并通过 SHOW LOAD 查看导入结果。

适用场景

源数据在 Spark 可以访问的存储系统中,如 HDFS。
数据量在 几十 GB 到 TB 级别。

基本原理

用户通过 MySQL 客户端提交 Spark 类型导入任务,FE 记录元数据并返回用户提交成功。
Spark Load 任务的执行主要分为以下 5 个阶段。
1. FE 调度提交 ETL 任务到 Spark 集群执行。
2. Spark 集群执行 ETL 完成对导入数据的预处理,包括全局字典构建( Bitmap 类型)、分区、排序、聚合等。
3. ETL 任务完成后,FE 获取预处理过的每个分片的数据路径,并调度相关的 BE 执行 Push 任务。
4. BE 通过 Broker 读取数据,转化为 Doris 底层存储格式。
5. FE 调度生效版本,完成导入任务。
+
| 0. User create spark load job
+----v----+
| FE |---------------------------------+
+----+----+ |
| 3. FE send push tasks |
| 5. FE publish version |
+------------+------------+ |
| | | |
+---v---+ +---v---+ +---v---+ |
| BE | | BE | | BE | |1. FE submit Spark ETL job
+---^---+ +---^---+ +---^---+ |
|4. BE push with broker | |
+---+---+ +---+---+ +---+---+ |
|Broker | |Broker | |Broker | |
+---^---+ +---^---+ +---^---+ |
| | | |
+---+------------+------------+---+ 2.ETL +-------------v---------------+
| HDFS +-------> Spark cluster |
| <-------+ |
+---------------------------------+ +-----------------------------+

配置环境

使用前需要配置 FE Follower 节点的环境。只需在 Follwer 节点配置,Observer 节点不需要。
说明:
配置环境部分操作需要腾讯云技术人员支持,请在使用时 提交工单 与我们联系。

打通 Doris 到 EMR 集群的网络

最好保证同 VPC 同子网同安全组,这样网络联通问题最少。如果不满足,需要联系腾讯云网络运维共同解决。最终需通过 ping / telnet 确认连通性。

配置 Hadoop 客户端

FE 底层通过执行 Yarn 命令去获取正在运行的 Application 的状态以及杀死 Application,因此需要为 FE 配置 Yarn 客户端。建议使用 2.5.2 或以上的版本。
需要客户提供要连接的 Yarn 集群的 Hadoop 客户端,如此可保证和 EMR 集群版本一致,避免出现兼容性问题,且其中包含必需的 hdfs-site.xml,yarn-site.xm,core-site.xml 等配置文件。
获取到 Hadoop 客户端后,让其放置到 /usr/local/service下,改名为 Hadoop 或 建一个指向此目录的名为 Hadoop 的软链。
修改路径/usr/local/service/hadoop/etc/hadoop的权限,保证 Doris FE 启动用户有写权限: chown -R doris:doris /usr/local/service/hadoop/etc/hadoop
说明:
如果要使用的 Spark 是在腾讯云 EMR,请 提交工单 联系 EMR 运维,将 Follower FE 的 IP 加入体外客户端白名单。
使用 Doris FE 启动用户(1.2版本之前为 root,1.2及之后为 doris)执行 hdfs dfs -ls / 命令,验证 Hadoop 客户端是否安装成功。

配置 Spark 客户端

FE 底层通过执行 spark-submit 的命令去提交 Spark 任务,因此需要为 FE 配置 Spark 客户端,建议使用 2.4.5 或以上的 Spark2 官方版本。
最好让客户提供要使用的yarn集群的 spark 客户端,这样版本一致,且其中包含必需的 spark-defaults.conf, hive-site.xml 等配置文件。
获取到 Spark 客户端后,让其放置到/usr/local/service下,改名为 spark 或 建一个名为 spark 的软链指向其。
然后执行以下命令:
cd /usr/local/service/spark/jars/
zip spark_jars.zip *.jar
修改 spark 的日志级别(log4j.rootLogger),改为 INFO。
vim /usr/local/service/spark/conf/log4j.properties
使用 doris FE 启动用户(1.2版本之前为 root,1.2及之后为 doris)执行以下命令,验证 Hadoop 及 Spark 客户端是否安装成功。
spark-submit --queue default --master yarn --deploy-mode cluster --class org.apache.spark.examples.SparkPi /usr/local/service/spark/examples/jars/spark-examples_*.jar 10

配置 Doris

配置 spark load 任务的目录,每个任务生成一个日志文件,Doris 通过监控日志文件获取任务 ID 和状态。
mkdir -p /usr/local/service/doris/log/spark_launcher_log;
chmod 777 /usr/local/service/doris/log/spark_launcher_log
通过管控,在 FE.conf 中添加配置:
spark_home_default_dir=/usr/local/service/spark
spark_resource_path=/usr/local/service/spark/jars/spark_jars.zip
yarn_client_path=/usr/local/service/hadoop/bin/yarn
yarn_config_dir=/usr/local/service/hadoop/etc/hadoop
spark_dpp_version=1.2-SNAPSHOT
配置完毕后重启 Master FE。
说明:
添加配置中,若需要腾讯云技术人员支持,请在使用时 提交工单 与我们联系。

使用 Spark load

创建 Spark Resource

一个典型的创建语句模板如下:
CREATE EXTERNAL RESOURCE spark_resource_xxx
PROPERTIES
(
"type" = "spark",
"spark.master" = "yarn",
"spark.submit.deployMode" = "cluster",
"spark.yarn.queue" = "<xxx_queue>",
"spark.hadoop.yarn.resourcemanager.ha.enabled" = "true",
"spark.hadoop.yarn.resourcemanager.ha.rm-ids" = "rm1,rm2",
"spark.hadoop.yarn.resourcemanager.address.rm1" = "<rm1_host>:<rm1_port>",
"spark.hadoop.yarn.resourcemanager.address.rm2" = "<rm2_host>:<rm2_port>",
"spark.hadoop.fs.defaultFS" = "hdfs://<hdfs_defaultFS>",
"spark.hadoop.dfs.nameservices" = "<hdfs_defaultFS>",
"spark.hadoop.dfs.ha.namenodes.<hdfs_defaultFS>" = "nn1,nn2",
"spark.hadoop.dfs.namenode.rpc-address.<hdfs_defaultFS>.nn1" = "<nn1_host>:<nn1_port>",
"spark.hadoop.dfs.namenode.rpc-address.<hdfs_defaultFS>.nn2" = "<nn2_host>:<nn2_port>",
"spark.hadoop.dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider",
"working_dir" = "hdfs://<hdfs_defaultFS>/doris/spark_load",
"broker" = "<doris_broker_name>",
"broker.username" = "hadoop",
"broker.password" = "",
"broker.dfs.nameservices" = "<hdfs_defaultFS>",
"broker.dfs.ha.namenodes.<hdfs_defaultFS>" = "nn1, nn2",
"broker.dfs.namenode.rpc-address.HDFS4001273.nn1" = "<nn1_host>:<nn1_port>",
"broker.dfs.namenode.rpc-address.HDFS4001273.nn2" = "<nn2_host>:<nn2_port>",
"broker.dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
);
各参数具体含义请参见 Spark Load > 创建 resource 参数
上述配置适用于绝大多数 EMR 集群,即开启了 RM HA 和 HDFS HA 的集群。Doris 也支持使用非 HA 集群、或开启了 Kerberos 认证的集群,具体的配置方法参见 Spark Load > 支持 Kerberos 认证
说明:
腾讯云 EMR 的常见端口:<rm1_port>: 5000、<nn1_port>: 4007。

授权对 Spark Resource 的使用

普通账户无法创建 resource,只能看到自己有 USAGE_PRIV 使用权限的资源。因此如果有普通账户需要使用某资源,需要给他授权。授予权限后也可撤销。具体命令如下:
-- 授予spark0资源的使用权限给用户user0
GRANT USAGE_PRIV ON RESOURCE "spark_resource_xxx" TO "user0"@"%";

-- 授予spark0资源的使用权限给角色role0
GRANT USAGE_PRIV ON RESOURCE "spark_resource_xxx" TO ROLE "role0";

-- 授予所有资源的使用权限给用户user0
GRANT USAGE_PRIV ON RESOURCE * TO "user0"@"%";

-- 授予所有资源的使用权限给角色role0
GRANT USAGE_PRIV ON RESOURCE * TO ROLE "role0";

-- 撤销用户user0的spark0资源使用权限
REVOKE USAGE_PRIV ON RESOURCE "spark_resource_xxx" FROM "user0"@"%";

执行 Spark load 任务

LOAD LABEL test_label_01 --label
(
DATA INFILE ("hdfs://HDFS4001234/warehouse/ods.db/user_events/ds=2023-04-15/*" --file path)
INTO TABLE user_events --doris table
FORMAT AS "parquet" --data format
( event_time, user_id, op_code) --columns in file
COLUMNS FROM PATH AS ( `ds` ) --partition column
SET
( --column mapping
ds = ds,
event_time = event_time,
user_id = user_id,
op_code = op_code
)
)
WITH RESOURCE 'spark_resource_xxx'
( --spark job params
"spark.executor.memory" = "4g",
"spark.default.parallelism" = "400",
"spark.executor.cores" = '5',
"spark.executor.instances" = '10'
)
PROPERTIES
( --doris load task params
"timeout" = "259200"
);
需要根据实际情况修改 label,file path,doris table,分区列、列映射关系、spark 任务参数等的值。一个 label 如果任务成功则不可重复使用。
更详细配置参见:Broker Load 手册

管理任务

需要先进入到导入目标表所在的库:use xxx_db;

查看任务

show load where label='test_label_01';
重点关注结果中的 STATE 和 PROGRESS 列。作业的状态转换路径为:
- State
导入任务当前所处的阶段。任务提交之后状态为 PENDING,提交 Spark ETL 之后状态变为 ETL,ETL 完成之后 FE 调度 BE 执行 push 操作状态变为 LOADING,push 完成并且版本生效后状态变为 FINISHED。
导入任务的最终阶段有两个:CANCELLED 和 FINISHED,当 Load Job 处于这两个阶段时导入完成。其中 CANCELLED 为导入失败,FINISHED 为导入成功。
- Progress
导入任务的进度描述。分为两种进度:ETL 和 LOAD,对应了导入流程的两个阶段 ETL 和 LOADING。
LOAD 的进度范围为:0~100%。
`LOAD 进度 = 当前已完成所有 Replica 导入的 Tablet 个数 / 本次导入任务的总 Tablet 个数 * 100%`
**如果所有导入表均完成导入,此时 LOAD 的进度为 99%** 导入进入到最后生效阶段,整个导入完成后,LOAD 的进度才会改为 100%。
导入进度并不是线性的。所以如果一段时间内进度没有变化,并不代表导入没有在执行。

取消任务

如果任务未结束,可执行命令将其结束:cancel load where label='test_label_01';

问题排查

一个 spark load 任务失败可能是 doris 的问题,也可能是外部使用的 yarn 集群的问题。因此两边都需要排查。
先从 Doris 侧排查:
1. 使用show load 命令根据 label 查看load 任务信息。
2. 每个任务会在目录/usr/local/service/doris/log/spark_launcher_log下生成一个日志文件,默认保留3天。进入此目录后执行 ls *<load_task_label>* 进行模糊搜索。
3. 如果找不到上述任务日志文件或文件为空,需要查看 doris fe 日志 fe.log / fe.warn.log: cd /data/cdw/doris/fe/log/
4. 如果日志显示 Spark Job 执行失败,则需让用户查看 Spark Job 日志确认具体报错。