迁移工具特性
基本特性
多线程导出:以 HBase 分区表的 Region 为单位进行数据分片,也可以自行切分数据。
全局限速:支持按字节数或行数进行限速控制。
KV 结构导出:简化配置,无需为每个表自定义 Schema 结构。
导出工具默认行为说明
缺省值处理
date 和 varchar 类型:直接使用空字符。
其他数据类型:使用 "NULL" 填充。
范围导出:start_key 和 end_key 采用左闭右开区间。
环境准备
1. 联系 腾讯云技术支持,获取迁移工具。
2. 将工具上传至自定义目录(本文以
/data/tdsql-project/为例说明),并解压。工具目录结构:
/data/tdsql-project/apache-seatunnel-2.3.8/├── bin/ # 存放运行脚本的目录├── config/ # 配置文件目录,包括日志组件,jvm 配置等├── connectors/ # 存放依赖的jar包├── lib/ # 存放依赖的jar包├── licenses/ # 存放依赖的jar包├── plugins/ # 存放依赖的jar包└── starter/ # 存放依赖的jar包├── logging/
迁移步骤
通过
seatunnel.sh脚本运行导出工具。可以通过
./bin/seatunnel.sh -h查看运行脚本的帮助信息。步骤一:创建配置文件
创建配置文件(如
file_config)并保存到 config/ 目录下,配置文件包含三个主要部分:env:环境配置项。
source:HBase 数据源配置。
sink:目标端配置。
env {# 环境配置项 - 控制任务执行环境和资源限制,详见境配置 (env)parallelism = 1job.mode = "BATCH"}source {Hbase {# 定义 HBase 数据源的连接信息、认证方式和数据导出规则,详见 HBase 数据源配置 (source)}}sink {# 配置数据写入目标,支持两种模式 JDBC(推荐)和 localfile,详见目标端配置 (sink)}
1. 环境配置 (env)
env 为导出导入任务的整体配置,主要配置并发度和限速情况(默认不限速)。
env {parallelism = 1 # 指定 Source Sink 时的并发线程数,实际的并发度可能由 Source Sink 配置决定job.mode = "BATCH"read_limit.bytes_per_second = 7000000 # 按照读取 bytes 限速,不填则不限速read_limit.rows_per_second = 200000 # 按照读取行数限速,不填则不限速}
2. HBase 数据源配置 (source)
HBase 中的一行数据最终会被转化为多个 KV 对,取决于该行有多少列。转换后的每个 KV,最终都会作为 TDSQL Boundless 中的一行数据。
TDSQL Boundless KV 状态下的默认表结构:
CREATE TABLE IF NOT EXISTS `{TableName}`(`K` varbinary(1024) not null,`Q` varbinary(256) not null,`T` bigint not null,`V` mediumblob,PRIMARY KEY (`K`, `Q`, `T`))partition by key(`k`) partitions 6;
KV 格式配置模板:
source {Hbase {# HBase连接选项:# 必填zookeeper_quorum = "127.0.0.1:2181,127.0.0.2:2181,127.0.0.3:2181" // HBase 集群的 ZooKeeper 地址及端口,用以连接 HBasehbase_extra_config={ # 额外适配的 HBase 连接设置,如无特殊需求可以不设置zookeeper.znode.parent=/bdphbs07/hbaseidhbase.client.ipc.pool.size=***hbase.client.scanner.timeout.period=6000000hbase.rpc.timeout=6000000hbase.cells.scanned.per.heartbeat.check=10000}SourceModeKV = {specified_namespace = "***" # 指定需要导出的 namespacetable_match_rules = ["***"] # 指定需要导出的表的匹配规则# 建立默认表结构的模板语句,以(K,Q,T,V)为格式,可自行调整每一行的大小和分区规则分区数# 注意配置文件中该项必须写成一行,因为 json 不支持多行的字符串# template的最后要加一个\\ncreate_table_template = "CREATE TABLE IF NOT EXISTS `%s`\\n(\\n`K` varbinary(1024) not null,\\n`Q` varbinary(256) not null,\\n`T` bigint not null,\\n`V` mediumblob,\\nPRIMARY KEY (`K`, `Q`, `T`))\\npartition by key(`k`) partitions 6;\\n"}}}
说明:
多线程说明:
多线程需要先在 env 配置下指定需要的线程数量。
导出任务按照默认规则进行切分:对于每张表,按照表在 regionServer 上的分布来进行分片,每个分片作为一个导出任务。
由于同时存在多张表,这里导出任务分片在多线程之间的调度策略优先按照表进行,会按照扫描表时的顺序(字典序),优先调度同一张表的导出任务
3. 目标端配置 (sink)
配置数据写入目标,支持两种模式 JDBC(直接写入 TDSQL Boundless 数据库)和 LocalFile(导出到本地文件系统)。
方式一(推荐):通过 JDBC 导入。
sink {jdbc {# url 里可以开启 bulkload,必须打开 rewriteBatchedStatements=true,否则都是单条 insert,导入效率极差url = "jdbc:mysql://{ip}:{port}/?rewriteBatchedStatements=true&sessionVariables=tdsql_bulk_load='ON'"driver = "com.mysql.cj.jdbc.Driver"user = "test"password = "test123"schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"enable_upsert = false # 不开启 upsertmax_retries = 3 # 重试次数generate_sink_sql = "true" # 根据 DB 生成导入SQLdatabase = "hbase" # 待导入的 DBbatch_size = 1000 # 可以按需调整batch_interval_ms = 500transaction_timeout = 60000 # 事务超时设为60秒(避免大批次超时)}}
方式二:LocalFile
sink {LocalFile {# 不可修改:file_format_type = "text" # 导出文件格式,目前仅支持 textfield_delimiter = "," # 数据分隔符,仅支持逗号row_delimiter = "\\n" # 每行分隔符,仅支持换行符is_enable_transaction = false # 不支持事务# 必填:path = "/root/dumper_test/apache-seatunnel-2.3.4" # 导出路径tmp_path = "/root/dumper_test/apache-seatunnel-2.3.4" # 导出时会先导出到 tmp_path,再 mv 到 path,因此必须设置 tmp_path,否则会占用根目录空间result_table_name = "test_mytable" # 导出表名result_database_name = "test_mydatabase" # 导出数据库名,用于生成文件名result_column_names = ["id", "name"] # 导出列的列名,必须和 source 中s chema 里的 columns 数量相同# 选填:parent_directory = "test_directory" # 实际导出文件的父目录,即如果填写,则文件真实目录为:path/parent_directory/file,如果不填,则真实目录为:path/filefile_start_index = 1000 # 文件名的起始序列,文件名中的序列会从此数字开始递增,默认为0}}
4. 参数调整
修改 SeaTunnel 工具分配给 JVM 的内存,编辑
apache-seatunnel-2.3.8/config/jvm_client_options。# 配置文件里的初始值给的很小,在高并发下可能会因为资源调用不足导致无法进行工作,可根据机器资源大小适当给就行-Xms16g #指定 JVM 初始堆内存大小为 16GB
在执行数据迁移前,请先登录 TDSQL Boundless 实例,执行以下命令,调整参数配置。
----走 bulkload 需要调整的参数----set persist tdsql_bulk_load_allow_auto_organize_txn = on; // 允许 bulkload自组织事务导入set persist tdsql_bulk_load_allow_unsorted = on; // 允许 bulkload 事务乱序set persist tdsql_bulk_load_commit_threshold = 4294967296; // commit 阈值调整set persist tdsql_bulk_load_rpc_timeout = 7200000; // rpc 超时时间set persist tdstore_data_db_max_compaction_threads_in_no_speedup=8; // 非缓写状态下,允许的 compaction job并发数set persist tdstore_data_db_max_subcompactions= 12;set persist tdstore_user_cf_level0_file_num_compaction_trigger = 128; // 调整level 0 compaction的阈值set persist tdstore_user_cf_disable_intra_l0_compaction = true;set persist tdstore_user_cf_level0_stop_writes_trigger=2048;set persist tdstore_user_cf_level0_slowdown_writes_trigger=1024;set persist tdstore_data_db_max_background_jobs = 16;set persist tdstore_db_compaction_job_threads = 12;
步骤二:执行数据迁移
在填写好配置文件后,可以准备开始导出,其中每一个配置文件都会作为一个 SeaTunnel 任务被执行。本文介绍如何以本地集群的方式启动 SeaTunnel 导出任务。
1. 在项目根目录
apache-seatunnel-2.3.8/下,把填好的配置文件存入 ./config 目录下。2. 在根目录下执行以下命令,即可根据该配置文件启动一个 SeaTunnel 本地集群,使用该配置文件进行导出。
nohup ./bin/seatunnel.sh --config ./config/hbase_config -m local > seatunnel_hbase_local.log 2>&1 &
参数说明:
--config:指定配置文件路径。-m local:本地模式运行。如需以集群方式使用 SeaTunnel 导出 HBase,请参照 SeaTunnel 官方文档,部署 SeaTunnel 集群,并提交任务。步骤三:数据校验
当前数据校验功能只支持校验以 KV 格式导出导入的结果,校验规则如下:分别从 HBase 和 TDSQL Boundless 按照数据分片读取数据,并且计算每行数据的CRC32 校验值并累加,比较最终的累加结果。
1. 创建校验配置文件
config/validation_config。HBaseDataValidation {parallelism = 10specified_namespace = "default"table_match_rules = []zookeeper_quorum = "127.0.0.1:2181,127.0.0.2:2181,127.0.0.3:2181"mysql_jdbc_url = "jdbc..." // 在 JDBC 中指定 TDSQL 的库名,表名默认与 HBase 表名相同(暂不支持指定)}
其中校验的并发控制和导出时保持一致。
2. 执行数据校验。
校验程序被打包在
hbase-connector.jar中,可以在包部署目录下运行脚本:./bin/validateKV.sh ./config/validation_config
工具目录结构:
## dir tree./apache-seatunnel-2.3.8/bin/├── install-plugin.cmd├── install-plugin.sh├── seatunnel-cluster.cmd├── seatunnel-cluster.sh├── seatunnel.cmd├── seatunnel-connector.cmd├── seatunnel-connector.sh├── seatunnel.sh└── validateKV.sh