收发流管理在网络库中处于一个非常重要的位置,与其他rpc框架不同,phxrpc在这方面可谓独辟蹊径,将socket与iostream和streambuf结合起来,完成了缓冲区的设计。
再来复习下phxrpc的收发流逻辑在哪儿
先看下代码中的msg_handler是怎么初始化的
void HshaServerIO::IOFunc(int accepted_fd) {
………………
auto msg_handler(msg_handler_factory_->Create());
int ret{msg_handler->RecvRequest(stream, req)};
msg_handler_factory_来源于:
class HshaServer {
public:
HshaServer(const HshaServerConfig &config, const Dispatch_t &dispatch,void *args,
phxrpc::BaseMessageHandlerFactoryCreateFunc msg_handler_factory_create_func =
[]()->std::unique_ptr<phxrpc::HttpMessageHandlerFactory> {
return std::unique_ptr<phxrpc::HttpMessageHandlerFactory>(new phxrpc::HttpMessageHandlerFactory);
});
所以msg_handler是HttpMessageHandlerFactory使用create方法返回的,下面的示意图有利于理解他们之间的关系:
当代码调用:msg_handler->RecvRequest(stream, req)时候,其实调用的是HttpMessageHandler.RecvRequest
在说这个函数前,先看下面这段代码,否则下面的流程不好理解了
void HshaServerIO::IOFunc(int accepted_fd) {
cout<<"HshaServerIO::IOFunc"<<endl;
UThreadSocket_t *socket{scheduler_->CreateSocket(accepted_fd)};
UThreadTcpStream stream;
stream.Attach(socket);//关键点在这里,Attach里将streambuf与iostream进行绑定
stream.Attach里调用了NewRdbuf方法
void BaseTcpStream::NewRdbuf(BaseTcpStreamBuf * buf) {
std::streambuf * old = rdbuf(buf);
delete old;
}
BaseTcpStream继承了std::iosream,BaseTcpStreamBuf继承了std::streambuf,上面的rdbuf方法将对iostream的操作转到了streambuf上
进入正题,来一起看下phxrpc怎样处理输入流。
int HttpMessageHandler::RecvRequest(BaseTcpStream &socket, BaseRequest *&req) {
HttpRequest *http_req{new HttpRequest};//这里新建了HttpRequest
int ret{HttpProtocol::RecvReq(socket, http_req)};
if (0 == ret) {
req_ = req = http_req;//这里将http_req指针指向了BaseRequest指针
version_ = (http_req->version() != nullptr ? http_req->version() :"");
keep_alive_ = http_req->keep_alive();
} else {
delete http_req;
http_req = nullptr;
}
return ret;
}
与上图对应的代码如下:
int HttpResponse::Send(BaseTcpStream &socket) const {
socket << version() << " " << status_code() << " " << reason_phrase() << "\r\n";
for (size_t i{0}; GetHeaderCount() > i; ++i) {
socket << GetHeaderName(i) << ": " << GetHeaderValue(i) << "\r\n";
}
if (content().size() > 0) {
if (nullptr == GetHeaderValue(HttpMessage::HEADER_CONTENT_LENGTH)) {
socket << HttpMessage::HEADER_CONTENT_LENGTH << ": " << content().size() << "\r\n";
}
}
socket << "\r\n";
if (content().size() > 0)
socket << content();
if (socket.flush().good()) {//这里flush中调用了红色箭头1指向的sync方法,后续又调用了红色箭头2的psend方法
return 0;
} else {
return static_cast<int>(socket.LastError());
}
}
psend方法里调用了UThreadSend,完成了对文件描述符的写操作
这里额外说明下,为什么HshaServerIO::IOFuncBaseResponse.Send是HttpResponse.Send方法实现的
BaseResponse *resp{(BaseResponse *)UThreadGetArgs(*socket)};//IOFunc中,resp的来源是UThreadGetArgs
if(!resp->fake()){
ret = resp->Send(stream);
…………
很明显,有UThreadGetArgs,就有UThreadSetArgs
UThreadSocket_t *HshaServerIO::ActiveSocketFunc() {
while (data_flow_->CanPluckResponse()) {
void *args{nullptr};
BaseResponse *resp{nullptr};
//(3)在这里resp来源
int queue_wait_time_ms{data_flow_->PluckResponse(args, resp)};
if (!resp) {
// break out
return nullptr;
}
hsha_server_stat_->outqueue_wait_time_costs_ += queue_wait_time_ms
;
hsha_server_stat_->outqueue_wait_time_costs_count_++;
UThreadSocket_t *socket{(UThreadSocket_t *)args};
if (socket != nullptr && IsUThreadDestory(*socket)) {
// socket aready timeout
//log(LOG_ERR, "%s socket aready timeout", __func__);
UThreadClose(*socket);
free(socket);
delete resp;
continue;
}
//(1)UThreadSetArgs在这里,可以判断,这里的resp已经是HttpResponse类型了
//(2)这里resp的来源在上面(3)中:data_flow_->PluckResponse
UThreadSetArgs(*socket, (void *)resp);
return socket;
}
return nullptr;
}
所以继续追查谁把resp放到了data_flow_里
void Worker::WorkerLogic(void *args, BaseRequest *req, int queue_wait_time_ms) {
………………
//(3)resp来源于req->GenResponse()
BaseResponse *resp{req->GenResponse()};
………………
//(1)这里把resp放到了data_flow_中
//(2)这里resp的来源看上面(3)
pool_->data_flow_->PushResponse(args, resp);
最后看看req->GenResponse()方法就一切真相大白了!
BaseResponse *HttpRequest::GenResponse() const {
return new HttpResponse;//看到了吧,这里返回的是HttpResponse类型
}
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。