首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink 实践教程:入门(11):MongoDB Sink

Flink 实践教程:入门(11):MongoDB Sink

原创
作者头像
于乐
修改2022-05-20 16:08:02
1.2K0
修改2022-05-20 16:08:02
举报

流计算 Oceanus 简介

流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。

本文将您详细介绍如何将数据写入 MongoDB。

视频内容

前置准备

创建流计算 Oceanus 集群

进入 Oceanus 控制台,点击左侧【集群管理】,点击左上方【创建集群】,具体可参考 Oceanus 官方文档 创建独享集群

创建 MongoDB 实例

进入 MongoDB 控制台,点击左上角【新建实例】创建实例,具体参考 创建 MongoDB 实例。作者这里使用 shell 的方式,下载 MongoDB 客户端的方式连接数据库,更多连接信息请参考 连接 MongoDB 实例

## 安装 MongoDB 客户端
wget https://fastdl.mongodb.org/linux/mongodb-linux-x86_64-rhel70-XX.XX.XX.tgz
## 解压
tar zxvf mongodb-linux-x86_64-rhel70-XX.XX.XX.tgz
## 进入目录
cd mongodb-linux-x86_64-rhel70-XX.XX.XX
## 连接客户端
./bin/mongo -umongouser -p***** 172.xx.xx.xx:27017/admin

请注意选择与云数据库 MongoDB 服务并与 CVM 操作系统相匹配的版本

流计算 Oceanus 作业

1. 创建 Source
CREATE TABLE datagen_source_table ( 
    id INT, 
    name STRING 
) WITH ( 
    'connector' = 'datagen',
    'rows-per-second'='1'  -- 每秒产生的数据条数
);
2. 创建 Sink
CREATE TABLE mongodb (
    id INT,
    name STRING
) WITH (
  'connector' = 'mongodb',   -- 固定值 'mongodb'
  'database' = 'testdb',     --数据库名
  'collection' = 'test1',    --数据集合
  'uri' = 'mongodb://mongouser:******@xx.xx.xx.xx:27017/admin', -- MongoDB连接串
  'batchSize' = '5'          -- 每次批量写入的条数
);
3. 编写业务 SQL
INSERT INTO mongodb
SELECT * from datagen_source_table;

总结

本实例演示如何使用 Datagen 生成随机数据,然后使用 MongoDB Sink 连接器将数据写入 MongoDB。

目前仅 Flink 1.13 支持 Sink 端写入,其他版本暂不支持。undefinedMongoDB Sink 暂不支持 Upsert。undefinedMongoDB 的 User 必须拥有 database 的写权限。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 流计算 Oceanus 简介
  • 前置准备
    • 创建流计算 Oceanus 集群
      • 创建 MongoDB 实例
        • 1. 创建 Source
        • 2. 创建 Sink
        • 3. 编写业务 SQL
    • 流计算 Oceanus 作业
    • 总结
    相关产品与服务
    流计算 Oceanus
    流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的企业级实时大数据分析平台,具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档