前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >微信phxrpc源码分析(六)--收发流管理

微信phxrpc源码分析(六)--收发流管理

原创
作者头像
路小饭
修改2019-03-15 09:08:36
1.3K2
修改2019-03-15 09:08:36
举报

收发流管理在网络库中处于一个非常重要的位置,与其他rpc框架不同,phxrpc在这方面可谓独辟蹊径,将socket与iostream和streambuf结合起来,完成了缓冲区的设计。

1 整体流程介绍

再来复习下phxrpc的收发流逻辑在哪儿

  • 主要逻辑在HshaServerIO::IOFunc中
  • IOFunc里的msg_handler是在HshaServer中注册的,这儿的结构会在下面第2节具体说明
  • msg_handler->RecvRequest(stream, req)开始接收请求内容,注意这里的req出来的时候已经变成了HttpRequest,第3节有说明
  • 请求接收完后会放到请求队列,Worker::WorkerLogic会拉取请求进行业务逻辑处理,生成结果
  • IOFunc拿到结果后,表面看是使用BaseResponse的Send方法发送结果,实际使用的是HttpResponse,第4节有说明

2 msg_handler

先看下代码中的msg_handler是怎么初始化的

代码语言:txt
复制
void HshaServerIO::IOFunc(int accepted_fd) {
………………
    auto msg_handler(msg_handler_factory_->Create());
    int ret{msg_handler->RecvRequest(stream, req)};

msg_handler_factory_来源于:

代码语言:txt
复制
class HshaServer {
  public:
    HshaServer(const HshaServerConfig &config, const Dispatch_t &dispatch,void *args,
               phxrpc::BaseMessageHandlerFactoryCreateFunc msg_handler_factory_create_func =
               []()->std::unique_ptr<phxrpc::HttpMessageHandlerFactory> {
        return std::unique_ptr<phxrpc::HttpMessageHandlerFactory>(new phxrpc::HttpMessageHandlerFactory);
    });

所以msg_handler是HttpMessageHandlerFactory使用create方法返回的,下面的示意图有利于理解他们之间的关系:

当代码调用:msg_handler->RecvRequest(stream, req)时候,其实调用的是HttpMessageHandler.RecvRequest

3 输入流说明:HttpMessageHandler.RecvRequest

在说这个函数前,先看下面这段代码,否则下面的流程不好理解了

代码语言:txt
复制
void HshaServerIO::IOFunc(int accepted_fd) {
    cout<<"HshaServerIO::IOFunc"<<endl;
    UThreadSocket_t *socket{scheduler_->CreateSocket(accepted_fd)};
    UThreadTcpStream stream;
    stream.Attach(socket);//关键点在这里,Attach里将streambuf与iostream进行绑定

stream.Attach里调用了NewRdbuf方法

代码语言:txt
复制
void BaseTcpStream::NewRdbuf(BaseTcpStreamBuf * buf) {
    std::streambuf * old = rdbuf(buf);
    delete old;
}

BaseTcpStream继承了std::iosream,BaseTcpStreamBuf继承了std::streambuf,上面的rdbuf方法将对iostream的操作转到了streambuf上

进入正题,来一起看下phxrpc怎样处理输入流。

  • 先来看HttpMessageHandler::RecvRequest方法中是怎样将BaseRequest变为HttpRequest的
代码语言:txt
复制
int HttpMessageHandler::RecvRequest(BaseTcpStream &socket, BaseRequest *&req) {
    HttpRequest *http_req{new HttpRequest};//这里新建了HttpRequest

    int ret{HttpProtocol::RecvReq(socket, http_req)};
    if (0 == ret) {
        req_ = req = http_req;//这里将http_req指针指向了BaseRequest指针
        version_ = (http_req->version() != nullptr ? http_req->version() :"");
        keep_alive_ = http_req->keep_alive();
    } else {
        delete http_req;
        http_req = nullptr;
    }   

    return ret;
}
  • 调用顺序依照箭头数字所示,关键点在两个地方:一是箭头2的getline是怎样调到箭头3的underflow方法;二是箭头3的underflow方法是怎样调用到箭头4的precv方法。
  • precv方法调用了UThreadRecv方法,完成了文件描述的符的读取工作

4 输出流说明:HttpResponse.Send(socket)

与上图对应的代码如下:

代码语言:txt
复制
int HttpResponse::Send(BaseTcpStream &socket) const {
    socket << version() << " " << status_code() << " " << reason_phrase() << "\r\n";

    for (size_t i{0}; GetHeaderCount() > i; ++i) {
        socket << GetHeaderName(i) << ": " << GetHeaderValue(i) << "\r\n";
    }   

    if (content().size() > 0) {
        if (nullptr == GetHeaderValue(HttpMessage::HEADER_CONTENT_LENGTH)) {
            socket << HttpMessage::HEADER_CONTENT_LENGTH << ": " << content().size() << "\r\n"; 
        }   
    }   

    socket << "\r\n";

    if (content().size() > 0)
        socket << content();
        
    if (socket.flush().good()) {//这里flush中调用了红色箭头1指向的sync方法,后续又调用了红色箭头2的psend方法
        return 0;
    } else {
        return static_cast<int>(socket.LastError());
    }
}

psend方法里调用了UThreadSend,完成了对文件描述符的写操作

这里额外说明下,为什么HshaServerIO::IOFuncBaseResponse.Send是HttpResponse.Send方法实现的

代码语言:txt
复制
BaseResponse *resp{(BaseResponse *)UThreadGetArgs(*socket)};//IOFunc中,resp的来源是UThreadGetArgs
if(!resp->fake()){                 
ret = resp->Send(stream);
…………

很明显,有UThreadGetArgs,就有UThreadSetArgs

代码语言:txt
复制
UThreadSocket_t *HshaServerIO::ActiveSocketFunc() {
    while (data_flow_->CanPluckResponse()) {
        void *args{nullptr};
        BaseResponse *resp{nullptr};
        //(3)在这里resp来源
        int queue_wait_time_ms{data_flow_->PluckResponse(args, resp)};
        if (!resp) {
            // break out
            return nullptr;
        }
        hsha_server_stat_->outqueue_wait_time_costs_ += queue_wait_time_ms
;       
        hsha_server_stat_->outqueue_wait_time_costs_count_++;
        
        UThreadSocket_t *socket{(UThreadSocket_t *)args};
        if (socket != nullptr && IsUThreadDestory(*socket)) {
            // socket aready timeout
            //log(LOG_ERR, "%s socket aready timeout", __func__);
            UThreadClose(*socket);
            free(socket);
            delete resp;
            
            continue;
        }
        //(1)UThreadSetArgs在这里,可以判断,这里的resp已经是HttpResponse类型了
        //(2)这里resp的来源在上面(3)中:data_flow_->PluckResponse
        UThreadSetArgs(*socket, (void *)resp);
        
        return socket;
    }

    return nullptr;
}         

所以继续追查谁把resp放到了data_flow_里

代码语言:txt
复制
void Worker::WorkerLogic(void *args, BaseRequest *req, int queue_wait_time_ms) {
   ………………
   //(3)resp来源于req->GenResponse()
    BaseResponse *resp{req->GenResponse()};
   ………………
   //(1)这里把resp放到了data_flow_中
   //(2)这里resp的来源看上面(3)
    pool_->data_flow_->PushResponse(args, resp);

最后看看req->GenResponse()方法就一切真相大白了!

代码语言:javascript
复制
BaseResponse *HttpRequest::GenResponse() const {
    return new HttpResponse;//看到了吧,这里返回的是HttpResponse类型
}

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1 整体流程介绍
  • 2 msg_handler
  • 3 输入流说明:HttpMessageHandler.RecvRequest
  • 4 输出流说明:HttpResponse.Send(socket)
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档