首页
学习
活动
专区
圈层
工具
发布
34 篇文章
1
腾讯云流计算 Oceanus 最佳实践&解决方案汇总
2
腾讯云流计算 Oceanus Connector 使用示例汇总
3
Flink 实践教程:进阶11-SQL 关联:Regular Join
4
Flink 实践教程:进阶10-自定义聚合函数(UDAF)
5
Flink 实践教程:进阶9-自定义表值函数(UDTF)
6
Flink 实践教程:进阶8-自定义标量函数(UDF)
7
Flink 实践教程:进阶7-基础运维
8
Flink 实践教程:进阶6-CEP 复杂事件处理
9
Flink 实践教程:进阶5-乱序调整
10
Flink 实践教程:进阶4-窗口 TOP N
11
Flink 实践教程:进阶3-窗口操作
12
Flink 实践教程:进阶2-复杂格式数据抽取
13
Flink 实践教程:进阶1-维表关联
14
Flink 实践教程:入门10-Python作业的使用
15
Flink 实践教程:入门9-Jar 作业开发
16
Flink 实践教程:入门8-简单 ETL 作业
17
Flink 实践教程:入门7-消费 Kafka 数据写入 PG
18
Flink 实践教程:入门6-读取 PG 数据写入 ClickHouse
19
Flink 实践教程:入门5-写入 ClickHouse
20
Flink 实践教程:入门4-读取 MySQL 数据写入 ES
21
Flink 实践教程:入门3-读取 MySQL 数据
22
Flink 实践教程:入门2-写入 Elasticsearch
23
Flink 实践教程:入门 1-零基础用户实现简单 Flink 任务
24
Oceanus 实践-从0到1接入 CKafka SQL 作业
25
Oceanus 实践-从0到1开发ClickHouse SQL作业
26
Oceanus 实践-从0到1开发PG SQL作业
27
基于腾讯云Oceanus实现MySQL和Hbase维表到数据仓库ClickHouse的实时分析
28
基于流计算 Oceanus 和 Elasticsearch Service 构建百亿级实时监控系统
29
Flink社区 | Flink CDC 2.0 正式发布,核心改进详解
30
用Python进行实时计算——PyFlink快速入门
31
实时数据湖:Flink CDC流式写入Hudi
32
专家带你吃透 Flink 架构:一个 新版 Connector 的实现
33
Apache Flink 1.12.2集成Hudi 0.9.0运行指南
34
Flink Metrics&REST API 介绍和原理解析

Flink 实践教程:入门3-读取 MySQL 数据

Oceanus 简介

流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。

本文将为您详细介绍如何取 MySQL 数据,经过流计算 Oceanus 实时计算引擎分析,输出数据到日志(Logger Sink)当中。

前置准备

创建 Oceanus 集群

活动购买链接 1 元购买 Oceanus 集群

进入 Oceanus 控制台,点击左侧【集群管理】,点击左上方【创建集群】,具体可参考 Oceanus 官方文档 创建独享集群

创建 Mysql 实例

进入 MySQL 控制台,点击【新建】。具体可参考官方文档 创建 MySQL 实例。然后在【数据库管理】> 【参数设置】中设置参数 binlog_row_image=FULL,便于使用 CDC(Capture Data Change)特性,实现数据的变更实时捕获。

!创建 Oceanus 集群和 MySQL 实例时所选 VPC 必须是同一 VPC。

Oceanus 作业

1. 创建 Source

代码语言:sql
复制
CREATE TABLE `MySQLSourceTable` (
    `id` INT,
    `name` VARCHAR,
    PRIMARY KEY (`id`) NOT ENFORCED  -- 如果要同步的数据库表定义了主键, 则这里也需要定义
) WITH (
    'connector' = 'mysql-cdc',       -- 必须为 'mysql-cdc'
    'hostname' = '10.0.0.158',       -- 数据库的 IP
    'port' = '3306',                 -- 数据库的访问端口
    'username' = 'root',             -- 数据库访问的用户名(需要提供 SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT, SELECT, RELOAD 权限)
    'password' = 'xxxxxxxxxx',       -- 数据库访问的密码
    'database-name' = 'testdb',      -- 需要同步的数据库
    'table-name' = 'student'         -- 需要同步的数据表名
);

2. 创建 Sink

代码语言:sql
复制
CREATE TABLE CustomSink ( 
  id INT,
  name VARCHAR
) WITH ( 
    'connector' = 'logger',
    'print-identifier' = 'DebugData'
);

3. 编写业务 SQL

代码语言:sql
复制
INSERT INTO CustomSink
SELECT * FROM MySQLSourceTable;

4. 运行作业

点击【保存】>【发布草稿】运行作业。查看Flink UI Taskmanger 日志,观察全量数据是否正常打印到日志。

5. 验证 MySQL-CDC 特性

在 MySQL 中新增一条数据,然后在 Flink UI Taskmanger 日志中观察结果,观察新增的数据是否正常打印到日志。

在 MySQL 中修改和删除记录同样会更新到 Logger Sink中,并打印输出。

总结

1、Mysql CDC 支持对 MySQL 数据库的全量和增量读取,并保证 Exactly Once 语义。MySQL CDC 底层使用了 Debezium 来做 CDC(Change Data Capture),其工作特性可参考 数据库 MySQL CDC

2、输入到 Logger Sink 的数据, 会通过日志打印出来,便于调试。Logger Jar 包下载地址:https://cloud.tencent.com/document/product/849/58713

下一篇
举报
领券