
导语:前面文章提到了MongoDB的复制集协议是一种raft-like的协议。其中一点差别就是关于log的拉取和回放。本文将尝试结合代码深入探究主从同步中一些细节。(PS:本文代码和分析基于源码版本V4.0.3版本。水平有限,文章中有错误或理解不当的地方,还望指出,共同学习)
之前的文章提到过,MongoDB复制集协议采用的是pull而不是push的方式。也就是说从节点定期去主节点的oplog集合中拉取最新的操作并应用到自身中。

大致的流程如下(与图中编号并不对应):
rsBackgroundSync 后台线程通过 find/getmore 命令到主节点上获取oplog,并放入到 OplogBuffer中;replBatcher 线程感知到OplogBuffer中的数据并消费,保存到OpQueue中;OplogApplier 线程感知OpQueue中的新数据,通过多个(默认16个)worker线程回放Oplog,并更新lastAppliedOpTime 和lastDurableOpTime;SyncSourceFeedback 后台线程感知到有新数据写入成功,将自身最新的 lastAppliedOpTime和lastDurableOpTime等信息通过 replSetUpdatePosition 内部命令返回给主节点;lastAppliedOpTime 和 lastDurableOpTime,计算大多数节点(包括自己)当前的数据同步进展,并更新 lastCommittedOpTime;这里oplog的拉取和回放可以理解为是一个“单个生产者多个消费者”的生产者-消费者模型。彼此是独立的,正常情况下相互不阻塞。
当节点处于SECONDARY状态时,BackgroundSync线程是一个死循环,每次循环中它都会完成从节点从其同步源上获取oplog并应用到自身的过程。
以一个从节点的视角出发,主从同步可以大致分为如下几个阶段:
SyncSourceResolver负责获取一个同步源的工作,代码路径如下:
SyncSourceResolver::startup()-->_chooseAndProbeNextSyncSource()-->_chooseNewSyncSource()-->chooseNewSyncSource()-->ReplicationCoordinator::chooseNewSyncSource()-->TopologyCoordinator::chooseNewSyncSource()
// find a target to sync from the last optime fetched
{
OpTime minValidSaved;
{
auto opCtx = cc().makeOperationContext();
minValidSaved = _replicationProcess->getConsistencyMarkers()->getMinValid(opCtx.get());
}
stdx::lock_guard<stdx::mutex> lock(_mutex);
if (_state != ProducerState::Running) {
return;
}
const auto requiredOpTime = (minValidSaved > _lastOpTimeFetched) ? minValidSaved : OpTime();
lastOpTimeFetched = _lastOpTimeFetched;
if (!_syncSourceHost.empty()) {
log() << "Clearing sync source " << _syncSourceHost << " to choose a new one.";
}
_syncSourceHost = HostAndPort();
_syncSourceResolver = stdx::make_unique<SyncSourceResolver>(
_replicationCoordinatorExternalState->getTaskExecutor(),
_replCoord,
lastOpTimeFetched,
requiredOpTime,
[&syncSourceResp](const SyncSourceResolverResponse& resp) { syncSourceResp = resp; });
}
auto status = _syncSourceResolver->startup();
if (ErrorCodes::CallbackCanceled == status || ErrorCodes::isShutdownError(status.code())) {
return;
}其中ReplicationCoordinator对象负责协调副本集与系统其余部分的交互。这里TopologyCoordinator::chooseNewSyncSource大致的逻辑如下:
forceSyncSoureCandidate选择同步源,并check。如果同步源无效或者不属于副本集或者处于黑名单中都会失败,否则会返回指定的同步源;buildIndex参数不同的节点、oplog落后于自身的节点、黑名单中的节点。当然了如果第一轮就找到了理想的同步源,自然也就不需要第二轮了。如果没有节点满足必要条件,则BackgroundSync等待1秒钟,然后重新开始同步源选择过程。
这一过程是由oplogFetcher完成的,也发生于BackgroundSync阶段中。代码路径如下:
oplogFetcher::startup()-->AbstractAsyncComponent::startup()-->AbstractOplogFetcher::_doStartup_inlock()-->AbstractOplogFetcher::_makeAndScheduleFetcherCallback()-->OplogFetcher::_makeFindCommandObject()-->AbstractOplogFetcher::_makeFetcher()-->AbstractOplogFetcher::_callback()-->OplogFetcher::_onSuccessfulBatch()
在_makeFindCommandObject()中,我们可以看到其生成的oplog查询语句的细节。
lastOpTimeFetched{oplogReplay:true}选项。oplogReplay表明拉取oplog的目的是为了回放{tailable:true}和{awaitData:true}选项。tailable cursor是类似于tail -f命令的操作,作用与像log.rs这样的Capped Collection上,使得我们可以不关闭cursor而从中持续不断地读出新的数据。awaitData参数的目的在于阻塞批处理。设置为true时,当tailable cursor遍历到集合末尾时,会在一段时间内阻塞查询线程,等待新的写入到来。当新写入插入该集合中时,阻塞线程会被唤醒并将这一批数据返回给客户端。60s)16M/ 12 * 10)
综合理解上面的查询条件,得到以下几个结论:
_id,没办法走索引,查询的初始扫描是比较耗性能的$gte,所以应始终至少返回一个文档OplogFetcher::_onSuccessfulBatch()处理成功拉回一批oplog的结果,更新自己的_lastFetched视图,并会返回下一次需要发送的getMore命令。其大致逻辑如下:
checkRemoteOplogStart()检查第一批拉回来的oplog结果。如果在同步源中找不到刚刚拉取的操作的optime,则会返回OplogStartMissing的错误;validateDocuments()检验文档的合法性,在这里检查oplog乱序等问题;BackgroundSync::_enqueueDocuments()将oplogFetcher拉取到的结果放入oplogBuffer中;shouldStopFetching()处理一些需要停止oplog拉取的错误场景;makeGetMoreCommandObject()根据当前的cursorId来生成新的getMore命令;外层的BackgroundSync会根据上面提到的fetcherReturnStatus返回的状态码进行相应的处理
这一过程是由oplogBUffer+oplogApplier完成。前者主要将拉到的oplog缓存在本地,pushAllNonBlocking()中会遍历所有的oplog条目并条用存储层的接口insertDocuments()。
后者的代码路径如下:
OplogApplier::startup()-->SyncTail::oplogApplication()-->SyncTail::_oplogApplication()-->SyncTail::multiApply()-->multiSyncApply()
其中,oplogApplier会启动一个新的ReplBatcher线程,它会不断尝试load可能动态更改的replBatchLimitBytes和replBatchLimitOperations参数,然后调用SyncTail::tryPopAndWaitForMore()。
在tryPopAndWaitForMore()中会尝试从oplogBuffer中取数据并保存到OpQueue里。有以下几种情况会等待数据长达1s:
1)oplogBuffer和oplogQueue均为空;
2)设置了延迟节点,拉回来的oplog还不满足延迟条件;
SyncTail::_consume()用于消费数据,但这里有关于DDL操作的额外处理逻辑。当遇到这种操作(包括:create,renameCollection, dbCheck, drop, collMod, dropDatabse, emptyCapped, convertToCapped, createIndexes, dropIndexes。注:applyOps除外)时,将会从批处理转成单条处理的方式。
bool SyncTail::tryPopAndWaitForMore(OperationContext* opCtx,
OplogBuffer* oplogBuffer,
SyncTail::OpQueue* ops,
const BatchLimits& limits) {
{
BSONObj op;
// Check to see if there are ops waiting in the bgsync queue
bool peek_success = oplogBuffer->peek(opCtx, &op);
if (!peek_success) {
// If we don't have anything in the queue, wait a bit for something to appear.
if (ops->empty()) {
if (inShutdown()) {
ops->setMustShutdownFlag();
} else {
// Block up to 1 second. We still return true in this case because we want this
// op to be the first in a new batch with a new start time.
oplogBuffer->waitForData(Seconds(1));
}
}
return true;
}
if (!ops->empty() && (ops->getBytes() + size_t(op.objsize())) > limits.bytes) {
return true; // Return before wasting time parsing the op.
}
ops->emplace_back(std::move(op)); // Parses the op in-place.
}
auto& entry = ops->back();
auto entryTime = Date_t::fromDurationSinceEpoch(Seconds(entry.getTimestamp().getSecs()));
if (limits.slaveDelayLatestTimestamp && entryTime > *limits.slaveDelayLatestTimestamp) {
ops->pop_back(); // Don't do this op yet.
if (ops->empty()) {
sleepsecs(1);
}
return true;
}
// !关于非CURD操作的处理!
if ((entry.isCommand() && entry.getCommandType() != OplogEntry::CommandType::kApplyOps) ||
entry.getNamespace().isSystemDotViews()) {
if (ops->getCount() == 1) {
// apply commands one-at-a-time
_consume(opCtx, oplogBuffer);
} else {
// This op must be processed alone, but we already had ops in the queue so we can't
// include it in this batch. Since we didn't call consume(), we'll see this again next
// time and process it alone.
ops->pop_back();
}
// Apply what we have so far.
return true;
}
// We are going to apply this Op.
_consume(opCtx, oplogBuffer);
// Go back for more ops, unless we've hit the limit.
return ops->getCount() >= limits.ops;而对于_oplogApplication(),其处理逻辑大致如下:
getNextBatch()从opQueue中取一批,超时为1s。如果没取到则继继续下一次循环;oplogTime以及自身的lasterAppliedOpTime,如果第一条opTime比本地已经apply的opTime还要小的话,返回oplog乱序的错误——OplogOutOfOrder(当然,基本不会出现);multiApply()进行oplog并发回放;它会返回一个本次apply中最后一条oplog的OpTime,肯定会等于第二步中获取的批处理中最后一条opTime;last applied optime的视图并持久化;oplogDiskLocRegister()通知存储引擎来更新这一批已applied oplog的可见性;在SyncTail::multiApply()中,multikeyVector是用于并发回放的线程池。multiApply()的大致逻辑如下:
scheduleWritesToOplog()将oplog写入本地oplog集合;fillWriterVectors()将待处理的一批oplog分发到不同的回放线程;ThreadPool::waitForIdle()等待上一次multiApply完成;applyOps()进行oplog回放;replicationBatchIsComplete()通知存储引擎这一批oplog已经完成了回放,这意味着所有跟这一批oplog条目相关的写入都结束了,不会再有新的写入操作了;opTime;先来看看oplog分发的逻辑——fillWriterVectors(),它会遍历这一批待回放的oplog
如果是CURD的操作(指insert,delete,update),通过getIdElement()取出其操作的文档_id并计算hash值,当然对于update命令需要去o里面取。然后以nampespace得到的hash值作为murmur哈希的seed为_id的hash值计算出一个新的hash值来标识一条oplog。

然后使用该hash值直接对回放线程池大小进行取模,来决定一条oplog应该分发到哪个线程。

上面的逻辑保证了对于同一个doc操作的oplog(_id一致)会在一个回放线程中完成回放,而oplog的时间顺序性保证了这些操作的顺序回放。
再来看看oplog回放的逻辑——applyOps,代码实现比较简洁。遍历线程池,每个负责回放的线程都会调用multiSyncApply()函数。

writerPool是由ReplicationCoordinatorExternalStateImpl::startThreads()中调用SyncTail::makeWriterPool()生成的,会使用replWriterThreadCount(缺省为16)作为线程池的线程数。调用链为:
ReplicationCoordinatorExternalStateImpl::startThreads()-->SyncTail::makeWriterPool()-->ThreadPool::startup()-->_startWorkerThread_inlock()-->ThreadPool::_workerThreadBody()-->ThreadPool::_consumeTasks()-->ThreadPool::_doOneTask()
在multiSyncApply()中首先用stableSortByNamespace()将这一批oplog按namespace排序。然后InsertGroup::groupAndApplyInserts()尝试将一批对同一个namespace的insert组成一个批量insert操作;当然如果没办法变成批处理也只好单条处理。最后调用SyncTail::syncApply(),里面会根据不同的op类型进行不同的处理,非DDL操作会调用applyOperation_inlock(),DDL操作会调用applyCommand_inlock()
auto opType = OpType_parse(IDLParserErrorContext("syncApply"), op["op"].valuestrsafe());
if (opType == OpTypeEnum::kNoop) { //空操作
Lock::DBLock dbLock(opCtx, nss.db(), MODE_X);
OldClientContext ctx(opCtx, nss.ns());
return applyOp(ctx.db());
} else if (opType == OpTypeEnum::kInsert && nss.isSystemDotIndexes()) {// 对于system.indexes的'特殊'insert操作
Lock::DBLock dbLock(opCtx, nss.db(), MODE_X);
OldClientContext ctx(opCtx, nss.ns());
return applyOp(ctx.db());
} else if (OplogEntry::isCrudOpType(opType)) { //其他的CURD操作
return writeConflictRetry(opCtx, "syncApply_CRUD", nss.ns(), [&] {
try {
AutoGetCollection autoColl(opCtx, getNsOrUUID(nss, op), MODE_IX);
auto db = autoColl.getDb();
OldClientContext ctx(opCtx, autoColl.getNss().ns(), db);
return applyOp(ctx.db());
} catch (ExceptionFor<ErrorCodes::NamespaceNotFound>& ex) {
ex.addContext(str::stream() << "Failed to apply operation: " << redact(op));
throw;
}
});
} else if (opType == OpTypeEnum::kCommand) { //DDL操作
return writeConflictRetry(opCtx, "syncApply_command", nss.ns(), [&] {
// a command may need a global write lock. so we will conservatively go
// ahead and grab one here. suboptimal. :-(
Lock::GlobalWrite globalWriteLock(opCtx); //!注意:这里加的是全局的写锁!
// special case apply for commands to avoid implicit database creation
Status status = applyCommand_inlock(opCtx, op, oplogApplicationMode);
incrementOpsAppliedStats();
return status;
});
}$cmd操作将以大小为1的批处理顺序进行然后在applyOperation_inlock()中对不同的op操作类型(n,i,u,d)进行了不同的处理,最终都是WriteUnitOfWork进行一次事务写操作并提交wuow.commit()。其中插入操作也会尝试进行批处理,以提高性能。
这一过程由syncSourceFeedback完成。它会将自身最新的 lastAppliedOpTime和lastDurableOpTime等信息通过replSetUpdatePosition内部命令返回给主节点。
ReplicationCoordinatorExternalState在启动时创建一个SyncSourceFeedback对象,该对象负责发送replSetUpdatePosition命令。
SyncSourceFeedback会启动一个循环。 在每次迭代中,它首先等待条件变量,每当ReplicationCoordinator发现副本集中的某个节点复制了更多操作并更新为最新状态时,该条件变量就会被通知。 在继续之前,它会检查它是否不处于PRIMARY或STARTUP状态。
然后,它获取节点的同步源,并创建一个Reporter,由该Reporter将replSetUpdatePosition命令发送到同步源。 该命令每隔keepAliveInterval毫秒(也就是(electionTimeout / 2))保持发送,以维护有关副本集中节点的活动信息。
replSetUpdatePosition命令包含以下信息:
opTimes数组,其中包含每个活动副本集成员的对象。 该信息由ReplicationCoordinator使用其SlaveInfo中的信息填充。 不包括被认为是挂掉的节点。 每个节点都包含以下信息:- `last durable OpTime`
- `last applied OpTime`
- 成员ID
- `ReplicaSetConfig`版本ReplSetMetadata,副本集元数据,包括以下信息- 上游节点的`last commited OpTime`
- 当前term
- `ReplicaSetConfig`的版本和term
- 副本集ID
- 上游节点是否为主可以看到,2.1~2.4中的任务分别由不同的线程进行处理,是相互独立的,他们都是由ReplicationCoordinatorExternalStateImpl::startSteadyStateReplication)()启动的(SyncSourceResolver和oplogFetcher的启动在BackgroundSync::startup()内部):
log() << "Starting replication fetcher thread";
_oplogBuffer = stdx::make_unique<OplogBufferBlockingQueue>();
_oplogBuffer->startup(opCtx);
_bgSync =
stdx::make_unique<BackgroundSync>(replCoord, this, _replicationProcess, _oplogBuffer.get());
_bgSync->startup(opCtx);
_oplogApplier = stdx::make_unique<OplogApplier>(_oplogApplierTaskExecutor.get(),
_oplogBuffer.get(),
_bgSync.get(),
replCoord,
_replicationProcess->getConsistencyMarkers(),
_storageInterface,
OplogApplier::Options(),
_writerPool.get());
_oplogApplierShutdownFuture = _oplogApplier->startup();
log() << "Starting replication reporter thread";
auto bgSyncPtr = _bgSync.get();
_syncSourceFeedbackThread = stdx::make_unique<stdx::thread>([this, bgSyncPtr, replCoord] {
_syncSourceFeedback.run(_taskExecutor.get(), bgSyncPtr, replCoord);
});oplogBuffer和oplogApplier中间要加一层opQueue以及ReplBatcher呢?一方面将oplog变得尽可能平滑,减少源端写入不均带来的影响;另一方面要做“并行--串行--并行”这样的转换操作,保证DDL是串行处理的(一个batch里面只有单条DDL操作,只会发送给后端16个回放线程中的一个)。_idhash到不同线程进行回放。同一批次内的oplog并不是按顺序apply的。按namespace排序应该是为了更好地利用局部性原理(同一个ns内的操作在相同的cache、内存或磁盘扇区的概率更大)原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。