前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【Spark数仓项目】需求八:MySQL的DataX全量导入和增量导入Hive

【Spark数仓项目】需求八:MySQL的DataX全量导入和增量导入Hive

作者头像
火之高兴
发布2024-07-25 15:41:54
310
发布2024-07-25 15:41:54
举报
文章被收录于专栏:大数据应用技术

【Spark数仓项目】需求八:MySQL的DataX全量导入和增量导入Hive

一、mysql全量导入hive[分区表]

需求介绍:

本需求将模拟从MySQL中向Hive数仓中导入数据,数据以时间分区。测试两种导入场景,一种是将数据全量导入,即包含所有时间分区;另一种是每天运行调度,仅导入当天时间分区中的用户数据。


  • mysql表建表语句:
代码语言:javascript
复制
create table t_order(
	id   	 	int   primary key auto_increment,
	amt  	 	decimal(10,2),
	`status` 	int  default 0,
	user_id  	int,
	create_time timestamp DEFAULT CURRENT_TIMESTAMP,
	modify_time timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
)
  • hive
代码语言:javascript
复制
create table t_order(
	id   	 	int,
	amt  	 	decimal(10,2),
	`status` 	int,
	user_id  	int,
	create_time date,
	modify_time date
)partitioned by (dt string)
row format delimited 
fields terminated by '\t'

注意字段时间戳,我们将从以上MySQL向Hive导入数据。

  • 编写datax的json脚本
代码语言:javascript
复制
{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "connection": [
                            {
                                "jdbcUrl": ["jdbc:mysql://hadoop10:3306/spark-dw"],
                                "querySql": [
                                    "select id,amt,status,user_id,create_time,modify_time from t_order"
                                ]
                            }
                        ],
                        "password": "0000",
                        "username": "root",
                    }
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "column": [
							{"name": "id","type": "int"},
							{"name": "amt","type": "double"},
							{"name": "status","type": "int"},
							{"name": "user_id","type": "int"},
							{"name": "create_time","type": "string"},
							{"name": "modify_time","type": "string"}
                     		],
                        "defaultFS": "hdfs://hadoop10:8020",
                        "fieldDelimiter": "\t",
                        "fileName": "t_order",
                        "fileType": "text",
                        "path": "/user/hive/warehouse/test_hive.db/t_order/dt=$dt",
                        "writeMode": "append"
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": "1"
            }
        }
    }
}
  • 执行导入操作

在mysql中添加测试数据 导入mysql中7-11的数据到hive下7-11分区

代码语言:javascript
复制
insert into t_order(amt,user_id) values(100,1001)
insert into t_order values(null,100,0,1001,'2023-07-11 10:18:39','2023-07-11 10:18:39')
insert into t_order values(null,120,0,1001,'2023-07-11 10:18:39','2023-07-11 10:18:39')

在hive下创建分区

代码语言:javascript
复制
alter table t_order add partition(dt='2023-07-11')

运行dataX脚本

代码语言:javascript
复制
python /opt/installs/datax/bin/datax.py -p "-Ddt=2023-07-11" /opt/installs/datax/job/mysql2hive.json

此部分的操作是将先插入mysql的三条数据导入到hive。


在mysql中添加测试数据 导入mysql中7-12的数据到hive下7-12分区

代码语言:javascript
复制
insert into t_order values(null,200,0,1001,'2023-07-12 10:18:39','2023-07-12 10:18:39');
insert into t_order values(null,220,0,1001,'2023-07-12 10:18:39','2023-07-12 10:18:39');

在hive下创建分区

代码语言:javascript
复制
alter table t_order add partition(dt='2023-07-12')

运行datax脚本

代码语言:javascript
复制
python /opt/installs/datax/bin/datax.py -p "-Ddt=2023-07-12" /opt/installs/datax/job/mysql2hive.json

此部分的操作是将先插入mysql的三条数据和本次插入mysql的数据都导入到hive。 根据查询结果可以看到,此时我们重复导入了第一部分的数据,这就是全量导入。

二、mysql增量导入hive

大方向:事实表用增量[订单表] 维度表用全量[商品表]

绝大部分公司采用的方案:全量为主、增量为辅

要想采用增量导入还有一个问题是你的业务库表能够支持增量导入

1. 增量导入的第一种实现方法

根据 id主键,查询hive表中最大的id值,然后去mysql中查询大于上述id值的数据。 如果有些使用uuid的,则不能用id,这种方案不适用于对修改的数据进行同步。

2. 另一种方法是 时间字段

在表中增加一个modify_time字段,如果数据新增或者修改,可以根据这个字段查询数据抽取到hive

3. dataX脚本

代码语言:javascript
复制
{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "connection": [
                            {
                                "jdbcUrl": ["jdbc:mysql://hadoop10:3306/spark-dw"],
                                "querySql": [
                                    "select id,amt,status,user_id,create_time,modify_time from t_order where date_format(modify_time,'%Y-%m-%d') = '$dt'"
                                ]
                            }
                        ],
                        "password": "0000",
                        "username": "root",
                    }
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "column": [
							{"name": "id","type": "int"},
							{"name": "amt","type": "double"},
							{"name": "status","type": "int"},
							{"name": "user_id","type": "int"},
							{"name": "create_time","type": "string"},
							{"name": "modify_time","type": "string"}
                     		],
                        "defaultFS": "hdfs://hadoop10:8020",
                        "fieldDelimiter": "\t",
                        "fileName": "t_order",
                        "fileType": "text",
                        "path": "/user/hive/warehouse/test_hive.db/t_order/dt=$dt",
                        "writeMode": "append"
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": "1"
            }
        }
    }
}

运行该增量脚本,即可按照分区的日期,每次导入需要的mysql数据到hive。

三、利用Python自动生成Datax的json脚本

1. 创建mysql和hive数据库

代码语言:javascript
复制
create table t_student(
	id   	 int PRIMARY key,
	name  	 	varchar(50),
	`age` 		int
);

create table t_person(
	id   	 		int PRIMARY key,
	name  	 	varchar(50),
	parentid 		int
);

INSERT into t_student values
(1,'zhanmusi',15),
(2,'lisi',55),
(3,'lisi',66);

INSERT into t_person values
(1,'miky',06),
(2,'tom',16),
(3,'jakcon',26);
代码语言:javascript
复制
create table ods_t_student(
	id   	 	int,
	name  	 	string,
	`age` 		int
)partitioned by (dt string)
row format delimited 
fields terminated by '\t'

create table ods_t_person(
	id   	 		int,
	name  	 		string,
	parentid 		int
)partitioned by (dt string)
row format delimited 
fields terminated by '\t'

2. 修改python脚本里面的密码(2处)和hdfs端口

代码语言:javascript
复制
import json
import sys
import pymysql


def gen_json(dbname, tablename):
 s1 = {
     "job": {
         "content": [
             {
                 "reader": {
                     "name": "mysqlreader",
                     "parameter": {
                         "connection": [
                             {
                                 "jdbcUrl": ["jdbc:mysql://hadoop10:3306/" + dbname + "?useSSL=false"],
                                 "table": [tablename]
                             }
                         ],
                         "password": "0000",  # 密码
                         "username": "root",
                         "column": getColumn(dbname, tablename)
                     }
                 },
                 "writer": {
                     "name": "hdfswriter",
                     "parameter": {
                         "column": getColumnAndType(dbname, tablename),
                         "defaultFS": "hdfs://hadoop10:8020",  # hdfs端口
                         "fileType": "text",
                         "path": "/user/hive/warehouse/ods_" + tablename + "/dt=$dt",
                         "fieldDelimiter": "\t",
                         "fileName": tablename,
                         "writeMode": "append"
                     }
                 }
             }
         ],
         "setting": {
             "speed": {
                 "channel": "1"
             }
         }
     }
 }

 with open('d:/test/' + tablename + '.json', 'w') as f:
     json.dump(s1, f)


def queryDataBase(dbname, tablename):
 conn = pymysql.connect(user='root', password='0000', host='hadoop10')  # 密码
 cursor = conn.cursor()
 cursor.execute(
     "select column_name ,data_type from information_schema.`COLUMNS` where TABLE_SCHEMA = %s and table_name = %s order by ordinal_position",
     [dbname, tablename])
 fetchall = cursor.fetchall()
 cursor.close()
 conn.close()
 return fetchall


def getColumn(dbname, tablename):
 k1 = queryDataBase(dbname, tablename)
 k2 = list(map(lambda x: x[0], k1))
 return k2


def getColumnAndType(dbname, tablename):
 k1 = queryDataBase(dbname, tablename)
 mappings = {
     'bigint': 'bigint',
     'varchar': 'string',
     'int': 'int',
     'datetime': 'string',
     'text': 'string'
 }
 k2 = list(map(lambda x: {"name": x[0], "type": mappings[x[1].lower()]}, k1))
 return k2


if __name__ == '__main__':
 l = sys.argv[1:]
 dbname = l[0]  # mysql数据库名
 tablename = l[1]  # 表名
 gen_json(dbname, tablename)

3. 运行python脚本

代码语言:javascript
复制
(untitled0606) C:\Users\Lenovo\PycharmProjects\untitled0606>python .\test0606\test_gen.py spark-dw t_student

(untitled0606) C:\Users\Lenovo\PycharmProjects\untitled0606>python .\test0606\test_gen.py spark-dw t_person

4. 将生成的json文件上传到linux

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-oB30wKR6-1689299346463)(上课笔记-day13.assets\1689068747698.png)]

5. 编写shell脚本 b.sh

代码语言:javascript
复制
#! /bin/bash

dt=$1

if [ ''$1 == '' ]
then
  dt=$(date -d yesterday +%Y-%m-%d)
fi

echo $dt

s=$(hive -e "show partitions ods_t_student partition(dt='$dt')")

echo === $s ====

if [ "$s" == "partition" ]
then
 hive -e "alter table ods_t_student add partition(dt='$dt')"
else
 echo "$dt分区已经存在"
fi

python /opt/installs/datax/bin/datax.py -p "-Ddt=$dt" /opt/installs/datax/job/t_student.json



s=$(hive -e "show partitions ods_t_person partition(dt='$dt')")

echo === $s ====

if [ "$s" == "partition" ]
then
 hive -e "alter table ods_t_person add partition(dt='$dt')"
else
 echo "$dt分区已经存在"
fi

python /opt/installs/datax/bin/datax.py -p "-Ddt=$dt" /opt/installs/datax/job/t_person.json

6. 运行shell

root@hadoop10 app]# sh b.sh 2023-07-13

代码语言:javascript
复制
任务启动时刻                    : 2023-07-13 02:31:38
任务结束时刻                    : 2023-07-13 02:31:50
任务总计耗时                    :                 12s
任务平均流量                    :                2B/s
记录写入速度                    :              0rec/s
读出记录总数                    :                   3
读写失败总数                    :                   0
  • hive
代码语言:javascript
复制
id|name    |age|dt        |
--|--------|---|----------|
 1|zhanmusi| 15|2023-07-13|
 2|lisi    | 55|2023-07-13|
 3|lisi    | 66|2023-07-13|
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2024-07-25,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、mysql全量导入hive[分区表]
    • 需求介绍:
    • 二、mysql增量导入hive
      • 1. 增量导入的第一种实现方法
        • 2. 另一种方法是 时间字段
          • 3. dataX脚本
          • 三、利用Python自动生成Datax的json脚本
            • 1. 创建mysql和hive数据库
              • 2. 修改python脚本里面的密码(2处)和hdfs端口
                • 3. 运行python脚本
                  • 4. 将生成的json文件上传到linux
                    • 5. 编写shell脚本 b.sh
                      • 6. 运行shell
                      相关产品与服务
                      云数据库 MySQL
                      腾讯云数据库 MySQL(TencentDB for MySQL)为用户提供安全可靠,性能卓越、易于维护的企业级云数据库服务。其具备6大企业级特性,包括企业级定制内核、企业级高可用、企业级高可靠、企业级安全、企业级扩展以及企业级智能运维。通过使用腾讯云数据库 MySQL,可实现分钟级别的数据库部署、弹性扩展以及全自动化的运维管理,不仅经济实惠,而且稳定可靠,易于运维。
                      领券
                      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档