前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Oceanus 实践-从0到1接入 CKafka SQL 作业

Oceanus 实践-从0到1接入 CKafka SQL 作业

原创
作者头像
于乐
修改2021-09-30 16:38:53
7510
修改2021-09-30 16:38:53
举报

Oceanus 简介

流计算 Oceanus 是位于云端的流式数据汇聚、计算服务。只需几分钟,您就可以轻松构建网站点击流分析、电商精准推荐、物联网 IoT 等应用。流计算基于 Apache Flink 构建,提供全托管的云上服务,您无须关注基础设施的运维,并能便捷对接云上数据源,获得完善的配套支持。

流计算 Oceanus 提供了便捷的控制台环境,方便用户编写 SQL 分析语句或者上传运行自定义 JAR 包,支持作业运维管理。基于 Flink 技术,流计算可以在 PB 级数据集上支持亚秒级的处理延时。

目前 Oceanus 使用的是独享集群模式,用户可以在自己的集群中运行各类作业,并进行相关资源管理。

操作步骤

步骤1:获取Ckafka实例接入地址

Ckafka实例与Oceanus集群在同一子网时:

Ckafka接入地址为:

Ckafka内网IP与端口.png
Ckafka内网IP与端口.png

Ckafka实例与Oceanus集群不在同一子网时:

1、登陆 Ckafka 控制台

2、在左侧导航栏选择【实例列表】,单击实例的“ID”,进入实例基本信息页面

3、在基本信息页面的【接入方式】模块里面,点击【添加路由策略】

Ckafka接入方式.png
Ckafka接入方式.png

4、【路由类型】选择VPC网络,【网络】注意选择Oceanus对应集群的网络

添加路由策略.png
添加路由策略.png

步骤2: 创建topic

1、在实例基本信息页面,选择顶部【Topic管理】页签。

2、在 Topic 管理页面,单击【新建】,创建名为 oceanus_test1、oceanus_test2 的两个 Topic,接下来将讲解Oceanus如何接入Ckafka。

创建topic.png
创建topic.png

步骤3: 接入Ckafka

1、访问 流计算Oceanus产品,点击【立即使用】或购买产品。

2、在【作业管理】页面点击左上角【新建】,创建作业。(演示使用,这里选用SQL作业,客户可自行选择作业类型)

3、选择已经创建好的“运行集群”。

新建作业.png
新建作业.png

4、SQL作业开发调试。(这里实现Oceanus从Ckafka消费数据,并将数据写入Ckafka中)

(1) 创建source

 CREATE TABLE `DataInput` (
       `request_time` VARCHAR,
       `client_ip` VARCHAR,
       `request_method` VARCHAR
 ) WITH (
     'connector' = 'kafka',   -- 可选 'kafka','kafka-0.11'. 注意选择对应的内置  Connector
     'topic' = 'oceanus_test1',  -- 替换为您要消费的 Topic
     'scan.startup.mode' = 'earliest-offset', -- 可以是 latest-offset / earliest-offset / specific-offsets / group-offsets 的任何一种
     'properties.bootstrap.servers' = 'IP地址:端口',  -- 替换为您的 Kafka 连接地址
     'properties.group.id' = 'testGroup',  -- 必选参数, 一定要指定 Group ID
     -- 定义数据格式 (JSON 格式)
     'format' = 'json',
     'json.ignore-parse-errors' = 'true',     -- 忽略 JSON 结构解析异常
     'json.fail-on-missing-field' = 'false'   -- 如果设置为 true, 则遇到缺失字段会报错 设置为 false 则缺失字段设置为 null
 );

(2) 创建sink

 CREATE TABLE `DataOutput` (
       `request_time` VARCHAR,
       `client_ip` VARCHAR,
       `request_method` VARCHAR
 ) WITH (
     'connector' = 'kafka',   -- 可选 'kafka','kafka-0.11'. 注意选择对应的内置  Connector
     'topic' = 'oceanus_test2',  -- 替换为您要消费的 Topic
     'properties.bootstrap.servers' = 'IP地址:端口',  -- 替换为您的 Kafka 连接地址
     -- 定义数据格式 (JSON 格式)
     'format' = 'json',
     'json.ignore-parse-errors' = 'true',     -- 忽略 JSON 结构解析异常
     'json.fail-on-missing-field' = 'false'   -- 如果设置为 true, 则遇到缺失字段会报错 设置为 false 则缺失字段设置为 null
 );

(3) 业务逻辑

 INSERT INTO DataOutput
 SELECT * FROM DataInput;

(4) 点击【作业参数】,点击【内置Connector】,选择“flink-connector-kafka”,然后点击【确认】保存。

内置Connector.png
内置Connector.png

注:具体实现请参考流计算Oceanus帮助文档

(https://cloud.tencent.com/document/product/849/48310)

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Oceanus 简介
  • 操作步骤
    • 步骤1:获取Ckafka实例接入地址
      • 步骤2: 创建topic
        • 步骤3: 接入Ckafka
        相关产品与服务
        流计算 Oceanus
        流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的企业级实时大数据分析平台,具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档