首页
学习
活动
专区
圈层
工具
发布
34 篇文章
1
腾讯云流计算 Oceanus 最佳实践&解决方案汇总
2
腾讯云流计算 Oceanus Connector 使用示例汇总
3
Flink 实践教程:进阶11-SQL 关联:Regular Join
4
Flink 实践教程:进阶10-自定义聚合函数(UDAF)
5
Flink 实践教程:进阶9-自定义表值函数(UDTF)
6
Flink 实践教程:进阶8-自定义标量函数(UDF)
7
Flink 实践教程:进阶7-基础运维
8
Flink 实践教程:进阶6-CEP 复杂事件处理
9
Flink 实践教程:进阶5-乱序调整
10
Flink 实践教程:进阶4-窗口 TOP N
11
Flink 实践教程:进阶3-窗口操作
12
Flink 实践教程:进阶2-复杂格式数据抽取
13
Flink 实践教程:进阶1-维表关联
14
Flink 实践教程:入门10-Python作业的使用
15
Flink 实践教程:入门9-Jar 作业开发
16
Flink 实践教程:入门8-简单 ETL 作业
17
Flink 实践教程:入门7-消费 Kafka 数据写入 PG
18
Flink 实践教程:入门6-读取 PG 数据写入 ClickHouse
19
Flink 实践教程:入门5-写入 ClickHouse
20
Flink 实践教程:入门4-读取 MySQL 数据写入 ES
21
Flink 实践教程:入门3-读取 MySQL 数据
22
Flink 实践教程:入门2-写入 Elasticsearch
23
Flink 实践教程:入门 1-零基础用户实现简单 Flink 任务
24
Oceanus 实践-从0到1接入 CKafka SQL 作业
25
Oceanus 实践-从0到1开发ClickHouse SQL作业
26
Oceanus 实践-从0到1开发PG SQL作业
27
基于腾讯云Oceanus实现MySQL和Hbase维表到数据仓库ClickHouse的实时分析
28
基于流计算 Oceanus 和 Elasticsearch Service 构建百亿级实时监控系统
29
Flink社区 | Flink CDC 2.0 正式发布,核心改进详解
30
用Python进行实时计算——PyFlink快速入门
31
实时数据湖:Flink CDC流式写入Hudi
32
专家带你吃透 Flink 架构:一个 新版 Connector 的实现
33
Apache Flink 1.12.2集成Hudi 0.9.0运行指南
34
Flink Metrics&REST API 介绍和原理解析

Flink 实践教程:进阶2-复杂格式数据抽取

流计算 Oceanus 简介

流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。

本文将为您详细介绍如何实时获取 CKafka 中的 JSON 格式数据,经过数据抽取、平铺转换后存入 MySQL 中。

前置准备

创建流计算 Oceanus 集群

在流计算 Oceanus 产品活动页面 1 元购买 Oceanus 集群

进入 Oceanus 控制台 [1],点击左侧【集群管理】,点击左上方【创建集群】,具体可参考 Oceanus 官方文档 创建独享集群 [2]。

创建消息队列 CKafka

进入 CKafka 控制台 [3],点击左上角【新建】,即可完成 CKafka 的创建,具体可参考 CKafka 创建实例 [4]。

创建 Topic

进入 CKafka 实例,点击【topic 管理】>【新建】,即可完成 Topic 的创建,具体可参考 CKafka 创建 Topic [5]。

数据准备

进入同子网的 CVM 下,启动 Kafka 客户端,模拟发送数据,具体操作参见 运行 Kafka 客户端 [6]。

代码语言:json
复制
// 数据格式
{
  "id": 1,
  "message": "流计算 Oceanus 1元限量秒杀活动",
  "userInfo": {
      "name": "张三",
      "phone": ["12345678910", "8547942"]
      },
  "companyInfo": {
      "name": "Tencent",
      "address": "深圳市腾讯大厦"
      }
}

创建 MySQL 实例

进入 MySQL 控制台 [7],点击【新建】。具体可参考官方文档 创建 MySQL 实例 [8]。

代码语言:sql
复制
-- 建表语句
CREATE TABLE `oceanus_advanced2` (
  `id`              int (100) NOT NULL,
  `message`         varchar (100) NULL DEFAULT '',
  `name`            varchar (50)  NULL DEFAULT '',
  `phone`           varchar (11)  NULL DEFAULT '',
  `company_name`    varchar (100) NULL DEFAULT '',
  `company_address` varchar (100) NULL DEFAULT '',
  PRIMARY KEY (`id`)
) ENGINE = innodb

流计算 Oceanus 作业

1. 创建 Source

代码语言:sql
复制
CREATE TABLE `kafka_json_source_table` (
    `id`             INT,
    `message`        STRING,
    `userInfo`       ROW<`name` STRING,`phone` ARRAY<STRING>>,  -- 采用 ROW 嵌套 ARRAY 格式接收 JSON 字段
    `companyInfo`    MAP<STRING,STRING>    -- 采用 MAP 格式接收 JSON 字段
) WITH (
  'connector' = 'kafka',
  'topic' = 'oceanus_advanced2',                      -- 替换为您要消费的 Topic
  'scan.startup.mode' = 'earliest-offset',            -- 可以是 latest-offset/earliest-offset/specific-offsets/group-offsets/timestamp 的任何一种
  'properties.bootstrap.servers' = '10.0.0.29:9092',  -- 替换为您的 Kafka 连接地址
  'properties.group.id' = 'testGroup',                -- 必选参数, 一定要指定 Group ID
  'format' = 'json',                                  -- 定义 JSON 格式,部分其他格式可能不支持抽取平铺
  'json.fail-on-missing-field' = 'false',             -- 如果设置为 false, 则遇到缺失字段不会报错。
  'json.ignore-parse-errors' = 'true'                 -- 如果设置为 true,则忽略任何解析报错。
);

2. 创建 Sink

代码语言:sql
复制
CREATE TABLE `jdbc_upsert_sink_table` (
    `id`                INT,
    `message`           STRING,
    `name`              STRING,
    `phone`             STRING,
    `company_name`      STRING,
    `company_address`   STRING
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://10.0.0.158:3306/testdb?rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai',         -- 请替换为您的实际 MySQL 连接参数
    'table-name' = 'oceanus_advanced2',    -- 需要写入的数据表
    'username' = 'root',                   -- 数据库访问的用户名(需要提供 INSERT 权限)
    'password' = 'Tencent123$',            -- 数据库访问的密码
    'sink.buffer-flush.max-rows' = '200',  -- 批量输出的条数
    'sink.buffer-flush.interval' = '2s'    -- 批量输出的间隔
);

3. 编写业务 SQL

代码语言:sql
复制
INSERT INTO `jdbc_upsert_sink_table`
SELECT
id                        AS  id,
message                   AS  message,
userInfo.name             AS  name,              -- 获取 Row 中成员采用.成员的方式
userInfo.phone[1]         AS  phone,             -- 获取 Array 中成员采用 [数组下标] 的方式
companyInfo['name']       AS  company_name,      -- 获取 Map 中成员采用 ['属性名'] 的方式
companyInfo['address']    AS  company_address
FROM `kafka_json_source_table`;

新版 Flink 1.13 集群无需用户选择内置 Connector,平台自动匹配获取

总结

本文详细介绍了如何通过 SQL 作业定义和获取 MAP、ARRAY、ROW 类型数据。更多内置运算符和函数请参考 Oceanus 官方文档 [9]。

参考链接

[1] Oceanus 控制台:https://console.cloud.tencent.com/oceanus/overview

[2] 创建独享集群:https://cloud.tencent.com/document/product/849/48298

[3] CKafka 控制台:https://console.cloud.tencent.com/ckafka/index?rid=1

[4] CKafka 创建实例:https://cloud.tencent.com/document/product/597/54839

[5] Ckafka 创建 Topic:https://cloud.tencent.com/document/product/597/54854

[6] 运行 Kafka 客户端:https://cloud.tencent.com/document/product/597/56840

[7] MySQL 控制台:https://console.cloud.tencent.com/cdb

[8] 创建 MySQL 实例:https://cloud.tencent.com/document/product/236/46433

[9] 内置运算符和函数:https://cloud.tencent.com/document/product/849/18083

下一篇
举报
领券