(三)服务器端的程序架构介绍2

下面我们以pc端登录为例来具体看一个数据包在服务器端各个服务之间走过的流程:

步骤1:login_server初始化侦听socket,设置新连接到来的回调函数。8080端口,该端口是为http服务配置的。

在login_server.cpp main函数中调用:

netlib_listen调用如下:

pSocket->Listen调用:

AddBaseSocket将该socket加入hash_map中。AddEvent设置需要关注的socket上的事件,这里只关注可读和出错事件。

步骤2: 客户端调用connect()函数连接login_server的8080端口。

步骤3:login_server收到连接请求后调用OnRead方法,OnRead()方法里面调用_AcceptNewSocket(),_AcceptNewSocket()接收新连接,创建新的socket,并调用之前初始化阶段netlib_listen设置的回调函数http_callback。

[cpp]view plaincopy

voidCBaseSocket::OnRead()

{

if(m_state == SOCKET_STATE_LISTENING)

{

_AcceptNewSocket();

}

else

{

u_long avail = 0;

if( (ioctlsocket(m_socket, FIONREAD, &avail) == SOCKET_ERROR) || (avail == 0) )

{

m_callback(m_callback_data, NETLIB_MSG_CLOSE, (net_handle_t)m_socket, NULL);

}

else

{

m_callback(m_callback_data, NETLIB_MSG_READ, (net_handle_t)m_socket, NULL);

}

}

}

[cpp]view plaincopy

voidCBaseSocket::_AcceptNewSocket()

{

SOCKET fd = 0;

sockaddr_in peer_addr;

socklen_t addr_len =sizeof(sockaddr_in);

charip_str[64];

while( (fd = accept(m_socket, (sockaddr*)&peer_addr, &addr_len)) != INVALID_SOCKET )

{

CBaseSocket* pSocket =newCBaseSocket();

uint32_t ip = ntohl(peer_addr.sin_addr.s_addr);

uint16_t port = ntohs(peer_addr.sin_port);

snprintf(ip_str,sizeof(ip_str),"%d.%d.%d.%d", ip >> 24, (ip >> 16) & 0xFF, (ip >> 8) & 0xFF, ip & 0xFF);

log("AcceptNewSocket, socket=%d from %s:%d\n", fd, ip_str, port);

pSocket->SetSocket(fd);

pSocket->SetCallback(m_callback);

pSocket->SetCallbackData(m_callback_data);

pSocket->SetState(SOCKET_STATE_CONNECTED);

pSocket->SetRemoteIP(ip_str);

pSocket->SetRemotePort(port);

_SetNoDelay(fd);

_SetNonblock(fd);

AddBaseSocket(pSocket);

CEventDispatch::Instance()->AddEvent(fd, SOCKET_READ | SOCKET_EXCEP);

m_callback(m_callback_data, NETLIB_MSG_CONNECT, (net_handle_t)fd, NULL);

}

}

[cpp]view plaincopy

voidhttp_callback(void* callback_data, uint8_t msg, uint32_t handle,void* pParam)

{

if(msg == NETLIB_MSG_CONNECT)

{

CHttpConn* pConn =newCHttpConn();

pConn->OnConnect(handle);

}

else

{

log("!!!error msg: %d ", msg);

}

}

pConn->OnConnect(handle)中设置http数据的回调函数httpconn_callback:

[cpp]view plaincopy

voidCHttpConn::OnConnect(net_handle_t handle)

{

printf("OnConnect, handle=%d\n", handle);

m_sock_handle = handle;

m_state = CONN_STATE_CONNECTED;

g_http_conn_map.insert(make_pair(m_conn_handle,this));

netlib_option(handle, NETLIB_OPT_SET_CALLBACK, (void*)httpconn_callback);

netlib_option(handle, NETLIB_OPT_SET_CALLBACK_DATA,reinterpret_cast(m_conn_handle) );

netlib_option(handle, NETLIB_OPT_GET_REMOTE_IP, (void*)&m_peer_ip);

}

httpconn_callback中处理http可读可写出错事件:

[cpp]view plaincopy

voidhttpconn_callback(void* callback_data, uint8_t msg, uint32_t handle, uint32_t uParam,void* pParam)

{

NOTUSED_ARG(uParam);

NOTUSED_ARG(pParam);

// convert void* to uint32_t, oops

uint32_t conn_handle = *((uint32_t*)(&callback_data));

CHttpConn* pConn = FindHttpConnByHandle(conn_handle);

if(!pConn) {

return;

}

switch(msg)

{

caseNETLIB_MSG_READ:

pConn->OnRead();

break;

caseNETLIB_MSG_WRITE:

pConn->OnWrite();

break;

caseNETLIB_MSG_CLOSE:

pConn->OnClose();

break;

default:

log("!!!httpconn_callback error msg: %d ", msg);

break;

}

}

步骤4:客户端连接成功以后,发送http请求,方法是get,请求url:http://192.168.226.128:8080/msg_server。(具体网址与你的机器配置的网址有关)

步骤5:login_server检测到该socket可读,调用pConn->OnRead()方法。

[cpp]view plaincopy

voidCHttpConn::OnRead()

{

for(;;)

{

uint32_t free_buf_len = m_in_buf.GetAllocSize() - m_in_buf.GetWriteOffset();

if(free_buf_len

m_in_buf.Extend(READ_BUF_SIZE + 1);

intret = netlib_recv(m_sock_handle, m_in_buf.GetBuffer() + m_in_buf.GetWriteOffset(), READ_BUF_SIZE);

if(ret

break;

m_in_buf.IncWriteOffset(ret);

m_last_recv_tick = get_tick_count();

}

// 每次请求对应一个HTTP连接,所以读完数据后,不用在同一个连接里面准备读取下个请求

char* in_buf = (char*)m_in_buf.GetBuffer();

uint32_t buf_len = m_in_buf.GetWriteOffset();

in_buf[buf_len] ='\0';

// 如果buf_len 过长可能是受到攻击,则断开连接

// 正常的url最大长度为2048,我们接受的所有数据长度不得大于1K

if(buf_len > 1024)

{

log("get too much data:%s ", in_buf);

Close();

return;

}

//log("OnRead, buf_len=%u, conn_handle=%u\n", buf_len, m_conn_handle); // for debug

m_cHttpParser.ParseHttpContent(in_buf, buf_len);

if(m_cHttpParser.IsReadAll()) {

string url = m_cHttpParser.GetUrl();

if(strncmp(url.c_str(),"/msg_server", 11) == 0) {

string content = m_cHttpParser.GetBodyContent();

_HandleMsgServRequest(url, content);

}else{

log("url unknown, url=%s ", url.c_str());

Close();

}

}

}

CHttpConn::OnRead()先用recv收取数据,接着解析数据,如果出错或者非法数据就关闭连接。如果客户端发送的请求的http object正好是/msg_server,则调用_HandleMsgServRequest(url, content);进行处理:

[cpp]view plaincopy

voidCHttpConn::_HandleMsgServRequest(string& url, string& post_data)

{

msg_serv_info_t* pMsgServInfo;

uint32_t min_user_cnt = (uint32_t)-1;

map::iterator it_min_conn = g_msg_serv_info.end();

map::iterator it;

if(g_msg_serv_info.size()

{

Json::Value value;

value["code"] = 1;

value["msg"] ="没有msg_server";

string strContent = value.toStyledString();

char* szContent =newchar[HTTP_RESPONSE_HTML_MAX];

snprintf(szContent, HTTP_RESPONSE_HTML_MAX, HTTP_RESPONSE_HTML, strContent.length(), strContent.c_str());

Send((void*)szContent, strlen(szContent));

delete[] szContent;

return;

}

for(it = g_msg_serv_info.begin() ; it != g_msg_serv_info.end(); it++) {

pMsgServInfo = it->second;

if( (pMsgServInfo->cur_conn_cnt max_conn_cnt) &&

(pMsgServInfo->cur_conn_cnt

it_min_conn = it;

min_user_cnt = pMsgServInfo->cur_conn_cnt;

}

}

if(it_min_conn == g_msg_serv_info.end()) {

log("All TCP MsgServer are full ");

Json::Value value;

value["code"] = 2;

value["msg"] ="负载过高";

string strContent = value.toStyledString();

char* szContent =newchar[HTTP_RESPONSE_HTML_MAX];

snprintf(szContent, HTTP_RESPONSE_HTML_MAX, HTTP_RESPONSE_HTML, strContent.length(), strContent.c_str());

Send((void*)szContent, strlen(szContent));

delete[] szContent;

return;

}else{

Json::Value value;

value["code"] = 0;

value["msg"] ="";

if(pIpParser->isTelcome(GetPeerIP()))

{

value["priorIP"] = string(it_min_conn->second->ip_addr1);

value["backupIP"] = string(it_min_conn->second->ip_addr2);

value["msfsPrior"] = strMsfsUrl;

value["msfsBackup"] = strMsfsUrl;

}

else

{

value["priorIP"] = string(it_min_conn->second->ip_addr2);

value["backupIP"] = string(it_min_conn->second->ip_addr1);

value["msfsPrior"] = strMsfsUrl;

value["msfsBackup"] = strMsfsUrl;

}

value["discovery"] = strDiscovery;

value["port"] = int2string(it_min_conn->second->port);

string strContent = value.toStyledString();

char* szContent =newchar[HTTP_RESPONSE_HTML_MAX];

uint32_t nLen = strContent.length();

snprintf(szContent, HTTP_RESPONSE_HTML_MAX, HTTP_RESPONSE_HTML, nLen, strContent.c_str());

Send((void*)szContent, strlen(szContent));

delete[] szContent;

return;

}

}

该方法根据客户端ip地址将msg_server的地址组装成json格式,返回给客户端。json格式内容如下:

[plain]view plaincopy

{

"backupIP" : "localhost",

"code" : 0,

"discovery" : "http://192.168.226.128/api/discovery",

"msfsBackup" : "http://192.168.226.128:8700/",

"msfsPrior" : "http://192.168.226.128:8700/",

"msg" : "",

"port" : "8000",

"priorIP" : "localhost"

}

注意,发送数据给客户端调用的是Send方法,该方法会先尝试着调用底层的send()函数去发送,如果不能全部发送出去,则将剩余数据加入到对应的写数据缓冲区内。这样这些数据会在该socket可写时再继续发送。这是也是设计网络通信库一个通用的技巧,即先试着去send,如果send不了,将数据放入待发送缓冲区内,并设置检测可写标识位,当socket可写时,从待发送缓冲区取出数据发送出去。如果还是不能全部发送出去,继续设置检测可写标识位,下次再次发送,如此循环一直到所有数据都发送出去为止。

[cpp]view plaincopy

intCHttpConn::Send(void* data,intlen)

{

m_last_send_tick = get_tick_count();

if(m_busy)

{

m_out_buf.Write(data, len);

returnlen;

}

intret = netlib_send(m_sock_handle, data, len);

if(ret

ret = 0;

if(ret

{

m_out_buf.Write((char*)data + ret, len - ret);

m_busy =true;

//log("not send all, remain=%d\n", m_out_buf.GetWriteOffset());

}

else

{

OnWriteComlete();

}

returnlen;

}

当然,由于这里http设置成了短连接,每次应答完客户度之后立即关闭连接,在OnWriteComplete()里面:

[cpp]view plaincopy

voidCHttpConn::OnWriteComlete()

{

log("write complete ");

Close();

}

步骤6:客户端收到http请求的应答后,根据收到的json得到msg_server的ip地址,这里是ip地址是192.168.226.128,端口号是8000。客户端开始连接这个ip地址和端口号,连接过程与msg_server接收连接过程与上面的步骤相同。接着客户端给服务器发送登录数据包。

步骤7:msg_server收到登录请求后,在CImConn::OnRead()收取数据,解包,调用子类CMsgConn重写的HandlePdu,处理登录请求,如何处理呢?处理如下:

[cpp]view plaincopy

//MsgConn.cpp

voidCMsgConn::HandlePdu(CImPdu* pPdu)

{

// request authorization check

if(pPdu->GetCommandId() != CID_LOGIN_REQ_USERLOGIN && !IsOpen() && IsKickOff()) {

log("HandlePdu, wrong msg. ");

throwCPduException(pPdu->GetServiceId(), pPdu->GetCommandId(), ERROR_CODE_WRONG_SERVICE_ID,"HandlePdu error, user not login. ");

return;

}

switch(pPdu->GetCommandId()) {

caseCID_OTHER_HEARTBEAT:

_HandleHeartBeat(pPdu);

break;

caseCID_LOGIN_REQ_USERLOGIN:

_HandleLoginRequest(pPdu );

break;

caseCID_LOGIN_REQ_LOGINOUT:

_HandleLoginOutRequest(pPdu);

break;

caseCID_LOGIN_REQ_DEVICETOKEN:

_HandleClientDeviceToken(pPdu);

break;

caseCID_LOGIN_REQ_KICKPCCLIENT:

_HandleKickPCClient(pPdu);

break;

caseCID_LOGIN_REQ_PUSH_SHIELD:

_HandlePushShieldRequest(pPdu);

break;

caseCID_LOGIN_REQ_QUERY_PUSH_SHIELD:

_HandleQueryPushShieldRequest(pPdu);

break;

caseCID_MSG_DATA:

_HandleClientMsgData(pPdu);

break;

caseCID_MSG_DATA_ACK:

_HandleClientMsgDataAck(pPdu);

break;

caseCID_MSG_TIME_REQUEST:

_HandleClientTimeRequest(pPdu);

break;

caseCID_MSG_LIST_REQUEST:

_HandleClientGetMsgListRequest(pPdu);

break;

caseCID_MSG_GET_BY_MSG_ID_REQ:

_HandleClientGetMsgByMsgIdRequest(pPdu);

break;

caseCID_MSG_UNREAD_CNT_REQUEST:

_HandleClientUnreadMsgCntRequest(pPdu );

break;

caseCID_MSG_READ_ACK:

_HandleClientMsgReadAck(pPdu);

break;

caseCID_MSG_GET_LATEST_MSG_ID_REQ:

_HandleClientGetLatestMsgIDReq(pPdu);

break;

caseCID_SWITCH_P2P_CMD:

_HandleClientP2PCmdMsg(pPdu );

break;

caseCID_BUDDY_LIST_RECENT_CONTACT_SESSION_REQUEST:

_HandleClientRecentContactSessionRequest(pPdu);

break;

caseCID_BUDDY_LIST_USER_INFO_REQUEST:

_HandleClientUserInfoRequest( pPdu );

break;

caseCID_BUDDY_LIST_REMOVE_SESSION_REQ:

_HandleClientRemoveSessionRequest( pPdu );

break;

caseCID_BUDDY_LIST_ALL_USER_REQUEST:

_HandleClientAllUserRequest(pPdu );

break;

caseCID_BUDDY_LIST_CHANGE_AVATAR_REQUEST:

_HandleChangeAvatarRequest(pPdu);

break;

caseCID_BUDDY_LIST_CHANGE_SIGN_INFO_REQUEST:

_HandleChangeSignInfoRequest(pPdu);

break;

caseCID_BUDDY_LIST_USERS_STATUS_REQUEST:

_HandleClientUsersStatusRequest(pPdu);

break;

caseCID_BUDDY_LIST_DEPARTMENT_REQUEST:

_HandleClientDepartmentRequest(pPdu);

break;

// for group process

caseCID_GROUP_NORMAL_LIST_REQUEST:

s_group_chat->HandleClientGroupNormalRequest(pPdu,this);

break;

caseCID_GROUP_INFO_REQUEST:

s_group_chat->HandleClientGroupInfoRequest(pPdu,this);

break;

caseCID_GROUP_CREATE_REQUEST:

s_group_chat->HandleClientGroupCreateRequest(pPdu,this);

break;

caseCID_GROUP_CHANGE_MEMBER_REQUEST:

s_group_chat->HandleClientGroupChangeMemberRequest(pPdu,this);

break;

caseCID_GROUP_SHIELD_GROUP_REQUEST:

s_group_chat->HandleClientGroupShieldGroupRequest(pPdu,this);

break;

caseCID_FILE_REQUEST:

s_file_handler->HandleClientFileRequest(this, pPdu);

break;

caseCID_FILE_HAS_OFFLINE_REQ:

s_file_handler->HandleClientFileHasOfflineReq(this, pPdu);

break;

caseCID_FILE_ADD_OFFLINE_REQ:

s_file_handler->HandleClientFileAddOfflineReq(this, pPdu);

break;

caseCID_FILE_DEL_OFFLINE_REQ:

s_file_handler->HandleClientFileDelOfflineReq(this, pPdu);

break;

default:

log("wrong msg, cmd id=%d, user id=%u. ", pPdu->GetCommandId(), GetUserId());

break;

}

}

分支case CID_LOGIN_REQ_USERLOGIN即处理登录请求:

[cpp]view plaincopy

//在MsgConn.cpp中

voidCMsgConn::_HandleLoginRequest(CImPdu* pPdu)

{

// refuse second validate request

if(m_login_name.length() != 0) {

log("duplicate LoginRequest in the same conn ");

return;

}

// check if all server connection are OK

uint32_t result = 0;

string result_string ="";

CDBServConn* pDbConn = get_db_serv_conn_for_login();

if(!pDbConn) {

result = IM::BaseDefine::REFUSE_REASON_NO_DB_SERVER;

result_string ="服务端异常";

}

elseif(!is_login_server_available()) {

result = IM::BaseDefine::REFUSE_REASON_NO_LOGIN_SERVER;

result_string ="服务端异常";

}

elseif(!is_route_server_available()) {

result = IM::BaseDefine::REFUSE_REASON_NO_ROUTE_SERVER;

result_string ="服务端异常";

}

if(result) {

IM::Login::IMLoginRes msg;

msg.set_server_time(time(NULL));

msg.set_result_code((IM::BaseDefine::ResultType)result);

msg.set_result_string(result_string);

CImPdu pdu;

pdu.SetPBMsg(&msg);

pdu.SetServiceId(SID_LOGIN);

pdu.SetCommandId(CID_LOGIN_RES_USERLOGIN);

pdu.SetSeqNum(pPdu->GetSeqNum());

SendPdu(&pdu);

Close();

return;

}

IM::Login::IMLoginReq msg;

CHECK_PB_PARSE_MSG(msg.ParseFromArray(pPdu->GetBodyData(), pPdu->GetBodyLength()));

//假如是汉字,则转成拼音

m_login_name = msg.user_name();

string password = msg.password();

uint32_t online_status = msg.online_status();

if(online_status IM::BaseDefine::USER_STATUS_LEAVE) {

log("HandleLoginReq, online status wrong: %u ", online_status);

online_status = IM::BaseDefine::USER_STATUS_ONLINE;

}

m_client_version = msg.client_version();

m_client_type = msg.client_type();

m_online_status = online_status;

log("HandleLoginReq, user_name=%s, status=%u, client_type=%u, client=%s, ",

m_login_name.c_str(), online_status, m_client_type, m_client_version.c_str());

CImUser* pImUser = CImUserManager::GetInstance()->GetImUserByLoginName(GetLoginName());

if(!pImUser) {

pImUser =newCImUser(GetLoginName());

CImUserManager::GetInstance()->AddImUserByLoginName(GetLoginName(), pImUser);

}

pImUser->AddUnValidateMsgConn(this);

CDbAttachData attach_data(ATTACH_TYPE_HANDLE, m_handle, 0);

// continue to validate if the user is OK

IM::Server::IMValidateReq msg2;

msg2.set_user_name(msg.user_name());

msg2.set_password(password);

msg2.set_attach_data(attach_data.GetBuffer(), attach_data.GetLength());

CImPdu pdu;

pdu.SetPBMsg(&msg2);

pdu.SetServiceId(SID_OTHER);

pdu.SetCommandId(CID_OTHER_VALIDATE_REQ);

pdu.SetSeqNum(pPdu->GetSeqNum());

pDbConn->SendPdu(&pdu);

}

  • 发表于:
  • 原文链接http://kuaibao.qq.com/s/20180312G060JV00?refer=cp_1026
  • 腾讯「云+社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 yunjia_community@tencent.com 删除。

扫码关注云+社区

领取腾讯云代金券