前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >MongoDB Change Stream之二——自顶向下流程剖析

MongoDB Change Stream之二——自顶向下流程剖析

原创
作者头像
phoenix、
修改2021-07-26 12:23:03
2.9K2
修改2021-07-26 12:23:03
举报
文章被收录于专栏:MongoDB 学习MongoDB 学习

上一篇文章介绍了『change stream的上手及初体验』,相信大家都对之已经有了基本的了解。接下来,本篇文章尝试深入MongoDB的内核源码,来看看其内部原理以及实现细节。为了帮助大家更好地切入,采用了自顶向下(也就是从客户端-->驱动-->服务端)的方式来梳理整个流程。希望能对大家更深入了解MongoDB change stream功能有一定帮助。(未特殊说明,文中内容均基于MongoDB4.0版本代码)

一、原理&自顶向下流程

自顶向下流程的整体时序图如下:

change stream时序图.png
change stream时序图.png

事实上,所有的query基本也是这样一个流程,只是不同的命令会获得不同类型的cursor罢了。这里如果暂时不好理解的话,不妨把第一章内容浏览完再回过头来看看。

1.1 当我们使用change stream,得到了什么?

一个类似复制集协议中主从同步逻辑的,挂在节点**local.oplog.rs**表上的tailable cursor。

Change Stream本质上是聚合命令中的一个特殊管道阶段(pipeline stage),由于它需要常驻在集群的节点上,因此会以tailable cursor的形式出现。

什么是**tailable cursor**?

按照官方的定义,它在概念上类似于Unix操作系统中提供的tail -f命令,即当一个游标到达结果集的末尾之后,它也不会立即关闭,而是将继续等待新的数据产生,并在等到的时候将之返回。

注1:在change Stream功能出现以前,开发者想要实时感知MongoDB数据库的变化只能通过tailing oplog的方式,其实也是使用的tailable cursor注2:副本集内的主从同步也是用的这个,细节请参考之前的文章

tailable cursor不会使用到索引,因此建立tailable cursor时的初始化扫描比较耗时。事实上,对于oplog这个capped collection而言,如果某个时刻有较多的oplog产生,tailable cursor也会因为需要逐条扫描而导致性能不佳。

要想确认change Stream的本质也很简单,在已经进行过变更流监听的节点上执行db.currentOp(),然后就可以看到类似下面这样的一个command(仅截取部分字段,关键字段已注释):

代码语言:txt
复制
{
    "host" : "TENCENT64.site:7029",
    ...
    "desc" : "conn14235866",
    "connectionId" : 14235866,	//该操作所在的链接id
    "op" : "getmore",	// 操作类型
    "ns" : "admin.$cmd",	// 指定的namespace,这里是集群维度的watch,因此为'admin.$cmd';如果是db维度的watch,则为'db.$cmd';collection维度的watch则为'db.collection'
    "command" : {	// 描述命令的基本信息
	"getMore" : NumberLong("6717122891978962123"),
	"collection" : "$cmd.aggregate", // 只有collection维度的watch才会显示表名
	"$clusterTime" : {	//逻辑时间戳
	    "clusterTime" : Timestamp(1605777729, 2),
	    "signature" : {
		"hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
		"keyId" : NumberLong(0)
	    }
	},
	"$db" : "admin",	// 操作的db信息,这里是集群维度的watch,因此为'admin'
	"$readPreference" : {"mode" : "primaryPreferred"}
    },
    "originatingCommand" : {	//描述原始命令的详细信息
	"aggregate" : 1, // 标识是一个聚合命令,collection维度的watch会展示所监听的表名
	"pipeline" : [	// 该聚合命令的详细pipeline组成
	    { //'$changeStream'一定在pipeline首位
		"$changeStream" : {// change stream相关的参数
		    "allChangesForCluster" : true, // 标识是对整个集群的变更进行watch
		    "fullDocument" : "default" // 未指定update返回全文档,使用缺省值;需要的话可以指定为'updateLookup'
		}
	    },
	    {	//其他自定义的aggregate操作符比如'$match'放在后面,需要注意的是change stream所支持的pipeline stages只是MongoDB所支持的子集,比如'$addFields/$replaceWith/$set'等
		"$match" : {"$or" : [{"operationType" : "drop"}]}
	    }
	],
	"lsid" : {"id" : BinData(4,"5zBaeTrSR0qmVnKbEh3JOA==")},
	"$db" : "admin", //同上
    },
    "planSummary" : "COLLSCAN",	// 查询计划,由于cursor本质上是挂在oplog表的,没有索引
    "numYields" : 125,
    "waitingForLock" : false,
}

感兴趣的朋友还可以将这里的内容与附录中主从同步的tailable cursor做比较,会惊喜地发现以下异同

  • 都是getMore类型的op,因为都是cursor;
  • 都是全表扫描COLLSCAN,因为都是挂在oplog表上,而oplog表没有索引;
  • 主从同步的namespace固定为local.oplog.rs,Change Stream则是特定的namespace,在上面的示例中由于是对整个集群的watch,因此是admin.$cmd
  • 主从同步的原始命令是一个基本的query操作且指定了包括tailable/oplogReplay/awaitData/maxTimeMS等在内的一系列cursor参数,Change Stream则是一个aggregate操作,并且其pipeline中的第一个stage一定是$changeStream,其他stage还包含用户自定义的条件;

1.2 当我们执行db.watch()时,发生了什么?

当我们在某个mongos/mongod(以mongo shell或者driver的形式)上执行db.watch()时,首先是驱动层会发送一个aggregate命令给相应的服务端(mongos/mongod)。这里为了让change Stream功能更加简单易用以及具备更好的灵活性,所有的驱动都要求做了一层封装并对外提供3个辅助方法(分别对应监听的三个维度)。在mongoDB官方的specifications我们可以看到其中的细节。以mongo-driver的go语言版本为例:

go driver.png
go driver.png

然后接下来就会真正执行这个命令(调用RoundTrip()),并将从服务端得到的batch cursor封装一层返回。一切顺利的话,就能在mongos/mongod上看到1.1中提到的tailable cursor了。

1.3 当我们执行cursor.next()时,又发生了什么?

在前面的步骤生成了相应的cursor之后,接下来的操作理所应当就是对cursor的不断迭代了。官方文档里也是这样示例的:

代码语言:txt
复制
watchCursor = db.getSiblingDB("hr").watch()

while (!watchCursor.isExhausted()){ //isExhausted
   if (watchCursor.hasNext()){
      printjson(watchCursor.next());
   }
}

驱动层的封装做了什么呢?我们还是以mongo-driver的go语言版本为例:

go driver 1.png
go driver 1.png

cursor.Next()会首先尝试查看本地缓存的队列里有没有,如果有的话直接取一个文档返回,没有的话则需要通过BatchCusor.Next()去server端拉一批(大小由cursor的batchSize指定)再放到缓存队列里。BatchCursor.Next()最终会调用BatchCusor.getMore(),本质上是对getMore命令的简单封装。看到熟悉的RoundTrip()我们就不用往下看了,因为接下来的事情已经不是驱动的处理范围了。

代码语言:txt
复制
func (bc *BatchCursor) getMore(ctx context.Context) {
	bc.clearBatch()

	conn, err := bc.server.Connection(ctx)

	response, err := (&command.GetMore{
		Clock:   bc.clock,
		ID:      bc.id,
		NS:      bc.namespace,
		Opts:    bc.opts,
		Session: bc.clientSession,
	}).RoundTrip(ctx, bc.server.SelectedDescription(), conn)
	...
}

注意到,驱动里会尝试去恢复有问题的change stream cursor,这也是specifications中所要求的。实现上就是通过第7步里的runCommand方法,通过指定{replaceOption:true},将本地缓存的resumeToken放在ResumeAfter中进行options的替换,从而指定change stream的恢复而不是新建。

二、内核源码解读

既然是聚合命令,到达服务端之后,对于mongos,由s\commands\cluster_aggregate.cpp#runAggregate()作为入口进行处理;对于mongod则是由db\commands\run_aggregate.cpp#runAggregate()作为入口进行处理。类似的,后面的getMore命令,到达服务端之后,对于mongos,由s\commands\cluster_getmore_cmd.cpp#run()作为入口进行处理;对于mongod则是由db\commands\getmore_cmd.cpp#run()作为入口进行处理。

2.1 整体流程(副本集)

2.1.1 聚合命令处理入口——runAggregate()

大致流程:

runAggregate.png
runAggregate.png
  1. 预解析聚合命令的入参request(这里只是简化版解析,比如发现有$changeStream stage之后并不会深入其内部,更详细解析在下一步骤);
  2. 将request里的pipeline进行按stage的深度解析;
  3. 如果原始namespace为视图(view)的话,需要将建立view时指定的pipeline与本次聚合命令里的pipeline做合并;
  4. optimizePipeline()会对pipeline做优化,可能会对其中的阶段做一些顺序调整和整合,比如多个$skip合并等,具体规则请参考Aggregation Pipeline Optimization
  5. 如果原始namespace为view的话,还需要检查其collation(可译为字节序,用于字符串的比较)配置是否与pipeline中所涉及的view的配置一致,不一致会报错;
  6. 准备一个查询执行器planExecutor插入到pipeline中;会对pipeline首位为$sample、以及最早的$match/$sort进行优化。比如一个最早的$match可以被从pipeline中剔除并替换成索引扫描的查询执行器;
  7. 调用optimizaPipeline()再次对pipeline优化,这是因为在上一步中添加了cursor stage之后还有其他优化空间;
  8. 创建可用于返回给客户端的cursor,然后注册到全局的cursor管理器中。如果之前指定了TailableAwaitData参数,则设置相应的cursor参数;
  9. 如果聚合命令指定了explain参数,则返回整体的查询计划,否则将客户端cursor作为result的一部分返回,同时还会进行currentOp的相关修改;

鉴于本文主要讨论change stream相关流程,我们需要关注的是第二步中的pipeline解析。调用链:

runAggregate()-->Pipeline::parse()-->Pipeline::parseTopLevelOrFacetPipeline()-->DocumentSource::parse()-->DocumentSourceChangeStream::createFromBson()-->buildPipeline()

  • parse()之前,如果预解析结果里pipeline有$changeStream的话,会首先将ns改为oplog.rs代表其监听的集合就是oplog表;然后将readConcern升级为majority来确保返回的change event已经提交到副本集中大多数节点中。同时如果原始namespace为view的话会报错,因为change stream并不支持在view上创建;
  • parse()createFromBson()的调用是通过全局的parserMap来实现的,所有支持的聚合管道stage都会将自己的解析函数存在这个全局map里。解析完$changeStreamstage后还会解析pipeline中的其他stage。

首先通过宏将预解析和解析(createFromBson)两个函数注册到全局的parserMap中去 REGISTER_MULTI_STAGE_ALIAS(changeStream, DocumentSourceChangeStream::LiteParsed::parse, DocumentSourceChangeStream::createFromBson); parseTopLevelOrFacetPipeline中会对pipeline中的所有stage遍历调用各自的parse方法: SourceContainer stages;for (auto&& stageObj : rawPipeline) { auto parsedSources = DocumentSource::parse(expCtx, stageObj); stages.insert(stages.end(), parsedSources.begin(), parsedSources.end()); }然后在parse()中会进行map的查找并调用它,完成后转到待解析stage列表中的下一个,继续其他stage的解析过程:

map中查找
map中查找

举个例子,对于1.1章节中展示的cursor,就是先解析$changeStream再解析$match,分别调用DocumentSourceChangeStream::createFromBson()DocumentSourceMatch::createFromBson() 【关于代码结构的备注】:看到这里可能你已经发现了,MongoDB中聚合命令所有支持的stage,都是继承自DocumentSource抽象类,并且都实现了相应的createFromBson方法用于解析stage内的操作符(如$and/$or)。它们的类都定义在db/pipeline/路径下。有对其他stage感兴趣的朋友可以从这里入手。

2.1.2 构建$changeStream的pipeline

按照之前的描述来看下createFromBson函数:

createFromBson函数
createFromBson函数

构建出来的pipeline包含了多个阶段,从前往后分别为oplogMatchtransformationcheckInvalidateresume(可选)、closeCursorlookupChangePostImage(可选):

changeStream源码-buildPipeline.png
changeStream源码-buildPipeline.png

oplogMatch阶段会复用已有的Match阶段(也就是做基本查询find({$match:...})时会使用到的阶段)。对oplog.rs表的查询匹配有以下规则:

代码语言:txt
复制
1. 首先关注相关的DDL操作,如果是表维度的监听,那么对于该表的`drop/renameCollection/to`的操作都需要匹配到,因为这些操作会产生非法的change event。同理,如果是db维度的监听,则`dropDatabse`也需要被匹配到;
2. 按`namespace`匹配。如果是cluster维度的监听,则需要匹配所有非`admin/config/local`库内表的操作,否则匹配指定`namespace`的操作;
3. rename操作的`to`是目标`namespace`时;
4. 对于非DDL操作,匹配一般的CURD操作,通过`{"op":{$ne: "n"} }`来实现(因为DDL在前面的规则中已处理);
5. chunk 迁移到一个新的shard的操作`migrateChunkToNewShard`,它的`op`是`n`;
6. 匹配所有的namesapce相关的事务操作,以`applyOps`的形式呈现;
7. 过滤所有balancer产生的操作,通过`{"fromMigrate":{$ne:true}}`来实现,因为balancer产生的操作如`move chunk`只涉及数据的位置变化,数据本身并没有发生变化;如清理孤儿文档的操作虽然删除了数据,但是对整体数据的完整性并没有影响,因此也可以完全过滤掉。

以表维度的监听为例,其生成的pipeline放在了附录中,感兴趣的可以去看一下,会发现就是上述规则的并集。

  • Transform阶段在pipeline构建时除了新建对象(DocumentSourceTransform)之外并没有做什么事情,对于指定了pipeline中包含resumeAfter的情况,如果resumeToken(结构体见2.2节)中的documentKey字段包含了shard key,会将它缓存起来。
  • CheckInvalidate阶段用于判断对于给定监听类型而言,是否产生了非法事件。在pipeline构建时除了新建对象(DocumentSourceCheckInvalidate)也不会做什么事情。
  • resume阶段只在指定了resumeAfter时存在,根据是否需要对cursor进行合并处理(needsMerge)会走两个不同的内部stage,如果需要,则会将resumeTokenclusterTime字段存下来用作后续的检查;否则会将整个resumeToken存下来用于后续检查其存在性。
  • CloseCursor阶段用于非法事件产生时关闭相关的cursor和销毁资源。在pipeline构建时除了新建对象(DocumentSourceCloseCursor)之外不会做什么事情。
  • LookupChangePostImage阶段只在指定了{fullDocument:updateLookup}时存在,用于监听更新操作时返回更新的源文档。同样的,在pipeline构建时也只是新建了对象(DoucmentSourceLookupChangePostImage)。

值得注意的是,上面描述的stage都是内部的概念,并不会对外暴露。用户完全不感知这些阶段,只是为了便于理解change stream工作的流程,不同的阶段相互解耦,完成各自相对独立的任务。

2.1.3 getMore命令处理入口——run()

runParsed()大致流程如下:

runParsed.png
runParsed.png
  1. 根据curosr类型不同,获取不同的锁。然后根据请求里的cursorID从管理器中取出相应的cursor;

在这里,cursor会区分两种类型: 被特定的表cursor管理器所管理,比如通过find()生成的,这种只需要加表级别的读锁即可; 被全局的cursor管理器所管理,比如通过聚合命令生成的(如$changeStream)

  1. 进行一些基本的检查,比如cursor的权限、namespace的权限、cursor的相关参数等(比如如果cursor指定了awiaitData选项则必须也指定tailable选项等);
  2. 如果cursor指定了readConcern:majority,则获取相应的已提交大多数的快照(snapshot);
  3. 构建一个执行器planExecutor并执行,通过generateBatch()得到一批符合条件的返回结果并放在nextBatch中;
  4. 为了生成该命令执行过程中的一些细节,比如检查了多少个文档,检查了多少个key等,需要做一些少量的计算以及指标更新,便于后续输出到日志中。也会有关于currentOp状态的更新;
  5. 最后通过nextBatch.done()将这一批结果放到命令的返回结果中;

对于我们关注的change stream流程而言,主要是第4步中的用于获取一批返回结果的generateBatch()。调用链:

generateBatch()-->PlanExecutor::getNext()-->planExecutor::getNextImpl()-->planStage::work()-->PipelineProxyStage::doWork()-->PipelineProxyStage::getNextBson()-->Pipeline::getNext()

不妨来看一下这个getNext()的实现,它会不断的去调用_sources里的getNext(),直到最后返回一个文档或者cursor迭代完成返回空:

getNext实现
getNext实现

而这里的_source正是之前在buildPipeline()时生成的若干个stage组成的列表。

2.1.4 pipeline内每个stage的处理

再来看一下跟change stream相关的细节。前面提到了$changeStream内部分了若干个stage,因此这里我们可以很容易地将调用链补全:

changeStream源码-getNext.png
changeStream源码-getNext.png

那么在**getNext()**被调用中,这些stage都各自发挥了什么作用呢?

  • Transform阶段实施从oplog条目change event的转换。会提取出oplog中需要的字段(比如代表操作类型的op,代表时间戳的ts,代表namespace的nsuuid),也会新增一些字段比如operationType/fullDocument/documentKey等。对于不同的操作类型进行不同的处理,update会多一些操作。
代码语言:txt
复制
Document DocumentSourceChangeStreamTransform::applyTransformation(const Document& input) {
	...
	switch (opType) {
        // "op"为i,即insert操作
        case repl::OpTypeEnum::kInsert: {...}
        // "op"为d,即delete操作
        case repl::OpTypeEnum::kDelete: {...}
        // "op"为u,即update操作
        case repl::OpTypeEnum::kUpdate: {
            // 会区分update和replace两种操作类型,对应替换和更新
            // !注意到,这里就会对fullDocument赋值,但是只会赋值为oplog条目中'o'字段的内容,并没有去查询更新的源文档,这个操作在最后的lookupChangePostImage阶段才会完成!
        }
        // "op"为c,即其他DDL操作
        case repl::OpTypeEnum::kCommand: {
            //根据'o'中的子操作类型
            // 1) o.applyops: 需要通过extractNextApplyOpsEntry()提取内部的文档,再applyTransformation()
            // 2) o.renameCollection: 相应的rename事件
            // 3) o.dropDatabase: 相应的dropDatabase事件
            // 4) 其他的子操作类型都会导致非法事件
        }
            // "op"为n,即空操作
        case repl::OpTypeEnum::kNoop: {
            // 这里只有可能是NewShard类型,需要抛给上层处理并在新shard上新开cursor
            // 同时构造一个虚假的documentKey,为了能恢复到此操作之后
        }
        default:{ MONGO_UNREACHABLE;}
   }
   
   // 生成resumeToken
   ResumeTokenData resumeTokenData = getResumeToken(ts, uuid, documentKey);
   // 如果是分片集群,意味着结果需要merge,这里的排序规则为:ts+uuid+documentKey
   if (pExpCtx->needsMerge) {
        doc.setSortKeyMetaField(BSON("" << ts << "" << uuid << "" << documentKey));
}
  • CheckInvalidate阶段,会判断对于给定监听类型而言,是否产生了非法事件。比如对于特定表的监听,那么删除表/重命名表/删除库都是非法的。会产生非法事件并交由后续流程进行错误的返回以及cursor的关闭等。
代码语言:txt
复制
bool isInvalidatingCommand(const boost::intrusive_ptr<ExpressionContext>& pExpCtx,
                           StringData operationType) {
    if (pExpCtx->isSingleNamespaceAggregation()) {
        return operationType == DSCS::kDropCollectionOpType ||
            operationType == DSCS::kRenameCollectionOpType ||
            operationType == DSCS::kDropDatabaseOpType;
    } else if (!pExpCtx->isClusterAggregation()) {
        return operationType == DSCS::kDropDatabaseOpType;
    } else {
        return false;
    }
};
  • CloseCursor阶段如果发现当前文档是非法事件,则会标记应该关闭cursor,在下一次getNext()时进行cursor的关闭。
  • resume阶段分为两种情况,对于需要对cursor进行合并处理的情况(分片集群)而言,需要检查给定的resumeToken是否可以用来恢复。会首先查看resumeToken的时间戳是否匹配,然后从oplog表中取出最早的一条记录对比时间戳,如果resumeToken更小的话,说明期望恢复的时间点已经不在oplog中,即无法恢复了。对于非分片情况,只需要检查给定的resumeToken是否存在即可。

函数会返回3个状态: 为什么需要检查而不能直接定位到?主要有以下两个原因: 1)用户指定的**resumeToken**不一定合法,一方面可能resumeToken所对应的操作已经不在oplog的范围中。另一方面由于是入参,如果不是从驱动或者change event中提取出来的,其可能是任何值; 2)用户指定的**resumeToken**是合成的,是change event中独有的,在oplog中并不存在相应的字段。因此我们并不能在buildPipeline时直接对ResumeToken$match匹配来过滤,而是退而求其次提取出ResumeToken中的clusterTime,以{"ts":{$gt: clusterTime}}做过滤,要先在transform阶段利用oplog里的信息生成resumeToken后再去匹配。 检查函数的返回值可能为以下几种: kFoundToken——找到了指定的resumeToken对应的oplog; kCheckNextDoc ——当前的比指定的resumeToken更老,因此需要继续找; kCannotResume——当前的已经比指定的resumeToken更新,意味着不可能找到resumeToken了,直接返回不可恢复的错误。

  • oplogMatch阶段会根据之前建立好的规则来匹配每一条oplog。
  • LookupChangePostImage阶段只会对operationTypeupdate的情况进行处理,获取到需要进行查找的documentKey并再次建立一个只有$match阶段的pipeline并执行,得到源文档并填充在"fullDocument"字段中。

所有的getNext()方法除了返回自己阶段处理完毕后的文档之外还会返回一个状态,用来告知接下来的阶段。状态的取值可能为以下几种:

  • kAdvanced——表示结果需要被处理,一切正常;
  • kEOF——没有更多结果了;
  • kPauseExecution——有问题,需要停止;

经过pipline里这一系列阶段的处理,最终就能得到我们需要的change event并返回给客户端了。至此,整个change stream的工作流程(自顶向下,从客户端-->驱动-->server端)就梳理完毕了。

2.2 resumeToken

resumeToken用于唯一描述一个变更事件(change event),可用于change stream的故障恢复。

2.2.1 resumeToken示例及结构

可以从任意一个change event中提取出与该文档操作对应的resumeToken`:

代码语言:txt
复制
{ "_id" : { "_data" : "825F156B3F0000000229295A1004C982483732384D28AE57C6500C6018BF46645F696400645F156B3F0DE1FAAEF1B3DF830004" }, "operationType" : "insert", "clusterTime" : Timestamp(1595239231, 2), "fullDocument" : { "_id" : ObjectId("5f156b3f0de1faaef1b3df83"), "a" : 2 }, "ns" : { "db" : "phoenix", "coll" : "test" }, "documentKey" : { "_id" : ObjectId("5f156b3f0de1faaef1b3df83") } }

提取出得到下面的K-V对。_data标识的那一长串十六进制就是了。

"_id" : { "_data" : "825F156B3F0000000229295A1004C982483732384D28AE57C6500C6018BF46645F696400645F156B3F0DE1FAAEF1B3DF830004" },

在内核源码中,它以ResumeToken存储,以ResumeTokenData对外呈现。

代码语言:txt
复制
struct ResumeTokenData {
    Timestamp clusterTime;	//逻辑时间戳,int64,由unix时间戳+计数器组成
    int version = 0;	//版本号,指的是resumeToken自身的版本号,因为resumeToken经历了从3.6的BinData-->4.0.7前的十六进制编码字符串v0-->4.0.7以后的十六进制编码字符串v1 的演变过程
    size_t applyOpsIndex = 0;	//applyOps内部的index,仅用于副本集事务
    Value documentKey;	//文档key,由_id和shardKey(if have)组成
    boost::optional<UUID> uuid;	//生成的namespace的uuid
};

class ResumeToken {
private:
    Value _keyStringData; //BinData 或者十六进制编码字符串
    Value _typeBits; // 保存type
}

在呈现上,无论是以binData还是十六进制编码字符串的形式呈现,在v0版本是按照 clusterTime、documentKey、uuid的顺序(v1则是将uuid放在了documentKey的前面),resumeToken的呈现形式并不会影响resumeToken的可排序性,因为排序是分字段排序的。正常情况下同一个文档的操作(CURD)会有相同的documentKey,但是由于一定会在同一个shard上执行,由逻辑时间戳保证了其clusterTime不一样,uuid也会不一样,因此相应的resumeToken肯定是不一样的。并且在排序时也是严格遵守“因果关系”。

applyOpsIndex的存在主要是为了描述事务中的操作,为了能在恢复时准确定位到事务中的某个操作。当resumeToken描述的是事务中的操作时,clusterTime字段存储的是整个事务的提交时间,事务内的所有操作需要这个index来建立时间顺序(事实上,新版本中会将此字段更名为txnOpIndex,更好理解一些)。当然,如果是resumeToken描述的是非事务操作,这个字段则一直为0。

2.2.2 resumeToken的可比性

由于resumeToken与文档是一一对应的,而且其组成的字段中包含了逻辑时间戳clusterTime,因此本身就是具有可比性的。对于事务而言,resumeToken由于包含了文档的applyOpsIndex(事务中单个原子操作的索引),对于相同clusterTime的情况也是具有可比性的。mongos也正是利用这一点才能在收到多个分片的返回结果时可以直接利用resumeToken来完成事件发生先后顺序的决定,不会出现change event乱序的问题。

image-20201217120416328.png
image-20201217120416328.png

从上面的截图里能看出其组成方式为:clusterTime|version|applyOpsIndex|uuid|documentKey其中clusterTime是必须的,其他字段可选。不同的change event中的resumeToken长度并不是完全一致的,比如一个非法事件只就有clusterTime,而dropDatabase事件则没有uuiddocumentKeyresumeToken的比较就是直接比较_keyStringData字符串即可。resumeTokenData的比较则是会分别比较clusterTimedocumentKeyuuid

2.3 分片集群

分片集群相较于副本集多了mongos的分发以及最后结果的排序与聚合,Change Stream源码解读这篇文章里已经介绍地比较详细了,鉴于文章篇幅这里不再赘述,感兴趣的可以去看看。

结构与上面描述的副本集中pipeline多个stage类似,mongos特有以下两个stage:

  • MergeCursors阶段:合并各个分片返回的结果并排序,排序的规则就是按照resumeToken的大小来的
  • UpdateOnAddShard阶段:用于处理新增一个shard的情况。这里需要在新shard上也建立相应的change stream cursor,以确保change event的完整性。

值得提一下的是,如果是分片集群的话,change stream必须通过mongos来建立,那么前面描述中的CloseCursor以及LookupChangePostImage这两个阶段都会被放到mongos上来做,因为这两个阶段都只需要做一次就够了,没必要在每个分片上都做,算是一个小优化。

三、Q&A

  • 为什么changeStream要做成可恢复的?

为了更好的用户体验,毕竟是常驻的cusor。而且在处理主从同步oplog拉取cursor的故障恢复问题时已有一定的经验,直接复制过来就好。 另外mongoDB还要求所有语言版本的驱动都加上对网络问题的自行恢复尝试。

  • 为什么$changeStream要在聚合管道的第一位?

为了在恢复时可以添加或替换resumeToken。

  • $changeStream内部的几个stage顺序有严格要求么?

有的。比如resume stage就只能放在checkInvalidate stage的后面。因为如果用户希望从一个invalidate event的resumeToken进行恢复的话,先检查是否可以恢复并返回报错才是正确的行为。

  • 为什么getNext()的stage顺序和buildPipeline()的顺序不一致?

optimizePipeline()优化调整过。transformationcloseCursor阶段被提前。前者主要是因为其他后续阶段都是以change event作为入参的;后者会使得新产生的非法事件变成change event返回,在下一轮next()才会真的销毁资源。(非法事件是需要返回的)

  • 如果我db.watch()的时候不指定任何参数,changeStream的默认行为是?

maxAwaitTimeMS缺省1000ms,表示服务端最多等待1s就返回给客户端(哪怕是一个空批次)。不指定起始时间时,会使用myLastAppliedOpTime作为起始时间。

  • 为什么指定resumeAfter时要使用{"ts":{$gte:xxx},而其他情况只需要{"ts":{$gt:xxx}}?

主要是因为事务操作的关系$gt$gte的唯一区别就是边界的包含问题。前面提到过,当指定resumeAfter时传入的是resumeToken,会被转换为对clusterTime的比较。当resumeToken对应的是事务中的某个操作时,由于事务中所有操作都具有相同的clusterTime,如果使用$gt的话可能会漏掉部分操作导致无法恢复的结果。对于其他情况,指定startAtOperationTime就是从某个时间点后,符合参数语义没有问题;什么也不指定,使用myLastAppliedOpTime作为起始时间也没有问题。

  • change Stream是否支持调整Concern?

不支持,返回的change event一定是在大多数节点上已提交的文档。

  • 为什么mongos上建立的监听流要将用户自定义的管道操作符放在mongos上执行,不能下放到mongod上以获得一定的优化吗?

很遗憾,在当前的架构下是只放在mongos上执行的。确实存在一定的优化空间,但需仔细考虑可以下放到mongod的操作符类型以及具体内容。 不妨举个例子来说明。假设用户自定义的操作符为:{$match:{operationType:"insert"}},如果我们将这个阶段下放到mongod,那么所有分片上产生的invalidate事件都会被过滤掉,导致即使发生了非法事件,变更流也将永远不会失效,这是无法接受的。 事实上,如果用户自定义操作符为:{$project:"updateDescription"}(表示用户只关注更新操作到底更新了什么),那么我们将$project下放到mongod可以减少mongos和mongod之间的网络流量。当然了resumeToken也需要被传递来确保所有事件的可排序性。

  • 什么情况下使用resumeToken也无法恢复change stream?

确实存在无法恢复的情况,主要为以下几种: 1)期望恢复的resumeToken所对应的oplog条目已经不在oplog.rs表中。当中断的时间比较长时会出现这种情况。 2)使用的是非法事件(invalidate event)中的resumeToken。不过官方在4.2版本里对这里做了优化,提供了新的startAfter选项,直接传入非法事件的resumeToken,可以恢复到非法事件产生后的时间点。 3)resumeToken格式不合法(只要使用的是驱动或者change event中的resumeToken一般不会遇到此问题)

  • 拉取oplog阶段是否会拉取全量的oplog?

并不是。通过对比主从同步与change stream的cursor可以发现:主从同步只设置了时间戳过滤条件,可以认为是全量拉取,而change stream的cursor的过滤条件更为丰富(参考附录2中matchFilter示例)。 对于库表维度的监听,只会拉取部分跟指定namespace相关的操作。而如果是整集群维度的监听,则会退化为拉取除了少量未处理DDL操作外的大多数oplog。

四、总结

  • Change Stream本质上是聚合命令中的特殊阶段:$changestream,它由一系列内部子阶段组成。
  • Change Stream的总体流程为:拉取oplog-->转换-->检查-->匹配-->返回事件,而且是完全串行的。从相关的JIRA来看,官方并不打算通过在stage内部并行化的方式来优化这里的性能,而是会考虑复用stage。
  • 整体上,change Stream的实现较为完整,尤其是可恢复性方面,并且官方也在性能方面做了一些优化。
  • 故障恢复对于遭遇了非法事件的情况不是特别友好,因为没办法通过resumeAfter来进行恢复。遇到非法事件导致的cursor挂掉的情况只能手动查询挂掉的时间戳后再以startAtOpeartionTime重新启动change stream。不过官方在4.2版本里对这里做了优化,提供了新的startAfter选项,直接传入非法事件的resumeToken,可以恢复到非法事件产生后的时间点。
  • 对于分片集群的情况,mongos上建立的监听流会将所有用户自定义的管道操作符(如$match/$project等)放在mongos上而不是mongod上执行,可能会导致mongos成为change stream的性能瓶颈。这里官方表示未来会优化。
  • 应尽量避免在同一个集群上建立很多个整集群纬度的监听。
  • 不支持一般的explain查询分析,需要使用聚合命令的查询分析方式,比如db.xxx.explain().aggregate([{$changeStream: {}}, {$match: {operationType: "insert"}}])

参考

  1. mongodb source code
  2. mongodb doc
  3. mongodb specifications
  4. Push down user-defined stages in a change stream pipeline where possible
  5. MongoDB Change Stream之一——上手及初体验

附录

1.主从同步的tailable cursor示例

因长度原因,同样只截取部分字段。

代码语言:txt
复制
{
    "host": "TENCENT64.site:7006",
    "desc": "conn47362",
    "connectionId": 47362,
    ..."op": "getmore",
    "ns": "local.oplog.rs",
    "command": {
        "getMore": NumberLong("37596103597"),
        "collection": "oplog.rs",
        "batchSize": 13981010,
        "maxTimeMS": NumberLong(5000),
        "term": NumberLong(2),
        "lastKnownCommittedOpTime": {
            "ts": Timestamp(1606124080,1),
            "t": NumberLong(2)
        },
        "$replData": 1,
        "$oplogQueryData": 1,
        "$readPreference": {
            "mode": "secondaryPreferred"
        },
        "$clusterTime": {
            "clusterTime": Timestamp(1606124083,1),
            "signature": {
                "hash": BinData(0,"JYtEpjcKP1mUp16iAR5Ti8/7t4M="),
                "keyId": NumberLong("6846941188591714334")
            }
        },
        "$db": "local"
    },
    "originatingCommand": {
        "find": "oplog.rs",
        "filter": {
            "ts": {"$gte": Timestamp(1594362143,1)}
        },
        "tailable": true,
        "oplogReplay": true,
        "awaitData": true,
        "maxTimeMS": NumberLong(60000),
        "batchSize": 13981010,
        "term": NumberLong(2),
        "readConcern": {
            "afterClusterTime": Timestamp(1594362143,1)
        },
        "$replData": 1,
        "$oplogQueryData": 1,
        "$readPreference": {
            "mode": "secondaryPreferred"
        },
        "$db": "local"
    },
    "planSummary": "COLLSCAN",
    "numYields": 2,
    "waitingForLock": false,
}

2. oplogMatch阶段的matchFilter示例

一个表维度的监听生成的matchFilter示例:

代码语言:txt
复制
{
    $and: [
        //resumeAfter和startAtOperationTime中的时间戳会在这里体现,由于我没有指定resumeAfter因此是$gt而非$gte
        {ts: {$gt: Timestamp(1608194033,1)}}, 
        {
            $or: [
                {
                    ns: /^db1.t$/, //库表的匹配是通过正则的方式
                    $or: [
                        {op: {$ne: "n"}}, //非空操作
                        {op: "n",o2.type: "migrateChunkToNewShard"} //Addshard的特殊空操作
                    ]
                },
                {
                    op: "c",  //关注所在db内的一些可能会导致非法事件的DDL,不同纬度的watch略有区别
                    $or: [
                        {	
                            $and: [
                                {ns: "db1.$cmd"},
                                {
                                    $or: [
                                        {o.drop: "t"},
                                        {o.renameCollection: "db1.t"},
                                        {o.to: "db1.t"},
                                        // 检查基于该表的create操作是否指定了非预期的collation
                                        {o.create: "t",o.collation: {$exists: true}
                                        }
                                    ]
                                }
                            ]
                        },
                        {o.to: /^db1.t$/}   // rename到这个表的操作,同样是正则匹配
                    ]
                },
                {	
                    op: "c",	//关注事务相关的DDL操作,以applyOps的形式呈现
                    lsid: {$exists: true},
                    txnNumber: {$exists: true},
                    o.applyOps.ns: /^db1.t$/
                }
            ]
        },
        {fromMigrate: {$ne: true}}	// 非balancer产生的操作,比如moveChunk,cleanupOrphan等等
    ]
}

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、原理&自顶向下流程
    • 1.1 当我们使用change stream,得到了什么?
      • 1.2 当我们执行db.watch()时,发生了什么?
        • 1.3 当我们执行cursor.next()时,又发生了什么?
        • 二、内核源码解读
          • 2.1 整体流程(副本集)
            • 2.1.1 聚合命令处理入口——runAggregate()
            • 2.1.2 构建$changeStream的pipeline
            • 2.1.3 getMore命令处理入口——run()
            • 2.1.4 pipeline内每个stage的处理
          • 2.2 resumeToken
            • 2.2.1 resumeToken示例及结构
            • 2.2.2 resumeToken的可比性
          • 2.3 分片集群
          • 三、Q&A
          • 四、总结
          • 参考
          • 附录
            • 1.主从同步的tailable cursor示例
              • 2. oplogMatch阶段的matchFilter示例
              相关产品与服务
              云数据库 MongoDB
              腾讯云数据库 MongoDB(TencentDB for MongoDB)是腾讯云基于全球广受欢迎的 MongoDB 打造的高性能 NoSQL 数据库,100%完全兼容 MongoDB 协议,支持跨文档事务,提供稳定丰富的监控管理,弹性可扩展、自动容灾,适用于文档型数据库场景,您无需自建灾备体系及控制管理系统。
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档