前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink 实践教程-入门(4):读取 MySQL 数据写入到 ES

Flink 实践教程-入门(4):读取 MySQL 数据写入到 ES

作者头像
腾讯云大数据
发布2021-11-09 10:40:16
1.2K0
发布2021-11-09 10:40:16
举报
文章被收录于专栏:腾讯云大数据

作者:腾讯云流计算 Oceanus 团队

流计算 Oceanus 简介

流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。

本文将为您详细介绍如何使用 MySQL 接入数据,经过流计算 Oceanus 对数据进行处理分析(示例中采用小写转换函数对name字段进行了小写转换),最终将处理好的数据存入 Elasticsearch 中 。

前置准备

1.  MySQL 集群准备

1.1 新建 MySQL 集群

进入 MySQL 控制台[1],点击左上方【新建】创建集群。具体可参考官方文档 创建 mysql 实例[2]。在【数据库管理】> 【参数设置】中设置参数 binlog_row_image=FULL,便于使用 CDC(Capture Data Change)特性,实现数据的变更实时捕获。

1.2 准备数据

首先创建 testdb 库,并在 testdb 库中创建用户 user 表,并插入数据。

user 表结构:

字段名

类型

含义

user_id

int

用户ID

user_name

varchar(50)

用户名

create_time

timestamp

创建时间

在表中插入2条数据。

代码语言:javascript
复制
INSERT INTO `user` (`user_id`, `user_name`, `create_time`) VALUES (1001, '小明', '2021-10-01 00:00:00');INSERT INTO `user` (`user_id`, `user_name`, `create_time`) VALUES (1002, 'TONY', '2021-10-02 00:00:00');
1.3 设置参数

点击实例 ID,在实例详情页面点击【数据库管理】进入【参数设置】面板,设置binlog_row_image=FULL来开启数据库变化的同步。

通过 MySQL 集成数据到流计算 Oceanus (Flink) 集群,可以使用 flink-connector-jdbc 或者 flink-connector-mysq-cdc。使用 MySQL-cdc 特性时,flink-connector-mysq-cdc 连接器需要设置 MySQL 数据库的参数 binlog_row_image=FULL。

2. 创建流计算 Oceanus 集群

进入流计算 Oceanus 控制台[3],点击左侧【集群管理】,点击左上方【创建集群】,具体可参考流计算 Oceanus 官方文档创建独享集群[4]。

创建流计算 Oceanus 集群和 MySQL 集群时所选 VPC 必须是同一 VPC。

3. 创建 Elasticsearch 集群

进入 Elasticsearch 控制台[5],点击左上方【新建】,创建 Elasticsearch 实例,具体操作请访问创建 Elasticsearch 集群[6]。

创建 ES 集群和流计算 Oceanus 集群时所选私有网络 VPC 必须是同一 VPC。

流计算 Oceanus 作业

1. 创建 Source

代码语言:javascript
复制
CREATE TABLE `user_source` (    `user_id` int,    `user_name` varchar(50),    PRIMARY KEY (`user_id`) NOT ENFORCED -- 如果要同步的数据库表定义了主键, 则这里也需要定义) WITH (    'connector' = 'mysql-cdc',      -- 必须为 'mysql-cdc'    'hostname' = '10.0.0.158',      -- 数据库的 IP    'port' = '3306',                -- 数据库的访问端口    'username' = 'root',            -- 数据库访问的用户名(需要提供 SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT, SELECT, RELOAD 权限)    'password' = 'yourpassword',    -- 数据库访问的密码    'database-name' = 'testdb',     -- 需要同步的数据库    'table-name' = 'user'           -- 需要同步的数据表名);

2. 创建 Sink

代码语言:javascript
复制
-- Elasticsearch 只能作为数据目的表(Sink)写入-- 参见 https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#elasticsearch-connector
CREATE TABLE es_sink (    `user_id` INT,    `user_name` VARCHAR) WITH (    'connector.type' = 'elasticsearch',    -- 输出到 Elasticsearch    'connector.version' = '6',             -- 指定 Elasticsearch 的版本, 例如 '6', '7'.     'connector.hosts' = 'http://10.0.0.175:9200',     'connector.index' = 'User',         'connector.document-type' = 'user',      'connector.username' = 'elastic',      'connector.password' = 'yourpassword', 
    'update-mode' = 'upsert',              -- 捕捉数据库变化时,需使用 'upsert' 模式     'connector.key-delimiter' = '$',       -- 可选参数, 复合主键的连接字符 (默认是 _ 符号)    'connector.key-null-literal' = 'n/a',  -- 主键为 null 时的替代字符串,默认是 'null'    'connector.connection-max-retry-timeout' = '300', -- 每次请求的最大超时时间 (ms)    'format.type' = 'json'                 -- 输出数据格式, 目前只支持 'json');

3. 编写业务 SQL

代码语言:javascript
复制
insert into es_sink(    select user_id,    LOWER(user_name) -- LOWER()函数会将用户名转换为小写    from user_source);

4. 选择 Connector

点击【保存】>【发布草稿】运行作业。

请根据实际购买的 Elasticsearch 版本选择对应的 Connector ,1.13 版本之后无需选择可自动匹配 Connector。

5. 数据查询

进入 Elasticsearch 控制台[5],点击之前购买的 Elasticsearch 实例,点击右上角【Kibana】,进入 Kibana 查询数据。具体查询方法请参考通过 Kibana 访问集群[7]。

总结

本示例用 MySQL 连接器持续集成数据库数据变化记录,经过流计算 Oceanus 实现最基础的数据转换功能,最后 Sink 到Elasticsearch 中,用户无需提前在 Elasticsearch 中创建索引。另外,ES 作为Source/Sink , 使用时间戳 timestamp 类型字段时长度需指定,如:timestamp(3)

参考阅读

[1]: MySQL 控制台:https://console.cloud.tencent.com/cdb

[2]: 创建 mysql 实例:https://cloud.tencent.com/document/product/236/46433

[3]: 流计算 Oceanus 控制台:https://console.cloud.tencent.com/oceanus/overview

[4]: 创建 Oceanus 独享集群:https://cloud.tencent.com/document/product/849/48298

[5]: Elasticsearch 控制台:https://console.cloud.tencent.com/es

[6]: 创建 Elasticsearch 集群:https://cloud.tencent.com/document/product/845/19536

[7]: 通过 Kibana 访问集群:https://cloud.tencent.com/document/product/845/19541

流计算 Oceanus 限量秒杀专享活动火爆进行中↓↓

点击文末「阅读原文」,了解腾讯云流计算 Oceanus 更多信息~

腾讯云大数据

长按二维码 关注我们

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-11-08,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 腾讯云大数据 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 作者:腾讯云流计算 Oceanus 团队
  • 流计算 Oceanus 简介
  • 1.  MySQL 集群准备
    • 1.2 准备数据
      • 1.3 设置参数
        • 2. 创建流计算 Oceanus 集群
          • 3. 创建 Elasticsearch 集群
          • 流计算 Oceanus 作业
            • 1. 创建 Source
              • 2. 创建 Sink
                • 3. 编写业务 SQL
                  • 4. 选择 Connector
                    • 5. 数据查询
                    • 总结
                    • 参考阅读
                    相关产品与服务
                    流计算 Oceanus
                    流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的企业级实时大数据分析平台,具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。
                    领券
                    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档