前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >MongoDB transport_layer网络传输层模块源码实现三

MongoDB transport_layer网络传输层模块源码实现三

作者头像
MongoDB中文社区
发布2020-11-10 14:05:03
4580
发布2020-11-10 14:05:03
举报
文章被收录于专栏:MongoDB中文社区MongoDB中文社区

1. 说明

在之前的<<MongoDB网络传输处理源码实现及性能调优-体验内核性能极致设计>>和<<MongoDB transport_layer网络传输层模块源码实现二>>一文中分析了如何阅读百万级大工程源码、Asio网络库实现、线程模型、transport_layer套接字处理及传输层管理子模块、session会话子模块、Ticket数据收发子模块、service_entry_point服务入口点子模块。

本文将继续分析网络传输层模块中service_state_machine服务状态子模块,即状态机调度子模块的源码实现。

2. service_state_machine

状态机调度子模块

service_state_machine状态机处理模块主要复杂一次完整请求的状态转换,确保请求可以按照指定的流程顺利进行,最终实现客户端的正常MongoDB访问。该模块核心代码实现主要由以下三个源码文件实现(test为测试相关,可以忽略):

2.1 核心代码实现

在service_entry_point服务入口点子模块分析中,当接收到一个新的链接后,在ServiceEntryPointImpl::startSession(...)回调函数中会构造一个ServiceStateMachine ssm类,从而实现了新链接、session、ssm的一一映射关系。其中,ServiceStateMachine 类实现对ThreadGuard(线程守护)有较多的依赖,因此本文从这两个类核心代码实现来分析整个状态机调度模块的内部设计细节。

2.1.1 ThreadGuard类核心代码实现

ThreadGuard也就是”线程守护”类,该类主要用于工作线程名的管理维护、ssm归属管理、该ssm对应session链接的回收处理等。该类核心成员及接口实现如下:

代码语言:javascript
复制
class ServiceStateMachine::ThreadGuard {  
  ......  
public:  
  // create a ThreadGuard which will take ownership of the SSM in this thread.  
  //ThreadGuard初始化,标记ssm所有权属于本线程  
  explicit ThreadGuard(ServiceStateMachine* ssm) : _ssm{ssm} {  
      //获取ssm状态机所有权  
      auto owned = _ssm->_owned.compareAndSwap(Ownership::kUnowned, Ownership::kOwned);  
      //如果是一个链接一个线程模型,则条件满足  
      if (owned == Ownership::kStatic) {   
      //一个链接一个线程模型,由于链接始终由同一个线程处理,因此该链接对应的ssm始终归属于同一个线程  
          dassert(haveClient());  
          dassert(Client::getCurrent() == _ssm->_dbClientPtr);  
          //标识归属权已确定  
          _haveTakenOwnership = true;  
          return;  
      }  
        ......
      //adaptive 动态线程模式走下面的模式  
 
      //把当前线程的线程名零时保存起来  
      auto oldThreadName = getThreadName();   
      //ssm保存的线程名和当前线程名不同  
      if (oldThreadName != _ssm->_threadName) {  
          //即将修改线程名了,把修改前的线程名保存到_oldThreadName  
          _ssm->_oldThreadName = getThreadName().toString();  
          //把运行本ssm状态机的线程名改为conn-x线程  
          setThreadName(_ssm->_threadName); //把当前线程改名为_threadName  
      }  
 
      //设置本线程对应client信息,一个链接对应一个client,标识本client当前归属于本线程处理  
      Client::setCurrent(std::move(_ssm->_dbClient));  
        //本状态机ssm所有权有了,归属于运行本ssm的线程  
      _haveTakenOwnership = true;  
  }  
  ......  
  //重新赋值  
  ThreadGuard& operator=(ThreadGuard&& other) {  
      if (this != &other) {  
          _ssm = other._ssm;  
          _haveTakenOwnership = other._haveTakenOwnership;  
          //原来的other所有权失效  
          other._haveTakenOwnership = false;  
      }  
  //返回  
      return *this;  
  };  
 
  //析构函数  
  ~ThreadGuard() {  
  //ssm所有权已确定,则析构的时候,调用release处理,恢复线程原有线程名  
      if (_haveTakenOwnership)  
          release();  
  }  
 
  //一个链接一个线程模型,标记_owned为kStatic,也就是线程名永远不会改变  
  void markStaticOwnership() {  
      dassert(static_cast<bool>(*this));  
      _ssm->_owned.store(Ownership::kStatic);  
  }  
 
  //恢复原有线程名,同时把client信息从调度线程归还给状态机  
  void release() {  
        auto owned = _ssm->_owned.load();  
        //adaptive异步线程池模式满足if条件,表示SSM固定归属于某个线程  
      if (owned != Ownership::kStatic) {  
      //本线程拥有currentClient信息,于是把它归还给SSM状态机  
          if (haveClient()) {  
              _ssm->_dbClient = Client::releaseCurrent();  
          }  
            //恢复到以前的线程名  
          if (!_ssm->_oldThreadName.empty()) {  
              //恢复到老线程名  
              setThreadName(_ssm->_oldThreadName);   
          }  
      }  
      //状态机状态进入end,则调用对应回收hook处理  
      if (_ssm->state() == State::Ended) {  
          //链接关闭的回收处理 ServiceStateMachine::setCleanupHook  
          auto cleanupHook = std::move(_ssm->_cleanupHook);  
          if (cleanupHook)  
              cleanupHook();  
          return;  
      }  
 
      //归属权失效  
      _haveTakenOwnership = false;  
      //归属状态变为未知  
      if (owned == Ownership::kOwned) {  
          _ssm->_owned.store(Ownership::kUnowned);  
      }  
  }  
 
private:  
  //本线程守护当前对应的ssm  
  ServiceStateMachine* _ssm;  
  //默认false,标识该状态机ssm不归属于任何线程  
  bool _haveTakenOwnership = false;  
}

从上面的代码分析可以看出线程守护类作用比较明确,就是守护当前线程的归属状态,并记录状态机ssm不同状态变化前后的线程名。此外,状态机ssm对应的session链接如果进入end状态,则该链接的资源回收释放也由该类完成。

查看mongod或者mongos实例,如果启动实例的时候配置了”serviceExecutor: adaptive”会发现这些进程下面有很多线程名为”conn-x”和”worker-x”线程,同时同一个线程线程名可能发生改变,这个过程就是由ThreadGuard线程守护类来实现。synchronous一个链接一个线程模型只有”conn-x”线程,adaptive线程模型将同时存在有线程名为”conn-x”和”worker-x”的线程,具体原理讲在后面继续分析,如下图:

说明:synchronous线程模式对应worker初始线程名为”conn-x”,adaptive线程模型对应worker初始线程名为”worker-x”。

ThreadGuard线程守护类和状态机ssm(service_state_machine)关联紧密,客户端请求处理的内部ssm状态转换也和该类密切关联,请看后续分析。

2.1.2 ServiceStateMachine 类核心代码实现

service_state_machine状态机处理模块核心代码实现通过ServiceStateMachine类完成,该类的核心结构成员和函数接口如下:

代码语言:javascript
复制
//一个新链接对应一个ServiceStateMachine保存到ServiceEntryPointImpl._sessions中  
class ServiceStateMachine : public std::enable_shared_from_this<ServiceStateMachine> {  
  ......  
public:  
  ......  
  static std::shared_ptr<ServiceStateMachine> create(...);  
  ServiceStateMachine(...);  
  //状态机所属状态
  enum class State {  
      //ServiceStateMachine::ServiceStateMachine构造函数初始状态  
      Created,        
      //ServiceStateMachine::_runNextInGuard开始进入接收网络数据状态  
      //标记本次客户端请求已完成(已经发送DB获取的数据给客户端),等待调度进行该链接的  
      //下一轮请求,该状态对应处理流程为_sourceMessage  
      Source,         
      //等待获取客户端的数据  
      SourceWait,     
      //接收到一个完整mongodb报文后进入该状态  
      Process,        
      //等待数据发送成功  
      SinkWait,       
      //接收或者发送数据异常、链接关闭,则进入该状态 _cleanupSession  
      EndSession,     
      //session回收处理进入该状态  
      Ended           
  };  
  //所有权状态,主要用来判断是否需要在状态转换中跟新线程名,只对动态线程模型生效  
  enum class Ownership {   
      //该状态表示本状态机SSM处于非活跃状态  
      kUnowned,    
      //该状态标识本状态机SSM归属于某个工作worker线程,处于活跃调度运行状态  
      kOwned,   
      //表示SSM固定归属于某个线程  
      kStatic   
  };  
    
  ......  
private:  
  //ThreadGuard可以理解为线程守护,后面在ThreadGuard类中单独说明  
  class ThreadGuard;  
  friend class ThreadGuard;  
    
  ......  
  //获取session信息  
  const transport::SessionHandle& _session()  
  //以下两个接口为任务task调度相关接口  
  void _scheduleNextWithGuard(...);  
  void _runNextInGuard(ThreadGuard guard);  
  //接收到一个完整mongodb报文后的处理  
  inline void _processMessage(ThreadGuard guard);  
  //以下四个接口完成底层数据读写及其对应回调处理  
  void _sourceCallback(Status status);  
  void _sinkCallback(Status status);  
  void _sourceMessage(ThreadGuard guard);  
  void _sinkMessage(ThreadGuard guard, Message toSink);  
    
  //一次客户端请求,当前mongodb服务端所处的状态  
  AtomicWord<State> _state{State::Created};  
  //服务入口,ServiceEntryPointMongod ServiceEntryPointMongos mongod及mongos入口点  
  ServiceEntryPoint* _sep;  
  //synchronous及adaptive模式,也就是线程模型是一个链接一个线程还是动态线程池  
  transport::Mode _transportMode;  
  //ServiceContextMongoD(mongod)或者ServiceContextNoop(mongos)服务上下文  
  ServiceContext* const _serviceContext;  
  //也就是本ssm对应的session信息,默认对应ASIOSession   
  transport::SessionHandle _sessionHandle;   
  //根据session构造对应client信息,ServiceStateMachine::ServiceStateMachine赋值  
  //也就是本次请求对应的客户端信息  
  ServiceContext::UniqueClient _dbClient;  
  //指向上面的_dbClient  
  const Client* _dbClientPtr;  
  //该SSM当前处理线程的线程名,因为adaptive线程模型一次请求中的不同状态会修改线程名
  const std::string _threadName;  
  //修改线程名之前的线程名称  
  std::string _oldThreadName;  
  //ServiceEntryPointImpl::startSession->ServiceStateMachine::setCleanupHook中设置赋值  
  //session链接回收处理  
  stdx::function<void()> _cleanupHook;  
  //接收处理的message信息 一个完整的报文就记录在该msg中  
  Message _inMessage;   
  //默认初始化kUnowned,标识本SSM状态机处于非活跃状态,  
  //主要用来判断是否需要在状态转换中跟新线程名,只对动态线程模型生效  
  AtomicWord<Ownership> _owned{Ownership::kUnowned};  
}

该类核心成员功能说明如下表:

成员名

功能说明

_state

SSM状态机当前所处状态

_sep

服务入口,mongod对应ServiceEntryPointMongod;mongos对应 ServiceEntryPointMongos

_transportMode

synchronous及adaptive模式,也就是线程模型是一个链接一个线程还是动态线程池

_serviceContext

服务上下文,mongod和mongos分别对应:ServiceContextMongoD、ServiceContextNoop

_sessionHandle

本ssm对应的session信息,默认对应ASIOSession

_dbClient

也就是本次请求对应的客户端信息

_dbClientPtr

指向上面的_dbClient

_threadName

SSM所属线程的线程名,初始化为”conn-n”,adaptive线程模型一次请求中的不同状态会修改线程名

_oldThreadName

修改线程名之前的线程名称

_cleanupHook

session链接回收处理

_inMessage

接收处理的message信息 一个完整的报文就记录在该msg中

_owned

默认初始化kUnowned,标识本SSM状态机处于非活跃状态,主要用来判断是否需要在状态转换中跟新线程名,只对动态线程模型生效

我们知道,链接、session、SSM状态机一一对应,他们也拥有对应的归属权,这里的归属权指的就是当前SSM归属于那个线程,也就是当前SSM状态机调度模块由那个线程实现。归属权通过Ownership类标记,该类保护如下几种状态,每种状态功能说明如下:

归属码

功能说明

kUnowned

未知状态,也就是SSM当前不归属与具体的线程

kOwned

针对adaptive线程模型,表示当前SSM状态转换处理由具体的线程负责,也就是归属权明确

kStatic

针对synchronous线程模型,也就是一个链接一个线程模型。由于一个链接始终由同一个线程处理,因此该链接对应SSM也就始终归属于同一个线程,整个运行状态不会改变。

Mongodb服务端接收到客户端请求后的数据接收、协议解析、从db层获取数据、发送数据给客户端都是通过SSM状态机进行有序的状态转换处理,SSM调度处理过程中保护多个状态,每个状态对应一个状态码,具体状态码及其功能说明如下表所示:

状态码

功能说明

Created

ServiceStateMachine::ServiceStateMachine()构造函数初始状态

Source

下面两种情况会进入该状态:1. 新连接到来第一次进入SSM处理;2. 客户端请求已完成(已经发送DB获取的数据给客户端),等待调度进行该链接的下一轮请求

SourceWait

等待获取客户端的数据

Process

接收到一个完整MongoDB报文后进入该状态,开始进行协议解析、db层数据访问

SinkWait

等待数据发送成功,发送数据给客户端过程中

EndSession

接收或者发送数据异常、链接关闭,session对应客户端,同时进入Ended状态

Ended

session回收处理,进入EndSession后立马通过_cleanupSession进入该状态

以上是SSM处理请求过程的状态码信息,状态转换的具体实现过程请参考后面的核心代码分析。listerner线程接收到新的客户端链接后会调用通过service_entry_point服务入口点子模块的ssm->start()接口进入SSM状态机调度模块,该接口相关的源码实现如下:

代码语言:javascript
复制
//ServiceEntryPointImpl::startSession中执行 启动SSM状态机  
void ServiceStateMachine::start(Ownership ownershipModel) {  
  //直接调用_scheduleNextWithGuard接口  
  _scheduleNextWithGuard(   
  //listener线程暂时性的变为conn线程名,在_scheduleNextWithGuard中任  
  //务入队完成后,在下面的_scheduleNextWithGuard调用guard.release()恢复listener线程名  
      ThreadGuard(this), transport::ServiceExecutor::kEmptyFlags, ownershipModel);  
}  
 
void ServiceStateMachine::_scheduleNextWithGuard(...) {  
  //该任务func实际上由worker线程运行,worker线程从asio库的全局队列获取任务调度执行  
  auto func = [ ssm = shared_from_this(), ownershipModel ] {  
      //构造ThreadGuard  
      ThreadGuard guard(ssm.get());    
      //说明是sync mode,即一个链接一个线程模式, 归属权明确,属于指定线程  
      if (ownershipModel == Ownership::kStatic)   
          guard.markStaticOwnership();  
      //对应:ServiceStateMachine::_runNextInGuard,在这里面完成状态调度转换  
      ssm->_runNextInGuard(std::move(guard));    
  };  
  //恢复之前的线程名,如果该SSM进入Ended状态,则开始资源回收处理  
  guard.release();  
  //ServiceExecutorAdaptive::schedule(adaptive)   ServiceExecutorSynchronous::schedule(synchronous)  
  //第一次进入该函数的时候在这里面创建新线程,不是第一次则把task任务入队调度  
  Status status = _serviceContext->getServiceExecutor()->schedule(std::move(func), flags);  
  if (status.isOK()) {  
      return;  
  }  
  //异常处理  
  ......  
}

ServiceStateMachine::start()接口调用ServiceStateMachine::scheduleNextWithGuard()来启动状态机运行。

scheduleNextWithGuard()接口最核心的作用就是调用service_executor服务运行子模块(线程模型子模块)的schedule()接口来把状态机调度任务入队到ASIO网络库的一个全局队列(adaptive动态线程模型),如果是一个链接一个线程模型,则任务入队到线程级私有队列。

adaptive线程模型,任务入队以及工作线程调度任务执行的流程将在后续的线程模型子模块中分析,也可以参考:<<MongoDB网络传输处理源码实现及性能调优-体验内核性能极致设计>>

此外,scheduleNextWithGuard()入队到全局队列的任务就是本模块后面需要分析的SSM状态机任务,这些task任务通过本函数接口的func (...)进行封装,然后通过线程模型子模块入队到一个全局队列。Func(...)这个task任务中会直接调用runNextInGuard()接口来进行状态转换处理,该接口也就是入队到ASIO全局队列的任务,核心代码功能如下:

代码语言:javascript
复制
void ServiceStateMachine::_runNextInGuard(ThreadGuard guard) {  
  //获取当前SSM状态  
  auto curState = state();  
  // If this is the first run of the SSM, then update its state to Source  
  //如果是第一次运行该SSM,则状态为Created,到这里标记可以准备接收数据了  
  if (curState == State::Created) {   
      //进入Source等待接收数据  
      curState = State::Source;  
      _state.store(curState);  
  }  
  //各状态对应处理  
  try {  
      switch (curState) {   
          //接收数据  
          case State::Source:    
              _sourceMessage(std::move(guard));  
              break;  
          //以及接收到完整的一个mongodb报文,可以内部处理(解析+命令处理+应答客户端)  
          case State::Process:  
              _processMessage(std::move(guard));  
              break;  
          //链接异常或者已经关闭,则开始回收处理  
          case State::EndSession:  
              _cleanupSession(std::move(guard));  
              break;  
          default:  
              MONGO_UNREACHABLE;  
      }  
        return;
  } catch (...) {  
      //异常打印  
  }  
  //异常处理  
  ......  
  //进入EndSession状态  
  _state.store(State::EndSession);  
  _cleanupSession(std::move(guard));  
}

从上面的代码实现可以看出,真正入队到全局队列中的任务类型只有三个,分别是:

1) 接收MongoDB数据的task任务,简称为readTask。

2) 接收到一个完整MongoDB数据后的后续处理(包括协议解析、命令处理、DB获取数据、发送数据给客户端等),简称为dealTask。

3) 接收或者发送数据异常、链接关闭等引起的后续资源释放,简称为cleanTask。

下面针对这三种task任务核心代码实现进行分析:

readTask任务核心代码实现

readTask任务核心代码实现由_sourceMessage()接口实现,具体代码如下:

代码语言:javascript
复制
//接收客户端数据  
void ServiceStateMachine::_sourceMessage(ThreadGuard guard) {  
  ......  
  //获取本session接收数据的ticket,也就是ASIOSourceTicket  
  auto ticket = _session()->sourceMessage(&_inMessage);   
  //进入等等接收数据状态  
  _state.store(State::SourceWait);    
  //release恢复worker线程原有的线程名,synchronous线程模型为"conn-xx",adaptive对应worker线程名为"conn-xx"  
  guard.release();  
  //线程模型默认同步方式,也就是一个链接一个线程  
  if (_transportMode == transport::Mode::kSynchronous) {  
        //同步方式,读取到一个完整mongodb报文后执行_sourceCallback回调  
        _sourceCallback([this](auto ticket) {  
          MONGO_IDLE_THREAD_BLOCK;  
          return _session()->getTransportLayer()->wait(std::move(ticket));  
      }(std::move(ticket)));   
  } else if (_transportMode == transport::Mode::kAsynchronous) {  
      //adaptive线程模型,异步读取一个mongodb报文后执行_sourceCallback回调  
      _session()->getTransportLayer()->asyncWait(   
          ////TransportLayerASIO::ASIOSourceTicket::_bodyCallback读取到一个完整报文后执行该回调  
          std::move(ticket), [this](Status status) { _sourceCallback(status); });  
  }  
}  
 
//接收到一个完整mongodb报文后的回调处理  
void ServiceStateMachine::_sourceCallback(Status status) {  
  //构造ThreadGuard,修改执行本SSM接口的线程名为conn-xx  
  ThreadGuard guard(this);   
 
  //状态检查  
  dassert(state() == State::SourceWait);  
  //获取链接session远端信息  
  auto remote = _session()->remote();   
  if (status.isOK()) {  
  //等待调度,进入处理消息阶段 _processMessage  
      _state.store(State::Process);  
      //注意kMayRecurse标识State::Process阶段的处理还是由本线程执行,这是一个递归标记  
      return _scheduleNextWithGuard(std::move(guard), ServiceExecutor::kMayRecurse);  
  }  
  ......  
  //异常流程调用  
  _runNextInGuard(std::move(guard));  
}

SSM调度的第一个任务就是readTask任务,从上面的源码分析可以看出,该任务就是通过ticket数据分发模块从ASIO网络库读取一个完整长度的MongoDB报文,然后执行sourceCallback回调。进入该回调函数后,即刻设置SSM状态为State::Process状态,然后调用scheduleNextWithGuard(...)把dealTask任务入队到ASIO的全局队列(adaptive线程模型),或者入队到线程级私有队列(synchronous线程模型)等待worker线程调度执行。

这里有个细节,在把dealTask入队的时候,携带了kMayRecurse标记,该标记标识该任务可以递归调用,也就是该任务可以由当前线程继续执行,这样也就可以保证同一个请求的taskRead任务和dealTask任务由同一个线程处理。任务递归调度,可以参考后面的线程模型子模块源码实现。

dealTask任务核心代码实现

当读取到一个完整长度的MongoDB报文后,就会把dealTask任务入队到全局队列,然后由worker线程调度执行该task任务。dealTask任务的核心代码实现如下:

代码语言:javascript
复制
//dealTask处理  
void ServiceStateMachine::_processMessage(ThreadGuard guard) {  
  ......  
  //入口流量计数  
  networkCounter.hitLogicalIn(_inMessage.size());  
  //获取一个唯一的UniqueOperationContext,一个客户端对应一个UniqueOperationContext  
  auto opCtx = Client::getCurrent()->makeOperationContext();  
  //ServiceEntryPointMongod::handleRequest ServiceEntryPointMongos::handleRequest请求处理  
  //command处理、DB访问后的数据通过dbresponse返回  
  DbResponse dbresponse = _sep->handleRequest(opCtx.get(), _inMessage);  
  //释放opCtx,这样currentop就看不到了  
  opCtx.reset();  
  //需要发送给客户端的应答数据  
  Message& toSink = dbresponse.response;  
  //应答数据存在  
  if (!toSink.empty()) {    
      ......  
      //发送数据 ServiceStateMachine::_sinkMessage()  
      _sinkMessage(std::move(guard), std::move(toSink));  
 
  } else {  
      //如果不需要应答客户端的处理  
      ......  
  }  
}  
 
//调用Sinkticket发送数据  
void ServiceStateMachine::_sinkMessage(ThreadGuard guard, Message toSink) {  
  //获取发送数据的ASIOSinkTicket  
  auto ticket = _session()->sinkMessage(toSink);  
  //进入sink发送等待状态  
  _state.store(State::SinkWait);  
  //恢复原有的worker线程名  
  guard.release();  
  //synchronous线程模型,同步发送  
  if (_transportMode == transport::Mode::kSynchronous) {  
      //最终在ASIOSinkTicket同步发送数据成功后执行_sinkCallback  
      _sinkCallback(_session()->getTransportLayer()->wait(std::move(ticket)));  
  } else if (_transportMode == transport::Mode::kAsynchronous) {  
      //最终在ASIOSinkTicket异步发送数据成功后执行_sinkCallback  
      _session()->getTransportLayer()->asyncWait(  
          std::move(ticket), [this](Status status) { _sinkCallback(status); });  
  }  
}  
 
//sink数据发送  
void ServiceStateMachine::_sinkCallback(Status status) {  
  //SSM归属于新的guard,同时修改当前线程名为conn-xx  
  ThreadGuard guard(this);  
  //状态检查  
  dassert(state() == State::SinkWait);  
  if (!status.isOK()) {  
      //进入EndSession状态  
      _state.store(State::EndSession);  
      //异常情况调用  
      return _runNextInGuard(std::move(guard));  
  } else if (_inExhaust) { //_inExhaust方式   
      //注意这里的状态是process   _processMessage   还需要继续进行Process处理  
      _state.store(State::Process);   
  } else {   
      //正常流程始终进入该分支 _sourceMessage   这里继续进行递归接收数据处理  
      //注意这里的状态是Source,继续接收客户端请求  
      _state.store(State::Source);  
  }  
  //本链接对应的一次mongo访问已经应答完成,需要继续要一次调度了  
  return _scheduleNextWithGuard(std::move(guard),  
                                ServiceExecutor::kDeferredTask |  
                                    ServiceExecutor::kMayYieldBeforeSchedule);  
}

readTask通过ticket数据分发子模块读取一个完整长度的MongoDB报文后,开始dealTask任务逻辑,该任务也就是processMessage(...)。该接口中核心实现就是调用mongod和mongos实例对应的服务入口类的handleRequest(...)接口来完成后续的command命令处理、DB层数据访问等,访问到的数据存储到DbResponse中,最终在通过sinkMessage(...)把数据发送出去。

真正的MongoDB内部处理流程实际上就是通过该dealTask任务完成,该任务也是处理整个请求中资源耗费最重的一个环节。在该task任务中,当数据成功发送给客户端后,该session链接对应的SSM状态机进入State::Source状态,继续等待worker线程调度来完成后续该链接的新请求。

cleanTask任务

在数据读写过程、客户端链接关闭、访问DB数据层等任何一个环节异常,则会进入State::EndSession状态。该状态对应得任务实现相对比较简单,具体代码实现如下:

代码语言:javascript
复制
//session对应链接回收处理  
void ServiceStateMachine::_cleanupSession(ThreadGuard guard) {  
  //进入这个状态后在~ThreadGuard::release中进行ssm _cleanupHook处理,该hook在ServiceEntryPointImpl::startSession  
  _state.store(State::Ended);  
  //清空message buffer  
  _inMessage.reset();  
  //释放链接对应client资源  
  Client::releaseCurrent();  
}

进入该状态后直接由本线程进行session资源回收和client资源释放处理,而无需状态机调度worker线程来回收。

2.2 关于worker线程名和guardthread线程守护类

前面得分析我们知道,当线程模型为adaptive动态线程模型的时候,mongod和mongos实例对应的子线程中有很多名为“conn-xx”和”worker-xx”的线程,而且同一个线程可能一会儿线程名为“conn-xx”,下一次又变为了”worker-xx”。这个线程名的初始命名和线程名更改与ServiceStateMachine状态机调度类、guardthread线程守护类、worker线程模型等都有关系。

Worker线程由ServiceExecutor线程模型子模块创建,请参考后续线程模型子模块相关章节。默认初始化线程名为”conn-x”,初始化代码实现如下:

代码语言:javascript
复制
//ServiceStateMachine::create调用,ServiceStateMachine类初始化构造  
ServiceStateMachine::ServiceStateMachine(...)  
  ......  
  //线程名初始化:conn-xx,xx代码session id  
  _threadName{str::stream() << "conn-" << _session()->id()} {}   
}  
 
class Session {  
  ......  
  //sessionID,自增生成  
  const Id _id;  
}  
 
//全局sessionIdCounter计数器,初始化为0  
AtomicUInt64 sessionIdCounter(0);  
 
//session id自增  
Session::Session() : _id(sessionIdCounter.addAndFetch(1)) {}

SSM状态处理过程中,会把一个完整的请求过程 = readTask任务 + dealTask任务,这两个任务都是通过SSM状态机和ServiceExecutor线程模型子模块的worker线程配合调度完成,在任务处理过程中处理同一个任务的线程可能会有多次线程名更改,这个就是结合guardthread线程守护类来完成,以一个线程名切换更改伪代码实现为例:

代码语言:javascript
复制
worker_thread_run_task(...)  
{  
  //如果是adaptive线程模型,当前worker线程名为"worker-xx"  
  print(threadName)  
  //业务逻辑处理1  
  ......  
     
  //初始化构造ThreadGuard,这里面修改线程名为_ssm->_threadName,也就是"conn-xx",  
  //同时保存原来的线程名"worker-xx"到_ssm->_oldThreadName中  
  ThreadGuard guard(this);  
  //如果是adaptive线程模型,线程名打印内容为"conn-xx"  
  print(threadName)  
  //业务逻辑处理2  
  ......  
  //恢复_ssm->_oldThreadName保存的线程名"worker-xx"  
  guard.release();  
     
  //如果是adaptive线程模型,线程名恢复为"worker-xx"  
  print(threadName)  
}

从上面的伪代码可以看出,adaptive线程模型对应worker线程名为”worker”,在进入ThreadGuard guard(this)流程后,线程名更改为”conn-xx”线程,当guard.release()释放后恢复原有”worker-xx”线程名。

结合前面的SSM状态处理流程,adaptive线程模型可以得到如下总结:底层网络IO数据读写过程,worker线程名会改为”worker-xx”,其他非网络IO的mongodb内部逻辑处理线程名为”conn-xx”。所以,如果查看mongod或者mongos进程所有线程名的时候,如果发现线程名为”worker-xx”,说明当前线程在处理网络IO;如果发现线程名为”conn-xx”,则说明当前线程在处理内部逻辑处理,对于mongod实例可以理解为主要处理磁盘IO。

由于synchronous同步线程模型,同一链接对应的所有客户端请求至始至终都有同一线程处理,所以整个处理线程名不会改变,也没必要修改线程名,整个过程都是”conn-xx”线程名。

2.3 该模块函数接口总结大全

前面分析了主要核心接口源码实现,很多其他接口没有一一列举详细分析,该模块u所有接口功能总结如下,更多接口代码实现详见MongoDB内核源码详细注释分析

类名

函数接口

功能说明

ThreadGuard

ThreadGuard(...)

线程守护初始化,把worker线程名改为”conn-xx”

operator=(...)

类赋值

~ThreadGuard()

类析构,析构函数中调用release()接口

operator bool()

获取所有权标识

markStaticOwnership()

synchronous线程模型默认归属权为kStatic

release()

1. 如果ssm状态为Ended,则链接回收处理 2. 恢复worker原来的线程名(adaptive:”worker-xx”)

ServiceStateMachine

ServiceStateMachine(...)

ServiceStateMachine类初始化构造赋值

_session()

获取当前ssm对应的session信息

_sourceMessage(...)

等待通过ASIO库接收网络IO数据

_sinkMessage(...)

等待通过ASIO库发送网络IO数据

_sourceCallback(...)

接收到一个完整长度mongodb报文的回调处理

_sinkCallback(...)

发送一个完整应答报文后的回调处理

_processMessage(...)

开始内部处理,如协议解析、命令处理、DB层数据访问等

_runNextInGuard(...)

SSM状态机调度运行

start(...)

接收到一个新链接通过start()接口启用SSM状态机

_scheduleNextWithGuard(...)

readTask和dealTask交由worker线程处理

terminate(...)

调用TransportLayerASIO::end()进行套接字回收处理

setCleanupHook(...)

设置clean hook,也就是回收处理

state()

获取SSM当前状态

_terminateAndLogIfError(...)

打印回收日志并进行terminate回收处理

_cleanupSession(...)

session会话回收处理

3. 总结

本文主要分析了service_state_machine状态机子模块,该模块把session对应的客户端请求转换为readTask任务、dealTask任务和cleanTask任务,前两个任务通过worker线程完成调度处理,cleanTask任务在内部处理异常或者链接关闭的时候由本线程直接执行,而不是通过worker线程调度执行。

这三个任务处理过程会分别对应到Created、Source、SourceWait、Process、SinkWait、EndSession、Ended七种状态的一种或者多种,具体详见前面的状态码分析。一个正常的客户端请求状态转换过程如下:

1) 链接刚建立的第一次请求状态转换过程:

Created->Source -> SourceWait -> Process -> SinkWait -> Source

2) 该链接后续的请求状态转换过程:

Source -> SourceWait -> Process -> SinkWait -> Source

此外,SSM状态机调度模块通ServiceStateMachine::_scheduleNextWithGuard(...)接口和线程模型子模块关联起来。SSM通过该接口完成worker线程初始创建、task任务入队处理,下期将分析<<网络线程模型子模块>>详细源码实现。

说明:

该模块更多接口实现细节详见MongoDB内核源码注释:

MongoDB内核源码详细注释分析

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-11-04,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Mongoing中文社区 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 2.1.2 ServiceStateMachine 类核心代码实现
  • 2.2 关于worker线程名和guardthread线程守护类
相关产品与服务
云数据库 MongoDB
腾讯云数据库 MongoDB(TencentDB for MongoDB)是腾讯云基于全球广受欢迎的 MongoDB 打造的高性能 NoSQL 数据库,100%完全兼容 MongoDB 协议,支持跨文档事务,提供稳定丰富的监控管理,弹性可扩展、自动容灾,适用于文档型数据库场景,您无需自建灾备体系及控制管理系统。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档