首页
学习
活动
专区
圈层
工具
发布
34 篇文章
1
腾讯云流计算 Oceanus 最佳实践&解决方案汇总
2
腾讯云流计算 Oceanus Connector 使用示例汇总
3
Flink 实践教程:进阶11-SQL 关联:Regular Join
4
Flink 实践教程:进阶10-自定义聚合函数(UDAF)
5
Flink 实践教程:进阶9-自定义表值函数(UDTF)
6
Flink 实践教程:进阶8-自定义标量函数(UDF)
7
Flink 实践教程:进阶7-基础运维
8
Flink 实践教程:进阶6-CEP 复杂事件处理
9
Flink 实践教程:进阶5-乱序调整
10
Flink 实践教程:进阶4-窗口 TOP N
11
Flink 实践教程:进阶3-窗口操作
12
Flink 实践教程:进阶2-复杂格式数据抽取
13
Flink 实践教程:进阶1-维表关联
14
Flink 实践教程:入门10-Python作业的使用
15
Flink 实践教程:入门9-Jar 作业开发
16
Flink 实践教程:入门8-简单 ETL 作业
17
Flink 实践教程:入门7-消费 Kafka 数据写入 PG
18
Flink 实践教程:入门6-读取 PG 数据写入 ClickHouse
19
Flink 实践教程:入门5-写入 ClickHouse
20
Flink 实践教程:入门4-读取 MySQL 数据写入 ES
21
Flink 实践教程:入门3-读取 MySQL 数据
22
Flink 实践教程:入门2-写入 Elasticsearch
23
Flink 实践教程:入门 1-零基础用户实现简单 Flink 任务
24
Oceanus 实践-从0到1接入 CKafka SQL 作业
25
Oceanus 实践-从0到1开发ClickHouse SQL作业
26
Oceanus 实践-从0到1开发PG SQL作业
27
基于腾讯云Oceanus实现MySQL和Hbase维表到数据仓库ClickHouse的实时分析
28
基于流计算 Oceanus 和 Elasticsearch Service 构建百亿级实时监控系统
29
Flink社区 | Flink CDC 2.0 正式发布,核心改进详解
30
用Python进行实时计算——PyFlink快速入门
31
实时数据湖:Flink CDC流式写入Hudi
32
专家带你吃透 Flink 架构:一个 新版 Connector 的实现
33
Apache Flink 1.12.2集成Hudi 0.9.0运行指南
34
Flink Metrics&REST API 介绍和原理解析

Oceanus 实践-从0到1接入 CKafka SQL 作业

Oceanus 简介

流计算 Oceanus 是位于云端的流式数据汇聚、计算服务。只需几分钟,您就可以轻松构建网站点击流分析、电商精准推荐、物联网 IoT 等应用。流计算基于 Apache Flink 构建,提供全托管的云上服务,您无须关注基础设施的运维,并能便捷对接云上数据源,获得完善的配套支持。

流计算 Oceanus 提供了便捷的控制台环境,方便用户编写 SQL 分析语句或者上传运行自定义 JAR 包,支持作业运维管理。基于 Flink 技术,流计算可以在 PB 级数据集上支持亚秒级的处理延时。

目前 Oceanus 使用的是独享集群模式,用户可以在自己的集群中运行各类作业,并进行相关资源管理。

操作步骤

步骤1:获取Ckafka实例接入地址

Ckafka实例与Oceanus集群在同一子网时:

Ckafka接入地址为:

Ckafka内网IP与端口.png

Ckafka实例与Oceanus集群不在同一子网时:

1、登陆 Ckafka 控制台

2、在左侧导航栏选择【实例列表】,单击实例的“ID”,进入实例基本信息页面

3、在基本信息页面的【接入方式】模块里面,点击【添加路由策略】

Ckafka接入方式.png

4、【路由类型】选择VPC网络,【网络】注意选择Oceanus对应集群的网络

添加路由策略.png

步骤2: 创建topic

1、在实例基本信息页面,选择顶部【Topic管理】页签。

2、在 Topic 管理页面,单击【新建】,创建名为 oceanus_test1、oceanus_test2 的两个 Topic,接下来将讲解Oceanus如何接入Ckafka。

创建topic.png

步骤3: 接入Ckafka

1、访问 流计算Oceanus产品,点击【立即使用】或购买产品。

2、在【作业管理】页面点击左上角【新建】,创建作业。(演示使用,这里选用SQL作业,客户可自行选择作业类型)

3、选择已经创建好的“运行集群”。

新建作业.png

4、SQL作业开发调试。(这里实现Oceanus从Ckafka消费数据,并将数据写入Ckafka中)

(1) 创建source

代码语言:txt
复制
 CREATE TABLE `DataInput` (
       `request_time` VARCHAR,
       `client_ip` VARCHAR,
       `request_method` VARCHAR
 ) WITH (
     'connector' = 'kafka',   -- 可选 'kafka','kafka-0.11'. 注意选择对应的内置  Connector
     'topic' = 'oceanus_test1',  -- 替换为您要消费的 Topic
     'scan.startup.mode' = 'earliest-offset', -- 可以是 latest-offset / earliest-offset / specific-offsets / group-offsets 的任何一种
     'properties.bootstrap.servers' = 'IP地址:端口',  -- 替换为您的 Kafka 连接地址
     'properties.group.id' = 'testGroup',  -- 必选参数, 一定要指定 Group ID
     -- 定义数据格式 (JSON 格式)
     'format' = 'json',
     'json.ignore-parse-errors' = 'true',     -- 忽略 JSON 结构解析异常
     'json.fail-on-missing-field' = 'false'   -- 如果设置为 true, 则遇到缺失字段会报错 设置为 false 则缺失字段设置为 null
 );

(2) 创建sink

代码语言:txt
复制
 CREATE TABLE `DataOutput` (
       `request_time` VARCHAR,
       `client_ip` VARCHAR,
       `request_method` VARCHAR
 ) WITH (
     'connector' = 'kafka',   -- 可选 'kafka','kafka-0.11'. 注意选择对应的内置  Connector
     'topic' = 'oceanus_test2',  -- 替换为您要消费的 Topic
     'properties.bootstrap.servers' = 'IP地址:端口',  -- 替换为您的 Kafka 连接地址
     -- 定义数据格式 (JSON 格式)
     'format' = 'json',
     'json.ignore-parse-errors' = 'true',     -- 忽略 JSON 结构解析异常
     'json.fail-on-missing-field' = 'false'   -- 如果设置为 true, 则遇到缺失字段会报错 设置为 false 则缺失字段设置为 null
 );

(3) 业务逻辑

代码语言:txt
复制
 INSERT INTO DataOutput
 SELECT * FROM DataInput;

(4) 点击【作业参数】,点击【内置Connector】,选择“flink-connector-kafka”,然后点击【确认】保存。

内置Connector.png

注:具体实现请参考流计算Oceanus帮助文档

(https://cloud.tencent.com/document/product/849/48310)

下一篇
举报
领券