实时同步MongoDB Oplog开发指南

Capped Collections

MongoDB有一种特殊的Collection叫Capped collections,它的插入速度非常快,基本和磁盘的写入速度差不多,并且支持按照插入顺序高效的查询操作。Capped collections的大小是固定的,它的工作方式很像环形缓冲器(circular buffers), 当剩余空间不足时,会覆盖最先插入的数据。

Capped collections的特点是高效插入和检索,所以最好不要在Capped collections上添加额外的索引,否则会影响插入速度。Capped collections可以用于以下场景:

  • 存储日志: Capped collections的first-in-first-out特性刚好满足日志事件的存储顺序;
  • 缓存小量数据:因为缓存的特点是读多写少,所以可以适当使用索引提高读取速度。

Capped collections的使用限制:

  • 如果更新数据,你需要为之创建索引以防止collection scan;
  • 更新数据时,文档的大小不能改变。比如说name属性为'abc',则只能修改成3个字符的字符串,否则操作将会失败;
  • 数据不允许删除,如果非删除不可,只能drop collection
  • 不支持sharding
  • 默认只支持按自然顺序(即插入顺序)返回结果

Capped collections可以使用$natural操作符按插入顺序的正序或反序返回结果:

db['oplog.rs'].find({}).sort({$natural: -1})

Oplog

Oplog是一种特殊的Capped collections,特殊之处在于它是系统级Collection,记录了数据库的所有操作,集群之间依靠Oplog进行数据同步。Oplog的全名是local.oplog.rs,位于local数据下。由于local数据不允许创建用户,如果要访问Oplog需要借助其它数据库的用户,并且赋予该用户访问local数据库的权限,例如:

db.createUser({
   user: "play-community",
   pwd: "******",
   "roles" : [
    {
      "role" : "readWrite", 
      "db" : "play-community"
    }, 
    {
      "role" : "read", 
      "db" : "local"
    }
  ]
})

Oplog记录的操作记录是幂等的(idempotent),这意味着你可以多次执行这些操作而不会导致数据丢失或不一致。例如对于$inc操作,Oplog会自动将其转换为$set操作,例如原始数据如下:

{ 
  "_id" : "0", 
  "count" : 1.0
}

执行如下$inc操作:

db.test.update({_id: "0"}, {$inc: {count: 1}})

Oplog记录的日志为:

{ 
  "ts" : Timestamp(1503110518, 1), 
  "t" : NumberLong(8), 
  "h" : NumberLong(-3967772133090765679), 
  "v" : NumberInt(2), 
  "op" : "u", 
  "ns" : "play-community.test", 
  "o2" : {
    "_id" : "0"
  }, 
  "o" : {
    "$set" : {
      "count" : 2.0
    }
  }
}

这种转换可以保证Oplog的幂等性。另外Oplog为了保证插入性能,不允许额外创建索引。

Timestamps格式

MongoDB有一种特殊的时间格式Timestamps,仅用于内部使用,例如上面Oplog记录:

Timestamp(1503110518, 1)

Timestamps长度为64位:

  • 前32位是time_t值,表示从epoch时间至今的秒数
  • 后32位是ordinal值,该值是一个顺序增长的序数,表示某一秒内的第几次操作

开始同步Oplog

在开始同步Oplog之前,我们需要注意以下几点:

  • 由于Oplog不使用索引,所以初始查询代价可能很大
  • 当Oplog数据量很大时,可以保存ts,系统重启时利用该ts可以减少首次查询开销
  • oplogReplay标志可以显著加快包含ts条件过滤的查询,但是只对oplog查询有效
val tailingCursor =
 oplogCol
  .find(Json.obj("ns" -> Json.obj("$in" -> Set(s"${db}.common-doc", s"${db}.common-article")), "ts" -> Json.obj("$gte" -> lastTS)))
  .options(QueryOpts().tailable.oplogReplay.awaitData.noCursorTimeout)
  .cursor[BSONDocument]()

tailingCursor.fold(()){ (_, doc) =>
 try {
  val jsObj = doc.as[JsObject]
  jsObj("op").as[String] match {
   case "i" => // 插入
   case "u" => // 更新
   case "d" => // 删除
  }

  // 保存ts值,以备后用
  if (tailCount.get() % 10 == 0) { }
 } catch {
  case t: Throwable =>
   Logger.error("Tail oplog Error: " + t.getMessage, t)
 }
}

另外提醒大家注意,ReactiveMongo-Streaming的Akka Stream实现有bug,如果首次查询没有数据返回,则会持续发送查询请求,大约每秒中发送几十次至几百次请求,因为Oplog的查询开销很大,最终会导致MongoDB内存溢出。详情参考Keep sending queries while the initial query result of a tailable cursor is empty.

参考

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Java帮帮-微信公众号-技术文章全总结

Web-第三十天 Activiti工作流【悟空教程】

工作流(Workflow),就是“业务过程的部分或整体在计算机应用环境下的自动化”,它主要解决的是“使在多个参与者之间按照某种预定义的规则传递文档、信息或任务的...

3903
来自专栏乐沙弥的世界

mongo shell连接到mongoDB及shell提示符下执行js脚本

1062
来自专栏陈树义

MongoDb 快速入门教程

MongoDb 是一个介于关系数据库和非关系数据库之间的产品,是非关系数据库当中功能最丰富,最像关系数据库的。 它是可扩展的高性能数据存储解决方案,经常被用于...

5924
来自专栏Golang语言社区

深入学习Golang—channel

Channel 1. 概述 “网络,并发”是Go语言的两大feature。Go语言号称“互联网的C语言”,与使用传统的C语言相比,写一个Server所使用的代码...

4819
来自专栏Java帮帮-微信公众号-技术文章全总结

Activiti学习详解【面试+工作】

一:Activiti第一天 1:工作流的概念 ? 说明: 1) 假设:这两张图就是XX兄弟的请假流程图 2) 图的组成部分: A. 人物:范XX 冯X刚 王X军...

5315
来自专栏小狼的世界

Crontab中的除号(slash)到底怎么用?

crontab 是Linux中配置定时任务的工具,在各种配置中,我们经常会看到除号(Slash)的使用,那么这个除号到底标示什么意思,使用中有哪些需要注意的地方...

802
来自专栏WindCoder

坐标判断

/* 功能:坐标判断 日期:2013-05-08 */ #include <stdio.h> #include <stdlib.h> #incl...

621
来自专栏Danny的专栏

机房收费系统(VB.NET)——存储过程实战

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/huyuyang6688/article/...

1255
来自专栏hotqin888的专栏

DOC文件中法规对标系统完成

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/hotqin888/article/det...

641
来自专栏乐沙弥的世界

mongoDB 文档插入

db.collection.insertOne(obj, ) 插入单个文档到一个集合(3.2版本有效),可选参数为w, wtimeou...

763

扫码关注云+社区