MongoDB 新功能介绍-Change Streams

MongoDB 3.6已经GA有一段时间,网络上对于该版本新特性的详细介绍文章比较少为此借机会对部分新特性做一个相对详细的介绍。基于早期MongoDB版本实现如跨平台数据同步、消息通知、ETL及oplog备份等服务时大多依赖于 Tailable Cursors 的方式。当然这样的实现一来相对复杂同时也存在着一些风险(如不同版本oplog兼容性及过滤特定操作类型等)。

Change streams(暂且叫变更流)的出现不仅为业务提供了实时获取数据库数据变化的简易接口,同时又避免了原来使用tail oplog 的复杂和风险性。下面我们来看看如何来正确使用 Change stream 。

使用条件限制

只用于 replica setssharded clusters ,单节点因为没有oplog故不支持。

复制协议必须是pv1 存储引擎必须是 WiredTiger

驱动实现接口

MongoDB Shell 接口说明

MongoDB 3.6 版本只实现了集合粒度的 change stream 具体方法如下:

db.collection.watch(pipeline, options)

该方法实际上是在集合collection上开启一个change stream的游标。

测试用例(mongo shell环境+副本集primary节点):

1.创建一个简单 Change Stream 游标并进行循环迭代

// 在test库的test集合上创建一个名为watchCursor 的change stream 游标
   watchCursor = db.getSiblingDB("test").test.watch();
// 对游标watchCursor进行循环迭代(其中当游标关闭或游标迭代没有文档时isExhausted()返回true)
 while (!watchCursor.isExhausted()){
   if (watchCursor.hasNext()){
      a=watchCursor.next();
      printjson(a);
   }
}

// 开启另一个会话在test库下的test集合执行update操作

db.test.update({x:100},{$set:{age:80}},{upsert:true});

输出结果及详细说明如下:

{
  "_id" : {  // 表示更新操作的token 值(映射至对应操作的oplog)
 "_data":BinData(0,"glsn32QAAAABRmRfaWQAZFsn3vA

7Q4yjQzA+1wBaEAQwkZh988FJS5yreqLRyy/wBA==")
        },
  "operationType" : "update", // 捕获的具体操作类型
      // 输出更新后整个文档的详细信息
     // 前提条件是在创建ChangeStream游标是指定了fullDocument : "updateLookup" 
"fullDocument" : { 
    "_id" : ObjectId("5b27e2453b438ca343304236"),
                "x" : 100,
                "age" : 80,
                "name" : "li"
        },
    "ns" : {
            "db" : "test",// 对应的库名
            "coll" : "test"// 对应的集合
      },
     "documentKey" : {
         // 操作对应记录的_id,如果是分片集合此处还会输出对应的分片key
         "_id" : ObjectId("5b27def03b438ca343303ed7") 
        },
        "updateDescription" : { // 描述了操作后记录影响的具体增量信息
         "updatedFields" : { // 增量操作(这里是update)所影响的字段
             "age" : 80 // 增量操作(这里是更新后)具体字段的值
         },
        "removedFields" : [ ] //该字段描述了update操作后被删除的字段信息
        }
}

2.创建一个只匹配 insert 操作类型的 Change Stream 游标

 watchCursor=db.getSiblingDB("test").test.watch(

   [
      { $match : {"operationType" : "insert" } }// 只匹配insert 操作的变更
   ]
);

游标创建后通过对游标进行迭代,只能获取test集合上insert操作类型的信息。其他支持的操作类型update、delete、replaceOne 及输出信息详细说明可参见:Change Events

3、ChangeStream 的”断线恢复”功能

ChangeStream还支持”断线恢复”功能即当游标因为意外情况关闭后可以通过之前的token信息进行恢复(前提条件是token对应的oplog没有被覆盖),具体使用如下:

var resumeToken={
// 该token 信息可是是之前任意有效操作的输出
"_data" :   BinData(0,"glsn32QAAAABRmRfaWQAZFsn3vA7Q
4yjQzA+1wBaEAQwkZh988FJS5yreqLRyy/wBA==")};
var resumedWatchCursor=db.getSiblingDB("test").test.watch( 
          [],    
          { resumeAfter : resumeToken } // 指定对应的token之后开始恢复游标 
          );

其他的使用场景,读者自行测试即可。

注意事项

1.尝试在单节点(非副本集节点)上创建ChangeStream游标会报如下错误:

command failed: {

"ok" : 0,

"errmsg" : "The $changeStream stage is only supported on replica sets",

"code" : 40573,

"codeName" : "Location40573"

}

2. ChangeStream 只发布持久化到大多数(majority-committed)节点的数据变化通知

3.要想在集合上创建ChangeStream游标用户必须对集合具有读权限

4.对于分片集合带有multi:true 的更新操作可能会导致发布孤立文档的变更消息

5.对于如创建索引的操作游标迭代时直接忽略该操作但是如果 dropDatabase 或对集合进行 rename、drop 操作则会触发游标退出并输出如下信息:

{

"_id" : {

"_data":BinData(0,"glsn6TQAAAABFFoQBI7fTw7hk0LHgHqi0QTIvq0E")

},

"operationType" : "invalidate" // 表示无效或叫非法操作

}

6. 当 ChangeStream 游标因特定操作导致退出后,Mongo Shell 下不会自动恢复,而对于3.6版本系列的各语言驱动则会尝试一次自动恢复。

7. 当对应的 token 信息对应的 oplog 不存在然后尝试恢复ChangeStream 游标时不会报错但尝试对集合进行数据操作后会报如下错:

getMore command failed:{

"operationTime" : Timestamp(1528994552, 1),

"ok" : 0,

"errmsg" : "resume of change stream was not possible, as the resume token was not found

….

}

MongoDB 4.0 的变化

因为4.0版本需要支持集群及库级别的ChangeStream 故会增加如下的pipeline 命令行语法:

// 集群粒度 对应MongoDB Shell Mongo.watch()

{

aggregate: 1

pipeline: [{$changeStream: {allChangesForCluster: true, ...}}, ...],

...

}

// 库粒度 对应MongoDB Shell db.watch()

{

aggregate: 1

pipeline: [{$changeStream: {...}}, ...],

...

}

另外,4.0版本在游标恢复时增加了一个 startAtOperationTime(表示操作时间)参数该参数指定从哪个操作的时间点开始恢复游标,可以通过事件的输出clusterTime 字段获得(其实对应了oplog里的操作时间),值得注意的是该参数不能和resumeAfter同时使用。

再则,4.0版本为了支持多文档事务在事件输出文档中增加了另外两个参数txnNumberlsid 分别表示事务号及会话ID ,需要注意的是同一个会话内事务ID从0开始自增。

ChangeStream 的介绍都到此为止,因为时间和精力有限难免有些错误还请及时反馈,祝各位玩得开心。

参考链接:

https://docs.mongodb.com/manual/changeStreams/

https://docs.mongodb.com/manual/reference/method/db.collection.watch/#db.collection.watch

https://docs.mongodb.com/manual/administration/change-streams-production-recommendations/

https://docs.mongodb.com/manual/reference/change-events/

https://docs.mongodb.com/manual/reference/method/cursor.isExhausted/#cursor.isExhausted

https://github.com/mongodb/specifications/blob/master/source/change-streams/change-streams.rst

作者简介

李丹

MongoDB中文社区北京分会主席。

原奇虎360数据库技术专家, 现任罗辑思维首席DBA。近10年专职数据库从业经验,主要从事mysql、mongodb 自动化运维及私有云平台建设,专注于开源数据库mysql、mongodb等相关技术领域的学习与研究。

2017年奇虎360大学最佳讲师;

2018 GIAC全球互联网架构大会演讲嘉宾。

转载时,须注明作者、译者、出处和微信号。

原文发布于微信公众号 - Mongoing中文社区(mongoing-mongoing)

原文发表时间:2018-07-09

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏后端之路

Sql注入

关于防范Sql注入也是安全测试老生常谈的问题。 首先说一下,攻击者之所以可以利用自己输入的数据来达到攻击网站的目的,原因就在于SQL语言作为一种解释型语言,它的...

2268
来自专栏Linyb极客之路

面对海量请求,缓存设计还应该考虑哪些问题?

从第一个缓存框架 Memcached 诞生以来,缓存就广泛地存在于互联网应用中。如果你的应用流量很小,那么使用缓存可能并不需要做多余的考虑。但如果你的应用流量达...

934
来自专栏Python中文社区

Python爬虫:模拟登录知乎完全详解

專 欄 ❈ sunhaiyu,Python中文社区专栏作者 专栏地址: http://www.jianshu.com/u/4943cb2c6ea4 ❈ 这几天...

7249
来自专栏达摩兵的技术空间

vue项目实践003

通过本问将看到我在vue的项目中,进行的一系列的项目优化,然后看到不同的维度将这些点进行分类。

1212
来自专栏JAVA高级架构

深入理解Java虚拟机到底是什么

什么是Java虚拟机 作为一个Java程序员,我们每天都在写Java代码,我们写的代码都是在一个叫做Java虚拟机的东西上执行的。但是如果要问什么是虚拟机,恐怕...

3067
来自专栏Java架构

深入理解Java虚拟机到底是什么什么是Java虚拟机从进程的角度解释JVM

1785
来自专栏微服务

C#并发编程实例讲解-概述(01)

在工作中经常遇到需要并发编程的实例,一直没有时间来整理,现在空了下来,个人整理对并发一下理解。 关于并发编程的几个误解 误解一:并发编程就是多线程 实际上多线...

28610
来自专栏枕边书

解决问题,别扩展问题

最近有个需求需要统计一个方法的耗时,这个方法前后各打出一条日志,类似于 [INFO] 20180913 19:24:01.442 method start/en...

741
来自专栏更流畅、简洁的软件开发方式

使用IE6看老赵的博客——比较完美版(可以在线查看、回复)

  上一个版本主要是测试一下我的想法,也是熟悉一下jQuery,代码这个东东不动手写一下是很难弄明白的。   有想法,写代码,出现错误,修改错误 = 不断进步。...

2058
来自专栏坚毅的PHP

进程、线程、轻量级进程、协程和go中的Goroutine 那些事儿

电话面试被问到go的协程,曾经的军伟也问到过我协程。虽然用python时候在Eurasia和eventlet里了解过协程,但自己对协程的概念也就是轻量级线程,还...

4523

扫码关注云+社区

领取腾讯云代金券