数据库 MongoDB CDC

最近更新时间:2025-06-09 11:02:02

我的收藏

介绍

MongoDB 的 CDC 源表(即 MongoDB 的流式源表),Connector 会自动跟踪 MongoDB 副本集或分片集群,以获取数据库和集合中的文档更改。

版本说明

Flink 版本
说明
1.11
不支持
1.13
支持
内置的 debezium 版本是1.5.4. Final
1.14
支持
内置的 debezium 版本是1.5.4. Final
1.16
支持
内置的 debezium 版本是1.5.4. Final

使用范围

MongoDB CDC 只支持作为源表,MongoDB CDC 支持4.0、4.2、5.0版本,MongoDB 集群必须是副本集或者分片集群。

DDL 定义

-- register a MongoDB table 'products' in Flink SQL
CREATE TABLE mongo_cdc_source_table (
_id STRING, // must be declared
name STRING,
weight DECIMAL(10,3),
tags ARRAY<STRING>, -- array
price ROW<amount DECIMAL(10,2), currency STRING>, -- embedded document
suppliers ARRAY<ROW<name STRING, address STRING>>, -- embedded documents
PRIMARY KEY(_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb-cdc',
'hosts' = 'localhost:27017,localhost:27018,localhost:27019',
'username' = 'flinkuser',
'password' = 'flinkpw',
'database' = 'inventory',
'collection' = 'products'
);

WITH 参数

参数
说明
是否必填
备注
connector
源表类型
固定值为 mongodb-cdc
hosts
MongoDB 数据库的 IP 端口对
-
username
MongoDB 数据库服务的用户名
-
password
MongoDB 数据库服务的密码
-
database
MongoDB 数据库名称
正则表达式下要求MongoDB 4.0版本及以上
collection
MongoDB Collection 名称
正则表达式下要求 MongoDB 4.0版本及以上,且表达式中必须包含数据库名称,例如要包含default库中所有"test_"开头的表,需要配置为"default.test_.*"。
filter-duplicate-pair-records
过滤未在 Flink DDL 语句中定义的源表字段变更记录
例如 MongoDB 源表有 a, b, c, d 四个字段,而用户在 Flink SQL 建表时只定义了 a, b 两个字段;开启该参数后,仅涉及 c 或 d 字段的变更记录会被忽略,不会输出到下游,可减少计算量和处理压力
connection.options
MongoDB 的 连接选项。有多个时,使用&连接,例如 replicaSet=test&connectTimeoutMS=300000
-
errors.tolerance
是否忽略错误记录,接受 none 或者 all。如果设置为 all, 忽略错误记录
none
errors.log.enable
是否需要把错误操作打印到日志文件
默认值为 true
copy.existing
是否复制库中原有的数据,如果在复制期间对数据有更改,会在数据复制完成后应用更改
默认值为 true
copy.existing.pipeline
当复制原有数据的时候,可以通过这个参数设置筛选条件。例如[{"$match": {"closed": "false"}}],只会复制 closed 为 false 的 记录。用法参考 $match (aggregation)
-
copy.existing.max.threads
执行数据复制时要使用的线程数
默认值为 Processors Count
copy.existing.queue.size
复制数据时要使用的队列的最大大小
默认值为16000
poll.max.batch.size
每次拉取数据的最大数量。默认情况下,1.5秒的拉取间隔下,最多拉取1000条变更数据
默认值为1000
poll.await.time.ms
拉取数据的时间间隔。默认情况下,1.5秒的拉取间隔下,最多拉取1000条变更数据
默认值为1500
heartbeat.interval.ms
发送心跳消息时间间隔,以毫秒为单位。使用0禁用
默认值为0
说明
Note:当数据流变化慢的时候,建议把 heartbeat.interval.ms 设置为一个合适的值,心跳会推送 resumeToken,防止当 Flink job 从 checkpoint 或者 savepoint 恢复的时候,resumeToken 已经过期。

类型映射

MongoDB 字段类型
Flink 字段类型
-
TINYINT
-
SMALLINT
Int
INT
Long
BIGINT
-
FLOAT
Double
DOUBLE
Decimal128
DECIMAL(p, s)
Boolean
BOOLEAN
DateTimestamp
DATE
DateTimestamp
TIME
Date
TIMESTAMP(3)TIMESTAMP_LTZ(3)
Timestamp
TIMESTAMP(0)TIMESTAMP_LTZ(0)
String
ObjectId
UUID
Symbol
MD5
JavaScript
Regex
STRING
BinData
BYTES
Object
ROW
Array
ARRAY
DBPointer
ROW<$ref STRING, $id STRING>
Point : ROW<type STRING, coordinates ARRAY<DOUBLE>>
Line : ROW<type STRING, coordinates ARRAY<ARRAY< DOUBLE>>>
...

代码示例

CREATE TABLE mongo_cdc_source_table (
_id STRING, // must be declared
name STRING,
weight DECIMAL(10,3),
tags ARRAY<STRING>, -- array
price ROW<amount DECIMAL(10,2), currency STRING>, -- embedded document
suppliers ARRAY<ROW<name STRING, address STRING>>, -- embedded documents
PRIMARY KEY(_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb-cdc',
'hosts' = 'localhost:27017,localhost:27018,localhost:27019',
'username' = 'flinkuser',
'password' = 'flinkpw',
'database' = 'inventory',
'collection' = 'products'
);
CREATE TABLE `print_table` (
`id` STRING,
`name` STRING,
`currency` STRING
) WITH (
'connector' = 'print'
);
insert into print_table select _id, name, price.currency from mongo_cdc_source_table;

注意事项

用户权限

MongoDB 的 User 必须有 changeStream 和 read 权限。
use admin;
db.createUser(
{
user: "flinkuser",
pwd: "flinkpw",
roles: [
{ role: "read", db: "admin" },
{ role: "readAnyDatabase", db: "admin" }
]
}
);

并行度

任务的并行度只支持为1。