首页
学习
活动
专区
圈层
工具
发布
34 篇文章
1
腾讯云流计算 Oceanus 最佳实践&解决方案汇总
2
腾讯云流计算 Oceanus Connector 使用示例汇总
3
Flink 实践教程:进阶11-SQL 关联:Regular Join
4
Flink 实践教程:进阶10-自定义聚合函数(UDAF)
5
Flink 实践教程:进阶9-自定义表值函数(UDTF)
6
Flink 实践教程:进阶8-自定义标量函数(UDF)
7
Flink 实践教程:进阶7-基础运维
8
Flink 实践教程:进阶6-CEP 复杂事件处理
9
Flink 实践教程:进阶5-乱序调整
10
Flink 实践教程:进阶4-窗口 TOP N
11
Flink 实践教程:进阶3-窗口操作
12
Flink 实践教程:进阶2-复杂格式数据抽取
13
Flink 实践教程:进阶1-维表关联
14
Flink 实践教程:入门10-Python作业的使用
15
Flink 实践教程:入门9-Jar 作业开发
16
Flink 实践教程:入门8-简单 ETL 作业
17
Flink 实践教程:入门7-消费 Kafka 数据写入 PG
18
Flink 实践教程:入门6-读取 PG 数据写入 ClickHouse
19
Flink 实践教程:入门5-写入 ClickHouse
20
Flink 实践教程:入门4-读取 MySQL 数据写入 ES
21
Flink 实践教程:入门3-读取 MySQL 数据
22
Flink 实践教程:入门2-写入 Elasticsearch
23
Flink 实践教程:入门 1-零基础用户实现简单 Flink 任务
24
Oceanus 实践-从0到1接入 CKafka SQL 作业
25
Oceanus 实践-从0到1开发ClickHouse SQL作业
26
Oceanus 实践-从0到1开发PG SQL作业
27
基于腾讯云Oceanus实现MySQL和Hbase维表到数据仓库ClickHouse的实时分析
28
基于流计算 Oceanus 和 Elasticsearch Service 构建百亿级实时监控系统
29
Flink社区 | Flink CDC 2.0 正式发布,核心改进详解
30
用Python进行实时计算——PyFlink快速入门
31
实时数据湖:Flink CDC流式写入Hudi
32
专家带你吃透 Flink 架构:一个 新版 Connector 的实现
33
Apache Flink 1.12.2集成Hudi 0.9.0运行指南
34
Flink Metrics&REST API 介绍和原理解析

Flink 实践教程:入门 1-零基础用户实现简单 Flink 任务

Oceanus 简介

流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台。Oceanus 提供了便捷的控制台环境,方便用户编写 SQL 分析语句、ETL 作业或者上传运行自定义 JAR 包,支持作业运维管理。

本文将为您详细介绍如何使用 datagen 和 blackhole 连接器随机产生数据和存储数据,来实现一个最简单的 Flink 任务。

一、前置准备

创建 Oceanus 集群

活动购买链接 1 元购买 Oceanus 集群

进入 Oceanus 控制台,点击左侧【集群管理】,点击左上方【创建集群】,具体可参考 Oceanus 官方文档 创建独享集群

二、创建 Oceanus 作业

1. 创建 Source

代码语言:txt
复制
- 参见 https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/dev/table/connectors/datagen.html
CREATE TABLE random_source ( 
  user_id INT,
  item_id INT,
  behavior VARCHAR
  ) WITH ( 
  'connector' = 'datagen',
  'rows-per-second' = '1',              -- 每秒产生的数据条数
  'fields.user_id.kind' = 'sequence',   -- 有界序列(结束后自动停止输出)
  'fields.user_id.start' = '1',         -- 序列的起始值
  'fields.user_id.end' = '10000',       -- 序列的终止值
  'fields.item_id.kind' = 'random',     -- 无界的随机数
  'fields.item_id.min' = '1',           -- 随机数的最小值
  'fields.item_id.max' = '1000',        -- 随机数的最大值
  'fields.behavior.length' = '5'        -- 随机字符串的长度
);

2. 创建 Sink

代码语言:txt
复制
-- 参见 https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/connectors/table/blackhole/

CREATE TABLE blackhole_sink (
  user_id INT,
  item_id INT,
  behavior VARCHAR
) WITH ('connector' = 'blackhole');

3. 编写业务 SQL

代码语言:txt
复制
INSERT INTO blackhole_sink
(
    SELECT user_id,
    item_id,
    behavior
    FROM random_source
);

4. 发布运行

点击工具栏【语法检查】进行 SQL 语法检查,检查无误后点击【保存】>【发布草稿】运行作业。

三、总结

  • Datagen Connector 连接器是一款用于生成随机数据的 Connector,一般作为测试使用。
  • Sink 到 Blackhole 的数据会被丢弃,用户无法查询到其中的数据,此连接器一般用于于性能测试。

参考阅读

[1] data gen connector 参考链接:https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/connectors/table/datagen/

[2] blackhole connecotr 参考链接:https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/connectors/table/blackhole/

下一篇
举报
领券