
上一篇文章介绍了『change stream的上手及初体验』,相信大家都对之已经有了基本的了解。接下来,本篇文章尝试深入MongoDB的内核源码,来看看其内部原理以及实现细节。为了帮助大家更好地切入,采用了自顶向下(也就是从客户端-->驱动-->服务端)的方式来梳理整个流程。希望能对大家更深入了解MongoDB change stream功能有一定帮助。(未特殊说明,文中内容均基于MongoDB4.0版本代码)
自顶向下流程的整体时序图如下:

事实上,所有的query基本也是这样一个流程,只是不同的命令会获得不同类型的cursor罢了。这里如果暂时不好理解的话,不妨把第一章内容浏览完再回过头来看看。
一个类似复制集协议中主从同步逻辑的,挂在节点**local.oplog.rs**表上的tailable cursor。
Change Stream本质上是聚合命令中的一个特殊管道阶段(pipeline stage),由于它需要常驻在集群的节点上,因此会以tailable cursor的形式出现。
什么是**tailable cursor**?
按照官方的定义,它在概念上类似于Unix操作系统中提供的tail -f命令,即当一个游标到达结果集的末尾之后,它也不会立即关闭,而是将继续等待新的数据产生,并在等到的时候将之返回。
注1:在change Stream功能出现以前,开发者想要实时感知MongoDB数据库的变化只能通过tailing oplog的方式,其实也是使用的
tailable cursor。 注2:副本集内的主从同步也是用的这个,细节请参考之前的文章。
tailable cursor不会使用到索引,因此建立tailable cursor时的初始化扫描比较耗时。事实上,对于oplog这个capped collection而言,如果某个时刻有较多的oplog产生,tailable cursor也会因为需要逐条扫描而导致性能不佳。
要想确认change Stream的本质也很简单,在已经进行过变更流监听的节点上执行db.currentOp(),然后就可以看到类似下面这样的一个command(仅截取部分字段,关键字段已注释):
{
"host" : "TENCENT64.site:7029",
...
"desc" : "conn14235866",
"connectionId" : 14235866, //该操作所在的链接id
"op" : "getmore", // 操作类型
"ns" : "admin.$cmd", // 指定的namespace,这里是集群维度的watch,因此为'admin.$cmd';如果是db维度的watch,则为'db.$cmd';collection维度的watch则为'db.collection'
"command" : { // 描述命令的基本信息
"getMore" : NumberLong("6717122891978962123"),
"collection" : "$cmd.aggregate", // 只有collection维度的watch才会显示表名
"$clusterTime" : { //逻辑时间戳
"clusterTime" : Timestamp(1605777729, 2),
"signature" : {
"hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
"keyId" : NumberLong(0)
}
},
"$db" : "admin", // 操作的db信息,这里是集群维度的watch,因此为'admin'
"$readPreference" : {"mode" : "primaryPreferred"}
},
"originatingCommand" : { //描述原始命令的详细信息
"aggregate" : 1, // 标识是一个聚合命令,collection维度的watch会展示所监听的表名
"pipeline" : [ // 该聚合命令的详细pipeline组成
{ //'$changeStream'一定在pipeline首位
"$changeStream" : {// change stream相关的参数
"allChangesForCluster" : true, // 标识是对整个集群的变更进行watch
"fullDocument" : "default" // 未指定update返回全文档,使用缺省值;需要的话可以指定为'updateLookup'
}
},
{ //其他自定义的aggregate操作符比如'$match'放在后面,需要注意的是change stream所支持的pipeline stages只是MongoDB所支持的子集,比如'$addFields/$replaceWith/$set'等
"$match" : {"$or" : [{"operationType" : "drop"}]}
}
],
"lsid" : {"id" : BinData(4,"5zBaeTrSR0qmVnKbEh3JOA==")},
"$db" : "admin", //同上
},
"planSummary" : "COLLSCAN", // 查询计划,由于cursor本质上是挂在oplog表的,没有索引
"numYields" : 125,
"waitingForLock" : false,
}感兴趣的朋友还可以将这里的内容与附录中主从同步的tailable cursor做比较,会惊喜地发现以下异同:
getMore类型的op,因为都是cursor;COLLSCAN,因为都是挂在oplog表上,而oplog表没有索引;local.oplog.rs,Change Stream则是特定的namespace,在上面的示例中由于是对整个集群的watch,因此是admin.$cmd;tailable/oplogReplay/awaitData/maxTimeMS等在内的一系列cursor参数,Change Stream则是一个aggregate操作,并且其pipeline中的第一个stage一定是$changeStream,其他stage还包含用户自定义的条件;db.watch()时,发生了什么?当我们在某个mongos/mongod(以mongo shell或者driver的形式)上执行db.watch()时,首先是驱动层会发送一个aggregate命令给相应的服务端(mongos/mongod)。这里为了让change Stream功能更加简单易用以及具备更好的灵活性,所有的驱动都要求做了一层封装并对外提供3个辅助方法(分别对应监听的三个维度)。在mongoDB官方的specifications我们可以看到其中的细节。以mongo-driver的go语言版本为例:

然后接下来就会真正执行这个命令(调用RoundTrip()),并将从服务端得到的batch cursor封装一层返回。一切顺利的话,就能在mongos/mongod上看到1.1中提到的tailable cursor了。
cursor.next()时,又发生了什么?在前面的步骤生成了相应的cursor之后,接下来的操作理所应当就是对cursor的不断迭代了。官方文档里也是这样示例的:
watchCursor = db.getSiblingDB("hr").watch()
while (!watchCursor.isExhausted()){ //isExhausted
if (watchCursor.hasNext()){
printjson(watchCursor.next());
}
}驱动层的封装做了什么呢?我们还是以mongo-driver的go语言版本为例:

cursor.Next()会首先尝试查看本地缓存的队列里有没有,如果有的话直接取一个文档返回,没有的话则需要通过BatchCusor.Next()去server端拉一批(大小由cursor的batchSize指定)再放到缓存队列里。BatchCursor.Next()最终会调用BatchCusor.getMore(),本质上是对getMore命令的简单封装。看到熟悉的RoundTrip()我们就不用往下看了,因为接下来的事情已经不是驱动的处理范围了。
func (bc *BatchCursor) getMore(ctx context.Context) {
bc.clearBatch()
conn, err := bc.server.Connection(ctx)
response, err := (&command.GetMore{
Clock: bc.clock,
ID: bc.id,
NS: bc.namespace,
Opts: bc.opts,
Session: bc.clientSession,
}).RoundTrip(ctx, bc.server.SelectedDescription(), conn)
...
}注意到,驱动里会尝试去恢复有问题的change stream cursor,这也是specifications中所要求的。实现上就是通过第7步里的runCommand方法,通过指定{replaceOption:true},将本地缓存的resumeToken放在ResumeAfter中进行options的替换,从而指定change stream的恢复而不是新建。
既然是聚合命令,到达服务端之后,对于mongos,由s\commands\cluster_aggregate.cpp#runAggregate()作为入口进行处理;对于mongod则是由db\commands\run_aggregate.cpp#runAggregate()作为入口进行处理。类似的,后面的getMore命令,到达服务端之后,对于mongos,由s\commands\cluster_getmore_cmd.cpp#run()作为入口进行处理;对于mongod则是由db\commands\getmore_cmd.cpp#run()作为入口进行处理。
runAggregate()大致流程:

$changeStream stage之后并不会深入其内部,更详细解析在下一步骤);optimizePipeline()会对pipeline做优化,可能会对其中的阶段做一些顺序调整和整合,比如多个$skip合并等,具体规则请参考Aggregation Pipeline Optimization;planExecutor插入到pipeline中;会对pipeline首位为$sample、以及最早的$match/$sort进行优化。比如一个最早的$match可以被从pipeline中剔除并替换成索引扫描的查询执行器;optimizaPipeline()再次对pipeline优化,这是因为在上一步中添加了cursor stage之后还有其他优化空间;Tailable和AwaitData参数,则设置相应的cursor参数;鉴于本文主要讨论change stream相关流程,我们需要关注的是第二步中的pipeline解析。调用链:
runAggregate()-->Pipeline::parse()-->Pipeline::parseTopLevelOrFacetPipeline()-->DocumentSource::parse()-->DocumentSourceChangeStream::createFromBson()-->buildPipeline()
parse()之前,如果预解析结果里pipeline有$changeStream的话,会首先将ns改为oplog.rs,代表其监听的集合就是oplog表;然后将readConcern升级为majority来确保返回的change event已经提交到副本集中大多数节点中。同时如果原始namespace为view的话会报错,因为change stream并不支持在view上创建;parse()到createFromBson()的调用是通过全局的parserMap来实现的,所有支持的聚合管道stage都会将自己的解析函数存在这个全局map里。解析完$changeStreamstage后还会解析pipeline中的其他stage。首先通过宏将预解析和解析(
createFromBson)两个函数注册到全局的parserMap中去 REGISTER_MULTI_STAGE_ALIAS(changeStream, DocumentSourceChangeStream::LiteParsed::parse, DocumentSourceChangeStream::createFromBson);parseTopLevelOrFacetPipeline中会对pipeline中的所有stage遍历调用各自的parse方法: SourceContainer stages;for (auto&& stageObj : rawPipeline) { auto parsedSources = DocumentSource::parse(expCtx, stageObj); stages.insert(stages.end(), parsedSources.begin(), parsedSources.end()); }然后在parse()中会进行map的查找并调用它,完成后转到待解析stage列表中的下一个,继续其他stage的解析过程:map中查找举个例子,对于1.1章节中展示的cursor,就是先解析
$changeStream再解析$match,分别调用DocumentSourceChangeStream::createFromBson()和DocumentSourceMatch::createFromBson()【关于代码结构的备注】:看到这里可能你已经发现了,MongoDB中聚合命令所有支持的stage,都是继承自DocumentSource抽象类,并且都实现了相应的createFromBson方法用于解析stage内的操作符(如$and/$or)。它们的类都定义在db/pipeline/路径下。有对其他stage感兴趣的朋友可以从这里入手。
$changeStream的pipeline按照之前的描述来看下createFromBson函数:

构建出来的pipeline包含了多个阶段,从前往后分别为oplogMatch、transformation、checkInvalidate、resume(可选)、closeCursor、lookupChangePostImage(可选):

oplogMatch阶段会复用已有的Match阶段(也就是做基本查询find({$match:...})时会使用到的阶段)。对oplog.rs表的查询匹配有以下规则:
1. 首先关注相关的DDL操作,如果是表维度的监听,那么对于该表的`drop/renameCollection/to`的操作都需要匹配到,因为这些操作会产生非法的change event。同理,如果是db维度的监听,则`dropDatabse`也需要被匹配到;
2. 按`namespace`匹配。如果是cluster维度的监听,则需要匹配所有非`admin/config/local`库内表的操作,否则匹配指定`namespace`的操作;
3. rename操作的`to`是目标`namespace`时;
4. 对于非DDL操作,匹配一般的CURD操作,通过`{"op":{$ne: "n"} }`来实现(因为DDL在前面的规则中已处理);
5. chunk 迁移到一个新的shard的操作`migrateChunkToNewShard`,它的`op`是`n`;
6. 匹配所有的namesapce相关的事务操作,以`applyOps`的形式呈现;
7. 过滤所有balancer产生的操作,通过`{"fromMigrate":{$ne:true}}`来实现,因为balancer产生的操作如`move chunk`只涉及数据的位置变化,数据本身并没有发生变化;如清理孤儿文档的操作虽然删除了数据,但是对整体数据的完整性并没有影响,因此也可以完全过滤掉。以表维度的监听为例,其生成的pipeline放在了附录中,感兴趣的可以去看一下,会发现就是上述规则的并集。
DocumentSourceTransform)之外并没有做什么事情,对于指定了pipeline中包含resumeAfter的情况,如果resumeToken(结构体见2.2节)中的documentKey字段包含了shard key,会将它缓存起来。DocumentSourceCheckInvalidate)也不会做什么事情。resumeAfter时存在,根据是否需要对cursor进行合并处理(needsMerge)会走两个不同的内部stage,如果需要,则会将resumeToken的clusterTime字段存下来用作后续的检查;否则会将整个resumeToken存下来用于后续检查其存在性。DocumentSourceCloseCursor)之外不会做什么事情。{fullDocument:updateLookup}时存在,用于监听更新操作时返回更新的源文档。同样的,在pipeline构建时也只是新建了对象(DoucmentSourceLookupChangePostImage)。值得注意的是,上面描述的stage都是内部的概念,并不会对外暴露。用户完全不感知这些阶段,只是为了便于理解change stream工作的流程,不同的阶段相互解耦,完成各自相对独立的任务。
run()runParsed()大致流程如下:

在这里,cursor会区分两种类型: 被特定的
表cursor管理器所管理,比如通过find()生成的,这种只需要加表级别的读锁即可; 被全局的cursor管理器所管理,比如通过聚合命令生成的(如$changeStream)
awiaitData选项则必须也指定tailable选项等);readConcern:majority,则获取相应的已提交大多数的快照(snapshot);planExecutor并执行,通过generateBatch()得到一批符合条件的返回结果并放在nextBatch中;nextBatch.done()将这一批结果放到命令的返回结果中;对于我们关注的change stream流程而言,主要是第4步中的用于获取一批返回结果的generateBatch()。调用链:
generateBatch()-->PlanExecutor::getNext()-->planExecutor::getNextImpl()-->planStage::work()-->PipelineProxyStage::doWork()-->PipelineProxyStage::getNextBson()-->Pipeline::getNext()
不妨来看一下这个getNext()的实现,它会不断的去调用_sources里的getNext(),直到最后返回一个文档或者cursor迭代完成返回空:

而这里的_source正是之前在buildPipeline()时生成的若干个stage组成的列表。
再来看一下跟change stream相关的细节。前面提到了$changeStream内部分了若干个stage,因此这里我们可以很容易地将调用链补全:

那么在**getNext()**被调用中,这些stage都各自发挥了什么作用呢?
oplog条目到change event的转换。会提取出oplog中需要的字段(比如代表操作类型的op,代表时间戳的ts,代表namespace的ns,uuid),也会新增一些字段比如operationType/fullDocument/documentKey等。对于不同的操作类型进行不同的处理,update会多一些操作。Document DocumentSourceChangeStreamTransform::applyTransformation(const Document& input) {
...
switch (opType) {
// "op"为i,即insert操作
case repl::OpTypeEnum::kInsert: {...}
// "op"为d,即delete操作
case repl::OpTypeEnum::kDelete: {...}
// "op"为u,即update操作
case repl::OpTypeEnum::kUpdate: {
// 会区分update和replace两种操作类型,对应替换和更新
// !注意到,这里就会对fullDocument赋值,但是只会赋值为oplog条目中'o'字段的内容,并没有去查询更新的源文档,这个操作在最后的lookupChangePostImage阶段才会完成!
}
// "op"为c,即其他DDL操作
case repl::OpTypeEnum::kCommand: {
//根据'o'中的子操作类型
// 1) o.applyops: 需要通过extractNextApplyOpsEntry()提取内部的文档,再applyTransformation()
// 2) o.renameCollection: 相应的rename事件
// 3) o.dropDatabase: 相应的dropDatabase事件
// 4) 其他的子操作类型都会导致非法事件
}
// "op"为n,即空操作
case repl::OpTypeEnum::kNoop: {
// 这里只有可能是NewShard类型,需要抛给上层处理并在新shard上新开cursor
// 同时构造一个虚假的documentKey,为了能恢复到此操作之后
}
default:{ MONGO_UNREACHABLE;}
}
// 生成resumeToken
ResumeTokenData resumeTokenData = getResumeToken(ts, uuid, documentKey);
// 如果是分片集群,意味着结果需要merge,这里的排序规则为:ts+uuid+documentKey
if (pExpCtx->needsMerge) {
doc.setSortKeyMetaField(BSON("" << ts << "" << uuid << "" << documentKey));
}bool isInvalidatingCommand(const boost::intrusive_ptr<ExpressionContext>& pExpCtx,
StringData operationType) {
if (pExpCtx->isSingleNamespaceAggregation()) {
return operationType == DSCS::kDropCollectionOpType ||
operationType == DSCS::kRenameCollectionOpType ||
operationType == DSCS::kDropDatabaseOpType;
} else if (!pExpCtx->isClusterAggregation()) {
return operationType == DSCS::kDropDatabaseOpType;
} else {
return false;
}
};getNext()时进行cursor的关闭。resumeToken是否可以用来恢复。会首先查看resumeToken的时间戳是否匹配,然后从oplog表中取出最早的一条记录对比时间戳,如果resumeToken更小的话,说明期望恢复的时间点已经不在oplog中,即无法恢复了。对于非分片情况,只需要检查给定的resumeToken是否存在即可。函数会返回3个状态: 为什么需要检查而不能直接定位到?主要有以下两个原因: 1)用户指定的**
resumeToken**不一定合法,一方面可能resumeToken所对应的操作已经不在oplog的范围中。另一方面由于是入参,如果不是从驱动或者change event中提取出来的,其可能是任何值; 2)用户指定的**resumeToken**是合成的,是change event中独有的,在oplog中并不存在相应的字段。因此我们并不能在buildPipeline时直接对ResumeToken做$match匹配来过滤,而是退而求其次提取出ResumeToken中的clusterTime,以{"ts":{$gt: clusterTime}}做过滤,要先在transform阶段利用oplog里的信息生成resumeToken后再去匹配。 检查函数的返回值可能为以下几种: kFoundToken——找到了指定的resumeToken对应的oplog; kCheckNextDoc ——当前的比指定的resumeToken更老,因此需要继续找; kCannotResume——当前的已经比指定的resumeToken更新,意味着不可能找到resumeToken了,直接返回不可恢复的错误。
operationType为update的情况进行处理,获取到需要进行查找的documentKey并再次建立一个只有$match阶段的pipeline并执行,得到源文档并填充在"fullDocument"字段中。所有的getNext()方法除了返回自己阶段处理完毕后的文档之外还会返回一个状态,用来告知接下来的阶段。状态的取值可能为以下几种:
经过pipline里这一系列阶段的处理,最终就能得到我们需要的change event并返回给客户端了。至此,整个change stream的工作流程(自顶向下,从客户端-->驱动-->server端)就梳理完毕了。
resumeTokenresumeToken用于唯一描述一个变更事件(change event),可用于change stream的故障恢复。
resumeToken示例及结构可以从任意一个change event中提取出与该文档操作对应的resumeToken`:
{ "_id" : { "_data" : "825F156B3F0000000229295A1004C982483732384D28AE57C6500C6018BF46645F696400645F156B3F0DE1FAAEF1B3DF830004" }, "operationType" : "insert", "clusterTime" : Timestamp(1595239231, 2), "fullDocument" : { "_id" : ObjectId("5f156b3f0de1faaef1b3df83"), "a" : 2 }, "ns" : { "db" : "phoenix", "coll" : "test" }, "documentKey" : { "_id" : ObjectId("5f156b3f0de1faaef1b3df83") } }提取出得到下面的K-V对。_data标识的那一长串十六进制就是了。
"_id" : { "_data" : "825F156B3F0000000229295A1004C982483732384D28AE57C6500C6018BF46645F696400645F156B3F0DE1FAAEF1B3DF830004" },
在内核源码中,它以ResumeToken存储,以ResumeTokenData对外呈现。
struct ResumeTokenData {
Timestamp clusterTime; //逻辑时间戳,int64,由unix时间戳+计数器组成
int version = 0; //版本号,指的是resumeToken自身的版本号,因为resumeToken经历了从3.6的BinData-->4.0.7前的十六进制编码字符串v0-->4.0.7以后的十六进制编码字符串v1 的演变过程
size_t applyOpsIndex = 0; //applyOps内部的index,仅用于副本集事务
Value documentKey; //文档key,由_id和shardKey(if have)组成
boost::optional<UUID> uuid; //生成的namespace的uuid
};
class ResumeToken {
private:
Value _keyStringData; //BinData 或者十六进制编码字符串
Value _typeBits; // 保存type
}在呈现上,无论是以binData还是十六进制编码字符串的形式呈现,在v0版本是按照 clusterTime、documentKey、uuid的顺序(v1则是将uuid放在了documentKey的前面),resumeToken的呈现形式并不会影响resumeToken的可排序性,因为排序是分字段排序的。正常情况下同一个文档的操作(CURD)会有相同的documentKey,但是由于一定会在同一个shard上执行,由逻辑时间戳保证了其clusterTime不一样,uuid也会不一样,因此相应的resumeToken肯定是不一样的。并且在排序时也是严格遵守“因果关系”。
applyOpsIndex的存在主要是为了描述事务中的操作,为了能在恢复时准确定位到事务中的某个操作。当resumeToken描述的是事务中的操作时,clusterTime字段存储的是整个事务的提交时间,事务内的所有操作需要这个index来建立时间顺序(事实上,新版本中会将此字段更名为txnOpIndex,更好理解一些)。当然,如果是resumeToken描述的是非事务操作,这个字段则一直为0。
resumeToken的可比性由于resumeToken与文档是一一对应的,而且其组成的字段中包含了逻辑时间戳clusterTime,因此本身就是具有可比性的。对于事务而言,resumeToken由于包含了文档的applyOpsIndex(事务中单个原子操作的索引),对于相同clusterTime的情况也是具有可比性的。mongos也正是利用这一点才能在收到多个分片的返回结果时可以直接利用resumeToken来完成事件发生先后顺序的决定,不会出现change event乱序的问题。

从上面的截图里能看出其组成方式为:clusterTime|version|applyOpsIndex|uuid|documentKey其中clusterTime是必须的,其他字段可选。不同的change event中的resumeToken长度并不是完全一致的,比如一个非法事件只就有clusterTime,而dropDatabase事件则没有uuid和documentKey。resumeToken的比较就是直接比较_keyStringData字符串即可。resumeTokenData的比较则是会分别比较clusterTime、documentKey和uuid。
分片集群相较于副本集多了mongos的分发以及最后结果的排序与聚合,Change Stream源码解读这篇文章里已经介绍地比较详细了,鉴于文章篇幅这里不再赘述,感兴趣的可以去看看。
结构与上面描述的副本集中pipeline多个stage类似,mongos特有以下两个stage:
resumeToken的大小来的值得提一下的是,如果是分片集群的话,change stream必须通过mongos来建立,那么前面描述中的CloseCursor以及LookupChangePostImage这两个阶段都会被放到mongos上来做,因为这两个阶段都只需要做一次就够了,没必要在每个分片上都做,算是一个小优化。
为了更好的用户体验,毕竟是常驻的cusor。而且在处理主从同步oplog拉取cursor的故障恢复问题时已有一定的经验,直接复制过来就好。 另外mongoDB还要求所有语言版本的驱动都加上对网络问题的自行恢复尝试。
$changeStream要在聚合管道的第一位?为了在恢复时可以添加或替换resumeToken。
$changeStream内部的几个stage顺序有严格要求么?有的。比如
resume stage就只能放在checkInvalidate stage的后面。因为如果用户希望从一个invalidate event的resumeToken进行恢复的话,先检查是否可以恢复并返回报错才是正确的行为。
getNext()的stage顺序和buildPipeline()的顺序不一致?被
optimizePipeline()优化调整过。transformation和closeCursor阶段被提前。前者主要是因为其他后续阶段都是以change event作为入参的;后者会使得新产生的非法事件变成change event返回,在下一轮next()才会真的销毁资源。(非法事件是需要返回的)
db.watch()的时候不指定任何参数,changeStream的默认行为是?
maxAwaitTimeMS缺省1000ms,表示服务端最多等待1s就返回给客户端(哪怕是一个空批次)。不指定起始时间时,会使用myLastAppliedOpTime作为起始时间。
resumeAfter时要使用{"ts":{$gte:xxx},而其他情况只需要{"ts":{$gt:xxx}}?主要是因为事务操作的关系。
$gt和$gte的唯一区别就是边界的包含问题。前面提到过,当指定resumeAfter时传入的是resumeToken,会被转换为对clusterTime的比较。当resumeToken对应的是事务中的某个操作时,由于事务中所有操作都具有相同的clusterTime,如果使用$gt的话可能会漏掉部分操作导致无法恢复的结果。对于其他情况,指定startAtOperationTime就是从某个时间点后,符合参数语义没有问题;什么也不指定,使用myLastAppliedOpTime作为起始时间也没有问题。
不支持,返回的change event一定是在大多数节点上已提交的文档。
很遗憾,在当前的架构下是只放在mongos上执行的。确实存在一定的优化空间,但需仔细考虑可以下放到mongod的操作符类型以及具体内容。 不妨举个例子来说明。假设用户自定义的操作符为:
{$match:{operationType:"insert"}},如果我们将这个阶段下放到mongod,那么所有分片上产生的invalidate事件都会被过滤掉,导致即使发生了非法事件,变更流也将永远不会失效,这是无法接受的。 事实上,如果用户自定义操作符为:{$project:"updateDescription"}(表示用户只关注更新操作到底更新了什么),那么我们将$project下放到mongod可以减少mongos和mongod之间的网络流量。当然了resumeToken也需要被传递来确保所有事件的可排序性。
resumeToken也无法恢复change stream?确实存在无法恢复的情况,主要为以下几种: 1)期望恢复的
resumeToken所对应的oplog条目已经不在oplog.rs表中。当中断的时间比较长时会出现这种情况。 2)使用的是非法事件(invalidate event)中的resumeToken。不过官方在4.2版本里对这里做了优化,提供了新的startAfter选项,直接传入非法事件的resumeToken,可以恢复到非法事件产生后的时间点。 3)resumeToken格式不合法(只要使用的是驱动或者change event中的resumeToken一般不会遇到此问题)
并不是。通过对比主从同步与change stream的cursor可以发现:主从同步只设置了时间戳过滤条件,可以认为是全量拉取,而change stream的cursor的过滤条件更为丰富(参考附录2中matchFilter示例)。 对于库表维度的监听,只会拉取部分跟指定namespace相关的操作。而如果是整集群维度的监听,则会退化为拉取除了少量未处理DDL操作外的大多数oplog。
$changestream,它由一系列内部子阶段组成。resumeAfter来进行恢复。遇到非法事件导致的cursor挂掉的情况只能手动查询挂掉的时间戳后再以startAtOpeartionTime重新启动change stream。不过官方在4.2版本里对这里做了优化,提供了新的startAfter选项,直接传入非法事件的resumeToken,可以恢复到非法事件产生后的时间点。$match/$project等)放在mongos上而不是mongod上执行,可能会导致mongos成为change stream的性能瓶颈。这里官方表示未来会优化。db.xxx.explain().aggregate([{$changeStream: {}}, {$match: {operationType: "insert"}}])因长度原因,同样只截取部分字段。
{
"host": "TENCENT64.site:7006",
"desc": "conn47362",
"connectionId": 47362,
..."op": "getmore",
"ns": "local.oplog.rs",
"command": {
"getMore": NumberLong("37596103597"),
"collection": "oplog.rs",
"batchSize": 13981010,
"maxTimeMS": NumberLong(5000),
"term": NumberLong(2),
"lastKnownCommittedOpTime": {
"ts": Timestamp(1606124080,1),
"t": NumberLong(2)
},
"$replData": 1,
"$oplogQueryData": 1,
"$readPreference": {
"mode": "secondaryPreferred"
},
"$clusterTime": {
"clusterTime": Timestamp(1606124083,1),
"signature": {
"hash": BinData(0,"JYtEpjcKP1mUp16iAR5Ti8/7t4M="),
"keyId": NumberLong("6846941188591714334")
}
},
"$db": "local"
},
"originatingCommand": {
"find": "oplog.rs",
"filter": {
"ts": {"$gte": Timestamp(1594362143,1)}
},
"tailable": true,
"oplogReplay": true,
"awaitData": true,
"maxTimeMS": NumberLong(60000),
"batchSize": 13981010,
"term": NumberLong(2),
"readConcern": {
"afterClusterTime": Timestamp(1594362143,1)
},
"$replData": 1,
"$oplogQueryData": 1,
"$readPreference": {
"mode": "secondaryPreferred"
},
"$db": "local"
},
"planSummary": "COLLSCAN",
"numYields": 2,
"waitingForLock": false,
}一个表维度的监听生成的matchFilter示例:
{
$and: [
//resumeAfter和startAtOperationTime中的时间戳会在这里体现,由于我没有指定resumeAfter因此是$gt而非$gte
{ts: {$gt: Timestamp(1608194033,1)}},
{
$or: [
{
ns: /^db1.t$/, //库表的匹配是通过正则的方式
$or: [
{op: {$ne: "n"}}, //非空操作
{op: "n",o2.type: "migrateChunkToNewShard"} //Addshard的特殊空操作
]
},
{
op: "c", //关注所在db内的一些可能会导致非法事件的DDL,不同纬度的watch略有区别
$or: [
{
$and: [
{ns: "db1.$cmd"},
{
$or: [
{o.drop: "t"},
{o.renameCollection: "db1.t"},
{o.to: "db1.t"},
// 检查基于该表的create操作是否指定了非预期的collation
{o.create: "t",o.collation: {$exists: true}
}
]
}
]
},
{o.to: /^db1.t$/} // rename到这个表的操作,同样是正则匹配
]
},
{
op: "c", //关注事务相关的DDL操作,以applyOps的形式呈现
lsid: {$exists: true},
txnNumber: {$exists: true},
o.applyOps.ns: /^db1.t$/
}
]
},
{fromMigrate: {$ne: true}} // 非balancer产生的操作,比如moveChunk,cleanupOrphan等等
]
}原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。