实时同步MongoDB Oplog开发指南

Capped Collections

MongoDB有一种特殊的Collection叫Capped collections,它的插入速度非常快,基本和磁盘的写入速度差不多,并且支持按照插入顺序高效的查询操作。Capped collections的大小是固定的,它的工作方式很像环形缓冲器(circular buffers), 当剩余空间不足时,会覆盖最先插入的数据。

Capped collections的特点是高效插入和检索,所以最好不要在Capped collections上添加额外的索引,否则会影响插入速度。Capped collections可以用于以下场景:

  • 存储日志: Capped collections的first-in-first-out特性刚好满足日志事件的存储顺序;
  • 缓存小量数据:因为缓存的特点是读多写少,所以可以适当使用索引提高读取速度。

Capped collections的使用限制:

  • 如果更新数据,你需要为之创建索引以防止collection scan;
  • 更新数据时,文档的大小不能改变。比如说name属性为'abc',则只能修改成3个字符的字符串,否则操作将会失败;
  • 数据不允许删除,如果非删除不可,只能drop collection
  • 不支持sharding
  • 默认只支持按自然顺序(即插入顺序)返回结果

Capped collections可以使用$natural操作符按插入顺序的正序或反序返回结果:

db['oplog.rs'].find({}).sort({$natural: -1})

Oplog

Oplog是一种特殊的Capped collections,特殊之处在于它是系统级Collection,记录了数据库的所有操作,集群之间依靠Oplog进行数据同步。Oplog的全名是local.oplog.rs,位于local数据下。由于local数据不允许创建用户,如果要访问Oplog需要借助其它数据库的用户,并且赋予该用户访问local数据库的权限,例如:

db.createUser({
   user: "play-community",
   pwd: "******",
   "roles" : [
    {
      "role" : "readWrite", 
      "db" : "play-community"
    }, 
    {
      "role" : "read", 
      "db" : "local"
    }
  ]
})

Oplog记录的操作记录是幂等的(idempotent),这意味着你可以多次执行这些操作而不会导致数据丢失或不一致。例如对于$inc操作,Oplog会自动将其转换为$set操作,例如原始数据如下:

{ 
  "_id" : "0", 
  "count" : 1.0
}

执行如下$inc操作:

db.test.update({_id: "0"}, {$inc: {count: 1}})

Oplog记录的日志为:

{ 
  "ts" : Timestamp(1503110518, 1), 
  "t" : NumberLong(8), 
  "h" : NumberLong(-3967772133090765679), 
  "v" : NumberInt(2), 
  "op" : "u", 
  "ns" : "play-community.test", 
  "o2" : {
    "_id" : "0"
  }, 
  "o" : {
    "$set" : {
      "count" : 2.0
    }
  }
}

这种转换可以保证Oplog的幂等性。另外Oplog为了保证插入性能,不允许额外创建索引。

Timestamps格式

MongoDB有一种特殊的时间格式Timestamps,仅用于内部使用,例如上面Oplog记录:

Timestamp(1503110518, 1)

Timestamps长度为64位:

  • 前32位是time_t值,表示从epoch时间至今的秒数
  • 后32位是ordinal值,该值是一个顺序增长的序数,表示某一秒内的第几次操作

开始同步Oplog

在开始同步Oplog之前,我们需要注意以下几点:

  • 由于Oplog不使用索引,所以初始查询代价可能很大
  • 当Oplog数据量很大时,可以保存ts,系统重启时利用该ts可以减少首次查询开销
  • oplogReplay标志可以显著加快包含ts条件过滤的查询,但是只对oplog查询有效
val tailingCursor =
 oplogCol
  .find(Json.obj("ns" -> Json.obj("$in" -> Set(s"${db}.common-doc", s"${db}.common-article")), "ts" -> Json.obj("$gte" -> lastTS)))
  .options(QueryOpts().tailable.oplogReplay.awaitData.noCursorTimeout)
  .cursor[BSONDocument]()

tailingCursor.fold(()){ (_, doc) =>
 try {
  val jsObj = doc.as[JsObject]
  jsObj("op").as[String] match {
   case "i" => // 插入
   case "u" => // 更新
   case "d" => // 删除
  }

  // 保存ts值,以备后用
  if (tailCount.get() % 10 == 0) { }
 } catch {
  case t: Throwable =>
   Logger.error("Tail oplog Error: " + t.getMessage, t)
 }
}

另外提醒大家注意,ReactiveMongo-Streaming的Akka Stream实现有bug,如果首次查询没有数据返回,则会持续发送查询请求,大约每秒中发送几十次至几百次请求,因为Oplog的查询开销很大,最终会导致MongoDB内存溢出。详情参考Keep sending queries while the initial query result of a tailable cursor is empty.

参考

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏一个会写诗的程序员的博客

Spring Reactor 项目核心库Reactor Core

Non-Blocking Reactive Streams Foundation for the JVM both implementing a Reactiv...

2152
来自专栏Ceph对象存储方案

Luminous版本PG 分布调优

Luminous版本开始新增的balancer模块在PG分布优化方面效果非常明显,操作也非常简便,强烈推荐各位在集群上线之前进行这一操作,能够极大的提升整个集群...

3125
来自专栏张善友的专栏

Miguel de Icaza 细说 Mix 07大会上的Silverlight和DLR

Mono之父Miguel de Icaza 详细报道微软Mix 07大会上的Silverlight和DLR ,上面还谈到了Mono and Silverligh...

2707
来自专栏java 成神之路

使用 NIO 实现 echo 服务器

4617
来自专栏张善友的专栏

Mix 10 上的asp.net mvc 2的相关Session

Beyond File | New Company: From Cheesy Sample to Social Platform Scott Hansel...

2567
来自专栏杨龙飞前端

scrollto 到指定位置

2494
来自专栏跟着阿笨一起玩NET

c#实现打印功能

2752
来自专栏pangguoming

Spring Boot集成JasperReports生成PDF文档

由于工作需要,要实现后端根据模板动态填充数据生成PDF文档,通过技术选型,使用Ireport5.6来设计模板,结合JasperReports5.6工具库来调用渲...

1.2K7
来自专栏Golang语言社区

【Golang语言社区】GO1.9 map并发安全测试

var m sync.Map //全局 func maintest() { // 第一个 YongHuomap := make(map[st...

4708
来自专栏张善友的专栏

LINQ via C# 系列文章

LINQ via C# Recently I am giving a series of talk on LINQ. the name “LINQ via C...

2645

扫码关注云+社区