前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Dlink 在 Flink-mysql-cdc 到 Doris 的实践

Dlink 在 Flink-mysql-cdc 到 Doris 的实践

作者头像
文末丶
发布2021-12-27 13:04:14
2.3K0
发布2021-12-27 13:04:14
举报
文章被收录于专栏:DataLink数据中台DataLink数据中台

一、背景

Apache Doris是一个现代化的MPP分析型数据库产品。仅需亚秒级响应时间即可获得查询结果,有效地支持实时数据分析。例如固定历史报表,实时数据分析,交互式数据分析和探索式数据分析等。

实时即未来,本文将分享如何基于 Dlink 实现 Mysql 变动数据通过 Flink 实时入库 Doris。

二、准备

老规矩,列清各组件版本:

组件

版本

Flink

1.13.3

Flink-mysql-cdc

2.1.0

Doris

0.15.1-rc09

doris-flink

1.0-SNAPSHOT

Mysql

8.0.13

Dlink

0.4.0

需要注意的是,本文的 Doris 是基于 OracleJDK1.8 和 Scala 2.11 通过源码进行编译打包的,所以所有组件的 scala 均为 2.11,此处和 Doris 社区略有不同。

三、部署

本文的 Flink 和 Doris 的部署不做描述,详情请查阅官网。

https://doris.apache.org/master/zh-CN/extending-doris/flink-doris-connector.html#%E4%BD%BF%E7%94%A8%E6%96%B9%E6%B3%95

本文在 Dlink 部署成功的基础上进行,如需查看具体部署步骤,请阅读《flink sql 知其所以然(十六):flink sql 开发企业级利器之 Dlink》。

Dlink 的 plugins 下添加 doris-flink-1.0-SNAPSHOT.jarflink-sql-connector-mysql-cdc-2.1.0.jar 。重启 Dlink。

代码语言:javascript
复制
plugins/ -- Flink 相关扩展
|- doris-flink-1.0-SNAPSHOT.jar
|- flink-csv-1.13.3.jar
|- flink-dist_2.11-1.13.3.jar
|- flink-format-changelog-json-2.1.0.jar
|- flink-json-1.13.3.jar
|- flink-shaded-zookeeper-3.4.14.jar
|- flink-sql-connector-mysql-cdc-2.1.0.jar
|- flink-table_2.11-1.13.3.jar
|- flink-table-blink_2.11-1.13.3.jar

当然,如果您想直接使用 FLINK_HOME 的话,可以在 auto.sh 文件中 SETTING 变量添加$FLINK_HOME/lib

四、数据表

学生表 (student)

代码语言:javascript
复制
-- Mysql
DROP TABLE IF EXISTS `student`;
CREATE TABLE `student`  (
  `sid` int(11) NOT NULL,
  `name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  PRIMARY KEY (`sid`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

INSERT INTO `student` VALUES (1, '小红');
INSERT INTO `student` VALUES (2, '小黑');
INSERT INTO `student` VALUES (3, '小黄');

成绩表(score)

代码语言:javascript
复制
-- Mysql
DROP TABLE IF EXISTS `score`;
CREATE TABLE `score`  (
  `cid` int(11) NOT NULL,
  `sid` int(11) NULL DEFAULT NULL,
  `cls` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `score` int(11) NULL DEFAULT NULL,
  PRIMARY KEY (`cid`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

INSERT INTO `score` VALUES (1, 1, 'chinese', 90);
INSERT INTO `score` VALUES (2, 1, 'math', 95);
INSERT INTO `score` VALUES (3, 1, 'english', 93);
INSERT INTO `score` VALUES (4, 2, 'chinese', 92);
INSERT INTO `score` VALUES (5, 2, 'math', 75);
INSERT INTO `score` VALUES (6, 2, 'english', 80);
INSERT INTO `score` VALUES (7, 3, 'chinese', 100);
INSERT INTO `score` VALUES (8, 3, 'math', 60);

学生成绩宽表(scoreinfo)

代码语言:javascript
复制
-- Doris
CREATE TABLE scoreinfo
(
    cid INT,
    sid INT,
    name VARCHAR(32),
    cls VARCHAR(32),
    score INT
)
UNIQUE KEY(cid)
DISTRIBUTED BY HASH(cid) BUCKETS 10
PROPERTIES("replication_num" = "1");

五、FlinkSQL

代码语言:javascript
复制
CREATE TABLE student (
    sid INT,
    name STRING,
    PRIMARY KEY (sid) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '127.0.0.1',
'port' = '3306',
'username' = 'test',
'password' = '123456',
'database-name' = 'test',
'table-name' = 'student');
CREATE TABLE score (
    cid INT,
    sid INT,
    cls STRING,
    score INT,
    PRIMARY KEY (cid) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '127.0.0.1',
'port' = '3306',
'username' = 'test',
'password' = '123456',
'database-name' = 'test',
'table-name' = 'score');
CREATE TABLE scoreinfo (
    cid INT,
    sid INT,
    name STRING,
    cls STRING,
    score INT,
    PRIMARY KEY (cid) NOT ENFORCED
) WITH (       
'connector' = 'doris',
'fenodes' = '127.0.0.1:8030' ,
'table.identifier' = 'test.scoreinfo',
'username' = 'root',
'password'=''
);
insert into scoreinfo
select 
a.cid,a.sid,b.name,a.cls,a.score
from score a
left join student b on a.sid = b.sid

六、调试

在 Dlink 中提交

本示例采用了 yarn-session 的方式进行提交。

FlinkWebUI

上图可见,流任务已经成功被 Dlink 提交的远程集群。

Doris 查询

上图可见,Doris 已经被写入了历史全量数据。

增量测试

在 Mysql 中执行新增语句:

代码语言:javascript
复制
INSERT INTO `score` VALUES (9, 3, 'english', 100);

Doris 成功被追加:

变动测试

在 Mysql 中执行新增语句:

代码语言:javascript
复制
update score set score = 100 where cid = 1

Doris 成功被修改:

七、总结

Apache Flink 强大的流计算能力与 Apache Doris 的查询能力为企业内实时数仓或实时指标实现提供了另一种高效的选择方式,与此同时 Flink CDC 也更进一步降低整个实时链路的成本。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-12-18,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Dinky开源 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 学生表 (student)
  • 成绩表(score)
  • 学生成绩宽表(scoreinfo)
  • 在 Dlink 中提交
  • FlinkWebUI
  • Doris 查询
  • 增量测试
  • 变动测试
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档