前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >hadoop生态之sqoop

hadoop生态之sqoop

作者头像
SRE运维实践
发布2021-03-04 10:26:21
6290
发布2021-03-04 10:26:21
举报
文章被收录于专栏:SRE运维实践SRE运维实践

序言

在使用大数据的时候,各种不同的数据都要将数据采集同步到数据仓库中,一个是属于业务系统的RDBMS系统,也就是各种关系型数据库,一个是hadoop生态的存储,中间用于传输的数据的工具可以使用sqoop,也就是sql to hadoop。

在数据进入数仓的ODS层的时候,使用sqoop,在进入hadoop之后,就可以使用其他的计算框架进行分析,例如hive,MR,spark等。

sqoop

1 sqoop所处的位置

sqoop是一个用于数据传输的工具,是连接RDBMS和数仓ODS的桥梁:

sqoop是将结构化数据同步到hdfs中,也可以是hive和hbase等,支持不同的数据库,只要将相关的连接数据库驱动放到安装sqoop的lib库中即可,从而能连接,进行数据的导入导出操作。

在进行使用sqoop的时候,考虑到任务数量的众多,需要从不同的业务系统中同步数据,而业务系统使用的数据库又是多种多样的,从数仓的建立来说,需要确定相关的指标,从而需要首先规划好哪些数据库,哪些数据需要同步,任务数量多,从而需要考虑任务的优先级管理,任务的优先级决定了指标的产出时间,可以划分核心指标,从而得到核心任务,也就是需要优先产出的任务,从而定义好相应的任务。

血缘关系,也就是sqoop数据导入任务才是第一步,后面还有数仓中的各种数据清洗,数据统计分析的任务,从而需要划分好任务依赖。

为了方便问题的排查,也就是对于sqoop的导入数据任务来说,每个导入使用一个导入job来实现。

ODS作为第一层,保持业务数据的一致性,基本不会对数据进行任何处理,直接保存在数仓中。从而表命名可以使用ods_databasename_tablename_di,ods表示表所在的层级,数据库名称表示数据的来源,而表名和业务系统一致,di表示每日增量day increase,全量的就不需要标注了,对于任务名称来说,也可以使用表名一致的方式,从而便于问题的排查,见名知意。

2 数据分类

在业务系统的表里面的数据,可以分为三类数据,一种是全量数据,这种表示同步的时候,只要全部同步就好,数据量比较少,而且变化的频率不高,像用户,地区这种维度表。

对于增量数据,也就是这个表里面的数据只会存在insert操作,而不会存在update操作,像交易表,这种只会新增记录,基本不会涉及修改。

对于新增及变化的数据,这种一般就是订单表,不但每天都会有新增的数据,而且会存在修改的动作,也就是修改订单的状态,从而每天都会发生变化。

3 sqoop使用

sqoop的安装很简单,只要下载之后,修改一下简单的配置就可以使用,但是由于sqoop需要使用hadoop的map任务,从而需要提前安装好hadoop,在这个基础上安装sqoop(注意将连接数据库的驱动放到sqoop的lib目录中)。

代码语言:javascript
复制
[root@KEL1 conf]# cat sqoop-env.sh 
# Set Hadoop-specific environment variables here.
#Set path to where bin/hadoop is available
export HADOOP_COMMON_HOME=/opt/module/hadoop-2.7.2
#Set path to where hadoop-*-core.jar is available
export HADOOP_MAPRED_HOME=/opt/module/hadoop-2.7.2
#set the path to where bin/hbase is available
#export HBASE_HOME=
#Set the path to where bin/hive is available
export HIVE_HOME=/opt/module/apache-hive-1.2.2
#Set the path for where zookeper config dir is
#export ZOOCFGDIR=
[root@KEL1 conf]# which hadoop
/opt/module/hadoop-2.7.2/bin/hadoop

全量导入(使用--query的方式):

代码语言:javascript
复制
#!/bin/bash
SQOOP=/opt/module/sqoop/bin/sqoop
JOB_DATE=$(date -d '-1 day' +%F)

if [[ -n "$2" ]];then
        JOB_DATE=$2
fi

export_date() {
$SQOOP import \
--connect "jdbc:mysql://kel1:3306/shop?useUnicode=true&characterEncoding=utf-8" \
--username root \
--password root \
--target-dir /user/root/shop/$1/$JOB_DATE \
--delete-target-dir \
--query "$2 and \$CONDITIONS" \
--num-mappers 1 \
--fields-terminated-by ',' \
--null-string '\\N' \
--null-non-string '\\N'
}
#passowrd表示连接数据库的密码,target-dir表示保存在hdfs的哪个路径
#delete-target-dir表示删除已经存在的目录,否则如果目录存在报错
#query表示查询导入的sql语句,num-mappers表示使用的map个数
#fields-terminated-by表示在hdfs上用什么分割字段
#null-string和null-non-stinrg表示在hdfs统一用\N存储
export_date  s_auction  "select * from s_auction where 1=1"

全量导出:

代码语言:javascript
复制
#!/bin/bash
SQOOP=/opt/module/sqoop/bin/sqoop
JOB_DATE=$(date -d '-1 day' +%F)

if [[ -n "$2" ]];then
  JOB_DATE=$2
fi

export_date() {
$SQOOP export \
--connect "jdbc:mysql://kel1:3306/shop?useUnicode=true&characterEncoding=utf-8" \
--username root \
--password root \
--table $1  \
--input-fields-terminated-by "," --fields-terminated-by ',' \
--map-column-java gmt_create=java.sql.Date \
--export-dir /user/hive/warehouse/ods_shop.db/$1/ds=$JOB_DATE
}
#注意数据类型的转换,从而使用了参数--map-column-java
export_date $1

执行之后,可以看到map程序的执行结果:

从文件名也可以看到只有map阶段,查看结果:

使用table的参数进行全量导入:

代码语言:javascript
复制
[root@KEL1 jobscript]# cat ods_shop_s_sale.sh 
#!/bin/bash
SQOOP=/opt/module/sqoop/bin/sqoop
JOB_DATE=$(date -d '-1 day' +%F)

if [[ -n "$2" ]];then
  JOB_DATE=$2
fi

export_date() {
$SQOOP import \
--connect "jdbc:mysql://kel1:3306/shop?useUnicode=true&characterEncoding=utf-8" \
--username root \
--password root \
--target-dir /user/root/shop/$1/$JOB_DATE \
--delete-target-dir \
--num-mappers 1 \
--fields-terminated-by ',' \
--null-string '\\N' \
--null-non-string '\\N' \
--table s_sale
}

export_date  s_sale

在使用--query的时候,可以指定你想要的列进行导入数据,也就是select指定具体的字段,从而可以进行导入。

在导出的时候,注意字段的对应关系,如果字段不对应,可能导致数据错位从而导致数据错误。

增量导入:

代码语言:javascript
复制
#!/bin/bash
SQOOP=/opt/module/sqoop/bin/sqoop
JOB_DATE=$(date -d '-1 day' +%F)

if [[ -n "$2" ]];then
        JOB_DATE=$2
fi

export_date() {
$SQOOP import \
--connect "jdbc:mysql://kel1:3306/shop?useUnicode=true&characterEncoding=utf-8" \
--username root \
--password root \
--target-dir /user/root/shop/$1/$JOB_DATE \
--num-mappers 1 \
--fields-terminated-by ',' \
--null-string '\\N' \
--null-non-string '\\N' \
--table s_sale \
--incremental append \
--check-column id \
--last-value 42
}
export_date  s_sale

确定增量的时候,主要是根据你给的字段来进行判断是否为insert,从而每次也需要一个last-value来确定一个比较的值,最后会把增量的数据放在单独的文件中。

其中的内容则是新增的数据:

注意在使用这种增量数据导入的时候,数据库中id的类型,否则会报错,无法执行:

代码语言:javascript
复制
#使用增量导入的时候,checkcolumn必须是自增的rowid,否则导入报错
21/02/28 10:40:04 INFO tool.ImportTool: Maximal id query for free form incremental import: SELECT MAX(`id`) FROM `s_sale`
21/02/28 10:40:04 ERROR tool.ImportTool: Import failed: Character column (id) can not be used to determine which rows to incrementally import.

后续的值可以用来重新生成任务:

变化及新增:

代码语言:javascript
复制
#!/bin/bash
SQOOP=/opt/module/sqoop/bin/sqoop
JOB_DATE=$(date -d '-1 day' +%F)

if [[ -n "$2" ]];then
        JOB_DATE=$2
fi

export_date() {
$SQOOP import \
--connect "jdbc:mysql://kel1:3306/shop?useUnicode=true&characterEncoding=utf-8" \
--username root \
--password root \
--target-dir /user/root/shop/$1/$JOB_DATE \
--num-mappers 1 \
--fields-terminated-by ',' \
--null-string '\\N' \
--null-non-string '\\N' \
--table s_sale \
--incremental lastmodified \
--check-column starts \
--last-value "2018-01-01 00:00:00" \
--mergy-key id
}
export_date  s_sale

这个执行的时候,最好starts能根据数据的改变自动修改时间,这样就能进行一个时间的对比,并且按照mergykey的设定来进行数据的合并。

总体看来,如果表里面有两个时间字段,那么还是使用--query的形式来选择时间方便点。

3 导入到hive

如果是需要导入到hive里面,也有相关的参数可以用,做了两部分内容,一部分是创建hive的表结构,第二部分是导入数据;如果任务是导入到hdfs,那么还有一个手动load的过程。

在从hive里面导出的时候,需要手动建表,例如mysql里面的表需要提前创建好。

4 可能出现的问题

a 查找相关日志

导入数据的时候,使用sqoop的时候,不会显示详细的报错日志,如下所示,只能看到是export的任务失败,至于失败原因就不清楚了:

可以找到jobhistory的web页面,对比相关的任务号,在sqoop的日志中有相应的打印信息:

点击其中的job id,从而进入到job的详情页面,在application master处点击对应的log信息:

在日志的页面中,可以看到部分日志,需要再次找到full的日志,点击如下的here查看全部日志信息:

可以看到是因为日期的转换出现问题:

在对20190206转换成yyyy-mm-dd的时候,出现错误。

b 任务执行失败,jobhistory服务未启动

代码语言:javascript
复制
2021-02-25 23:30:50,336 Stage-1 map = 0%,  reduce = 0%
java.io.IOException: java.net.ConnectException: Call From KEL1/192.168.1.99 to KEL:10020 failed on connection exception: java.net.ConnectException: Connection refused; For more details see:  http://wiki.apache.org/hadoop/ConnectionRefused
  at org.apache.hadoop.mapred.ClientServiceDelegate.invoke(ClientServiceDelegate.java:343)
  at org.apache.hadoop.mapred.ClientServiceDelegate.getJobStatus(ClientServiceDelegate.java:428)
  at org.apache.hadoop.mapred.YARNRunner.getJobStatus(YARNRunner.java:572)
  at org.apache.hadoop.mapreduce.Cluster.getJob(Cluster.java:184)
  at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:593)
  at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:591)
  at java.security.AccessController.doPrivileged(Native Method)
  at javax.security.auth.Subject.doAs(Subject.java:422)
  at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
  at org.apache.hadoop.mapred.JobClient.getJobUsingCluster(JobClient.java:591)
  at org.apache.hadoop.mapred.JobClient.getJobInner(JobClient.java:601)
  at org.apache.hadoop.mapred.JobClient.getJob(JobClient.java:631)

sqoop连接10020端口,将相关日志发送到jobhistory服务器。

c 导入的目录已经存在

代码语言:javascript
复制
//导入的目录已经存在 --delete-target-dir
21/02/26 02:15:21 INFO Configuration.deprecation: mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps
21/02/26 02:15:22 ERROR tool.ImportAllTablesTool: Encountered IOException running import job: org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory hdfs://ns/user/root/BUCKETING_COLS already exists

d 默认4个map任务,无法进行分割,split-by指定,或者使用1一个map任务

代码语言:javascript
复制
//到处的表没有主键,要么使用-m 1来指定一个map任务,要么使用split-by
21/02/26 02:12:14 INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-root/compile/e4ea9c1fb8c56a292e224d796c6812f8/COMPLETED_TXN_COMPONENTS.jar
21/02/26 02:12:14 ERROR tool.ImportAllTablesTool: Error during import: No primary key could be found for table COMPLETED_TXN_COMPONENTS. Please specify one with --split-by or perform a sequential import with '-m 1'.

e 字符串切割出错

代码语言:javascript
复制
// 在sqoop import 后面加参数 -Dorg.apache.sqoop.splitter.allow_text_splitter=true
21/02/26 02:19:42 INFO db.DataDrivenDBInputFormat: BoundingValsQuery: SELECT MIN(`TOKEN_IDENT`), MAX(`TOKEN_IDENT`) FROM `DELEGATION_TOKENS`
21/02/26 02:19:42 INFO mapreduce.JobSubmitter: Cleaning up the staging area /tmp/hadoop-yarn/staging/root/.staging/job_1614267326631_0013
21/02/26 02:19:42 ERROR tool.ImportAllTablesTool: Encountered IOException running import job: java.io.IOException: Generating splits for a textual index column allowed only in case of "-Dorg.apache.sqoop.splitter.allow_text_splitter=true" property passed as a parameter

f 转换时间格式

代码语言:javascript
复制
2021-02-26 10:53:49,020 INFO [IPC Server handler 3 on 41321] org.apache.hadoop.mapred.TaskAttemptListenerImpl: Diagnostics report from attempt_1614267326631_0082_m_000003_0: Error: java.io.IOException: Can't export data, please check failed map task logs
  at org.apache.sqoop.mapreduce.TextExportMapper.map(TextExportMapper.java:122)
  at org.apache.sqoop.mapreduce.TextExportMapper.map(TextExportMapper.java:39)
  at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:146)
  at org.apache.sqoop.mapreduce.AutoProgressMapper.run(AutoProgressMapper.java:64)
  at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:787)
  at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
  at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164)
  at java.security.AccessController.doPrivileged(Native Method)
  at javax.security.auth.Subject.doAs(Subject.java:422)
  at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
  at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
Caused by: java.lang.RuntimeException: Can't parse input data: '20190308'
  at s_auction.__loadFromFields(s_auction.java:1176)
  at s_auction.parse(s_auction.java:958)
  at org.apache.sqoop.mapreduce.TextExportMapper.map(TextExportMapper.java:89)
  ... 10 more
Caused by: java.lang.IllegalArgumentException
  at java.sql.Date.valueOf(Date.java:143)
  at s_auction.__loadFromFields(s_auction.java:1029)
  ... 12 more

在hive中进行时间转换:

代码语言:javascript
复制
insert overwrite  table  ss_pay_order_delta PARTITION(ds='20210222') 
select pay_order_id,total_fee,seller_id,buyer_id, 
pay_status,from_unixtime(unix_timestamp(pay_time,'yyyymmdd'),
'yyyy-mm-dd') as pay_time,
from_unixtime(unix_timestamp(gmt_create,'yyyymmdd'),'yyyy-mm-dd') 
as gmt_create,refund_fee,confirm_paid_fee 
from s_pay_order_delta;
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-02-28,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 SRE运维实践 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
数据库
云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档