介绍
Flink connector mongodb 目前支持通过 Flink 将数据批量写入到 mongodb 中,目前支持 append 流和 upsert 流。
版本说明
Flink 版本 | 说明 |
1.11 | 不支持 |
1.13 | 支持 |
1.14 | 不支持 |
1.16 | 支持 |
DDL定义
CREATE TABLE mongodb (user_id STRING,item_id INT,category_id INT,behavior VARCHAR) WITH ('connector' = 'mongodb', -- 固定值 'mongodb''database' = 'test', --数据库名'collection' = 'table1',--数据集合'uri' = 'mongodb://$username:$password@$IP:$PORT,$IP:$PORT,$IP:$PORT/test?authSource=admin', -- MongoDB连接串'sink.buffer-flush.max-rows' = '1024', -- 每次批量写入的条数'sink.buffer-flush.interval' = '1s' -- 每次批量写入的间隔);
使用范围
Flink connector mongodb 目前仅支持 mongodb sink。支持将腾讯云数据库 MongoDB 作为结果表使用。
WITH参数
参数 | 说明 | 是否必填 | 备注 |
connector | 结果表类型 | 是 | 固定值 mongodb |
database | 数据库名称 | 是 | - |
collection | 数据集合 | 是 | - |
uri | MongoDB 连接串 | 是 | - |
sink.buffer-flush.max-rows | 每次批量写入的条数 | 否 | 默认1000 |
maxConnectionIdleTime | 连接超时时长 | 否 | 默认值为60000,单位为毫秒 |
sink.buffer-flush.interval | 每次批量写入的间隔 | 否 | 默认值为1,单位为秒 |
sink.max-retries | 写入失败的最大重试次数 | 否 | 默认为3 |
sink.retry.interval | 写入失败的重试间隔 | 否 | 默认值为1000,单位为ms |
代码示例
CREATE TABLE random_source (user_id STRING,item_id INT,category_id INT,behavior VARCHAR) WITH ('connector' = 'datagen','rows-per-second' = '100', -- 每秒产生的数据条数'fields.user_id.kind' = 'sequence', -- 有界序列(结束后自动停止输出)'fields.user_id.start' = '1', -- 序列的起始值'fields.user_id.end' = '10000', -- 序列的终止值'fields.item_id.kind' = 'random', -- 无界的随机数'fields.item_id.min' = '1', -- 随机数的最小值'fields.item_id.max' = '1000', -- 随机数的最大值'fields.category_id.kind' = 'random', -- 无界的随机数'fields.category_id.min' = '1', -- 随机数的最小值'fields.category_id.max' = '1000', -- 随机数的最大值'fields.behavior.length' = '5' -- 随机字符串的长度);CREATE TABLE mongodb (user_id STRING,item_id INT,category_id INT,behavior VARCHAR) WITH ('connector' = 'mongodb', -- 固定值 'mongodb''database' = 'test', --数据库名'collection' = 'table1',--数据集合'uri' = 'mongodb://$username:$password@$IP:$PORT,$IP:$PORT,$IP:$PORT/test?authSource=admin', -- MongoDB连接串'sink.buffer-flush.max-rows' = '1024', -- 每次批量写入的条数'sink.buffer-flush.interval' = '1s' -- 每次批量写入的间隔);insert into mongodb select * from random_source;
Upsert
MongoDB sink 支持 upsert,但是需要在创建 table 时指定 primary key。分为以下两种情况:
1. 只指定 _id 作为 key。
CREATE TABLE mongodb (_id STRING,item_id INT,category_id INT,behavior VARCHAR,PRIMARY KEY (`_id`) NOT ENFORCED) WITH (...);
2. 指定除 _id 以外的其他自定义字段作为 key。注意:此时会将自定义的字段转成 _id。
举例:字段 a 和字段 b 共同作为主键,那么 {a = 1, b = '2',c=3} 同步到 mongodb 端会变成{_id : {a:1, b:'2'}, a: 1, b: '2', c: 3}。
CREATE TABLE mongodb (user_id STRING,item_id INT,category_id INT,behavior VARCHAR,PRIMARY KEY (`user_id`, `item_id`) NOT ENFORCED) WITH (...);
注意事项
用户权限
MongoDB 的 User 必须拥有 database 的写权限。