腾讯云 Mongo 数据源与 Mongo 数据源配置方式相同,此处以 Mongo 数据源为例进行讲解。
支持版本
支持 MongoDB 3.6、4.x、5.x、6.x 版本。
使用限制
1. 增量同步时,where 条件语法:
create_time>= '${yyyy-MM-dd-1d HH:mm:ss}' and create_time < '${yyyy-MM-dd HH:mm:ss}'
Bound 目前支持整型和日期函数配置。日期函数配置使用方法:
// 转化为13位时间戳(毫秒)TimestampMillis('yyyy-MM-ddTHH:mm:00+0800')TimestampMillis('2023-07-10T00:00:00+0800')TimestampMillis('2023-07-10 00:00:00')TimestampMillis('2023-07-10')// 转化为10位时间戳(秒)TimestampSeconds('yyyy-MM-ddTHH:mm:00+0800')TimestampSeconds('2023-07-10T00:00:00+0800')TimestampSeconds('2023-07-10 00:00:00')TimestampSeconds('2023-07-10')
2. 支持使用 MongoDB 数据库对应账号进行连接,如果您使用的是云数据库 MongoDB 版,默认会有一个 admin 账号。出于安全策略的考虑,在添加使用MongoDB 数据源时,请避免使用 admin 作为访问账号。
3. 如果 MongoDB 为分片集群,则在配置数据源时,需要配置 mongos 地址,避免配置 mongod/shard 节点地址。否则同步任务在抽取 MongoDB 中数据时,可能会导致只查询到指定 shard 的数据,而非预期的全集。关于 mongos、mongod,详情请参见 mongos shards。
4. 在并发大于1的情况下,支持自定义切割键,选择作为自定义切割键的字段必须:1)已创建索引,否则并发设置不会生效 2)字段的所有值必须都是同一种数据类型,否则会出现数据同步条数不准确的情况。
说明:
并发大于1时,任务拆分会使用切割键字段进行划分,因而在此场景切割键字段不支持混合类型。如果被设置为切割键的字段有多种字段类型,会出现数据同步条数不准确的情况。这种情况下建议使用单并发的形式进行数据同步,且不配置 splitFactor 或 splitFactor 配置为1。
Mongo 离线单表读取节点配置


参数 | 说明 |
数据来源 | 可用的 Mongo 数据源。 |
库 | 支持选择、或者手动输入需读取的库名称 默认将数据源绑定的数据库作为默认库,其他数据库需手动输入库名称。 当数据源网络不联通导致无法直接拉取库信息时,可手动输入数据库名称。在数据集成网络连通的情况下,仍可进行数据同步。 |
集合 | 支持选择、或者手动输入需读取的集合。 |
分割符 | 当字段为数组类型时,使用分割符拼接数组内容 |
切割键(选填) | 指定用于数据分片的字段,指定后将启动并发任务进行数据同步。 注意: 若任务并发数设置大于1,则该项必填否则并发设置无效。 选择作为自定义切割键的字段必须:1.已创建索引,否则并发设置不会生效;2.字段的所有值必须都是同一种数据类型,否则会出现数据同步条数不准确的情况。 |
筛选条件(选填) | 根据数据类型填写对应筛选语句,该语句会作为将要同步数据的筛选条件。 |
高级设置(选填) | 可根据业务需求配置参数。 |
Mongo 离线单表写入节点配置


参数 | 说明 |
数据去向 | 选择当前项目中可用的 Mongodb 数据源。 |
库 | 支持选择、或者手动输入需读取的库名称 默认将数据源绑定的数据库作为默认库,其他数据库需手动输入库名称。 当数据源网络不联通导致无法直接拉取库信息时,可手动输入数据库名称。在数据集成网络连通的情况下,仍可进行数据同步。 |
集合 | 支持选择、或者手动输入需读取的集合。 |
写入模式 | 表示写入数据时,是否针对相同的主键覆盖写入。默认使用 _id 每行记录的业务主键 |
前置语句 | 选填,表示数据同步写入 MongoDB 前的前置操作,请确保前置语句符合 JSON 语句要求,如{"type": "remove"} |
高级设置(选填) | 可根据业务需求配置参数。 |
数据类型转换支持
读取
Mongo 读取支持的数据类型及转换对应关系如下(在处理 Mongo 时,会先将 Mongo 数据源的数据类型和数据处理引擎的数据类型做映射):
Mongo 数据类型 | 内部类型 |
INT32,INT64,BIGINT | LONG |
DOUBLE,DECIMAL | DOUBLE |
STRING, ARRAY,MAP,LIST | STRING |
DATE,DATETIME | DATE |
BOOLEAN | BOOLEAN |
BYTES | BYTES |
写入
Mongo 写入支持的数据类型及转换对应关系如下:
内部类型 | Mongo 数据类型 |
LONG | int, Long |
DOUBLE | DOUBLE,DECIMAL |
STRING | string, array |
DATE | DATE,DATETIME |
BOOLEAN | BOOLEAN |
BYTES | BYTES |
Mongo 脚本 Demo
如果您配置离线任务时,使用脚本模式的方式进行配置,您需要在任务脚本中,按照脚本的统一格式要求编写脚本中的 reader 参数和 writer 参数。
"job": {"content": [{"reader": {"parameter": {"userPassword": "******","address": [ //源数据源地址"ip:27017"],"query": "{\\"create_date\\":{\\"$gt\\":ISODate(\\"${yyyy-MM-dd-1d}T00:00:00+0800\\"),\\"$lt\\":ISODate(\\"${yyyy-MM-dd}T00:00:00+0800\\")}}", //筛选条件"dbName": "database", //源库"column": [{"name": "id","type": "string"},{"name": "name","type": "string"}],"authDb": "admin","userName": "mongouser","splitter": "-", //分割符,当字段为数组类型时,使用分割符拼接数组内容"collectionName": "source_collection" //源集合},"name": "mongodbreader"},"transformer": [],"writer": {"parameter": {"userPassword": "******","address": [ //支持多ip"ip:45400","ip:41748","ip:41748"],"dbName": "database", //目标库"column": [{"name": "id","type": "string"},{"name": "name","type": "string"}],"authDb": "admin","userName": "mongouser","collectionName": "sink_collections", //目标集合"preSql": { //前置sql"type": "remove"}},"name": "mongodbwriter"}}],"setting": {"errorLimit": { //脏数据阈值"record": 0},"speed": {"byte": -1, //不限制同步速度,正整数表示设置最大传输速度 byte/s"channel": 1 //并发数量}}}