如果您需要将自有 HDFS 的原始数据迁移至腾讯云 EMR,可以通过两种方式进行数据迁移,第一种是通过腾讯云对象存储(COS)进行数据中转迁移,第二种是通过 Hadoop 自带文件迁移工具 DistCp 进行数据迁移。本文主要介绍通过 DistCp 进行数据迁移。
DistCp 是 Hadoop 自带的文件迁移工具。它使用 MapReduce 来实现其分发、错误处理和恢复、报告的功能。它将文件和目录的列表扩展为映射任务的输入,每个任务将复制源列表中指定的文件的分区。使用 DistCp 需要确保自建集群和 EMR 集群的网络互通。
使用 DistCP 数据迁移步骤如下:
步骤1:网络打通
本地自建 HDFS 文件迁移到 EMR
本地自建 HDFS 文件迁移到 EMR 集群需要有专线或者 VPN 打通网络,可以参考以下文档:
CVM 上的自建 HDFS 文件迁移到 EMR
如果 CVM 的所属网络和 EMR 集群的所属网络在同一 VPC 下,则可以直接传输文件。
如果 CVM 的所属网络和 EMR 集群的所属网络在不同 VPC 下,则需要使用对等连接将网络打通。
使用对等连接
网段1示例:广州的 VPC1 中的子网 A 192.168.1.0/24。
网段2示例:北京的 VPC2 中的子网 B 10.0.1.0/24。
1. 登录 私有网络控制台 > 对等连接,在列表上方选择地域广州,选择私有网络 VPC1,然后单击+新建。


2. 进入建立对等连接页。
名称:对等连接的名称,例如 PeerConn。
本端地域:填写本地端地域,例如广州。
本端网络:填写本端网络,例如 VPC1。
对端账户类型:填写对端网络所属账户,如果广州和北京两个网络在同一账户下,选择我的账户,如果不在同一账户,则要选择其它账户。
说明:
如果本端网络和对端网络都在同一地域,例如广州,通信是免费的,也不需要选择带宽上限;如果不在同一地域,就要进行收费,同时带宽上限可选。
对端地域:填写对端地域,例如北京。
对端网络:填写对端网络,例如 VPC2。

3. 同账户内私有网络进行连接,新建后对等连接立即生效;与其它账户私有网络创建对等连接,需要对端接受此对等连接后生效。参见 同账号创建对等连接通信 和 跨账号创建对等连接通信。
4. 为对等连接配置本端和对端路由表。
登录 私有网络控制台 ,单击左侧目录中的子网,进入管理页面。单击对等连接本端指定子网(例如广州的子网 VPC1)的关联路由表 ID,进入路由表详情页。

单击新增路由策略。

目的端中填入对端 CIDR(例如北京的 VPC2 的 CIDR 是10.0.1.0/24),下一跳类型选择对等连接,下一跳选择已建立的对等连接(PeerConn)。

以上步骤是配置广州 VPC1 到北京 VPC2 的路由表,还需要配置北京 VPC2 到广州 VPC1 的配置,配置过程同上。
路由表配置完成后,不同私有网络的网段之间即可进行通信。
步骤2:环境准备
1. hostname 配置。
如果您源集群节点配置了 hostname,在进行数据迁移时,会使用 hostname 进行通信,所以 emr 集群节点需要能识别这些 hostname 与 ip 的映射关系,可以通过配置 DNS 或者在 EMR 机器节点上 hosts 文件里面添加映射。
2. 源集群 HA 配置。
如果在迁移数据时,使用源集群的 nameservice 来进行 hdfs 访问,则需在 EMR 的 hdfs-site.xml 中新增源集群 nameservice 的 HA 配置。
<property><name>dfs.nameservices</name><value>nameservice1,nameservice2</value></property><property><name>dfs.ha.namenodes.nameservice2</name><value>namenode1,namenode2</value></property><property><name>dfs.namenode.rpc-address.nameservice2.namenode1</name><value>xxx-hadoop-test02.xxx.com:8020</value></property><property><name>dfs.namenode.rpc-address.nameservice2.namenode2</name><value>xxx-hadoop-test03.xxx.com:8020</value></property><property><name>dfs.client.failover.proxy.provider.nameservice2</name><value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value></property>
说明:
1. 源集群 nameservice 值可在 hdfs-site.xml 通过 dfs.nameservices 查找。
2. namenode1、namenode2代表源集群 namenode 节点的 ip/hostname 信息。
3. 源集群 namenode 通信地址,可通过 dfs.namenode.rpc-address.xxx 参数查找。
4. 上述源集群 HA 信息,建议只在客户端节点配置,目标集群的服务节点请勿配置。
步骤3:执行拷贝
1. 初始数据全量迁移。
# 控制台输出info日志export HADOOP_CLIENT_OPTS="-Dhadoop.root.logger=INFO,console $HADOOP_CLIENT_OPTS"# HA distcp指令hadoop distcp -prbugp -i -strategy dynamic -numListstatusThreads 100 -skipcrccheck -update -m 20 -bandwidth 100 hdfs://${source_nameservice}/apps/hive/warehouse/xxx.db/xxx_info_h /apps/hive/warehouse/xxx.db/xxxx_info_h# 非HA distcp指令hadoop distcp -prbugp -i -strategy dynamic -numListstatusThreads 100 -skipcrccheck -update -m 20 -bandwidth 100 hdfs://nn1:8020/apps/hive/warehouse/xxx.db/xxx_info_h hdfs://nn2:4007/apps/hive/warehouse/xxx.db/xxxx_info_h
2. 增量迁移。
使用-update 参数,仅同步源端和目标端文件大小或校验不一致的文件,以达到增量迁移效果。
# 控制台输出info日志export HADOOP_CLIENT_OPTS="-Dhadoop.root.logger=INFO,console $HADOOP_CLIENT_OPTS"# HA distcp指令hadoop distcp -prbugp -i -strategy dynamic -numListstatusThreads 100 -skipcrccheck -update -m 20 -bandwidth 100 hdfs://${source_nameservice}/apps/hive/warehouse/xxx.db/xxx_info_h /apps/hive/warehouse/xxx.db/xxxx_info_h# 非HA distcp指令hadoop distcp -prbugp -i -strategy dynamic -numListstatusThreads 100 -skipcrccheck -update -m 20 -bandwidth 100 hdfs://nn1:8020/apps/hive/warehouse/xxx.db/xxx_info_h hdfs://nn2:4007/apps/hive/warehouse/xxx.db/xxxx_info_h
3. 数据校验。
# 使用count 命令查询目录文件个数和大小hadoop fs -count /apps/hive/warehouse/xxx.db/xxxx_info_h# 使用hive库表进行数据count /明细查询比对select count(1) from ${db}.${table} where pt =${pt}
注意:
如果另一个客户端仍然在写入源文件,则该拷贝可能会失败;如果一个文件正在被拷贝到目的端,试图重写该文件的操作可能会失败;如果源文件在被复制之前被移动,那么拷贝可能将失败,报错信息为 FileNotFoundException。
在进行数据增量迁移时,需要使用 update 参数,但是在指定 update 参数时,只会更新源位置和目标位置的文件的大小,校验和不同的文件,但是针对源位置已删除的数据,在目标端无法同步删除,这时可以结合 delete 删除这类数据,但是在使用delete参数时,需格外注意源目录和目标目录的位置,以防因为目录错误导致数据误删除。
以上示例中的参数,可以根据实际情况来进行调整。
命令参数说明:
-p[rbugpcaxtq] 当同时使用-update选项时,即使被拷贝文件的内容没有被更新,它的状态信息也会被更新r:副本数,b:块大小,u:所属用户,g:所属用户组,p:许可,c:校验和类型,a:访问控制,t:时间戳,q:Quota信息-i 拷贝过程中忽略失败-log <logdir> 指定日志路径-v 指定日志中的额外信息-m <num_maps> 最大的同时运行的执行拷贝的任务数-numListstatusThreads 构建被拷贝文件的文件列表时所用的线程数,该选项会提高distcp的运行速度-overwrite 覆盖目标位置的文件-update 如果源位置和目标位置的文件的大小,校验和不同,则更新目标位置的文件-append 当同时使用-update选项时,追加源位置的文件内容到目标位置的文件-f <urilist_uri> 将<urilist_uri>文件的内容作为需要拷贝的文件列表-filters 指定一个本地文件,其文件内容是多条正则表达式。当被拷贝的文件与某条正则表达式匹配时,则该文件不会被拷贝-async 异步运行distcp命令-atomic {-tmp <tmp_dir>} 指定一次原子性的拷贝,可以添加一个临时目录的选项,作为拷贝过程中的暂存目录-bandwidth 指定每个拷贝任务的传输带宽,单位MB/s-delete 删除掉目标位置中存在,但源位置不存在的文件。该选项通常会和-update配合使用,表示将源位置和目标位置的文件同步,删除掉目标位置多余的文件(使用 -delete 需谨慎,可能导致数据误删,建议先测试并备份)-diff <oldSnapshot> <newSnapshot> 将新旧版本之间的差异内容,拷贝到目标位置的旧版本文件中-skipcrccheck 是否跳过源文件和目标文件之间的CRC校验-strategy {dynamic|uniformsize} 指定拷贝任务的拷贝策略,默认策略是uniformsize,即每个拷贝任务复制相同的字节数
参数样例
hadoop distcp OPTIONS [source_path...] <target_path>示例1(原集群/source目录迁移到目标集群target目录):hadoop distcp hdfs://sourceip:/source hdfs://targetip:ipc_port:/target示例2(原集群/source1和/source2目录迁移到目标集群target目录):hadoop distcp hdfs://sourceip:/source1 hdfs://sourceip:/source2 hdfs://targetip:ipc_port:/target示例3(使用-f指定源文件/文件夹路径):hadoop distcp -f /home/source hdfs://targetip:ipc_port:/target[hadoop@9 /root]$ cat /home/hadoop/sourcehdfs://source_ip:ipc_port/source1hdfs://source_ip:ipc_port/source2/hadoop.loghdfs://source_ip:ipc_port/source2/a示例4(增量拷贝)hadoop distcp -update hdfs://sourceip:/source hdfs://targetip:ipc_port:/target含义是如果假如原集群/source/a文件在目标集群已经存在(/target/source/a),则-update参数会忽略该文件,只拷贝目标集群没有的文件示例5(忽略错误)hadoop distcp -i hdfs://sourceip:/source hdfs://targetip:ipc_port:/target如果发生读写异常,导致某个文件拷贝失败,该失败不会导致DistCp任务终止示例6(指定/限制拷贝速率)hadoop distcp -m 100 -bandwidth 200 hdfs://sourceip:/source hdfs://targetip:ipc_port:/target其中-m(默认20个)指定作业的map数量,-bandwidth(默认100MB)指定了每个map所能使用的最大的带宽,需要根据实际需要合理的调整带宽已达到限速/加速目的示例7(附带拷贝源文件的其他文件属性)hadoop distcp -prbugpt hdfs://sourceip:/source hdfs://targetip:ipc_port:/target-p指定了需要保留的源文件的属性,参数说明如下:r:源文件副本数b:源文件块大小u:源文件用户g:源文件用户组p:源文件权限t:源文件创建时间示例8(过滤部分文件,该功能在284以上版本才支持,273版本没有该功能)hadoop distcp -filter /home/filter.txt hdfs://sourceip:/source hdfs://targetip:ipc_port:/target对源文件中匹配到filter指定正则的文件进行过滤,上述命令表示对以log结尾的文件不进行拷贝[hadoop@9 /root]$ cat /home/filter.txtlog$
数据迁移报错排查方法
数据迁移任务本质上是使用 mapreduce 任务进行数据迁移的,所以可以通过 EMR 的 YARN UI 查看任务的状态以及日志。
1. 在 EMR 控制台 的集群服务界面,单击 YARN UI 链接。

2. 登录 YARN UI 后,能看到如下界面,包含多个 distcp 任务。

3. 通过单击对应的 applicationId,可以查看详情。

4. 点击上图的 tracking url,可以进一步查看 logs,包含成功或失败的日志。


Distcp 常见错误
1.拷贝出现Mismatch in length....原因分析:源文件在open状态,并且持续写入,distcp在写入完成后会校验源和目的文件长度是否一致,如果不一致就会出现该错误解决办法:手动关闭正在写入的文件,并重新进行拷贝,关闭文件租约命令:hdfs debug recoverLease -path /path2.oom问题当拷贝数据量较大时,可能会出现内存溢出,可以修改mapred-site.xml中的mapreduce.map.memory.mb调大(默认1024,推荐4096),或者在使用如下方式指定map内存大小:hadoop distcp -Dmapreduce.map.memory.mb=4096 /source hdfs://ip:port/target3.出现无法解析的域名错误一般出现在非EMR集群与EMR集群之间的拷贝,在读取非emr集群的数据时,NN向客户端返回了DN的机器名,机器名在EMR集群上并不能被识别,解决方案:在EMR集群core节点上的/etc/hosts中添加自建集群的所有DN节点的ip hostname映射关系4.大目录问题概率出现错误:org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not find any valid local directory for emr/hadoop-yarn/staging/hanoi/.staging/_distcp2128238759/intermediate.1目前解决方式,递归拆分大目录进行拷贝5.小文件拷贝出现大量小文件,拷贝效率低a.调大map数,通过-m指定更大的map数,加速处理b.增加参数-numListstatusThreads 30,多线程统计文件信息,加速AM切分任务的速度c.建议采用-strategy dynamic模式进行文件切分6.-m 指定400报错问题Too many chunks created with splitRatio:2, numMaps:400. Reduce numMaps or decrease split-ratio to proceed.解决方案:mapred-site.xml中添加,参数值可以根据实际情况来调整:distcp.dynamic.max.chunks.tolerable 800