有奖捉虫:行业应用 & 管理与支持文档专题 HOT

简介

MongoDB Kafka Connector 允许监控一个 Mongo 实例内的所有数据库(database)或单个数据库,也允许监控某个数据库内的所有集合(collection)或单个集合。将 Mongo 的修改信息生成修改事件消息,以消息流的方式提交给 kafka 的 topic。客户端应用可以通过消费对应 topic 中的消息来对数据库修改事件进行处理,从而达到监控特定数据库的目的。
本文档是对 Mongo 官方文档的归纳和整理,详情参见 MongoDB Change Events

事件格式

以下 JSON 框架展示了所有修改事件消息中可能出现的字段:
{
_id : { <BSON Object> },
"operationType" : "<operation>",
"fullDocument" : { <document> },
"ns" : {
"db" : "<database>",
"coll" : "<collection>"
},
"to" : {
"db" : "<database>",
"coll" : "<collection>"
},
"documentKey" : { "_id" : <value> },
"updateDescription" : {
"updatedFields" : { <document> },
"removedFields" : [ "<field>", ... ],
"truncatedArrays" : [
{ "field" : <field>, "newSize" : <integer> },
...
]
},
"clusterTime" : <Timestamp>,
"txnNumber" : <NumberLong>,
"lsid" : {
"id" : <UUID>,
"uid" : <BinData>
}
}

其中部分字段可能只在特定的事件类型中才会出现,下表对相应字段及其含义进行了描述。
Field
Type
Description
_id
document
一个用来唯一标识事件的 BSON 对象。 _id 对象的格式如下:{ "_data" : <BinData|hex string>} 。_data 的类型取决于 MongoDB 的版本 ,可通过 Resume Tokens 查看完整的_data类型介绍。
operationType
string
触发修改事件的操作类型,具体包括以下 8 种:insertdeletereplaceupdatedroprenamedropDatabaseinvalidate
fullDocument
document
表示被新增( insert), 替换(replace), 删除(delete), 更新(update )操作所影响的文档。对于 insert 和 replace 操作,该字段表示新增的文档。对于 delete 操作,该字段缺省表示文档已经不存在。对于 update 操作,只有配置了 fullDocument 为 updateLookup 时才会显示。
ns
document
命名空间(namespace),由 database 和 collection 构成。
ns.db
string
数据库名称。
ns.coll
string
集合名称。对于 dropDatabase 操作,该字段缺省。
to
document
当操作类型为 rename 时,表示新的集合名称。该字段对其他操作是缺省的。
to.db
string
新的数据库的名称。
to.coll
string
新的集合名称。
documentKey
document
操作修改的文档的 ID。
updateDescription
document
一个用来描述被更新操作(update operation)修改的字段的文档。该字段仅当事件对应的操作为 update 时才有。
updateDescription.updatedFields
document
包含被更新操作修改的字段,字段的 value 值为更新后的值。
updateDescription.removedFields
array
包含被更新操作删除的字段。
updateDescription.truncatedArrays
array
其中记录了使用以下一个或多个基于 pipeline 的更新执行的数组截断:$addFields$set$replaceRoot$replaceWith
updateDescription.truncatedArrays.field
string
被删除的字段。
updateDescription.truncatedArrays.newSize
integer
truncated array 中的元素个数。
clusterTime
Timestamp
oplog 与事件关联的时间戳。对于涉及 多文档事务, 关联的事件的 clusterTime 值是相同的。
txnNumber
NumberLong
事务 ID。仅当操作是 多文档事务 时出现。
lsid
Document
与事务关联的 session 的 ID,仅当操作是 多文档事务 时出现。

事件列表

新增事件(insert event)

{
_id: { < Resume Token > },
operationType: 'insert',
clusterTime: <Timestamp>,
ns: {
db: 'engineering',
coll: 'users'
},
documentKey: {
userName: 'alice123',
_id: ObjectId("599af247bb69cd8996xxxxxx")
},
fullDocument: {
_id: ObjectId("599af247bb69cd8996xxxxxx"),
userName: 'alice123',
name: 'Alice'
}
}
其中 documentKey 字段同时包含了 _id 和 username 字段。表示 engineering.users 集合是分片的,shard key 为 username 和 _id。

更新事件(update event)

{
_id: { < Resume Token > },
operationType: 'update',
clusterTime: <Timestamp>,
ns: {
db: 'engineering',
coll: 'users'
},
documentKey: {
_id: ObjectId("58a4eb4a30c75625e0xxxxxx")
},
updateDescription: {
updatedFields: {
email: 'alice@10gen.com'
},
removedFields: ['phoneNumber'],
truncatedArrays: [ {
"field" : "vacation_time",
"newSize" : 36
} ]
}
}
以下例子展示了 update event 配置了 fullDocument : updateLookup 选项的消息内容:
{
_id: { < Resume Token > },
operationType: 'update',
clusterTime: <Timestamp>,
ns: {
db: 'engineering',
coll: 'users'
},
documentKey: {
_id: ObjectId("58a4eb4a30c75625e0xxxxxx")
},
updateDescription: {
updatedFields: {
email: 'alice@10gen.com'
},
removedFields: ['phoneNumber'],
truncatedArrays: [ {
"field" : "vacation_time",
"newSize" : 36
} ]
},
fullDocument: {
_id: ObjectId("58a4eb4a30c75625e0xxxxxx"),
name: 'Alice',
userName: 'alice123',
email: 'alice@10gen.com',
team: 'replication'
}
}

替换事件(replace event)

{
_id: { < Resume Token > },
operationType: 'replace',
clusterTime: <Timestamp>,
ns: {
db: 'engineering',
coll: 'users'
},
documentKey: {
_id: ObjectId("599af247bb69cd8996xxxxxx")
},
fullDocument: {
_id: ObjectId("599af247bb69cd8996xxxxxx"),
userName: 'alice123',
name: 'Alice'
}
}
replace 操作是通过两步操作实现的:
删除原 documentKey 对应的文档
根据一样的 documentkey插入新的文档
基于 replace 事件的 fullDocument 字段表示的是插入后的新文档。

删文档事件(delete event)

{
_id: { < Resume Token > },
operationType: 'delete',
clusterTime: <Timestamp>,
ns: {
db: 'engineering',
coll: 'users'
},
documentKey: {
_id: ObjectId("599af247bb69cd8996xxxxxx")
}
}
对于删除文档事件的消息,fullDocument 字段缺省。

删集合事件(drop event)

{
_id: { < Resume Token > },
operationType: 'drop',
clusterTime: <Timestamp>,
ns: {
db: 'engineering',
coll: 'users'
}
}
当一个集合被删除时会触发该事件,同时会导致订阅了该集合的 connector 产生一个无效事件(invalidate event)。

改名事件(rename event)

{
_id: { < Resume Token > },
operationType: 'rename',
clusterTime: <Timestamp>,
ns: {
db: 'engineering',
coll: 'users'
},
to: {
db: 'engineering',
coll: 'people'
}
}
当一个集合名称被更改时会触发该事件,同时会导致订阅了该集合的 connector 产生一个无效事件(invalidate event)。

删库事件(drop database event)

{
_id: { < Resume Token > },
operationType: 'dropDatabase',
clusterTime: <Timestamp>,
ns: {
db: 'engineering'
}
}
当一个数据库被删除时会触发该事件,同时会导致订阅了该集合的 connector 产生一个无效事件(invalidate event)。
在生成数据库删除事件(dropDatabase)之前,会为数据库中的每一个集合生成一个集合删除事件(drop event)。

无效事件(invalidate event)

{
_id: { < Resume Token > },
operationType: 'invalidate',
clusterTime: <Timestamp>
}
对于订阅了一个集合(collection)的 connector,drop event,rename event 或 dropDatabase event 这类会对该集合产生影响的事件都会产生一个无效事件。
对于订阅了一个数据库(database)的 connector,dropDatabase event 会产生一个无效事件。