游戏服务器之数据存档:把逻辑服务器的角色数据存档到mysql和redis,分析的是较早前的一个游戏项目的存档处理。有些设计缺点,会提出优化方式。
设计上:
逻辑服务器在其逻辑线程里读写数据,数据缓存在redis。数据服务器接收消息,并执行写sql和备份写sql和记录存档日志(分线程来写)。写sql的执行都有备份。
有些需要优化的点:
所有的在线角色的数据在游戏启动时就读到逻辑服务器。
所有的有关数据引擎(redis和mysql)的操作在数据服务器的逻辑线程里处理。
数据服务器和逻辑服务器使用自定义存档消息来存档。可考虑分标签的角色存档消息。
逻辑服务器
1、逻辑服务器连接数据服务器
2、数据读写
(1)保存玩家基本数据到数据库
(1-1)玩家基本数据放入到redis中
(1-2)写mysql,发送sql消息(和表名)到数据服务器
(2)加载玩家基本数据
(2-1)从redis加载
(2-2)在redis上没有,就从mysql上读取然后再存到redis上
数据服务器
1、sql备份文件
执行写sql并备份sql到文件
(1)加载sql备份文件
(2)消息放到db处理线程的队列
2、处理网络消息(逻辑服务器发来的sql)
3、db线程的消息处理
(1)备份sql到文件
(2)执行sql
(3)写备份日志
逻辑服务器
1、逻辑服务器连接数据服务器
注册句柄到ace反应器 CDBMsgServer tmpDBMsgServer = CConfigManager::instance()->get_srv_config().get_dbmsg_server_conf(); string dbMsgServerIp = tmpDBMsgServer.get_ip(); int dbMsgServerPort = tmpDBMsgServer.get_port(); CConnectDBMsg* tmpConnectDBMsg = get_connect_dbmsg(); if(false == tmpConnectDBMsg->connect_server(dbMsgServerIp.c_str(),dbMsgServerPort))//连接对端地址 { return false; } ACE_Reactor::instance()->register_handler(tmpConnectDBMsg,ACE_Event_Handler::READ_MASK);
2、数据读写
这里的数据的读写最好是需要在一个数据读写线程里的处理,可以实现异步。
(1)保存玩家基本数据到数据库 void CMsgProcessCenter::player_base_insert_db(CGamePlayer* player) { ... (1-1)玩家基本数据放入到redis中 CRWRedisClientOperator::instance()->player_base_insert_db_redis(player); ... char tmpSql[1024]; memset(tmpSql,0,1024); char table[128]; memset(table,0,128); sprintf(table,"player_%d",id); string tableName = table; string sql = tmpSql; memset(tmpSql,0,1024); sprintf(tmpSql,"update player_%d set level = %d,exp = %d,honor = %d,health = %d,attack = %d,stamina = %d,energy = %d," "crit = %d,climb= %d,anger = %d,physique = %d,power = %d,agile = %d,defenceP = %d,sex = %d,isFinishGuide = %d," "total = %d,win = %d,escape = %d,fighting = %d ,roletime = %d,defenceD = %d,getpos = %d,areaid = %d,loginip = \"%s\" where playerid = %d", id,level,exp,honor,health,attack,stamina,energy,crit,climb,anger,physique,power,agile,defenceP,sex, isFinishGuide,total,win,escape,equipScore,roletime,defenceD,getpos,gateId,loginip.c_str(),playerId); tableName = table; sql = tmpSql;
(1-2)写mysql,发送sql消息(和表名)到数据服务器 send_to_DB_process(tableName,sql);
... }
把sql的消息压倒db线程的消息队列,再发送到db进程
void CMsgProcessCenter::send_to_DB_process(string& tableName,string& sql) { CMyMessagePacket packet(1024); packet.set_msg_id(C_SEND_TO_DB_SERVER); CMyStreamWriter writer(packet.get_body()); writer.write_string(tableName); writer.write_string(sql); send_to_DB_process(packet);
}
(2)加载玩家基本数据
//只要redis上有就从redis上读取,否则才从mysql上读取然后再存到redis上 bool CMsgProcessCenter::load_player_base_data(CGamePlayer* player) {
...
(2-1)从redis加载 bool result = CRWRedisClientOperator::instance()->load_player_base_data_redis(player); ......
(2-2)在redis上没有,就从mysql上读取然后再存到redis上 try { //保存到数据库 CRWDBConnection tmpCon; mysqlpp::Connection* conPtr = tmpCon.get_connection(); if(NULL == conPtr) { return false; } char tmpSql[1024]; memset(tmpSql,0,1024); sprintf(tmpSql,"select sex,playername,level,exp,honor,health,attack,stamina,energy,crit,climb,anger," "physique,power,agile,defenceP,unionId,unionpos,isFinishGuide,unionName,total,win," "escape,fighting,roletime,defenceD,lastLoginTime,lastLogoutTime,loginMark,logoutMark," "areaid,loginip from player_%d where playerid = %d limit 1",id,playerId); mysqlpp::Query query = conPtr->query(tmpSql); if(mysqlpp::StoreQueryResult ret = query.store()) { for(size_t i = 0; i < ret.num_rows(); i++) { player->get_player_base()->m_sex = ret[i][0];//获取玩家信息 ...... query.reset(); CRWRedisClientOperator::instance()->player_base_insert_db_redis(player); return true; } } query.reset(); } catch(mysqlpp::Exception& error) { cout << "mysql error : " << error.what() << endl; } return false; }
数据服务器
1、sql备份文件
执行写sql并备份sql到文件
(1)加载sql备份文件 void RWFile::tmpdb_file_date(string tableId,string fileName,string sqlId) { if(fileName.length() == 0) { return; } ifstream input(fileName.c_str(),ios::in); if(!input) { return; } string line; char *p;
const char split[]="|";//把文件中的|中的语句分隔一个个消息根据名字投放到db处理线程的队列(表是切片处理,分成6个线程写入)
(2)消息放到db处理线程的队列
... CDBProcessManage::instance()->get_msg_process_center_task()->send_to_db_process(msg,threadId); ... string mvCmd = "mv " + fileName + " " +tableId+DBDONEPATH;//执行完后需要移除dbfile文件 system(mvCmd.c_str()); }
2、处理网络消息(逻辑服务器发来的sql)
void CMsgCenterProcess::handle_msg(CMyMessagePacket& msg) { CMyStreamReader reader(msg.get_body()); //写日志,分发消息 string tableName = reader.read_string(); string execSql = reader.read_string(); if(tableName.length() == 0 || execSql.length() == 0) { return; } //分发消息 int tableCount = CConfigManager::instance()->get_srv_config().get_db_conf().get_con_number(); int threadId = CConfigManager::instance()->get_table_config().get_table(tableName); if(threadId < 0 || threadId > tableCount) { return; } //备份(数据中心) int curTime = time(NULL); write_file(execSql,curTime);//把写sql写到备份文件 //按线程号,发送到db写线程处理 send_to_db_process(msg,threadId); }
3、db线程的消息处理 db写入线程处理各自消息队列中的消息派送 void CDBProcess::dispatch(CMyMessagePacket& msg) { ...... process_sql(msg); } void CDBProcess::process_sql(CMyMessagePacket& msg) { 读取消息,执行sql,并写入备份文件 CMyStreamReader reader(msg.get_body()); string tableName = reader.read_string(); string execSql = reader.read_string(); if(tableName.length() == 0 || execSql.length() == 0) { return; }
(1)备份sql到文件
write_buckup_file(tableName,execSql);
......
(2)执行sql
if(true == execl_sql(execSql))
{
(3)写备份日志
write_exec_log(times,index); } else { write_error_log(times,tableName,execSql); } } bool CDBProcess::execl_sql(string execSql) { if(execSql.length() == 0) { return false; } 执行sql try { CRWDBConnection tmpCon; mysqlpp::Connection* conPtr = tmpCon.get_connection(); if(NULL == conPtr) { return false; } mysqlpp::Query query = conPtr->query(); mysqlpp::SimpleResult ret = query.execute(execSql.c_str(),execSql.length()); if(false == ret) { query.reset(); return false; } query.reset(); } catch(mysqlpp::Exception& error) { cout << "mysql error : " << error.what() << endl; return false; } return true; }