Kudu 数据迁移

最近更新时间:2025-08-01 17:55:02

我的收藏
如果您需要将自建 Kudu 集群 的原始数据迁移至腾讯云 EMR,支持以下两种方式,第一种是通过 Kudu 的 table copy 命令迁移,第二种是通过 Kudu 的备份恢复工具进行数据迁移。其中 table copy 方式适用于数据量小一些的表(建议不超过500GB)超过500GB甚至TB级别的表建议使用 Kudu 备份恢复工具进行迁移。两种方法可以搭配使用,对于大表初始全量迁移使用 Kudu 备份恢复工具,增量迁移使用 kudu table copy 方式。由于 Kudu 通常会搭配 Impala 来使用,因此完整的 Kudu 数据迁移需要包含 Impala 表的迁移。

前置步骤

网络打通

将源集群 Kudu 数据迁移到 EMR 集群需要有专线打通两个集群的网络。

提交机准备

在迁移过程中需要准备一个提交机用来执行迁移的命令和脚本,可以是 EMR 集群的 router 点。下文中的命令和脚本无特殊说明均指在此提交机上运行。

Impala 表元数据迁移(可选)

首先下载 Impala 元数据迁移批量操作脚本:
#切换到hadoop用户
su - hadoop

wget https://cnb.cool/tencent/POC-MigrateX/scripts-copy-kudu-table/-/git/raw/main/scripts-copy-kudu-table.tar

#解压
tar xvf scripts-copy-kudu-table.tar

cd scripts-copy-kudu-table-ddl
1. 编辑 env.sh 设置源端和目标端的 impalad 地址。
2. source env.sh 初始化环境变量。
3. 将要迁移的表按照 每行一个 db.tablename 的格式写到txt文件中,例如 all-tables.txt。
4. nohup ./show-and-create-multi-tables.sh all-tables.txt & 后台执行。

方式1:Kudu table copy 方式

该方式使用 Kudu 的 table copy 命令,将 Kudu 数据从源表拷贝到目标表,并且支持添加过滤条件。示例命令如下:
kudu table copy {src_kudu_master} default.usertable {dst_kudu_master} -num_threads=8 -write_type=upsert -predicates='["AND",["=","key","value"]]'
局限性:
1. copy table 仅支持处理新增和更新,delete 场景需要在目标端也同步删除。
2. copy table 命令中的过滤条件只支持且的逻辑即 AND,如果需要按多个条件或来过滤需要将每个条件单独写一条 copy table 命令来执行。

方式2:Kudu 备份恢复工具迁移

从 Kudu 1.10.0 开始,Kudu 可以通过 Spark 备份工具来做全量和增量表备份。此外,它还支持通过 Spark 恢复工具从备份中恢复表。在迁移过程中专线带宽通常会是瓶颈,因此这里我们采用在源集群备份,然后将备份文件拷贝到目标集群的方式来降低对专线带宽的占用。如带宽资源充足,也可直接将文件备份到目标集群的 HDFS 中。

步骤1:下载 Kudu 备份恢复工具

根据 Kudu 版本和 Spark 版本在 mvn 仓库下载适合的工具包。
下载地址:

步骤2:备份表数据到源集群 HDFS

新建 kudu-backup.sh 文件将如下脚本内容粘贴到文件中。
#!/bin/bash

input_file=$1 #脚本的入参是一个文件路径,文件中每一行是一个待备份的Kudu表的完整名称。

executors_num=10

tbls=`cat $input_file | tr '\\n' ' '`

CMD="spark-submit --master yarn --deploy-mode cluster --driver-memory 4g --executor-memory 4G --executor-cores 1 --num-executors $executors_num --class org.apache.kudu.backup.KuduBackup kudu-backup2_2.11-1.13.0.jar --kuduMasterAddresses <kudu-master-ips> --rootPath hdfs://<nameservice of source cluster>/kudu-backups $tbls"

echo $CMD

bash -c "$CMD"
批量执行备份的操作如下,由于 Kudu 备份我们使用的是源集群的资源,这里使用的是默认的1个并发来执行,即同一时间只有一张表在备份。
su - hadoop
vi kudu-backup.sh #将前面的脚本贴进来
chmod +x kudu-backup.sh
./kudu-backup.sh all-tables.txt

步骤3:将备份目录从源集群拷贝到目标集群

在腾讯云 EMR 集群执行 hadoop distcp 命令来跨集群拷贝文件。
脚本 dist-copy-single.sh:
#!/bin/bash

DIR_TO_COPY=$1

bandwidthMBPerMap=15
mapNumber=20
SRC_NAMESERVICE="hdfs://<nameservice-of-source-cluster>"
DST_NAMESERVICE="hdfs://<nameservice-of-target-cluster>"


FROM="${SRC_NAMESERVICE}/kudu-backups/${DIR_TO_COPY}"
TO="${DST_NAMESERVICE}/kudu-backups/${DIR_TO_COPY}"

hadoop distcp -Ddistcp.dynamic.recordsPerChunk=50 -Ddistcp.dynamic.max.chunks.tolerable=10000 -Dmapreduce.map.memory.mb=4096 -Dmapreduce.map.java.opts="-Xms3500m -Xmx3500m" -prbugpt -i -strategy dynamic -numListstatusThreads 100 -skipcrccheck -update -delete -m $mapNumber -bandwidth $bandwidthMBPerMap $FROM $TO
dist-copy.sh:
#!/bin/bash

total=`wc -l $1`
idx=0
while read line
do
DIR=$line
((idx++))
echo "processing $DIR (${idx}/${total})"
./dist-copy-single.sh $DIR
if [ $? -eq 0 ];then
echo "$DIR was successfully copied."
else
echo "ERROR:failed to copy $DIR."
fi
done < $1
新建上述2个脚本,并添加执行权限,将待迁移的目录放到 table-backups.txt 文件中,执行如下命令批量拷贝。
nohup ./dist-copy.sh table-backups.txt > dist-copy.log &

步骤4:在目标集群从备份文件中恢复 Kudu 表的数据

新建 kudu-restore.sh 文件将如下脚本内容粘贴到文件中。
#!/bin/bash

input_file=$1

executors_num=20

tbls=`cat $input_file | tr '\\n' ' '`

CMD="spark-submit --master yarn --deploy-mode cluster --driver-memory 4g --executor-memory 4G --executor-cores 1 --num-executors $executors_num --conf spark.task.maxFailures=10 --conf spark.yarn.maxAppAttempts=0 --class org.apache.kudu.backup.KuduRestore kudu-backup2_2.11-1.13.0.jar --kuduMasterAddresses <kudu-master-ips-of-target> --createTables false --rootPath hdfs://<nameservice-of-target-cluster>//kudu-backups $tbls"

echo $CMD

$CMD

批量执行恢复的操作如下:
su - hadoop
vi kudu-restore.sh #将前面的脚本贴进来
chmod +x kudu-restore.sh
./kudu-restore.sh all-tables.txt
大表 restore 前先跑一次副本均衡,遇到分区数据倾斜,可以对目标端尝试增加 hash 分区数量,任务进行中也能进行 rebalance。
kudu cluster rebalance <kudu-master-ips> --enable_range_rebalancing -tables=table1,table2

增量数据迁移

增量迁移使用 kudu table copy 命令,详情请参见方式一,通过 -predicates 参数来减少对源表的扫描和对目标表的写入。因此需要维护增量字段。例如我们按照创建时间和更新时间做增量迁移可以参考下面的命令。
# 由于predicates参数不支持OR,因此多个条件或的场景使用多条命令来实现
kudu table copy ${src_kudu_master} $table_name ${dst_kudu_master} -num_threads=$num_threads -write_type=upsert -predicates='[">=","create_time","2024-07-01 00:00:00"]'
kudu table copy ${src_kudu_master} $table_name ${dst_kudu_master} -num_threads=$num_threads -write_type=upsert -predicates='[">=","update_time","2024-07-01 00:00:00"]'

需要注意 kudu table copy 不会同步 schema 变更,在迁移期间尽可能不要去添加列和删除列等元数据变更操作,会导致 kudu table copy 失败,需要手动在目标端做相应的元数据变更。如果源端有删除的动作,也需要额外在目标端删除一次。

数据验证

如果执行过程没有报错,代表迁移动作执行成功,可以对表和分区粒度使用 count 计数来进行简单的迁移结果验证。

常见问题

1. 创建目标端表时报错 NonRecoverableException: the requested number of tablet replicas is over the maximum permitted at creation time (720), additional tablets may be added by adding range partitions to the table post-creation

原因:建表时指定的分区太多,这里主要是针对 Range 分区,可以先把表建出来,然后再通过alter table add partition来把需要的分区添加上去。

2. 在执行 Spark 备份任务时报错 snapshot scan start timestamp is earlier than the ancient history mark. Consider increasing the value of the configuration parameter --tablet_history_max_age_sec.

原因:2次备份的间隔时间太久,增量备份的起始时间小于最早历史快照的时间(7天前)解决办法:删除备份目录重新跑一次全量备份,后续保持至少2~3天备份一次。

3. 在执行 Spark 恢复任务和 Kudu table copy 任务时,Kudu 的写入频率慢,同时 TServer 的 CPU 和磁盘负载不高。

调整参数 --maintenance_manager_num_threads(建议值为物理磁盘数量,如 4)以优化 flush 性能。