Oceanus 简介
流计算 Oceanus 是大数据生态体系的实时化分析利器。只需几分钟,您就可以轻松构建网站点击流分析、电商精准推荐、物联网 IoT 等应用。流计算基于 Apache Flink 构建,提供全托管的云上服务,您无须关注基础设施的运维,并能便捷对接云上数据源,获得完善的配套支持。
流计算 Oceanus 提供了便捷的控制台环境,方便用户编写 SQL 分析语句或者上传运行自定义 JAR 包,支持作业运维管理。基于 Flink 技术,流计算可以在 PB 级数据集上支持亚秒级的处理延时。
目前 Oceanus 使用的是独享集群模式,用户可以在自己的集群中运行各类作业,并进行相关资源管理。本文将为您详细介绍如何使用 Oceanus 对接对象存储(Cloud Object Storage,COS)。
准备工作
创建 Oceanus 集群
创建 COS 存储桶
1. 登录 COS 控制台。
2. 在左侧导航栏中,单击存储桶列表。
3. 单击创建存储桶,创建一个存储桶。具体可参见 创建存储桶 文档。
说明
当写入 COS 时,Oceanus 作业所运行的地域必须和 COS 在同一个地域。
实践步骤
1. 创建 Source
CREATE TABLE `random_source` (f_sequence INT,f_random INT,f_random_str VARCHAR) WITH ('connector' = 'datagen','rows-per-second'='10', -- 每秒产生的数据条数'fields.f_sequence.kind'='random', -- 随机数'fields.f_sequence.min'='1', -- 随机数的最小值'fields.f_sequence.max'='10', -- 随机数的最大值'fields.f_random.kind'='random', -- 随机数'fields.f_random.min'='1', -- 随机数的最小值'fields.f_random.max'='100', -- 随机数的最大值'fields.f_random_str.length'='10' -- 随机字符串的长度);
说明
2. 创建 Sink
-- 请将<存储桶名称>和<文件夹名称>替换成您实际的存储桶名称和文件夹名称CREATE TABLE `cos_sink` (f_sequence INT,f_random INT,f_random_str VARCHAR) PARTITIONED BY (f_sequence) WITH ('connector' = 'filesystem','path'='cosn://<存储桶名称>/<文件夹名称>/', --- 数据写入的目录路径'format' = 'json', --- 数据写入的格式'sink.rolling-policy.file-size' = '128MB', --- 文件最大大小'sink.rolling-policy.rollover-interval' = '30 min', --- 文件最大写入时间'sink.partition-commit.delay' = '1 s', --- 分区提交延迟'sink.partition-commit.policy.kind' = 'success-file' --- 分区提交方式);
说明
3. 业务逻辑
INSERT INTO `cos_sink`SELECT * FROM `random_source`;
注意
此处只做展示,无实际业务目的。
4. 作业参数设置
在内置 Connector选择
flink-connector-cos
,在高级参数中对 COS 的地址进行如下配置:fs.AbstractFileSystem.cosn.impl: org.apache.hadoop.fs.CosNfs.cosn.impl: org.apache.hadoop.fs.CosFileSystemfs.cosn.credentials.provider: org.apache.flink.fs.cos.OceanusCOSCredentialsProviderfs.cosn.bucket.region: <COS 所在地域>fs.cosn.userinfo.appid: <COS 所属用户的 appid>
作业配置说明如下:
请将
<COS 所在地域>
替换为您实际的 COS 地域,例如:ap-guangzhou。请将
<COS 所属用户的 appid>
替换为您实际的 APPID,具体请进入 账号中心 查看。说明
5. 启动作业
依次单击保存 > 语法检查 > 发布草稿,等待 SQL 作业启动后,即可前往相应 COS 目录中查看写入数据。