首页
学习
活动
专区
圈层
工具
发布
34 篇文章
1
腾讯云流计算 Oceanus 最佳实践&解决方案汇总
2
腾讯云流计算 Oceanus Connector 使用示例汇总
3
Flink 实践教程:进阶11-SQL 关联:Regular Join
4
Flink 实践教程:进阶10-自定义聚合函数(UDAF)
5
Flink 实践教程:进阶9-自定义表值函数(UDTF)
6
Flink 实践教程:进阶8-自定义标量函数(UDF)
7
Flink 实践教程:进阶7-基础运维
8
Flink 实践教程:进阶6-CEP 复杂事件处理
9
Flink 实践教程:进阶5-乱序调整
10
Flink 实践教程:进阶4-窗口 TOP N
11
Flink 实践教程:进阶3-窗口操作
12
Flink 实践教程:进阶2-复杂格式数据抽取
13
Flink 实践教程:进阶1-维表关联
14
Flink 实践教程:入门10-Python作业的使用
15
Flink 实践教程:入门9-Jar 作业开发
16
Flink 实践教程:入门8-简单 ETL 作业
17
Flink 实践教程:入门7-消费 Kafka 数据写入 PG
18
Flink 实践教程:入门6-读取 PG 数据写入 ClickHouse
19
Flink 实践教程:入门5-写入 ClickHouse
20
Flink 实践教程:入门4-读取 MySQL 数据写入 ES
21
Flink 实践教程:入门3-读取 MySQL 数据
22
Flink 实践教程:入门2-写入 Elasticsearch
23
Flink 实践教程:入门 1-零基础用户实现简单 Flink 任务
24
Oceanus 实践-从0到1接入 CKafka SQL 作业
25
Oceanus 实践-从0到1开发ClickHouse SQL作业
26
Oceanus 实践-从0到1开发PG SQL作业
27
基于腾讯云Oceanus实现MySQL和Hbase维表到数据仓库ClickHouse的实时分析
28
基于流计算 Oceanus 和 Elasticsearch Service 构建百亿级实时监控系统
29
Flink社区 | Flink CDC 2.0 正式发布,核心改进详解
30
用Python进行实时计算——PyFlink快速入门
31
实时数据湖:Flink CDC流式写入Hudi
32
专家带你吃透 Flink 架构:一个 新版 Connector 的实现
33
Apache Flink 1.12.2集成Hudi 0.9.0运行指南
34
Flink Metrics&REST API 介绍和原理解析

Flink 实践教程:入门8-简单 ETL 作业

流计算 Oceanus 简介

流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。

本示例使用 Oceanus 平台的 ETL 功能,将 PostgreSQL 数据取出,经过时间转换函数处理后存入 PostgreSQL 中。用户无需编写 SQL 代码,只用在界面上进行简单的点击操作即可创建 Oceanus ETL 作业。

前置准备

创建流计算 Oceanus 集群

活动购买链接 1 元购买 Oceanus 集群

进入 Oceanus 控制台 [1],点击左侧【集群管理】,点击左上方【创建集群】,具体可参考 Oceanus 官方文档 创建独享集群 [2]。

创建 PostgreSQL 实例

进入 PostgreSQL 控制台 [3],点击左上角【新建】创建实例,具体参考 创建 PostgreSQL 实例 [4]。进入实例数据库,创建表。

代码语言:sql
复制
-- 用于 Source
CREATE TABLE public.oceanus8_output (
id         INT,
time_one   TIMESTAMP,
PRIMARY    KEY(id)   );
​
-- 手动插入数据
INSERT INTO public.oceanus8_output VALUES (1,'2020-10-01 18:00:00');
INSERT INTO public.oceanus8_output VALUES (2,'2021-10-01 18:30:24');
​
-- 用于 Sink
CREATE TABLE public.oceanus8_input (
id           INT,
transf_one   VARCHAR(50),
transf_two   TIMESTAMP,
const_four   INT,
PRIMARY      KEY(id)  );

笔者这里使用 DBeaver 进行外网连接,更多连接方式参考官网文档 连接 PostgreSQL 实例 [5]

流计算 Oceanus 作业

进入 Oceanus 控制台 [1],点击左上角【新建】创建 ETL 作业,点击【开发调试】进入作业编辑页面。ETL 作业源端可以是 MySQL、PostgreSQL,目的端可以是 MySQL、PostgreSQL、ClickHouse 和 Elasticsearch。

1. 创建 Source

单击【数据源表】右侧【添加】按钮,选择 PostgreSQL ,选择并填写数据库表相关的信息。

2. 创建 Sink

单击【数据目的表】右侧【添加】按钮,选择 PostgreSQL ,选择并填写数据库表相关的信息。

3. 映射字段

编写需要创建映射字段的业务逻辑。这里使用 DATA_FORMAT 函数将 time_one 字段类型由 TIMESTAMP 映射为 STRING,使用 TIMESTAMPADD 函数将 time_one 字段增加一周,并将常量 1000 存入 const_str 字段。

ETL 作业字段映射

ETL 作业开发详见 Oceanus 官方文档 ETL 开发指南 [6]。

添加数据源表和目的表后,可配置字段映射。字段映射分为原字段映射和新增字段映射两个部分。

原字段映射

在左侧的数据源表中可以勾选本次 ETL 作业需要从数据源表抽取的数据字段,并在右侧选择要加载进目的表的对应的映射字段名称。这样在数据源表中的数据就会复制加载到目的表中。

新增字段映射

字段生成方式有计算字段和常量字段两种。

  • 计算字段可以对从数据源表抽取出来的字段数据进行 内置函数 数值转换或者计算。
  • 常量字段可以输入一个自定义常量字段到目的源表相应的字段中。
字段取值
  • 计算字段:字段取值可以输入字段值或者表达式,对每个满足的输入源数据进行表达式计算。将计算结果返回到数据目的表所选的映射字段中。
  • 常量字段:字段取值可以输入字符串或者数字(输入类型与目的表类型要一致),这个常量字段取值将会加载到每一条数据目的表所选的映射字段中。

总结

流计算 Oceanus ETL 作业最简化了用户操作,开发人员甚至无需了解编程语言,只需要选择数据源表和目的表,并根据业务逻辑完成字段映射的配置,花费几分钟即可轻松启动 ETL 作业。

计算字段:可以对从数据源表抽取出来的字段数据进行 内置函数 [7] 数值转换或者计算。 常量字段:可以输入一个自定义常量字段到目的源表相应的字段中。

参考链接

[1] Oceanus 控制台:https://console.cloud.tencent.com/oceanus/overview

[2] 创建独享集群:https://cloud.tencent.com/document/product/849/48298

[3] PostgreSQL 控制台:https://console.cloud.tencent.com/postgres/index

[4] 创建 PostgreSQL 实例:https://cloud.tencent.com/document/product/409/56961

[5] 连接 PostgreSQL 实例:https://cloud.tencent.com/document/product/409/40429

[6] ETL 开发指南:https://cloud.tencent.com/document/product/849/59839

[7] 内置函数:https://cloud.tencent.com/document/product/849/18083

下一篇
举报
领券