数据库 MongoDB

最近更新时间:2025-04-15 14:16:12

我的收藏

介绍

Flink connector mongodb 目前支持通过 Flink 将数据批量写入到 mongodb 中,目前支持 append 流和 upsert 流。

版本说明

Flink 版本
说明
1.11
不支持
1.13
支持
1.14
不支持
1.16
支持

DDL定义

CREATE TABLE mongodb (
user_id STRING,
item_id INT,
category_id INT,
behavior VARCHAR
) WITH (
'connector' = 'mongodb', -- 固定值 'mongodb'
'database' = 'test', --数据库名
'collection' = 'table1',--数据集合
'uri' = 'mongodb://$username:$password@$IP:$PORT,$IP:$PORT,$IP:$PORT/test?authSource=admin', -- MongoDB连接串
'sink.buffer-flush.max-rows' = '1024', -- 每次批量写入的条数
'sink.buffer-flush.interval' = '1s' -- 每次批量写入的间隔
);

使用范围

Flink connector mongodb 目前仅支持 mongodb sink。支持将腾讯云数据库 MongoDB 作为结果表使用。

WITH参数

参数
说明
是否必填
备注
connector
结果表类型
固定值 mongodb
database
数据库名称
-
collection
数据集合
-
uri
MongoDB 连接串
-
sink.buffer-flush.max-rows
每次批量写入的条数
默认1000
maxConnectionIdleTime
连接超时时长
默认值为60000,单位为毫秒
sink.buffer-flush.interval
每次批量写入的间隔
默认值为1,单位为秒
sink.max-retries
写入失败的最大重试次数
默认为3
sink.retry.interval
写入失败的重试间隔
默认值为1000,单位为ms

代码示例

CREATE TABLE random_source (
user_id STRING,
item_id INT,
category_id INT,
behavior VARCHAR
) WITH (
'connector' = 'datagen',
'rows-per-second' = '100', -- 每秒产生的数据条数
'fields.user_id.kind' = 'sequence', -- 有界序列(结束后自动停止输出)
'fields.user_id.start' = '1', -- 序列的起始值
'fields.user_id.end' = '10000', -- 序列的终止值
'fields.item_id.kind' = 'random', -- 无界的随机数
'fields.item_id.min' = '1', -- 随机数的最小值
'fields.item_id.max' = '1000', -- 随机数的最大值
'fields.category_id.kind' = 'random', -- 无界的随机数
'fields.category_id.min' = '1', -- 随机数的最小值
'fields.category_id.max' = '1000', -- 随机数的最大值
'fields.behavior.length' = '5' -- 随机字符串的长度
);

CREATE TABLE mongodb (
user_id STRING,
item_id INT,
category_id INT,
behavior VARCHAR
) WITH (
'connector' = 'mongodb', -- 固定值 'mongodb'
'database' = 'test', --数据库名
'collection' = 'table1',--数据集合
'uri' = 'mongodb://$username:$password@$IP:$PORT,$IP:$PORT,$IP:$PORT/test?authSource=admin', -- MongoDB连接串
'sink.buffer-flush.max-rows' = '1024', -- 每次批量写入的条数
'sink.buffer-flush.interval' = '1s' -- 每次批量写入的间隔
);

insert into mongodb select * from random_source;

Upsert

MongoDB sink 支持 upsert,但是需要在创建 table 时指定 primary key。分为以下两种情况:
1. 只指定 _id 作为 key。
CREATE TABLE mongodb (
_id STRING,
item_id INT,
category_id INT,
behavior VARCHAR,
PRIMARY KEY (`_id`) NOT ENFORCED
) WITH (
...
);
2. 指定除 _id 以外的其他自定义字段作为 key。注意:此时会将自定义的字段转成 _id。
举例:字段 a 和字段 b 共同作为主键,那么 {a = 1, b = '2',c=3} 同步到 mongodb 端会变成{_id : {a:1, b:'2'}, a: 1, b: '2', c: 3}。
CREATE TABLE mongodb (
user_id STRING,
item_id INT,
category_id INT,
behavior VARCHAR,
PRIMARY KEY (`user_id`, `item_id`) NOT ENFORCED
) WITH (
...
);

注意事项

用户权限

MongoDB 的 User 必须拥有 database 的写权限。