目标:掌握Sqoop常用命令的使用
路径
实施
语法
sqoop import | export \
--数据库连接参数
--HDFS或者Hive的连接参数
--配置参数
数据库参数
导入参数
导出参数
其他参数
连接Oracle语法
--connect jdbc:oracle:thin:@OracleServer:OraclePort:OracleSID
测试采集Oracle数据
进入
docker exec -it sqoop bash
测试
sqoop import \
--connect jdbc:oracle:thin:@oracle.bigdata.cn:1521:helowin \
--username ciss \
--password 123456 \
--table CISS4.CISS_BASE_AREAS \
--target-dir /test/full_imp/ciss4.ciss_base_areas \
--fields-terminated-by "\t" \
-m 1
查看结果
小结
目标:实现YARN的资源调度配置
实施
常用端口记住:排错
YARN调度策略
YARN面试题
问题1:程序已提交YARN,但是无法运行,报错:Application is added to the scheduler and is not activated. User’s AM resource limit exceeded.
yarn.scheduler.capacity.maximum-am-resource-percent=0.8
问题2:程序提交,运行失败,报错:无法申请Container
yarn.scheduler.minimum-allocation-mb=512
问题3:怎么提高YARN集群的并发度?
物理资源、YARN资源、Container资源、进程资源
YARN资源配置
yarn.nodemanager.resource.cpu-vcores=8
yarn.nodemanager.resource.memory-mb=8192
Container资源
yarn.scheduler.minimum-allocation-vcores=1
yarn.scheduler.maximum-allocation-vcores=32
yarn.scheduler.minimum-allocation-mb=1024
yarn.scheduler.maximum-allocation-mb=8192
MR Task资源
mapreduce.map.cpu.vcores=1
mapreduce.map.memory.mb=1024
mapreduce.reduce.cpu.vcores=1
mapreduce.reduce.memory.mb=1024
Spark Executor资源
--driver-memory #分配给Driver的内存,默认分配1GB
--driver-cores #分配给Driver运行的CPU核数,默认分配1核
--executor-memory #分配给每个Executor的内存数,默认为1G,所有集群模式都通用的选项
--executor-cores #分配给每个Executor的核心数,YARN集合和Standalone集群通用的选项
--total-executor-cores NUM #Standalone模式下用于指定所有Executor所用的总CPU核数
--num-executors NUM #YARN模式下用于指定Executor的个数,默认启动2个
实现:修改问题1中的配置属性
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-jgRIa2kT-1673426702988)(Day2_数仓设计及数据采集.assets/image-20210822085238536.png)]
目标:了解MR的Uber模式的配置及应用
实施
Spark为什么要比MR要快
只有Map和Reduce阶段,每个阶段的结果都必须写入磁盘
MapReduce程序处理是进程级别:MapTask进程、ReduceTask进程
问题:MR程序运行在YARN上时,有一些轻量级的作业要频繁的申请资源再运行,性能比较差怎么办?
功能:Uber模式下,程序只申请一个AM Container:所有Map Task和Reduce Task,均在这个Container中顺序执行
配置:${HADOOP_HOME}/etc/hadoop/mapred-site.xml
mapreduce.job.ubertask.enable=true
#必须满足以下条件
mapreduce.job.ubertask.maxmaps=9
mapreduce.job.ubertask.maxreduces=1
mapreduce.job.ubertask.maxbytes=128M
yarn.app.mapreduce.am.resource.cpu-vcores=1
yarn.app.mapreduce.am.resource.mb=1536M
特点
小结
目标:掌握Sqoop采集数据时的问题
路径
实施
现象
step1:查看Oracle中CISS_SERVICE_WORKORDER表的数据条数
select count(1) as cnt from CISS_SERVICE_WORKORDER;
step2:采集CISS_SERVICE_WORKORDER的数据到HDFS上
sqoop import –connect jdbc:oracle:thin:@oracle.bigdata.cn:1521:helowin –username ciss –password 123456 –table CISS4.CISS_SERVICE_WORKORDER –delete-target-dir –target-dir /test/full_imp/ciss4.ciss_service_workorder –fields-terminated-by “\001” -m 1
- step3:Hive中建表查看数据条数
- 进入Hive容器
```
docker exec -it hive bash
```
- 连接HiveServer
```
beeline -u jdbc:hive2://hive.bigdata.cn:10000 -n root -p 123456
```
- 创建测试表
```sql
create external table test_text(
line string
)
location '/test/full_imp/ciss4.ciss_service_workorder';
```
- 统计行数
```
select count(*) from test_text;
```
问题:Sqoop采集完成后导致HDFS数据与Oracle数据量不符
原因
sqoop以文本格式导入数据时,默认的换行符是特殊字符
Oracle中的数据列中如果出现了\n、\r、\t等特殊字符,就会被划分为多行
Oracle数据
id name age
001 zhang\nsan 18
Sqoop遇到特殊字段就作为一行
001 zhang
san 18
Hive
id name age
001 zhang
san 18
解决
小结
目标:掌握使用Avro格式解决采集换行问题
路径
实施
常见格式介绍
类型 | 介绍 |
---|---|
TextFile | Hive默认的文件格式,最简单的数据格式,便于查看和编辑,耗费存储空间,I/O性能较低 |
SequenceFile | 含有键值对的二进制文件,优化磁盘利用率和I/O,并行操作数据,查询效率高,但存储空间消耗最大 |
AvroFile | 特殊的二进制文件,设计的主要目标是为了满足schema evolution,Schema和数据保存在一起 |
OrcFile | 列式存储,Schema存储在footer中,不支持schema evolution,高度压缩比并包含索引,查询速度非常快 |
ParquetFile | 列式存储,与Orc类似,压缩比不如Orc,但是查询性能接近,支持的工具更多,通用性更强 |
Avro格式特点
Sqoop使用Avro格式
选项
--as-avrodatafile Imports data to Avro datafiles
注意:如果使用了MR的Uber模式,必须在程序中加上以下参数避免类冲突问题
-Dmapreduce.job.user.classpath.first=true
使用测试
sqoop import \
-Dmapreduce.job.user.classpath.first=true \
--connect jdbc:oracle:thin:@oracle.bigdata.cn:1521:helowin \
--username ciss \
--password 123456 \
--table CISS4.CISS_SERVICE_WORKORDER \
--delete-target-dir \
--target-dir /test/full_imp/ciss4.ciss_service_workorder \
--as-avrodatafile \
--fields-terminated-by "\001" \
-m 1
Hive中建表
进入Hive容器
docker exec -it hive bash
连接HiveServer
beeline -u jdbc:hive2://hive.bigdata.cn:10000 -n root -p 123456
创建测试表
create external table test_avro(
line string
)
stored as avro
location '/test/full_imp/ciss4.ciss_service_workorder';
统计行数
select count(*) from test_avro;
小结
目标:回顾Sqoop增量采集方案
路径
实施
Append
要求:必须有一列自增的值,按照自增的int值进行判断
特点:只能导入增加的数据,无法导入更新的数据
场景:数据只会发生新增,不会发生更新的场景
代码
sqoop import \
--connect jdbc:mysql://node3:3306/sqoopTest \
--username root \
--password 123456 \
--table tb_tohdfs \
--target-dir /sqoop/import/test02 \
--fields-terminated-by '\t' \
--check-column id \
--incremental append \
--last-value 0 \
-m 1
Lastmodified
要求:必须包含动态时间变化这一列,按照数据变化的时间进行判断
特点:既导入新增的数据也导入更新的数据
场景:一般无法满足要求,所以不用
代码
sqoop import \
--connect jdbc:mysql://node3:3306/sqoopTest \
--username root \
--password 123456 \
--table tb_lastmode \
--target-dir /sqoop/import/test03 \
--fields-terminated-by '\t' \
--incremental lastmodified \
--check-column lastmode \
--last-value '2021-06-06 16:09:32' \
-m 1
特殊方式
要求:每次运行的输出目录不能相同
特点:自己实现增量的数据过滤,可以实现新增和更新数据的采集
场景:一般用于自定义增量采集每天的分区数据到Hive
代码
sqoop import \
--connect jdbc:mysql://node3:3306/db_order \
--username root \
--password-file file:///export/data/sqoop.passwd \
--query "select * from tb_order where substring(create_time,1,10) = '2021-09-14' or substring(update_time,1,10) = '2021-09-14' and \$CONDITIONS " \
--delete-target-dir \
--target-dir /nginx/logs/tb_order/daystr=2021-09-14 \
--fields-terminated-by '\t' \
-m 1
小结