前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >技术干货|如何利用 ChunJun 实现数据离线同步?

技术干货|如何利用 ChunJun 实现数据离线同步?

原创
作者头像
袋鼠云数栈
发布2023-05-19 14:36:29
5590
发布2023-05-19 14:36:29
举报
文章被收录于专栏:数栈技术分享数栈技术分享

ChunJun 是⼀款稳定、易⽤、⾼效、批流⼀体的数据集成框架,基于计算引擎 Flink 实现多种异构数据源之间的数据同步与计算。ChunJun 可以把不同来源、格式、特点性质的数据在逻辑上或物理上有机地集中,从⽽为企业提供全⾯的数据共享,目前已在上千家公司部署且稳定运⾏。

在之前,我们曾经为大家介绍过如何利用 ChunJun 实现数据实时同步(点击看正文),本篇将为大家介绍姊妹篇,如何利⽤ ChunJun 实现数据的离线同步。

ChunJun 离线同步案例

离线同步是 ChunJun 的⼀个重要特性,下⾯以最通⽤的 mysql -> hive 的同步任务来介绍离线同步。

配置环境

找⼀个空⽬录,接下来要配置 Flink 和 ChunJun 的环境,下⾯以 /root/chunjun_demo/ 为例⼦。

● 配置 Flink

#下载 Flink wget "http://archive.apache.org/dist/flink/flink-1.12.7/flink-1.12.7-bin-scala_2.12.tgz" tar -zxvf chunjun-dist.tar.gz

● 配置 ChunJun

#下载 chunjun, 内部依赖 flink 1.12.7 wget https://github.com/DTStack/chunjun/releases/download/v1.12.8/chunjun-dist-1.12-SNAPSHOT.tar.gz #新创建⼀个⽬录 mkdir chunjun && cd chunjun #解压到指定⽬录 tar -zxvf chunjun-dist-1.12-SNAPSHOT.tar.gz

解压好的 ChunJun 有如下⽬录: bin chunjun-dist chunjun-examples lib

● 配置环境变量

#配置 Flink 环境变量 echo "FLINK_HOME=/root/chunjun_demo/flink-1.12.7" >> /etc/profile.d/sh.local #配置 Chunjun 的环境变量 echo "CHUNJUN_DIST=/root/chunjun_demo/chunjun/chunjun-dist" >> /etc/profile.d/sh.local #刷新换新变量 . /etc/profile.d/sh.local

● 在 Yarn 上⾯启动 Flink Session

#启动 Flink Session bash $FLINK_HOME/bin/yarn-session.sh -t $CHUNJUN_DIST -d

输出如下:

echo "stop" | $FLINK_HOME/bin/yarn-session.sh -id application_1683599622970_0270 If this should not be possible, then you can also kill Flink via YARN's web interface or via: yarn application -kill application_1683599622970_0270

下⾯提交任务会⽤到 Flink Session 这个 Yarn Application Id (application_1683599622970_0270)。

● 其他配置

如果⽤ parquet 格式,需要把 flink-parquet_2.12-1.12.7.jar 放⼊到 flink/lib 下⾯, 在上⾯的例⼦中,需要放到 $FLINK_HOME/lib ⾥⾯。

提交任务

● 在 MySQL 准备数据

-- 创建⼀个名为ecommerce_db的数据库,⽤于存储电商⽹站的数据 CREATE DATABASE IF NOT EXISTS chunjun; USE chunjun; -- 创建⼀个名为orders的表,⽤于存储订单信息 CREATE TABLE IF NOT EXISTS orders ( id INT AUTO_INCREMENT PRIMARY KEY, -- ⾃增主键 order_id VARCHAR(50) NOT NULL, -- 订单编号,不能为空 user_id INT NOT NULL, -- ⽤户ID,不能为空 product_id INT NOT NULL, -- 产品ID,不能为空 quantity INT NOT NULL, -- 订购数量,不能为空 order_date TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP -- 订单⽇期,默认值为当前时间戳,不能为空 ); -- 插⼊⼀些测试数据到orders表 INSERT INTO orders (order_id, user_id, product_id, quantity) VALUES ('ORD123', 1, 101, 2), ('ORD124', 2, 102, 1), ('ORD125', 3, 103, 3), ('ORD126', 1, 104, 1), ('ORD127', 2, 105, 5); select * from chunjun.orders;

如果没有 MySQL 的话,可以⽤ docker 快速创建⼀个。

docker pull mysql:8.0.12 docker run --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=123456 -d mysql:8.0.12

● 创建 Hive 表

CREATE DATABASE IF NOT EXISTS chunjun; USE chunjun; -- 创建⼀个名为orders的表,⽤于存储订单信息 CREATE TABLE IF NOT EXISTS chunjun.orders ( id INT, order_id VARCHAR(50), user_id INT, product_id INT, quantity INT, order_date TIMESTAMP ) STORED AS PARQUET;-- 查看 hive 表,底层的 HDFS ⽂件位置,下⾯的 SQL 结果⾥⾯ Location 字段,就是 HDFS ⽂件的位置。 desc formatted chunjun.orders; -- Location: hdfs://ns1/dtInsight/hive/warehouse/chunjun.db/orders -- ⼀会配置同步任务的时候会⽤到 hdfs://ns1/dtInsight/hive/warehouse/chunjun.db/orders

● 在当前⽬录( /root/chunjun_demo/ ) 配置⼀个任务 mysql_hdfs.json

vim mysql_hdfs.json 输⼊如下内容:

{ "job": { "content": [ { "reader": { "parameter": { "connection": [ { "schema": "chunjun", "jdbcUrl": [ "jdbc:mysql://172.16.85.200:3306/chunjun" ], "table": [ "orders" ] } ], "username": "root", "password": "123456", "column": [ { "name": "id", "type": "INT" }, { "name": "order_id", "type": "VARCHAR" }, { "name": "user_id", "type": "INT" }, { "name": "product_id", "type": "INT" }, { "name": "quantity", "type": "INT" }, { "name": "order_date", "type": "TIMESTAMP" } ] }, "name": "mysqlreader" }, "writer": { "parameter": { "path": "hdfs://ns1/dtInsight/hive/warehouse/chunjun.db/orders", "defaultFS": "hdfs://ns1", "hadoopConfig": { "dfs.nameservices": "ns1", "dfs.ha.namenodes.ns1": "nn1,nn2", "dfs.namenode.rpc-address.ns1.nn1": "172.16.85.194:9000", "dfs.namenode.rpc-address.ns1.nn2": "172.16.85.200:9000", "dfs.client.failover.proxy.provider.ns1": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider" }, "column": [ { "name": "id", "type": "INT" }, { "name": "order_id", "type": "VARCHAR" }, { "name": "user_id", "type": "INT" }, { "name": "product_id", "type": "INT" }, { "name": "quantity", "type": "INT" }, { "name": "order_date", "type": "TIMESTAMP" } ], "writeMode": "overwrite", "encoding": "utf-8", "fileType": "parquet", "fullColumnName": [ "id", "order_id", "user_id", "product_id", "quantity", "order_date"], "fullColumnType": [ "INT", "VARCHAR", "INT", "INT", "INT", "TIMESTAMP" ] }, "name": "hdfswriter" } } ], "setting": { "errorLimit": { "record": 0 }, "speed": { "bytes": 0, "channel": 1 } } } }

因为我们要将 MySQL 同步到 Hive ⾥⾯,但是如果直接同步 Hive 的话,内部会⽤ jdbc,⽽ jdbc 的效率不⾼,因此我们可以直接把数据同步到 Hive 底层的 HDFS 上⾯,所以 writer ⽤到了 hdfswriter。脚本解析如下:

{ "job": { "content": [ { "reader": { "parameter": { "connectionComment": "数据库链接, 数据库, 表, 账号, 密码", "connection": [ { "schema": "chunjun", "jdbcUrl": [ "jdbc:mysql://172.16.85.200:3306/chunjun" ], "table": [ "orders" ] } ], "username": "root", "password": "123456", "columnComment": "要同步的列选择, 可以选择部分列", "column": [ { "name": "id", "type": "INT" }, { "name": "order_id", "type": "VARCHAR" }, { "name": "user_id", "type": "INT" }, { "name": "product_id", "type": "INT" }, { "name": "quantity", "type": "INT" }, { "name": "order_date", "type": "TIMESTAMP" } ] }, "nameComment" : "source 是 mysql", "name": "mysqlreader" }, "writer": { "parameter": { "pathComment": "HDFS 上⾯的路径, 通过 hive 语句的 desc formatted 查看", "path": "hdfs://ns1/dtInsight/hive/warehouse/chunjun.db/orders", "defaultFS": "hdfs://ns1", "hadoopConfigComment": "是 hdfs ⾼可⽤最基本的配置, 在 Hadoop 配置⽂件 hdfs-site.xml 可以找到", "hadoopConfig": { "dfs.nameservices": "ns1", "dfs.ha.namenodes.ns1": "nn1,nn2", "dfs.namenode.rpc-address.ns1.nn1": "172.16.85.194:9000", "dfs.namenode.rpc-address.ns1.nn2": "172.16.85.200:9000", "dfs.client.failover.proxy.provider.ns1": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider" }, "columnComment": "要同步的列选择, 可以选择部分列", "column": [ { "name": "id", "type": "INT" }, { "name": "order_id", "type": "VARCHAR" }, { "name": "user_id", "type": "INT" }, { "name": "product_id", "type": "INT" }, { "name": "quantity", "type": "INT" }, { "name": "order_date", "type": "TIMESTAMP" } ], "writeModeComment": "覆盖写⼊到 hdfs 上⾯的⽂件, 可选 overwrite, append(默认模式)", "writeMode": "overwrite", "encoding": "utf-8", "fileTypeComment": "可选 orc, parquet, text", "fileType": "parquet", "fullColumnNameComment": "全部字段,有时候 column ⾥⾯同步部分字段,但是⼜需要有全部字段的格式,例如 fileType : text ", "fullColumnName": [ "id", "order_id", "user_id", "product_id", "quantity", "order_date"], "fullColumnTypeComment": "全部字段的类型", "fullColumnType": [ "INT", "VARCHAR", "INT", "INT", "INT", "TIMESTAMP" ] }, "nameComment" : "sink 是 hdfs", "name": "hdfswriter" } } ], "setting": { "errorLimit": { "record": 0 }, "speed": { "bytes": 0, "channel": 1 } } } }

● 提交任务

bash chunjun/bin/chunjun-yarn-session.sh -job mysql_hdfs.json -confProp {\"yarn.application.id\":\"application_1683599622970_0270\"}

● 查看任务

任务同步完成, 可以看⼀下 HDFS 上⾯的数据。

查看⼀下 Hive 表的数据。

注意, 如果是分区的 Hive 表,需要⼿动刷新⼀下 Hive 的元数据, 使⽤ MSCK 命令。(MSCK 是 Hive 中的⼀个命令,⽤于检查表中的分区,并将其添加到 Hive 元数据中)

MSCK REPAIR TABLE my_table;

ChunJun 离线同步原理解析

HDFS 文件同步原理

· 对于⽂件系统,同步的时候会先把⽂件写⼊到 path + [filename] ⽬录⾥⾯的 .data 的⽂件⾥⾯,如果任务失败,那么 .data ⾥⾯的⽂件不会⽣效。

· 在 TaskManager 上⾯所有 task 任务结束的时候,会在 JobManager 执⾏ FinalizeOnMaster 的 finalizeGlobal ⽅法, 最终会调⽤到 moveAllTmpDataFileToDir , 把 .data ⾥⾯的⽂件移除到 .data 的上⼀层。

public interface FinalizeOnMaster {

/** The method is invoked on the master (JobManager) after all (parallel) instances of an OutputFormat finished. Params:parallelism – The parallelism with which the format or functions was run. Throws:IOException – The finalization may throw exceptions, which may cause the job to abort. */ void finalizeGlobal(int parallelism) throws IOException; }// 在 JobManager 执⾏ @Override protected void moveAllTmpDataFileToDir() { if (fs == null) { openSource(); } String currentFilePath = ""; try { Path dir = new Path(outputFilePath); Path tmpDir = new Path(tmpPath); FileStatus[] dataFiles = fs.listStatus(tmpDir); for (FileStatus dataFile : dataFiles) { currentFilePath = dataFile.getPath().getName(); fs.rename(dataFile.getPath(), dir); LOG.info("move temp file:{} to dir:{}", dataFile.getPath(), dir); } fs.delete(tmpDir, true); } catch (IOException e) { throw new ChunJunRuntimeException( String.format( "can't move file:[%s] to dir:[%s]", currentFilePath, outputFilePath), e); } }

增量同步

增量同步主要针对某些只有 Insert 操作的表,随着业务增⻓,表内数据越来越多。如果每次都同步整表的话,消耗的时间和资源会⽐较多。因此需要⼀个增量同步的功能,每次只读取增加部分的数据。

● 实现原理

其实现原理实际上就是配合增量键在查询的 sql 语句中拼接过滤条件,⽐如 where id > ? ,将之前已经读取过的数据过滤出去。

增量同步是针对于两个及以上的同步作业来说的。对于初次执⾏增量同步的作业⽽⾔,实际上是整表同步,不同于其他作业的在于增量同步作业会在作业执⾏完成后记录⼀个 endLocation 指标,并将这个指标上传到 prometheus 以供后续使⽤。

除第⼀次作业外,后续的所有增量同步作业都会取上⼀次作业的 endLocation 做为本次作业的过滤依据(startLocation)。⽐如第⼀次作业执⾏完后,endLocation 为10,那么下⼀个作业就会构建出例如 SELECT id,name,age from table where id > 10 的 SQL 语句,达到增量读取的⽬的。

● 使用限制

· 只有 RDB 的 Reader 插件可以使⽤

· 通过构建SQL过滤语句实现,因此只能⽤于RDB插件

· 增量同步只关⼼读,不关⼼写,因此只与Reader插件有关

· 增量字段只能为数值类型和时间类型

· 指标需要上传到 prometheus,⽽ prometheus 不⽀持字符串类型,因此只⽀持数据类型和时间类型,时间类型会转换成时间戳后上传

· 增量键的值可以重复,但必须递增

· 由于使⽤ '>' 的缘故,要求字段必须递增

断点续传

断点续传是为了在离线同步的时候,针对⻓时间同步任务如超过1天,如果在同步过程中由于某些原因导致任务失败,从头再来的话成本⾮常⼤,因此需要⼀个断点续传的功能从任务失败的地⽅继续。

● 实现原理

· 基于 Flink 的 checkpoint,在 checkpoint 的时候 会存储 source 端最后⼀条数据的某个字段值,sink 端插件执⾏事务提交。

· 在任务失败,后续通过 checkpoint 重新运⾏时,source 端在⽣成 select 语句的时候将 state ⾥的值作为条件拼接进⾏数据的过滤,达到从上次失败位点进⾏恢复。

● 适用场景

通过上述原理我们可以知道 source 端必须是 RDB 类型插件,因为是通过 select 语句拼接 where 条件进⾏数据过滤达到断点续传的,同时断点续传需要指定⼀个字段作为过滤条件,且此字段要求是递增的。

· 任务需要开启 checkpoint

· reader 为 RDB 的插件均⽀持且 writer ⽀持事务的插件(如 rdb filesystem 等),如果下游是幂等性则 writer 插件也不需要⽀持事务

· 作为断点续传的字段在源表⾥的数据是递增的,因为过滤条件是 >

项目地址:https://github.com/DTStack

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • ChunJun 离线同步案例
    • 配置环境
      • 提交任务
      • ChunJun 离线同步原理解析
        • HDFS 文件同步原理
          • 增量同步
            • 断点续传
            相关产品与服务
            云数据库 MySQL
            腾讯云数据库 MySQL(TencentDB for MySQL)为用户提供安全可靠,性能卓越、易于维护的企业级云数据库服务。其具备6大企业级特性,包括企业级定制内核、企业级高可用、企业级高可靠、企业级安全、企业级扩展以及企业级智能运维。通过使用腾讯云数据库 MySQL,可实现分钟级别的数据库部署、弹性扩展以及全自动化的运维管理,不仅经济实惠,而且稳定可靠,易于运维。
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档