有奖捉虫:办公协同&微信生态&物联网文档专题 HOT

介绍

MongoDB 的 CDC 源表(即 MongoDB 的流式源表),Connector 会自动跟踪 MongoDB 副本集或分片集群,以获取数据库和集合中的文档更改。

版本说明

Flink 版本
说明
1.11
不支持
1.13
支持
1.14
支持
1.16
支持

使用范围

MongoDB CDC 只支持作为源表,MongoDB CDC 支持4.0、4.2、5.0版本,MongoDB 集群必须是副本集或者分片集群。

DDL 定义

-- register a MongoDB table 'products' in Flink SQL
CREATE TABLE mongo_cdc_source_table (
_id STRING, // must be declared
name STRING,
weight DECIMAL(10,3),
tags ARRAY<STRING>, -- array
price ROW<amount DECIMAL(10,2), currency STRING>, -- embedded document
suppliers ARRAY<ROW<name STRING, address STRING>>, -- embedded documents
PRIMARY KEY(_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb-cdc',
'hosts' = 'localhost:27017,localhost:27018,localhost:27019',
'username' = 'flinkuser',
'password' = 'flinkpw',
'database' = 'inventory',
'collection' = 'products'
);

WITH 参数

参数
说明
是否必填
备注
connector
源表类型
固定值为 mongodb-cdc
hosts
MongoDB 数据库的 IP 端口对
-
username
MongoDB 数据库服务的用户名
-
password
MongoDB 数据库服务的密码
-
database
MongoDB 数据库名称
-
collection
MongoDB Collection 名称
-
connection.options
MongoDB 的 连接选项。有多个时,使用&连接,例如 relicaSet=test&connectTimeoutMS=300000
-
errors.tolerance
是否忽略错误记录,接受 none 或者 all。如果设置为 all, 忽略错误记录
none
errors.log.enable
是否需要把错误操作打印到日志文件
默认值为 true
copy.existing
是否复制库中原有的数据,如果在复制期间对数据有更改,会在数据复制完成后应用更改
默认值为 true
copy.existing.pipeline
当复制原有数据的时候,可以通过这个参数设置筛选条件。例如[{"$match": {"closed": "false"}}],只会复制 closed 为 false 的 记录。用法参考 $match (aggregation)
-
copy.existing.max.threads
执行数据复制时要使用的线程数
默认值为 Processors Count
copy.existing.queue.size
复制数据时要使用的队列的最大大小
默认值为16000
poll.max.batch.size
每次拉取数据的最大数量。默认情况下,1.5秒的拉取间隔下,最多拉取1000条变更数据
默认值为1000
poll.await.time.ms
拉取数据的时间间隔。默认情况下,1.5秒的拉取间隔下,最多拉取1000条变更数据
默认值为1500
heartbeat.interval.ms
发送心跳消息时间间隔,以毫秒为单位。使用0禁用
默认值为0
说明
Note:当数据流变化慢的时候,建议把 heartbeat.interval.ms 设置为一个合适的值,心跳会推送 resumeToken,防止当 Flink job 从 checkpoint 或者 savepoint 恢复的时候,resumeToken 已经过期。

类型映射

MongoDB 字段类型
Flink 字段类型
-
TINYINT
-
SMALLINT
Int
INT
Long
BIGINT
-
FLOAT
Double
DOUBLE
Decimal128
DECIMAL(p, s)
Boolean
BOOLEAN
DateTimestamp
DATE
DateTimestamp
TIME
Date
TIMESTAMP(3)TIMESTAMP_LTZ(3)
Timestamp
TIMESTAMP(0)TIMESTAMP_LTZ(0)
String
ObjectId
UUID
Symbol
MD5
JavaScript
Regex
STRING
BinData
BYTES
Object
ROW
Array
ARRAY
DBPointer
ROW<$ref STRING, $id STRING>
Point : ROW<type STRING, coordinates ARRAY<DOUBLE>>
Line : ROW<type STRING, coordinates ARRAY<ARRAY< DOUBLE>>>
...

代码示例

CREATE TABLE mongo_cdc_source_table (
_id STRING, // must be declared
name STRING,
weight DECIMAL(10,3),
tags ARRAY<STRING>, -- array
price ROW<amount DECIMAL(10,2), currency STRING>, -- embedded document
suppliers ARRAY<ROW<name STRING, address STRING>>, -- embedded documents
PRIMARY KEY(_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb-cdc',
'hosts' = 'localhost:27017,localhost:27018,localhost:27019',
'username' = 'flinkuser',
'password' = 'flinkpw',
'database' = 'inventory',
'collection' = 'products'
);
CREATE TABLE `print_table` (
`id` STRING,
`name` STRING,
`currency` STRING
) WITH (
'connector' = 'print'
);
insert into print_table select _id, name, price.currency from mongo_cdc_source_table;

注意事项

用户权限

MongoDB 的 User 必须有 changeStream 和 read 权限。
use admin;
db.createUser(
{
user: "flinkuser",
pwd: "flinkpw",
roles: [
{ role: "read", db: "admin" },
{ role: "readAnyDatabase", db: "admin" }
]
}
);

并行度

任务的并行度只支持为1。