简介
MongoDB Kafka Connector 允许监控一个 Mongo 实例内的所有数据库(database)或单个数据库,也允许监控某个数据库内的所有集合(collection)或单个集合。将 Mongo 的修改信息生成修改事件消息,以消息流的方式提交给 kafka 的 topic。客户端应用可以通过消费对应 topic 中的消息来对数据库修改事件进行处理,从而达到监控特定数据库的目的。
事件格式
以下 JSON 框架展示了所有修改事件消息中可能出现的字段:
{_id : { <BSON Object> },"operationType" : "<operation>","fullDocument" : { <document> },"ns" : {"db" : "<database>","coll" : "<collection>"},"to" : {"db" : "<database>","coll" : "<collection>"},"documentKey" : { "_id" : <value> },"updateDescription" : {"updatedFields" : { <document> },"removedFields" : [ "<field>", ... ],"truncatedArrays" : [{ "field" : <field>, "newSize" : <integer> },...]},"clusterTime" : <Timestamp>,"txnNumber" : <NumberLong>,"lsid" : {"id" : <UUID>,"uid" : <BinData>}}
其中部分字段可能只在特定的事件类型中才会出现,下表对相应字段及其含义进行了描述。
Field | Type | Description |
_id | document | 一个用来唯一标识事件的 BSON 对象。 _id 对象的格式如下:{ "_data" : <BinData|hex string>} 。_data 的类型取决于 MongoDB 的版本 ,可通过 Resume Tokens 查看完整的_data类型介绍。 |
operationType | string | 触发修改事件的操作类型,具体包括以下 8 种:insertdeletereplaceupdatedroprenamedropDatabaseinvalidate |
fullDocument | document | 表示被新增( insert), 替换(replace), 删除(delete), 更新(update )操作所影响的文档。对于 insert 和 replace 操作,该字段表示新增的文档。对于 delete 操作,该字段缺省表示文档已经不存在。对于 update 操作,只有配置了 fullDocument 为 updateLookup 时才会显示。 |
ns | document | 命名空间(namespace),由 database 和 collection 构成。 |
ns.db | string | 数据库名称。 |
ns.coll | string | 集合名称。对于 dropDatabase 操作,该字段缺省。 |
to | document | 当操作类型为 rename 时,表示新的集合名称。该字段对其他操作是缺省的。 |
to.db | string | 新的数据库的名称。 |
to.coll | string | 新的集合名称。 |
documentKey | document | 操作修改的文档的 ID。 |
updateDescription | document | 一个用来描述被更新操作(update operation)修改的字段的文档。该字段仅当事件对应的操作为 update 时才有。 |
updateDescription.updatedFields | document | 包含被更新操作修改的字段,字段的 value 值为更新后的值。 |
updateDescription.removedFields | array | 包含被更新操作删除的字段。 |
updateDescription.truncatedArrays | array | |
updateDescription.truncatedArrays.field | string | 被删除的字段。 |
updateDescription.truncatedArrays.newSize | integer | truncated array 中的元素个数。 |
clusterTime | Timestamp | |
txnNumber | NumberLong | |
lsid | Document |
事件列表
新增事件(insert event)
{_id: { < Resume Token > },operationType: 'insert',clusterTime: <Timestamp>,ns: {db: 'engineering',coll: 'users'},documentKey: {userName: 'alice123',_id: ObjectId("599af247bb69cd8996xxxxxx")},fullDocument: {_id: ObjectId("599af247bb69cd8996xxxxxx"),userName: 'alice123',name: 'Alice'}}
其中 documentKey 字段同时包含了 _id 和 username 字段。表示 engineering.users 集合是分片的,shard key 为 username 和 _id。
更新事件(update event)
{_id: { < Resume Token > },operationType: 'update',clusterTime: <Timestamp>,ns: {db: 'engineering',coll: 'users'},documentKey: {_id: ObjectId("58a4eb4a30c75625e0xxxxxx")},updateDescription: {updatedFields: {email: 'alice@10gen.com'},removedFields: ['phoneNumber'],truncatedArrays: [ {"field" : "vacation_time","newSize" : 36} ]}}
以下例子展示了
update
event 配置了 fullDocument : updateLookup
选项的消息内容:{_id: { < Resume Token > },operationType: 'update',clusterTime: <Timestamp>,ns: {db: 'engineering',coll: 'users'},documentKey: {_id: ObjectId("58a4eb4a30c75625e0xxxxxx")},updateDescription: {updatedFields: {email: 'alice@10gen.com'},removedFields: ['phoneNumber'],truncatedArrays: [ {"field" : "vacation_time","newSize" : 36} ]},fullDocument: {_id: ObjectId("58a4eb4a30c75625e0xxxxxx"),name: 'Alice',userName: 'alice123',email: 'alice@10gen.com',team: 'replication'}}
替换事件(replace event)
{_id: { < Resume Token > },operationType: 'replace',clusterTime: <Timestamp>,ns: {db: 'engineering',coll: 'users'},documentKey: {_id: ObjectId("599af247bb69cd8996xxxxxx")},fullDocument: {_id: ObjectId("599af247bb69cd8996xxxxxx"),userName: 'alice123',name: 'Alice'}}
replace
操作是通过两步操作实现的:删除原
documentKey
对应的文档根据一样的
documentkey
插入新的文档基于
replace
事件的 fullDocument
字段表示的是插入后的新文档。删文档事件(delete event)
{_id: { < Resume Token > },operationType: 'delete',clusterTime: <Timestamp>,ns: {db: 'engineering',coll: 'users'},documentKey: {_id: ObjectId("599af247bb69cd8996xxxxxx")}}
对于删除文档事件的消息,
fullDocument
字段缺省。删集合事件(drop event)
{_id: { < Resume Token > },operationType: 'drop',clusterTime: <Timestamp>,ns: {db: 'engineering',coll: 'users'}}
当一个集合被删除时会触发该事件,同时会导致订阅了该集合的 connector 产生一个无效事件(invalidate event)。
改名事件(rename event)
{_id: { < Resume Token > },operationType: 'rename',clusterTime: <Timestamp>,ns: {db: 'engineering',coll: 'users'},to: {db: 'engineering',coll: 'people'}}
当一个集合名称被更改时会触发该事件,同时会导致订阅了该集合的 connector 产生一个无效事件(invalidate event)。
删库事件(drop database event)
{_id: { < Resume Token > },operationType: 'dropDatabase',clusterTime: <Timestamp>,ns: {db: 'engineering'}}
当一个数据库被删除时会触发该事件,同时会导致订阅了该集合的 connector 产生一个无效事件(invalidate event)。
在生成数据库删除事件(dropDatabase)之前,会为数据库中的每一个集合生成一个集合删除事件(drop event)。
无效事件(invalidate event)
{_id: { < Resume Token > },operationType: 'invalidate',clusterTime: <Timestamp>}
对于订阅了一个集合(collection)的 connector,drop event,rename event 或 dropDatabase event 这类会对该集合产生影响的事件都会产生一个无效事件。
对于订阅了一个数据库(database)的 connector,dropDatabase event 会产生一个无效事件。