首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Java和Node.js实战 MongoDB 4.x 新特性:Change Streams 变化流

Java和Node.js实战 MongoDB 4.x 新特性:Change Streams 变化流

作者头像
MongoDB中文社区
发布2019-04-22 15:33:33
1.4K0
发布2019-04-22 15:33:33
举报
文章被收录于专栏:MongoDB中文社区MongoDB中文社区

监控数据库发生的变化是MongoDB同步数据服务的关键。我们不需要去定期轮训查询集合中的更改文档,我们就可以可以更轻松地过滤Change Streams 变化流,并立即采取处理错误。这是一种Reactive反应式编程风格,可以非常强大。如今,获取这些变更信息流非常简单。

*译者注:关于Reactive编程,Java提供了Reactive模型支持,阿里Java专家杜万老师,在阿里Java钉钉群中提供了讲座和资料。

先介绍点历史知识。在MongoDB 3.6之前,如果我们要监听MongoDB中正在发生的变化,必须“tail the oplog”,跟踪操作日志,这是一个用于复制记录变更的集合。 “tail the oplog”的过程往往最终会出现复杂的问题,不受支持的,脆弱的代码,而这些代码在生产中存在风险,难以控制,并不是我们想要的。这意味着人们会避免使用Reactive反应式编程风格。

变更流和集合

Change Streams

and Collections

这种问题情况在MongoDB 3.6 Change Streams新功能出现后开始发生变化。变更流使其变得简单并且支持监听集合中的数据变化,而不在需要跟踪Oplog。是不是非常简单方便?让我们看一下Java和Node.js示例中movieDetails集合中发生的一些变化。

Java实现 Change Streams的代码如下:

MongoClient mongoClient = new MongoClient( new MongoClientURI("mongodb://host1:port1,host2:port2..."));

// Select the MongoDB database and collection to open the change stream against

MongoDatabase db = mongoClient.getDatabase("myTargetDatabase");

MongoCollection<Document> collection = db.getCollection("myTargetCollection");

// Create $match pipeline stage.
List<Bson> pipeline = singletonList(Aggregates.match(Filters.or(
    Document.parse("{'fullDocument.username': 'alice'}"),
    Filters.in("operationType", asList("delete")))));

// Create the change stream cursor, passing the pipeline to the
// collection.watch() method

MongoCursor<Document> cursor = collection.watch(pipeline).iterator();

Node实现 Change Streams的代码如下:

const MongoClient = require("mongodb").MongoClient;

const uri = "MONGODBURL";

const client = new MongoClient(uri, { useNewUrlParser: true });
client.connect().then(db => {
 const changeStream = client.db("video").collection("movieDetails").watch();
 changeStream.on("change", next => {
   console.log(next);
 });
});

此Node代码连接到Mongodb数据库。然后,它选择数据库video和movieDetails集合,并使用watch()函数创建变化流。我们使用.on添加一个事件触发器(“change”,...然后代码将在变化流changeStream中获取changeStream事件,随后它将调用一个函数,执行处理代码。在这种情况下,它只是在文档更改时打印出Change Streams 变化流事件如果我运行此代码,然后使用MongoDB Compass查看movieDetail对象细节,下面是详细例子信息:

{ _id:
   { _data:
      '825C51D03F0000000129295A1004E515B4338C574BA2B9603CB1C7FB3B0446645F696400645C0EC4B74B052F9E2EF0C3810004' },
  operationType: 'replace',
  clusterTime:
   Timestamp { _bsontype: 'Timestamp', low_: 1, high_: 1548865599 },
  fullDocument:
   { _id: 5c0ec4b74b052f9e2ef0c381,
     title: 'PS I Love You',
     year: 2007,
     ...
     awards: { wins: 2, nominations: 4, text: '2 wins & 4 nominations.' },
     type: 'movie' },
  ns: { db: 'video', coll: 'movieDetails' },
  documentKey: { _id: 5c0ec4b74b052f9e2ef0c381 } 
}

我们可以在Change Events变化事件文档中阅读更多Change Events内容https://docs.mongodb.com/manual/reference/change-events/,但快速方法是可以在operationType字段中找到Change Events重要信息,即更改类型。当我们观察集合时,它可以具有插入,更新,替换,删除或无效(insert, update, replace, delete or invalidate)的值。前四种类型代表了他们的名字。我们在上面的文档中看到的是Compass通过支付替换集合中的文档进行编辑的结果。

无效的operationType在变化流中出现,其中正在监控的集合被删除或重命名,或者集合所在的数据库被删除。这是关闭change Stream变更流的信号。本文档的其余部分是有关变更内容的信息;哪个命名空间、文档结构、以及变化发生的时间。

顺便说一句,上面的示例中更改文档是在MongoDB 4.x数据库上测试的,在以前的版本_data上添加了一个字段。这是一个恢复标志字段,允许对其进行记录的应用程序使用它们在流中的该点重新开始执行未完成的任务。

深入集合Collection

MongoDB 3.6版本Change Streams变化流已经做的很好,可以跟踪集合中的数据变化。但是之前很多人被迫使用oplog来跟踪全局变化,想要对整个数据库中所有变化跟踪并处理,这种情况就比较痛苦。监控整库变化这个功能在MongoDB 4.0添加进来了。它可以在数据库或整个部署上创建Change Streams变化流的功能 - 高可用副本集或分片集群。 4.0不仅允许对集合执行watch()监控,还可以允许对数据库或整个部署集群执行watch()。例子代码如下:

const MongoClient = require("mongodb").MongoClient;

const uri ="MONGODBURL";

const client = new MongoClient(uri, { useNewUrlParser: true });
client.connect().then(db => {
 const changeStream = client.watch();
 changeStream.on("change", next => {
   console.log(next);
});

现在,只要任何数据库中的任何集合任何数据被修改,就会打印日志到控制台。 这些不是我们要获得的唯一变化事件,还可以监控更多事件。 由于Change Streams监控已经可以监控到最广泛的范围,现在我们将看到删除集合时的drop事件,删除数据库时的dropDatabase事件以及重命名集合时rename重命名事件,都会被监控到。

如果我们只对特定数据库中发生的事件感兴趣,可以打开数据库并对其执行watch()。 我们可以获得该数据库中collection集合的所有更新,以及删除和重命名事件。 但是不会得到dropDatabase事件; 如果我们的数据库被删除,那么当数据库已经删除时,返回的结果是invalidate ,表示无效操作。

扩展学习

有了MongoDB 4.0 Change Streams增强新特性,我们可以跟踪单个集合Colletion、数据库或部署集群的数据库和集合中的所有变化。有些变化我们不会明确看到信息;必须通过在集合中创建文档来推断新集合和数据库的创建过程。

当复制到另一个MongoDB时,这些都不是大问题,因为数据库和集合创建是在新文档生成时创建的,可以推测出来。复制集合的困难点在于,检查新集合是否影响以前的集合,还有就是我们监控不到创建索引和其他操作,这些操作不会反映在为更改文档的日志中,不能通过变更流监控。

MongoDB4.0 Change Streams增强新特性 意味着我们现在更容易监控MongoDB数据库和集群活动,该功能提供了一种全新的方式将MongoDB呈现给另一个系统 - 实时监控MongoDB数据的变化。建议大家自己动手实战一下MongoDB 4.0 Change Streams。可以参考官方文档:https://docs.mongodb.com/manual/changeStreams

译者简介

徐雷

  • MongoDB中文社区联席主席;
  • 《Mongodb实战》第2版 译者;
  • 阿里巴巴云大学讲师, 微软msdn特邀讲师、资深架构师、技术顾问。
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-02-26,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Mongoing中文社区 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
云数据库 MongoDB
腾讯云数据库 MongoDB(TencentDB for MongoDB)是腾讯云基于全球广受欢迎的 MongoDB 打造的高性能 NoSQL 数据库,100%完全兼容 MongoDB 协议,支持跨文档事务,提供稳定丰富的监控管理,弹性可扩展、自动容灾,适用于文档型数据库场景,您无需自建灾备体系及控制管理系统。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档