前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Dinky从checkpoint与savepoint自动恢复整库同步作业

Dinky从checkpoint与savepoint自动恢复整库同步作业

作者头像
文末丶
发布2023-02-26 14:26:35
8100
发布2023-02-26 14:26:35
举报
文章被收录于专栏:DataLink数据中台DataLink数据中台

摘要:本文由韩公子老师带了 Dinky 实时计算平台从 checkpoint 与 savepoint 自动恢复整库同步作业的实操过程分享。内容包括:

  1. 场景
  2. Dinky 提交作业
  3. 自动 savepoint 恢复
  4. 自动 checkpoint 恢复
  5. 手动指定 checkpoint 恢复
  6. 总结

Tips:历史传送门~

Dinky on k8s 整库同步实践

Dinky 实践系列之 Flink Catalog 元数据管理

Dinky实践系列之FlinkCDC整库实时入仓入湖

Dinky FlinkCDC 整库入仓 StarRocks

GitHub 地址

https://github.com/DataLinkDC/dlink

https://gitee.com/DataLinkDC/Dinky

欢迎大家关注 Dinky 的发展~

一、场景

使用 Dinky 自动 savepoint、checkpoint 恢复整库同步作业。

组件

版本

Flink

1.14.4

Flink-mysql-cdc

2.2.1

Mysql

5.7+

Dinky

0.6.6

温馨提示: 由于 Fink 自身 bug,Dinky 暂时不支持 Flink1.15.x 版本做 savepoint 处理, 请等待后续更新支持,或者使用小于 Flink1.15 的版本。

二、Dinky 提交作业

依赖准备

将 flink-sql-connector-mysql-cdc-2.2.1.jar 添加到 dinky 根目录 plugins 和 hdfs 集群配置路径上。

依赖图:

Mysql 数据源准备

代码语言:javascript
复制
create database emp_1;

use emp_1;

CREATE TABLE IF NOT EXISTS `employees_1` (
  `emp_no` int(11) NOT NULL,
  `birth_date` date NOT NULL,
  `first_name` varchar(50) NOT NULL,
  `last_name` varchar(50) NOT NULL,
  `gender` enum('M','F') NOT NULL,
  `hire_date` date NOT NULL,
  PRIMARY KEY (`emp_no`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

insert into employees_1 VALUES ("10", "1992-09-12", "cai", "kunkun", "M", "2022-09-22");
insert into employees_1 VALUES ("11", "1990-09-15", "wang", "meimei", "F", "2021-04-12");

FlinkSQL 准备

代码语言:javascript
复制
-- 测试参数, 生成环境不需设置
SET pipeline.operator-chaining = false;

SET table.local-time-zone = Asia/Shanghai;
SET execution.runtime-mode = streaming;
SET execution.checkpointing.interval = 6000;
SET execution.checkpointing.tolerable-failed-checkpoints = 10;
SET execution.checkpointing.timeout =600000;
SET execution.checkpointing.externalized-checkpoint-retention = RETAIN_ON_CANCELLATION;
SET execution.checkpointing.mode = EXACTLY_ONCE;
SET execution.checkpointing.unaligned = true;
SET execution.checkpointing.max-concurrent-checkpoints = 1;
SET state.checkpoints.num-retained = 3;
SET restart-strategy = fixed-delay;
SET restart-strategy.fixed-delay.attempts = 10 ;
SET restart-strategy.fixed-delay.delay = 20s;
SET table.exec.source.cdc-events-duplicate = true;
SET table.sql-dialect = default;
--SET pipeline.name = hive_catalog_cdc_orders;
SET jobmanager.memory.process.size = 1600m;
SET taskmanager.memory.process.size = 1780m;
SET taskmanager.memory.managed.size = 200m;
SET taskmanager.numberOfTaskSlots=1;
SET yarn.application.queue= default;

EXECUTE CDCSOURCE dinkySource_mysqlCDC_to_Doris WITH (
  'connector' = 'mysql-cdc',
  'hostname' = 'hadoop102',
  'port' = '3306',
  'username' = 'root',
  'password' = '000000',
  'checkpoint' = '3000',
  'scan.startup.mode' = 'initial',
  'parallelism' = '1',
  'table-name' = 'emp_1\.employees_[0-9]+',
  'sink.connector' = 'print',
)

补充说明:

Flink 需要开启 checkpoint,并配置好状态后端参数。

配置 SavePoint 策略

SavePoint 策略选择最近一次。

任务提交

因为作业是第一次运行,之前没有做过savepoint,所以作业是一个新的程序,消费两条数据。

Flink WebUI

TaskManager 输出

三、自动 savepoint 恢复

查看作业详情栏, 如下图右上角所示, 他们的含义分别为:

名称

含义

智能停止

触发一次 SavePoint,并停止作业

SavePoint 触发

只触发一次 SavePoint

SavePoint 暂停

触发一次 SavePoint,并暂停作业

SavePoint 停止

触发一次 SavePoint,并停止作业

SavePoint 停止作业

点击 '智能停止' 或者 'Savepoint停止',触发一次Savepoint,并停止作业。

运维中心查看作业 SavePoint 记录

等作业停止后,在作业快照 Savepoint 栏中,查看到刚刚成功保存的Savepoint 记录。

数据库中查看 SavePoint 信息

在dlink数据库中,也可以查看到保存的Savepoint元数据。

数据开发查看作业 SavePoint 信息

同时,在'数据开发' 面板对应的作业中,右边栏也可以查看到savepoint记录。

插入一条数据

接下来,往表中插入一条新的数据。

代码语言:javascript
复制
insert into employees_1 VALUES ("55", "2020-09-15", "huang", "meiji", "F", "2022-04-12");

重启作业

作业会自动从之前保存的savepoint处启动。

断点续传

观察到作业,成功做到断点续传,只消费到一条记录。

FlinkWeb UI

TaskManager Stdout

Taskmanager 成功输出一条记录。

四、自动 CheckPoint 恢复

Dinky 的 checkpoint 恢复功能使用非常方便,只需要点击一个按钮即可恢复,整体过程如下所示:

准备数据源

代码语言:javascript
复制
create database emp_2;

use emp_2;

CREATE TABLE IF NOT EXISTS `employees_2` (
  `emp_no` int(11) NOT NULL,
  `birth_date` date NOT NULL,
  `first_name` varchar(50) NOT NULL,
  `last_name` varchar(50) NOT NULL,
  `gender` enum('M','F') NOT NULL,
  `hire_date` date NOT NULL,
  PRIMARY KEY (`emp_no`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

-- flink sql
-- 测试参数, 生成环境不需设置
SET pipeline.operator-chaining = false;

SET table.local-time-zone = Asia/Shanghai;
SET execution.runtime-mode = streaming;
SET execution.checkpointing.interval = 6000;
SET execution.checkpointing.tolerable-failed-checkpoints = 10;
SET execution.checkpointing.timeout =600000;
SET execution.checkpointing.externalized-checkpoint-retention = RETAIN_ON_CANCELLATION;
SET execution.checkpointing.mode = EXACTLY_ONCE;
SET execution.checkpointing.unaligned = true;
SET execution.checkpointing.max-concurrent-checkpoints = 1;
SET state.checkpoints.num-retained = 3;
SET restart-strategy = fixed-delay;
SET restart-strategy.fixed-delay.attempts = 10 ;
SET restart-strategy.fixed-delay.delay = 20s;
SET table.exec.source.cdc-events-duplicate = true;
SET table.sql-dialect = default;
--SET pipeline.name = hive_catalog_cdc_orders;
SET jobmanager.memory.process.size = 1600m;
SET taskmanager.memory.process.size = 1780m;
SET taskmanager.memory.managed.size = 200m;
SET taskmanager.numberOfTaskSlots=1;
SET yarn.application.queue= default;

EXECUTE CDCSOURCE dinkySource_mysqlCDC_to_Doris WITH (
  'connector' = 'mysql-cdc',
  'hostname' = 'hadoop102',
  'port' = '3306',
  'username' = 'root',
  'password' = '000000',
  'checkpoint' = '3000',
  'scan.startup.mode' = 'initial',
  'parallelism' = '1',
  'table-name' = 'emp_2\.employees_[0-9]+',
  'sink.connector' = 'print',
)

提交作业

插入数据

代码语言:javascript
复制
insert into employees_2 VALUES ("10", "1992-09-12", "cai", "kunkun", "M", "2022-09-22");
insert into employees_2 VALUES ("11", "1990-09-15", "wang", "meimei", "F", "2021-04-12");

消费两条数据

普通停止

点击 '普通停止',不做savepoint,从checkpoint 处启动。

运维中心查看 checkpoint 信息

停止之后,我们可以从 '作业快照'中,查看到作业保存的checkpoint记录。

这跟hdfs 上保存的checkpoint记录 是一致的。

hdfs 的 checkpoint

恢复最新的 checkpoint

重启后插入一条数据

代码语言:javascript
复制
insert into employees_2 VALUES ("13", "1992-09-12", "cai", "kunkun", "M", "2022-09-22");

断点续传

温馨提示

运行 perjob、 app 模式的作业,如果作业被强行kill掉、内部错误等原因导致集群实例销毁, 会导致 Dinky 无法访问 JobManager 来获取 checkpoint 信息,可能存在 dinky 保存的 checkpoint 记录,跟 hdfs 上保存的记录不一致,有可能缺失最新的 checkpoint,所以线上作业恢复 checkpoint 时,需要查看 hdfs 上保存的最新 checkpoint 记录与 dinky 作比较。

五、手动指定 checkpoint 恢复

作业中指定 checkpoint

在上一个步骤中,点击 '此处恢复' 之后,作业能 '断点续传',实际原理是dinky 将 checkpoint 的记录填充到了作业的右边栏,选项为 '指定一次' 然后运行的

从指定 checkpoint 中恢复

所以,dinky也是支持手动指定某处checkpoint 恢复,只需 'SavePoin策略' 选择 '指定一次',将ck路径粘贴到 'SavePointPath',运行即可恢复checkpoint。

温馨提示

运行完毕,如查看到成功恢复ck之后,还请将 'SavePoin策略' 还原回 '最近一次',避免后续从这个检查点再次恢复。

六、总结

优点: 使用dinky,简化了线上作业的部署、运维、作业恢复等操作,增强了flink作业的健壮性。

不足: 如果线上作业过多,'运维中心' 找到指定的作业会比较费力,所以期待 '运维中心',增加能按照 '数据开发' 面板的分目录、分层级查看作业的功能,这样就能快速找到对应的作业。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-08-29,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Dinky开源 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 依赖准备
  • SavePoint 停止作业
  • 数据库中查看 SavePoint 信息
  • 数据开发查看作业 SavePoint 信息
  • 插入一条数据
  • 重启作业
  • 断点续传
  • FlinkWeb UI
  • TaskManager Stdout
  • 准备数据源
  • 提交作业
  • 消费两条数据
  • 普通停止
  • 作业中指定 checkpoint
  • 从指定 checkpoint 中恢复
  • 温馨提示
相关产品与服务
数据库
云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档