前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >MongoDB 新功能介绍-Change Streams

MongoDB 新功能介绍-Change Streams

作者头像
MongoDB中文社区
发布2019-04-22 16:05:43
2.7K0
发布2019-04-22 16:05:43
举报
文章被收录于专栏:MongoDB中文社区MongoDB中文社区

MongoDB 3.6已经GA有一段时间,网络上对于该版本新特性的详细介绍文章比较少为此借机会对部分新特性做一个相对详细的介绍。基于早期MongoDB版本实现如跨平台数据同步、消息通知、ETL及oplog备份等服务时大多依赖于 Tailable Cursors 的方式。当然这样的实现一来相对复杂同时也存在着一些风险(如不同版本oplog兼容性及过滤特定操作类型等)。

Change streams(暂且叫变更流)的出现不仅为业务提供了实时获取数据库数据变化的简易接口,同时又避免了原来使用tail oplog 的复杂和风险性。下面我们来看看如何来正确使用 Change stream 。

使用条件限制

只用于 replica setssharded clusters ,单节点因为没有oplog故不支持。

复制协议必须是pv1 存储引擎必须是 WiredTiger

驱动实现接口

MongoDB Shell 接口说明

MongoDB 3.6 版本只实现了集合粒度的 change stream 具体方法如下:

db.collection.watch(pipeline, options)

该方法实际上是在集合collection上开启一个change stream的游标。

测试用例(mongo shell环境+副本集primary节点):

1.创建一个简单 Change Stream 游标并进行循环迭代

代码语言:javascript
复制
// 在test库的test集合上创建一个名为watchCursor 的change stream 游标
   watchCursor = db.getSiblingDB("test").test.watch();
// 对游标watchCursor进行循环迭代(其中当游标关闭或游标迭代没有文档时isExhausted()返回true)
 while (!watchCursor.isExhausted()){
   if (watchCursor.hasNext()){
      a=watchCursor.next();
      printjson(a);
   }
}
// 开启另一个会话在test库下的test集合执行update操作
db.test.update({x:100},{$set:{age:80}},{upsert:true});
输出结果及详细说明如下:
{
  "_id" : {  // 表示更新操作的token 值(映射至对应操作的oplog)
 "_data":BinData(0,"glsn32QAAAABRmRfaWQAZFsn3vA

7Q4yjQzA+1wBaEAQwkZh988FJS5yreqLRyy/wBA==")
        },
  "operationType" : "update", // 捕获的具体操作类型
      // 输出更新后整个文档的详细信息
     // 前提条件是在创建ChangeStream游标是指定了fullDocument : "updateLookup" 
"fullDocument" : { 
    "_id" : ObjectId("5b27e2453b438ca343304236"),
                "x" : 100,
                "age" : 80,
                "name" : "li"
        },
    "ns" : {
            "db" : "test",// 对应的库名
            "coll" : "test"// 对应的集合
      },
     "documentKey" : {
         // 操作对应记录的_id,如果是分片集合此处还会输出对应的分片key
         "_id" : ObjectId("5b27def03b438ca343303ed7") 
        },
        "updateDescription" : { // 描述了操作后记录影响的具体增量信息
         "updatedFields" : { // 增量操作(这里是update)所影响的字段
             "age" : 80 // 增量操作(这里是更新后)具体字段的值
         },
        "removedFields" : [ ] //该字段描述了update操作后被删除的字段信息
        }
}

2.创建一个只匹配 insert 操作类型的 Change Stream 游标

代码语言:javascript
复制
 watchCursor=db.getSiblingDB("test").test.watch(

   [
      { $match : {"operationType" : "insert" } }// 只匹配insert 操作的变更
   ]
);

游标创建后通过对游标进行迭代,只能获取test集合上insert操作类型的信息。其他支持的操作类型update、delete、replaceOne 及输出信息详细说明可参见:Change Events

3、ChangeStream 的”断线恢复”功能

ChangeStream还支持”断线恢复”功能即当游标因为意外情况关闭后可以通过之前的token信息进行恢复(前提条件是token对应的oplog没有被覆盖),具体使用如下:

代码语言:javascript
复制
var resumeToken={
// 该token 信息可是是之前任意有效操作的输出
"_data" :   BinData(0,"glsn32QAAAABRmRfaWQAZFsn3vA7Q
4yjQzA+1wBaEAQwkZh988FJS5yreqLRyy/wBA==")};
var resumedWatchCursor=db.getSiblingDB("test").test.watch( 
          [],    
          { resumeAfter : resumeToken } // 指定对应的token之后开始恢复游标 
          );

其他的使用场景,读者自行测试即可。

注意事项

1.尝试在单节点(非副本集节点)上创建ChangeStream游标会报如下错误:

代码语言:javascript
复制
command failed: {
        "ok" : 0,
      "errmsg" : "The $changeStream stage is only supported on replica sets",
        "code" : 40573,
        "codeName" : "Location40573"
}

2. ChangeStream 只发布持久化到大多数(majority-committed)节点的数据变化通知

3.要想在集合上创建ChangeStream游标用户必须对集合具有读权限

4.对于分片集合带有multi:true 的更新操作可能会导致发布孤立文档的变更消息

5.对于如创建索引的操作游标迭代时直接忽略该操作但是如果 dropDatabase 或对集合进行 rename、drop 操作则会触发游标退出并输出如下信息:

代码语言:javascript
复制
 {
      "_id" : {                         
          "_data":BinData(0,"glsn6TQAAAABFFoQBI7fTw7hk0LHgHqi0QTIvq0E")
        },
     "operationType" : "invalidate" // 表示无效或叫非法操作
}

6. 当 ChangeStream 游标因特定操作导致退出后,Mongo Shell 下不会自动恢复,而对于3.6版本系列的各语言驱动则会尝试一次自动恢复。

7. 当对应的 token 信息对应的 oplog 不存在然后尝试恢复ChangeStream 游标时不会报错但尝试对集合进行数据操作后会报如下错:

代码语言:javascript
复制
getMore command failed:{
    "operationTime" : Timestamp(1528994552, 1),
    "ok" : 0,
    "errmsg" : "resume of change stream was not possible, as the resume token was not found
     ….
}

MongoDB 4.0 的变化

因为4.0版本需要支持集群及库级别的ChangeStream 故会增加如下的pipeline 命令行语法:

代码语言:javascript
复制
// 集群粒度 对应MongoDB Shell  Mongo.watch()
 { 
  aggregate: 1
  pipeline: [{$changeStream: {allChangesForCluster: true, ...}}, ...],
  ...
}
// 库粒度 对应MongoDB Shell db.watch()
{
  aggregate: 1
  pipeline: [{$changeStream: {...}}, ...],
  ...
}

另外,4.0版本在游标恢复时增加了一个 startAtOperationTime(表示操作时间)参数该参数指定从哪个操作的时间点开始恢复游标,可以通过事件的输出clusterTime 字段获得(其实对应了oplog里的操作时间),值得注意的是该参数不能和resumeAfter同时使用。

再则,4.0版本为了支持多文档事务在事件输出文档中增加了另外两个参数txnNumberlsid 分别表示事务号及会话ID ,需要注意的是同一个会话内事务ID从0开始自增。

ChangeStream 的介绍都到此为止,因为时间和精力有限难免有些错误还请及时反馈,祝各位玩得开心。

参考链接:

https://docs.mongodb.com/manual/changeStreams/

https://docs.mongodb.com/manual/reference/method/db.collection.watch/#db.collection.watch

https://docs.mongodb.com/manual/administration/change-streams-production-recommendations/

https://docs.mongodb.com/manual/reference/change-events/

https://docs.mongodb.com/manual/reference/method/cursor.isExhausted/#cursor.isExhausted

https://github.com/mongodb/specifications/blob/master/source/change-streams/change-streams.rst

作者简介

李丹

MongoDB中文社区北京分会主席。

原奇虎360数据库技术专家, 现任罗辑思维首席DBA。近10年专职数据库从业经验,主要从事mysql、mongodb 自动化运维及私有云平台建设,专注于开源数据库mysql、mongodb等相关技术领域的学习与研究。

2017年奇虎360大学最佳讲师;

2018 GIAC全球互联网架构大会演讲嘉宾。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2018-07-09,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Mongoing中文社区 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
云数据库 MongoDB
腾讯云数据库 MongoDB(TencentDB for MongoDB)是腾讯云基于全球广受欢迎的 MongoDB 打造的高性能 NoSQL 数据库,100%完全兼容 MongoDB 协议,支持跨文档事务,提供稳定丰富的监控管理,弹性可扩展、自动容灾,适用于文档型数据库场景,您无需自建灾备体系及控制管理系统。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档