前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Oceanus 实践-从0到1开发PG SQL作业

Oceanus 实践-从0到1开发PG SQL作业

作者头像
吴云涛
修改2021-11-29 14:50:20
9740
修改2021-11-29 14:50:20
举报

实时即未来,最近在腾讯云流计算 Oceanus 进行 Flink 实时计算服务,分享给大家~

本次实践为随机生成的数据写入到 Postgres(PG) 目的端。

1. 环境搭建

1.1. 创建流计算 Oceanus 集群

在 Oceanus 控制台的【集群管理】->【新建集群】页面创建集群,选择地域、可用区、VPC、日志、存储,设置初始密码等。

若之前未使用过 VPC,日志(CLS),存储(COS)这些组件,需要先进行创建。

VPC及子网需要和下面的PG集群使用同一个,否则需要手动打通(如对等连接)。

创建完后的集群如下:

Oceanus集群
Oceanus集群
1.2 创建 CDW PostgreSQL 集群

在云数据仓库控制台创建 PostgreSQL 集群,这里为了简单,选择了与 Oceanus 同一个地域,同可用区。网络选择也选择与上面同样的 VPC。

创建完后的集群如下:

CDW Postgres集群
CDW Postgres集群

要登录集群目前需要在同网段的云服务器才能登录,集群节点不能直接登录,通过同网段的云服务器(CVM)安装pg客户端进行登录,创建 PG 表。

这里需要先登录postgres database,然后创建自己的database。创建后进行才能登录自己的数据库,登录命令:

psql -h 172.28.28.91 -p 5436 -U test_root -d postgres

create database testdb;

\q

psql -h 172.28.28.91 -p 5436 -U test_root -d postgres

创建schema,创建表语句如下:

代码语言:sql
复制
create schema testschema;
create table testschema.pg_sink (id int not null, name text, primary key(id)) distributed by (id);

至此,环境准备完毕。

2. 作业创建

2.1 创建 SQL 作业

在 Oceanus 控制台【作业管理】->【新建作业】-> SQL作业,选择刚刚新建的集群创建作业。1.11 以下版本需在作业的【开发调试】->【作业参数】里面添加必要的 connector,如 jdbc connector。当前版本兼容了 1.13 Flink 无需手动添加 connector。

作业参数
作业参数
2.2 创建Source端

此处选择 Datagen 来随机生成一些数据。

代码语言:sql
复制
-- Datagen Connector 可以随机生成一些数据用于测试
-- 参见 [https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/datagen.html](https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/datagen.html)

 CREATE TABLE random_source ( 
 f_sequence INT, 
 f_random INT, 
 f_random_str VARCHAR 
 ) WITH ( 
 'connector' = 'datagen', 
 'rows-per-second'='1',  -- 每秒产生的数据条数
 'fields.f_sequence.kind'='sequence',   -- 有界序列(结束后自动停止输出)
 'fields.f_sequence.start'='1',         -- 序列的起始值
 'fields.f_sequence.end'='10000',       -- 序列的终止值 
 'fields.f_random.kind'='random',       -- 无界的随机数
 'fields.f_random.min'='1',             -- 随机数的最小值
 'fields.f_random.max'='1000',          -- 随机数的最大值
 'fields.f_random_str.length'='10'      -- 随机字符串的长度
   );
2.3 创建 Sink 端

此处 sink 的表信息需要和 CDW PG 中表信息完全对应。

代码语言:sql
复制
CREATE TABLE `pg_sink` (
    `id` INT,
    `name` VARCHAR
) WITH (
    -- 指定数据库连接参数
    'connector' = 'jdbc',
    'url' = 'jdbc:postgresql://172.28.28.91:5436/testdb?currentSchema=testschema&reWriteBatchedInserts=true',   -- 请替换为您的实际 PostgreSQL 连接参数
    'table-name' = 'pg_sink',   -- 需要写入的数据表
    'username' = 'test_root',        -- 数据库访问的用户名(需要提供 INSERT 权限)
    'password' = 'your_password',  -- 数据库访问的密码

    -- 数据目的 Sink 性能调优参数
    'sink.buffer-flush.max-rows' = '5000', -- 可选参数, 表示每批数据的最大缓存条数, 默认值是 5000
    'sink.buffer-flush.interval' = '2s', -- 可选参数, 表示每批数据的刷新周期, 默认值是 0s
    'sink.max-retries' = '3' -- 可选参数, 表示数据库写入出错时, 最多尝试的次数
);
2.4 算子操作

这里大家可以自由发挥,进行运算数据处理。我在此处只做了简单的数据插入,没有进行复杂计算。

代码语言:sql
复制
INSERT INTO pg_sink SELECT f_sequence as id, f_random_str as name FROM random_source;

3. 验证总结

查询pg数据,数据是否插入成功。

代码语言:sql
复制
select * from testddb.pg_sink;

阅读参考:

[1] 流计算 Oceanus 官方文档:https://cloud.tencent.com/document/product/849

[2] PostgreSQL (CDW PG) 集群官方文档:https://cloud.tencent.com/document/product/878

本文系转载,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文系转载前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 环境搭建
  • 2. 作业创建
  • 3. 验证总结
  • 阅读参考:
相关产品与服务
流计算 Oceanus
流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的企业级实时大数据分析平台,具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档