前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink 实践教程:入门6-读取 PG 数据写入 ClickHouse

Flink 实践教程:入门6-读取 PG 数据写入 ClickHouse

原创
作者头像
吴云涛
修改2021-12-08 16:02:06
1.4K0
修改2021-12-08 16:02:06
举报

流计算 Oceanus 简介

流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。

本文将向您详细介绍如何获取 PostgreSQL 表数据,并使用字符串函数进行转换,最后将数据输出到 ClickHouse 中。

Flink 实践教程:入门6-读取 PG 数据写入 ClickHouse

前置准备

创建流计算 Oceanus 集群

活动购买链接 1 元购买 Oceanus 集群

进入 Oceanus 控制台 [1],点击左侧【集群管理】,点击左上方【创建集群】,具体可参考 Oceanus 官方文档 创建独享集群 [2]。

创建 PostgreSQL 实例

进入 PostgreSQL 控制台 [3],点击左上角【新建】创建实例,具体参考 创建 PostgreSQL 实例 [4]。

数据准备:

进入实例数据库,创建 test1 表,并手动插入数据。

-- 建表语句
create table public.test1 (
    id INT,
    str_one VARCHAR(50),
    str_two VARCHAR(50),
    str_thr VARCHAR(50),
    PRIMARY key(id)
);
​
-- 插入语句
INSERT INTO public.test1 VALUES (1, 'hello world', 'b', 'Oceanus-1');
INSERT INTO public.test1 VALUES (2, 'good job', 'c', 'Oceanus-2');
INSERT INTO public.test1 VALUES (3, 'hello oceanus', 'd', 'Oceanus-3');

笔者这里使用 DBeaver 进行外网连接,更多连接方式参考官网文档 连接 PostgreSQL 实例 [5]

创建 ClickHouse 集群

进入 ClickHouse 控制台 [6],点击左上角【新建集群】,完成 ClickHouse 集群创建,具体可参考 ClickHouse 快速入门 [7]。

创建 ClickHouse 表: 登陆 ClickHouse 集群(登入方式参考 ClickHouse 快速入门 [7]),并建表。

CREATE TABLE default.pg_to_ck on cluster default_cluster (
    id  Int8,
    str_one String,
    str_two String,
    str_thr String,
    Sign Int8 )
ENGINE = ReplicatedCollapsingMergeTree('/clickhouse/tables/{layer}-{shard}/default/pg_to_ck', '{replica}',Sign)
ORDER BY (id);

注:Oceanus 集群、PostgreSQL 实例、ClickHouse 集群需在同一 VPC 下。

流计算 Oceanus 作业

1. 创建 Source
-- PostgreSQL CDC Source。
CREATE TABLE PostgreSourceTable (
  id INT,
  str_one VARCHAR,
  str_two VARCHAR,
  str_thr VARCHAR,
  PRIMARY KEY (id) NOT ENFORCED -- 如果要同步的数据库表定义了主键, 则这里也需要定义
) WITH (
  'connector' = 'postgres-cdc',  -- 必须为 'postgres-cdc'
  'hostname' = '10.0.0.236',     -- 数据库的 IP
  'port' = '5432',               -- 数据库的访问端口
  'username' = 'root',           -- 数据库访问使用的用户名(需要提供 REPLICATION 权限, 日志级别必须大于等于 logical, 且设置后需要重启实例)
  'password' = 'xxxxxxxxxxx',    -- 数据库访问使用的密码
  'database-name' = 'postgres',  -- 需要同步的数据库名
  'schema-name' = 'public',      -- 需要同步的数据库模式 (Schema)
  'table-name' = 'test1'         -- 需要同步的数据表名
);
2. 创建 Sink
-- ClickHouse Sink (不完全支持upsert,详见说明文档)。配合 flink-connector-clickhouse 使用。
CREATE TABLE clickhouse_sink (
  id INT,
  str_one VARCHAR,
  str_two VARCHAR,
  str_thr VARCHAR,
  PRIMARY KEY (id) NOT ENFORCED          -- 如果要同步的数据库表定义了主键, 则这里也需要定义
) WITH (
  'connector' = 'clickhouse',              -- connector 类型为 clickhouse
  'url' = 'clickhouse://10.0.0.178:8123',  -- 指定数据库链接 url
  'database-name' = 'default',             -- 需要写入的 clickhouse 库名
  'table-name' = 'pg_to_ck',               -- 需要写入的 clickhouse 表名
  'table.collapsing.field' = 'Sign'        -- 采用 CollapsingMergeTree 引擎的 clickhouse 表,Collapsing 列字段的名称
);
3. 编写业务 SQL
INSERT INTO clickhouse_sink
SELECT 
  id,
--INITCAP:将 str_one 中的单词转为大写开头,例如 INITCAP('i have a dream') 返回 'I Have A Dream'。
  INITCAP(str_one)    AS str_one,
--TO_BASE64:将 string 表示的字符串编码为 Base64 字符串。
  TO_BASE64(str_two)  AS str_two,
--REPLACE:将 string1 字符串中所有的 string2 替换为 string3。例如 REPLACE('banana', 'a', 'A') 返回 'bAnAnA'。
  REPLACE(str_thr,'Oceanus','Hello Oceanus') AS str_thr 
FROM PostgreSourceTable;

这里我们使用 Flink 1.13 集群,旧版 Flink 集群需选择相应的内置 Connector

总结

  1. 使用 Postgres-CDC 连接器:
  • 用于同步的 Postgres 用户至少需要开启 REPLICATION、LOGIN、SCHEMA、DATABASE、SELECT 权限。可以进入 PostgreSQL 数据库进行授权操作。
CREATE ROLE debezium_user REPLICATION LOGIN;
GRANT USAGE ON DATABASE database_name TO debezium_user;
GRANT USAGE ON SCHEMA schema_name TO debezium_user;
GRANT SELECT ON scheam_name.table_name, scheam_name.table_name TO debezium_user;
  • 日志级别必须大于等于 logical, 且设置后需要重启实例。进入数据库实例,单击【参数设置】,单击【WAL】,修改【wal_level】的【参数运行值】为 "logical"。修改成功后点击右上角【重启】。
  1. 更多字符串操作函数请参考 Oceanus 官方文档 字符串函数[8]。

参考链接

[1] Oceanus 控制台:https://console.cloud.tencent.com/oceanus/overview [2] 创建独享集群:https://cloud.tencent.com/document/product/849/48298 [3] PostgreSQL 控制台:https://console.cloud.tencent.com/postgres/index [4] 创建 PostgreSQL 实例:https://cloud.tencent.com/document/product/409/56961 [5] 连接 PostgreSQL 实例:https://cloud.tencent.com/document/product/409/40429 [6] ClickHouse 控制台:https://console.cloud.tencent.com/cdwch?region=ap-guangzhou [7] ClickHouse 快速入门:https://cloud.tencent.com/document/product/1299/49824 [8] Oceanus 字符串函数:https://cloud.tencent.com/document/product/849/18073

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 流计算 Oceanus 简介
  • 前置准备
    • 创建流计算 Oceanus 集群
      • 创建 PostgreSQL 实例
        • 数据准备:
      • 创建 ClickHouse 集群
        • 1. 创建 Source
        • 2. 创建 Sink
        • 3. 编写业务 SQL
    • 流计算 Oceanus 作业
    • 总结
    • 参考链接
    相关产品与服务
    流计算 Oceanus
    流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的企业级实时大数据分析平台,具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档