mooon-agent接收状态机代码摘要

  • recv_machine.h
#ifndef MOOON_AGENT_RECV_MACHINE_H

#define MOOON_AGENT_RECV_MACHINE_H

#include <agent/message.h>

AGENT_NAMESPACE_BEGIN



class CAgentThread;

class CRecvMachine

{

private:

 /***

 * 接收状态值

 */

    typedef enum recv_state_t

 {

        rs_header, /** 接收消息头状态 */

        rs_body    /** 接收消息体状态 */

 }TRecvState;



 /***

 * 接收状态上下文

 */

    struct RecvStateContext

 {

 const char* buffer; /** 当前的数据buffer */

        size_t buffer_size; /** 当前的数据字节数 */

 

        RecvStateContext(const char* buf=NULL, size_t buf_size=0)

 :buffer(buf)

 ,buffer_size(buf_size)

 {

 }

 

        RecvStateContext(const RecvStateContext& other)

 :buffer(other.buffer)

 ,buffer_size(other.buffer_size)

 {

 }

 

        RecvStateContext& operator =(const RecvStateContext& other)

 {

            buffer = other.buffer;

            buffer_size = other.buffer_size;

            return *this;

 }

 };

 

public:

    CRecvMachine(CAgentThread* thread);

    util::handle_result_t work(const char* buffer, size_t buffer_size);

    void reset();

 

private:

    void set_next_state(recv_state_t next_state)

 {

        _recv_state = next_state;

        _finished_size = 0;

 }

 

    util::handle_result_t handle_header(const RecvStateContext& cur_ctx, RecvStateContext* next_ctx);

    util::handle_result_t handle_body(const RecvStateContext& cur_ctx, RecvStateContext* next_ctx);

    util::handle_result_t handle_error(const RecvStateContext& cur_ctx, RecvStateContext* next_ctx);

 

private: 

    CAgentThread* _thread; /** 需要通过CAgentThread取得CProcessorManager */

    agent_message_header_t _header; /** 消息头,这个大小是固定的 */

    recv_state_t _recv_state; /** 当前的接收状态 */

    size_t _finished_size; /** 当前状态已经接收到的字节数,注意不是总的已经接收到的字节数,只针对当前状态 */

};



AGENT_NAMESPACE_END

#endif // MOOON_AGENT_RECV_MACHINE_H
recv_machine.cpp

#include "recv_machine.h"

#include "agent_thread.h"

AGENT_NAMESPACE_BEGIN



CRecvMachine::CRecvMachine(CAgentThread* thread)

 :_thread(thread)

{

    set_next_state(rs_header);

}



// 状态机入口函数

// 状态机工作原理:-> rs_header -> rs_body -> rs_header

// -> rs_header -> rs_error -> rs_header

// -> rs_header -> rs_body -> rs_error -> rs_header

// 参数说明:

// buffer - 本次收到的数据,注意不是总的

// buffer_size - 本次收到的数据字节数

util::handle_result_t CRecvMachine::work(const char* buffer, size_t buffer_size)

{ 

    RecvStateContext next_ctx(buffer, buffer_size); 

    util::handle_result_t hr = util::handle_continue;

 

 // 状态机循环条件为:util::handle_continue == hr

 while (util::handle_continue == hr)

 { 

        RecvStateContext cur_ctx(next_ctx);

 

        switch (_recv_state)

 {

 case rs_header:

            hr = handle_header(cur_ctx, &next_ctx);

            break;

 case rs_body:

            hr = handle_body(cur_ctx, &next_ctx);

            break;

        default:

            hr = handle_error(cur_ctx, &next_ctx);

            break;

 }

 }

 

    return hr;

}



void CRecvMachine::reset()

{

    set_next_state(rs_header);

}



// 处理消息头部

// 参数说明:

// cur_ctx - 当前上下文,

//           cur_ctx.buffer为当前收到的数据buffer,包含了消息头,但也可能包含了消息体。

//           cur_ctx.buffer_size为当前收到字节数

// next_ctx - 下一步上下文,

//           由于cur_ctx.buffer可能包含了消息体,所以在一次接收receive动作后,

//           会涉及到消息头和消息体两个状态,这里的next_ctx实际为下一步handle_body的cur_ctx

util::handle_result_t CRecvMachine::handle_header(const RecvStateContext& cur_ctx, RecvStateContext* next_ctx)

{

 if (_finished_size + cur_ctx.buffer_size < sizeof(agent_message_header_t))

 {

        memcpy(reinterpret_cast<char*>(&_header) + _finished_size

 ,cur_ctx.buffer

 ,cur_ctx.buffer_size);

 

        _finished_size += cur_ctx.buffer_size;

        return util::handle_continue;

 }

 else

 {

        size_t need_size = sizeof(agent_message_header_t) - _finished_size;

        memcpy(reinterpret_cast<char*>(&_header) + _finished_size

 ,cur_ctx.buffer

 ,need_size);

 

 // TODO: Check header here

 

        size_t remain_size = cur_ctx.buffer_size - need_size;

 if (remain_size > 0)

 {

            next_ctx->buffer = cur_ctx.buffer + need_size;

            next_ctx->buffer_size = cur_ctx.buffer_size - need_size;

 }

 

 // 只有当包含消息体时,才需要进行状态切换,

 // 否则维持rs_header状态不变

 if (_header.size > 0)

 {

 // 切换状态

            set_next_state(rs_body);

 }

 else

 { 

            CProcessorManager* processor_manager = _thread->get_processor_manager(); 

 if (!processor_manager->on_message(_header, 0, NULL, 0))

 {

                return util::handle_error;

 }

 }

 

        return (remain_size > 0)

 ? util::handle_continue // 控制work过程是否继续循环

 : util::handle_finish;

 }

}



// 处理消息体

// 参数说明:

// cur_ctx - 当前上下文,

//           cur_ctx.buffer为当前收到的数据buffer,包含了消息体,但也可能包含了消息头。

//           cur_ctx.buffer_size为当前收到字节数

// next_ctx - 下一步上下文,

//           由于cur_ctx.buffer可能包含了消息头,所以在一次接收receive动作后,

//           会涉及到消息头和消息体两个状态,这里的next_ctx实际为下一步handle_header的cur_ctx

util::handle_result_t CRecvMachine::handle_body(const RecvStateContext& cur_ctx, RecvStateContext* next_ctx)

{

    CProcessorManager* processor_manager = _thread->get_processor_manager();

 

 if (_finished_size + cur_ctx.buffer_size < _header.size)

 {

 if (!processor_manager->on_message(_header, _finished_size, cur_ctx.buffer, cur_ctx.buffer_size))

 {

            return util::handle_error;

 }

 

        _finished_size += cur_ctx.buffer_size;

        return util::handle_continue;

 }

 else

 {

        size_t need_size = _header.size - _finished_size;

 if (!processor_manager->on_message(_header, _finished_size, cur_ctx.buffer, need_size))

 {

            return util::handle_error;

 }

 

 // 切换状态

        set_next_state(rs_header);

 

        size_t remain_size = cur_ctx.buffer_size - need_size;

 if (remain_size > 0)

 {

            next_ctx->buffer = cur_ctx.buffer + need_size;

            next_ctx->buffer_size = cur_ctx.buffer_size - need_size;

            return util::handle_continue;

 }



        return util::handle_finish; 

 }

}



util::handle_result_t CRecvMachine::handle_error(const RecvStateContext& cur_ctx, RecvStateContext* next_ctx)

{

 //AGENT_LOG_ERROR("Network error.\n");

    set_next_state(rs_header); // 无条件切换到rs_header,这个时候应当断开连接重连接

    return util::handle_error;

}



AGENT_NAMESPACE_END

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏LhWorld哥陪你聊算法

Hive篇--相关概念和使用二

分桶表是对列值取哈希值的方式,将不同数据放到不同文件中存储。 对于hive中每一个表、分区都可以进一步进行分桶。(可以对列,也可以对表进行分桶) 由列的哈希值除...

2332
来自专栏大数据

Python 101:如何从RottenTomatoes爬取数据

今天,我们将研究如何从热门电影网站Rotten Tomatoes爬取数据。你需要在这里注册一个API key。当你拿到key时,记下你的使用限制(如每分钟限制的...

4696
来自专栏大内老A

谈谈基于Kerberos的Windows Network Authentication[下篇]

六、User2User Sub-Protocol:有效地保障Server的安全 通过3个Sub-protocol的介绍,我们可以全面地掌握整个Kerberos的...

2247
来自专栏张善友的专栏

用sp_change_users_login消除Sql Server的孤立用户

孤立帐户,就是某个数据库的帐户只有用户名而没有登录名,这样的用户在用户库的sysusers系统表中存在,而在master数据库的syslogins中却没有对应的...

1936
来自专栏数据库新发现

Oracle HowTo:如何在Oracle10g中启动和关闭OEM

作者:eygle 出处:http://www.eygle.com/blog 日期:October 28, 2005 本文链接:http://www.eygle....

882
来自专栏我和PYTHON有个约会

Django来敲门~第一部分【5.2.模型和数据库交互】

通常情况下,如果你只是做测试使用,可以使用Django内置的数据库SQLite就完全可以满足需要了,我们在本次教程中,通过使用MySQL这个数据库来完成后续的功...

641
来自专栏数据和云

时过境迁:Oracle跨平台迁移之XTTS方案与实践

作者简介 ? 谢金融 云和恩墨东区交付部 Oracle 工程师,多年来从事 Oracle 第三方服务,曾服务过金融、制造业、物流、政府等许多行业的客户,精通数据...

92910
来自专栏沃趣科技

Oracle 12c数据库优化器统计信息收集的最佳实践(二)

原文链接 http://www.oracle.com/technetwork/database/bi-datawarehousing/twp-bp-for-st...

3657
来自专栏乐沙弥的世界

Oracle 数据库实例启动关闭过程

Oracle数据库实例的启动,严格来说应该是实例的启动,数据库仅仅是在实例启动后进行装载。Oracle数据启动的过程被划分为

1234
来自专栏性能与架构

Redis 达到maxmemory时如何抉择?

当Redis的最大可用内存空间都占满时,Redis会如何处理呢? Redis给出了6个选项,让我们自行选择 volatile-lru 使用LRU算法,从设...

3085

扫码关注云+社区

领取腾讯云代金券