增量 DB 数据到 HDFS

最近更新时间:2019-08-06 15:45:15

Sqoop 是一款开源的工具,主要用于在 Hadoop 和传统数据库(MySQL、PostgreSQL 等)之间进行数据传递,可以将一个关系型数据库(例如:MySQL、Oracle、Postgres等)中的数据导入到 Hadoop 的 HDFS 中,也可以将 HDFS 的数据导入到关系型数据库中。 Sqoop 中一大亮点就是可以通过 Hadoop 的 MapReduce 把数据从关系型数据库中导入数据到 HDFS。

本文介绍了 Sqoop 的增量导入操作,即在数据库中的数据增加或更新后,把数据库的改动同步到导入 HDFS 的数据中。其中分为 append 模式和 lastmodified 模式,append 模式只能用在数据库的数据增加但不更新的场景,lastmodified 模式用在数据增加并且更新的场景。

1. 开发准备

  • 确认您已经开通了腾讯云,并且创建了一个 EMR 集群。在创建 EMR 集群的时候需要在软件配置界面选择 Sqoop 组件。
  • Sqoop 等相关软件安装在路径 EMR 云服务器的/usr/local/service/路径下。

2. 使用 append 模式

本节将继续使用上一节的用例。
进入 EMR 控制台,复制目标集群的实例 ID,即集群的名字。再进入关系型数据库控制台,使用 Ctrl+F 进行搜索,找到集群对应的 MySQL 数据库,查看该数据库的内网地址 $mysqlIP。

登录 EMR 集群中的任意机器,最好是登录到 Master 节点。登录 EMR 的方式请参考 登录 Linux 实例。这里我们可以选择使用 WebShell 登录。单击对应云服务器右侧的登录,进入登录界面,用户名默认为 root,密码为创建 EMR 时用户自己输入的密码。输入正确后,即可进入命令行界面。

在 EMR 命令行先使用以下指令切换到 Hadoop 用户,并进入 Sqoop 文件夹:

[root@172 ~]# su hadoop
[hadoop@172 ~]# cd /usr/local/service/sqoop

连接 MySQL 数据库:

[hadoop@172 sqoop]$ mysql -h $mysqlIP –p
Enter password:

密码为您创建 EMR 集群的时候设置的密码。
连接了 MySQL 数据库之后,在表 sqoop_test 中新增一条数据,如下:

mysql> use test;
Database changed

mysql> insert into sqoop_test values(null, 'forth', now(), 'hbase');
Query ok, 1 row affected(0.00 sec)

查看表中的数据:

Mysql> select * from sqoop_test;
+----+--------+---------------------+---------+
| id | title  | time                | content |
+----+--------+---------------------+---------+
|  1 | first  | 2018-07-03 15:29:37 | hdfs    |
|  2 | second | 2018-07-03 15:30:57 | mr      |
|  3 | third  | 2018-07-03 15:31:07 | yarn    |
|  4 | forth  | 2018-07-03 15:39:38  | hbase    |
+----+--------+---------------------+---------+
4 rows in set (0.00 sec)

使用 append 模式将新增的数据同步到上一节中储存数据的 HDFS 路径中:

[hadoop@172 sqoop]$ bin/sqoop-import --connect jdbc:mysql://$mysqlIP/test --username 
root -P --table sqoop_test --check-column id  --incremental append --last-value 3 --target-dir 
/sqoop

其中 $mysqlIP 为您的 MySQL 数据库的内网地址。

执行命令会需要您输入数据库的密码,默认为您创建 EMR 集群时设置的密码。比普通的 sqoop-import 命令多出一些参数,其中 --check-column 为导入时参照的数据,--incremental 为导入的模式,在此例中为 append,--last-value 为参考数据的参考值,比该值更新的数据都会导入到 HDFS 中。
执行成功之后,可以查看 HDFS 相应目录下更新后的数据:

[hadoop@172 sqoop]$ hadoop fs -cat /sqoop/*
1, first, 2018-07-03 15:29:37.0,hdfs
2, second, 2018-07-03 15:30:57.0,mr
3, third, 2018-07-03 15:31:07.0,yarn
4,forth,2018-07-03 15:39:38.0,hbase

使用 Sqoop job

使用 append 同步 HDFS 中的数据每次需要手动输入 --last-value,也可以使用 sqoop job 的方式,Sqoop 会自动保存上次导入成功的 last-value 值。如果要使用 sqoop job。需要启动 sqoop-metastore 进程,操作步骤如下:

首先在 conf/sqoop-site.xml 中启动 sqoop-metastore 进程:

<property>
  <name>sqoop.metastore.client.enable.autoconnect</name>
  <value>ture</value>
</property>

然后在bin目录下启动 sqoop-metastore 服务:

./sqoop-metastore &

使用如下指令创建 Sqoop job:

说明:

此命令适用于 Sqoop 1.4.6 版本。

[hadoop@172 sqoop]$ bin/sqoop job --create job1 -- import --connect
jdbc:mysql://$mysqlIP/test --username root -P --table sqoop_test --check-column id 
--incremental append --last-value 4 --target-dir /sqoop

其中 $mysqlIP 为您的 MySQL 的内网地址。使用该命令就成功创建了一个 Sqoop job,每一次执行,会自动从上次更新的 last-value 值自动更新。

为 MySQL 中的 sqoop_test 表格新增一条记录:

mysql> insert into sqoop_test values(null, 'fifth', now(), 'hive');
Query ok, 1 row affected(0.00 sec)

Mysql> select * from sqoop_test;
+----+--------+---------------------+---------+
| id | title  | time                | content |
+----+--------+---------------------+---------+
|  1 | first  | 2018-07-03 15:29:37 | hdfs    |
|  2 | second | 2018-07-03 15:30:57 | mr      |
|  3 | third  | 2018-07-03 15:31:07 | yarn    |
|  4 | forth  | 2018-07-03 15:39:38  | hbase    |
|  5 | fifth  | 2018-07-03 16:02:29   | hive    |
+----+--------+---------------------+---------+
5 rows in set (0.00 sec)

然后执行 Sqoop job:

[hadoop@172 sqoop]$ bin/sqoop job --exec job1

执行该命令会让您输入 MySQL 的密码。执行成功后,可以查看 HDFS 相应目录下更新后的数据:

[hadoop@172 sqoop]$ hadoop fs -cat /sqoop/*
1, first, 2018-07-03 15:29:37.0,hdfs
2, second, 2018-07-03 15:30:57.0,mr
3, third, 2018-07-03 15:31:07.0,yarn
4,forth,2018-07-03 15:39:38.0,hbase
5,fifth,2018-07-03 16:02:29.0,hive

3. 使用 lastmodified 模式

直接创建一个 sqoop-import 的 lastmodified 模式的 Sqoop job,首先查询 sqoop_test 中最后更新的时间:

mysql> select max(time) from sqoop_test;

创建一个 Sqoop job:

[hadoop@172 sqoop]$ bin/sqoop job --create job2 -- import --connect jdbc:mysql://$mysqlIP/test --username root -P --table sqoop_test --check-column time --incremental lastmodified --merge-key id --last-value '2018-07-03 16:02:29' --target-dir /sqoop

其中 $mysqlIP 为您的 MySQL 的内网地址。新增了几个参数,--check-column 必须使用 timestamp,--incremental 模式选择 lastmodified,--merge-key 选择 ID,--last-value 为我们查询到的表中的最后更新时间。在这个时间之后做出的更新都会被同步到 HDFS 中去,而 Sqoop job 每次会自动保存和更新该值。

对 MySQL 中的 sqoop_test 表添加数据并做出更改:

mysql> insert into sqoop_test values(null, 'sixth', now(), 'sqoop');
Query ok, 1 row affected(0.00 sec)

mysql> update sqoop_test set time=now(), content='spark' where id = 1;
Query OK, 1 row affected (0.00 sec)
Rows matched: 1 changed: 1 warnings: 0

Mysql> select * from sqoop_test;
+----+--------+---------------------+---------+
| id | title  | time                | content |
+----+--------+---------------------+---------+
|  1 | first  | 2018-07-03 16:07:46 | spark    |
|  2 | second | 2018-07-03 15:30:57 | mr      |
|  3 | third  | 2018-07-03 15:31:07 | yarn    |
|  4 | forth  | 2018-07-03 15:39:38  | hbase    |
|  5 | fifth  | 2018-07-03 16:02:29   | hive    |
|  6 | fifth  | 2018-07-03 16:09:58   | sqoop    |
+----+--------+---------------------+---------+
6 rows in set (0.00 sec)

执行 Sqoop job:

[hadoop@172 sqoop]$ bin/sqoop job --exec job2

执行该命令会让您输入 MySQL 的密码。执行成功后,可以查看 HDFS 相应目录下更新后的数据:

[hadoop@172 sqoop]$ hdfs dfs -cat /sqoop/*
1,first,2018-07-03 16:07:46.0,spark
2,second,2018-07-03 15:30:57.0,mr
3,third,2018-07-03 15:31:07.0,yarn
4,forth,2018-07-03 15:39:38.0,hbase
5,fifth,2018-07-03 16:02:29.0,hive
6,sixth,2018-07-03 16:09:58.0,sqoop

更多的 Sqoop 操作可以查看 官方文档