专栏首页蓝天Thrift辅助类,用于简化Thrift编程

Thrift辅助类,用于简化Thrift编程

CThriftServerHelper用于服务端,CThriftClientHelper用于客户端。 源代码链接:https://github.com/eyjian/mooon/blob/master/mooon/include/mooon/net/thrift_helper.h IDL定义: service PackageManagerService { } 服务端使用示例: CThriftServerHelper _thrift_server_helper; return _thrift_server_helper.serve(FLAGS_package_port, rpc_threads); 客户端使用示例: CThriftClientHelper thrift_client_helper(FLAGS_package_ip, FLAGS_package_port); thrift_client_helper.connect(); // 注意需要处理异常TTransportException/TApplicationException/TException

 #ifndef MOOON_NET_THRIFT_HELPER_H
  
 			#define MOOON_NET_THRIFT_HELPER_H
 
 			#include <mooon/net/config.h>
 
 			#include <mooon/sys/log.h>
 
 			#include <mooon/utils/string_utils.h>
 
 			#include <mooon/utils/scoped_ptr.h>
 
 			#include <arpa/inet.h>
 
 			#include <boost/scoped_ptr.hpp>
 
 			#include <thrift/concurrency/PosixThreadFactory.h>
 
 			#include <thrift/concurrency/ThreadManager.h>
 
 			#include <thrift/protocol/TBinaryProtocol.h>
 
 			#include <thrift/server/TNonblockingServer.h>
 
 			#include <thrift/transport/TSocketPool.h>
 
 			#include <thrift/transport/TTransportException.h>
 
 			#include <vector>
 
 			NET_NAMESPACE_BEGIN
 
 
 
 // 用来判断thrift是否已经连接,包括两种情况:
 
 // 1.从未连接过,也就是还未打开过连接
 
 // 2.连接被对端关闭了
 
 			inline bool thrift_not_connected(
 
 			        apache::thrift::transport::TTransportException::TTransportExceptionType type)
 
 {
 
 			    return (apache::thrift::transport::TTransportException::NOT_OPEN == type)
 
 || (apache::thrift::transport::TTransportException::END_OF_FILE == type);
 
 }
 
 
 
 			inline bool thrift_not_connected(
 
 			        apache::thrift::transport::TTransportException& ex)
 
 {
 
 			    apache::thrift::transport::TTransportException::TTransportExceptionType type = ex.getType();
 
 			    return thrift_not_connected(type);
 
 }
 
 
 
 // thrift客户端辅助类
 
 //
 
 // 使用示例:
 
 // mooon::net::CThriftClientHelper<ExampleServiceClient> client(rpc_server_ip, rpc_server_port);
 
 // try
 
 // {
 
 // client.connect();
 
 // client->foo();
 
 // }
 
 // catch (apache::thrift::transport::TTransportException& ex)
 
 // {
 
 // MYLOG_ERROR("thrift exception: (%d)%s\n", ex.getType(), ex.what());
 
 // }
 
 // catch (apache::thrift::transport::TApplicationException& ex)
 
 // {
 
 // MYLOG_ERROR("thrift exception: %s\n", ex.what());
 
 // }
 
 // catch (apache::thrift::TException& ex)
 
 // {
 
 // MYLOG_ERROR("thrift exception: %s\n", ex.what());
 
 // }
 
 // Transport除默认的TFramedTransport (TBufferTransports.h),还可选择:
 
 // TBufferedTransport (TBufferTransports.h)
 
 // THttpTransport
 
 // TZlibTransport
 
 // TFDTransport (TSimpleFileTransport)
 
 //
 
 // Protocol除默认的apache::thrift::protocol::TBinaryProtocol,还可选择:
 
 // TCompactProtocol
 
 // TJSONProtocol
 
 // TDebugProtocol
 
 			template <class ThriftClient,
 
 class Protocol=apache::thrift::protocol::TBinaryProtocol,
 
 class Transport=apache::thrift::transport::TFramedTransport>
 
 class CThriftClientHelper
 
 {
 
 public:
 
 // host thrift服务端的IP地址
 
 // port thrift服务端的端口号
 
 // connect_timeout_milliseconds 连接thrift服务端的超时毫秒数
 
 // receive_timeout_milliseconds 接收thrift服务端发过来的数据的超时毫秒数
 
 // send_timeout_milliseconds 向thrift服务端发送数据时的超时毫秒数
 
 			    CThriftClientHelper(const std::string &host, uint16_t port,
 
 int connect_timeout_milliseconds=2000,
 
 int receive_timeout_milliseconds=2000,
 
 int send_timeout_milliseconds=2000);
 
 
 
 // 支持指定多个servers,运行时随机选择一个,当一个异常时自动选择其它
 
 // num_retries 重试次数
 
 // retry_interval 重试间隔,单位为秒
 
 // max_consecutive_failures 单个Server最大连续失败次数
 
 // randomize_ 是否随机选择一个Server
 
 // always_try_last 是否总是重试最后一个Server
 
 			    CThriftClientHelper(const std::vector<std::pair<std::string, int> >& servers,
 
 int connect_timeout_milliseconds=2000,
 
 int receive_timeout_milliseconds=2000,
 
 int send_timeout_milliseconds=2000,
 
 int num_retries=1, int retry_interval=60,
 
 int max_consecutive_failures=1,
 
 			                        bool randomize=true, bool always_try_last=true
 
 );
 
 ~CThriftClientHelper();
 
 
 
 // 连接thrift服务端
 
 //
 
 // 出错时,可抛出以下几个thrift异常:
 
 // apache::thrift::transport::TTransportException
 
 // apache::thrift::TApplicationException
 
 // apache::thrift::TException
 
 			    void connect();
 
 			    bool is_connected() const;
 
 
 
 // 断开与thrift服务端的连接
 
 //
 
 // 出错时,可抛出以下几个thrift异常:
 
 // apache::thrift::transport::TTransportException
 
 // apache::thrift::TApplicationException
 
 // apache::thrift::TException
 
 			    void close();
 
 
 
 			    apache::thrift::transport::TSocket* get_socket() { return _socket.get(); }
 
 const apache::thrift::transport::TSocket get_socket() const { return _socket.get(); }
 
 			    ThriftClient* get() { return _client.get(); }
 
 			    ThriftClient* get() const { return _client.get(); }
 
 			    ThriftClient* operator ->() { return get(); }
 
 			    ThriftClient* operator ->() const { return get(); }
 
 
 
 // 取thrift服务端的IP地址
 
 const std::string& get_host() const;
 
 // 取thrift服务端的端口号
 
 			    uint16_t get_port() const;
 
 
 
 // 返回可读的标识,常用于记录日志
 
 			    std::string str() const
 
 {
 
 			        return utils::CStringUtils::format_string("thrift://%s:%u", get_host().c_str(), get_port());
 
 }
 
 
 
 private:
 
 			    void init();
 
 
 
 private:
 
 int _connect_timeout_milliseconds;
 
 int _receive_timeout_milliseconds;
 
 int _send_timeout_milliseconds;
 
 
 
 private:
 
 // TSocket只支持一个server,而TSocketPool是TSocket的子类支持指定多个server,运行时随机选择一个
 
 			    boost::shared_ptr<apache::thrift::transport::TSocket> _socket;
 
 			    boost::shared_ptr<apache::thrift::transport::TTransport> _transport;
 
 			    boost::shared_ptr<apache::thrift::protocol::TProtocol> _protocol;
 
 			    boost::shared_ptr<ThriftClient> _client;
 
 };
 
 
 
 ////////////////////////////////////////////////////////////////////////////////
 
 // thrift服务端辅助类
 
 //
 
 // 使用示例:
 
 // mooon::net::CThriftServerHelper<CExampleHandler, ExampleServiceProcessor> _thrift_server;
 
 // try
 
 // {
 
 // _thrift_server.serve(listen_port);
 
 // }
 
 // catch (apache::thrift::TException& ex)
 
 // {
 
 // MYLOG_ERROR("thrift exception: %s\n", ex.what());
 
 // }
 
 // ProtocolFactory除了默认的TBinaryProtocolFactory,还可选择:
 
 // TCompactProtocolFactory
 
 // TJSONProtocolFactory
 
 // TDebugProtocolFactory
 
 //
 
 // 只支持TNonblockingServer一种Server
 
 			template <class ThriftHandler,
 
 class ServiceProcessor,
 
 class ProtocolFactory=apache::thrift::protocol::TBinaryProtocolFactory>
 
 class CThriftServerHelper
 
 {
 
 public:
 
 // 启动rpc服务,请注意该调用是同步阻塞的,所以需放最后调用
 
 // port thrift服务端的监听端口号
 
 // num_threads thrift服务端开启的线程数
 
 //
 
 // 出错时,可抛出以下几个thrift异常:
 
 // apache::thrift::transport::TTransportException
 
 // apache::thrift::TApplicationException
 
 // apache::thrift::TException
 
 // 参数num_io_threads,只有当Server为TNonblockingServer才有效
 
 			    void serve(uint16_t port, uint8_t num_worker_threads=1, uint8_t num_io_threads=1);
 
 			    void serve(const std::string &ip, uint16_t port, uint8_t num_worker_threads, uint8_t num_io_threads=1);
 
 			    void serve(const std::string &ip, uint16_t port, uint8_t num_worker_threads, uint8_t num_io_threads, void* attached);
 
 
 
 // 要求ThriftHandler类有方法attach(void*)
 
 			    void serve(uint16_t port, void* attached, uint8_t num_worker_threads=1, uint8_t num_io_threads=1);
 
 			    void stop();
 
 
 
 			    ThriftHandler* get()
 
 {
 
 			        return _handler.get();
 
 }
 
 			    ThriftHandler* get() const
 
 {
 
 			        return _handler.get();
 
 }
 
 
 
 private:
 
 			    void init(const std::string &ip, uint16_t port, uint8_t num_worker_threads, uint8_t num_io_threads);
 
 
 
 private:
 
 			    boost::shared_ptr<ThriftHandler> _handler;
 
 			    boost::shared_ptr<apache::thrift::TProcessor> _processor;
 
 			    boost::shared_ptr<apache::thrift::protocol::TProtocolFactory> _protocol_factory;
 
 			    boost::shared_ptr<apache::thrift::server::ThreadManager> _thread_manager;
 
 			    boost::shared_ptr<apache::thrift::concurrency::PosixThreadFactory> _thread_factory;
 
 			    boost::shared_ptr<apache::thrift::server::TServer> _server;
 
 };
 
 
 
 ////////////////////////////////////////////////////////////////////////////////
 
 // 被thrift回调的写日志函数,由set_thrift_debug_log_function()调用它
 
 			inline void write_thrift_debug_log(const char* log)
 
 {
 
 			    MYLOG_DEBUG("%s", log);
 
 }
 
 
 
 			inline void write_thrift_info_log(const char* log)
 
 {
 
 			    MYLOG_INFO("%s", log);
 
 }
 
 
 
 			inline void write_thrift_error_log(const char* log)
 
 {
 
 			    MYLOG_ERROR("%s", log);
 
 }
 
 
 
 // 将thrift输出写入到日志文件中
 
 			inline void set_thrift_debug_log_function()
 
 {
 
 if (::mooon::sys::g_logger != NULL)
 
 {
 
 			        apache::thrift::GlobalOutput.setOutputFunction(write_thrift_debug_log);
 
 }
 
 }
 
 
 
 			inline void set_thrift_info_log_function()
 
 {
 
 if (::mooon::sys::g_logger != NULL)
 
 {
 
 			        apache::thrift::GlobalOutput.setOutputFunction(write_thrift_info_log);
 
 }
 
 }
 
 
 
 			inline void set_thrift_error_log_function()
 
 {
 
 if (::mooon::sys::g_logger != NULL)
 
 {
 
 			        apache::thrift::GlobalOutput.setOutputFunction(write_thrift_error_log);
 
 }
 
 }
 
 
 
 ////////////////////////////////////////////////////////////////////////////////
 
 			template <class ThriftClient, class Protocol, class Transport>
 
 			CThriftClientHelper<ThriftClient, Protocol, Transport>::CThriftClientHelper(
 
 const std::string &host, uint16_t port,
 
 int connect_timeout_milliseconds, int receive_timeout_milliseconds, int send_timeout_milliseconds)
 
 : _connect_timeout_milliseconds(connect_timeout_milliseconds),
 
 			          _receive_timeout_milliseconds(receive_timeout_milliseconds),
 
 			          _send_timeout_milliseconds(send_timeout_milliseconds)
 
 {
 
 			    set_thrift_debug_log_function();
 
 			    _socket.reset(new apache::thrift::transport::TSocket(host, port));
 
 			    init();
 
 }
 
 
 
 			template <class ThriftClient, class Protocol, class Transport>
 
 			CThriftClientHelper<ThriftClient, Protocol, Transport>::CThriftClientHelper(
 
 const std::vector<std::pair<std::string, int> >& servers,
 
 int connect_timeout_milliseconds,
 
 int receive_timeout_milliseconds,
 
 int send_timeout_milliseconds,
 
 int num_retries, int retry_interval,
 
 int max_consecutive_failures,
 
 			        bool randomize, bool always_try_last)
 
 : _connect_timeout_milliseconds(connect_timeout_milliseconds),
 
 			          _receive_timeout_milliseconds(receive_timeout_milliseconds),
 
 			          _send_timeout_milliseconds(send_timeout_milliseconds)
 
 {
 
 			    set_thrift_debug_log_function();
 
 
 
 			    apache::thrift::transport::TSocketPool* socket_pool = new apache::thrift::transport::TSocketPool(servers);
 
 			    socket_pool->setNumRetries(num_retries);
 
 			    socket_pool->setRetryInterval(retry_interval);
 
 			    socket_pool->setMaxConsecutiveFailures(max_consecutive_failures);
 
 			    socket_pool->setRandomize(randomize);
 
 			    socket_pool->setAlwaysTryLast(always_try_last);
 
 			    _socket.reset(socket_pool);
 
 			    init();
 
 }
 
 
 
 			template <class ThriftClient, class Protocol, class Transport>
 
 			void CThriftClientHelper<ThriftClient, Protocol, Transport>::init()
 
 {
 
 			    _socket->setConnTimeout(_connect_timeout_milliseconds);
 
 			    _socket->setRecvTimeout(_receive_timeout_milliseconds);
 
 			    _socket->setSendTimeout(_send_timeout_milliseconds);
 
 
 
 // Transport默认为apache::thrift::transport::TFramedTransport
 
 			    _transport.reset(new Transport(_socket));
 
 // Protocol默认为apache::thrift::protocol::TBinaryProtocol
 
 			    _protocol.reset(new Protocol(_transport));
 
 // 服务端的Client
 
 			    _client.reset(new ThriftClient(_protocol));
 
 }
 
 
 
 			template <class ThriftClient, class Protocol, class Transport>
 
 			CThriftClientHelper<ThriftClient, Protocol, Transport>::~CThriftClientHelper()
 
 {
 
 			    close();
 
 }
 
 
 
 			template <class ThriftClient, class Protocol, class Transport>
 
 			void CThriftClientHelper<ThriftClient, Protocol, Transport>::connect()
 
 {
 
 if (!_transport->isOpen())
 
 {
 
 // 如果Transport为TFramedTransport,则实际调用:TFramedTransport::open -> TSocketPool::open
 
 			        _transport->open();
 
 // 当"TSocketPool::open: all connections failed"时,
 
 // TSocketPool::open就抛出异常TTransportException,异常类型为TTransportException::NOT_OPEN
 
 }
 
 }
 
 
 
 			template <class ThriftClient, class Protocol, class Transport>
 
 			bool CThriftClientHelper<ThriftClient, Protocol, Transport>::is_connected() const
 
 {
 
 			    return _transport->isOpen();
 
 }
 
 
 
 			template <class ThriftClient, class Protocol, class Transport>
 
 			void CThriftClientHelper<ThriftClient, Protocol, Transport>::close()
 
 {
 
 if (_transport->isOpen())
 
 {
 
 			        _transport->close();
 
 }
 
 }
 
 
 
 			template <class ThriftClient, class Protocol, class Transport>
 
 const std::string& CThriftClientHelper<ThriftClient, Protocol, Transport>::get_host() const
 
 {
 
 			    return _socket->getHost();
 
 }
 
 
 
 			template <class ThriftClient, class Protocol, class Transport>
 
 			uint16_t CThriftClientHelper<ThriftClient, Protocol, Transport>::get_port() const
 
 {
 
 			    return static_cast<uint16_t>(_socket->getPort());
 
 }
 
 
 
 ////////////////////////////////////////////////////////////////////////////////
 
 			template <class ThriftHandler, class ServiceProcessor, class ProtocolFactory>
 
 			void CThriftServerHelper<ThriftHandler, ServiceProcessor, ProtocolFactory>::serve(uint16_t port, uint8_t num_worker_threads, uint8_t num_io_threads)
 
 {
 
 			    serve("0.0.0.0", port, num_worker_threads, num_io_threads);
 
 }
 
 
 
 			template <class ThriftHandler, class ServiceProcessor, class ProtocolFactory>
 
 			void CThriftServerHelper<ThriftHandler, ServiceProcessor, ProtocolFactory>::serve(const std::string &ip, uint16_t port, uint8_t num_worker_threads, uint8_t num_io_threads)
 
 {
 
 			    init("0.0.0.0", port, num_worker_threads, num_io_threads);
 
 
 
 // 这里也可直接调用serve(),但推荐run()
 
 // !!!注意调用run()的进程或线程会被阻塞
 
 			    _server->run();
 
     _thread_manager->join(); 
 }
 
 
 
 			template <class ThriftHandler, class ServiceProcessor, class ProtocolFactory>
 
 			void CThriftServerHelper<ThriftHandler, ServiceProcessor, ProtocolFactory>::serve(const std::string &ip, uint16_t port, uint8_t num_worker_threads, uint8_t num_io_threads, void* attached)
 
 {
 
 			    init(ip, port, num_worker_threads, num_io_threads);
 
 
 
 // 关联
 
 if (attached != NULL)
 
 			        _handler->attach(attached);
 
 
 
 // 这里也可直接调用serve(),但推荐run()
 
 // !!!注意调用run()的进程或线程会被阻塞
 
 			    _server->run();
 
     _thread_manager->join(); 
 }
 
 
 
 			template <class ThriftHandler, class ServiceProcessor, class ProtocolFactory>
 
 			void CThriftServerHelper<ThriftHandler, ServiceProcessor, ProtocolFactory>::serve(uint16_t port, void* attached, uint8_t num_worker_threads, uint8_t num_io_threads)
 
 {
 
 			    init("0.0.0.0", port, num_worker_threads, num_io_threads);
 
 
 
 // 关联
 
 if (attached != NULL)
 
 			        _handler->attach(attached);
 
 
 
 // 这里也可直接调用serve(),但推荐run()
 
 // !!!注意调用run()的进程或线程会被阻塞
 
 			    _server->run();
 
     _thread_manager->join(); 
 }
 
 
 
 			template <class ThriftHandler, class ServiceProcessor, class ProtocolFactory>
 
 			void CThriftServerHelper<ThriftHandler, ServiceProcessor, ProtocolFactory>::stop()
 
 {
 
 			    _server->stop();
 
     _thread_manager->stop(); 
 }
 
 
 
 			template <class ThriftHandler, class ServiceProcessor, class ProtocolFactory>
 
 			void CThriftServerHelper<ThriftHandler, ServiceProcessor, ProtocolFactory>::init(const std::string &ip, uint16_t port, uint8_t num_worker_threads, uint8_t num_io_threads)
 
 {
 
 			    set_thrift_debug_log_function();
 
 
 
 			    _handler.reset(new ThriftHandler);
 
 			    _processor.reset(new ServiceProcessor(_handler));
 
 
 
 // ProtocolFactory默认为apache::thrift::protocol::TBinaryProtocolFactory
 
 			    _protocol_factory.reset(new ProtocolFactory());
 
 			    _thread_manager = apache::thrift::server::ThreadManager::newSimpleThreadManager(num_worker_threads);
 
 			    _thread_factory.reset(new apache::thrift::concurrency::PosixThreadFactory());
 
 
 
 			    _thread_manager->threadFactory(_thread_factory);
 
 			    _thread_manager->start();
 
 
 
 			    apache::thrift::server::TNonblockingServer* server = new apache::thrift::server::TNonblockingServer(_processor, _protocol_factory, port, _thread_manager);
 
 			    server->setNumIOThreads(num_io_threads);
 
 			    _server.reset(server);
 
 
 
 // 不要调用_server->run(),交给serve()来调用,
 
 // 因为一旦调用了run()后,调用线程或进程就被阻塞了。
 
 }
 
 
 
 			NET_NAMESPACE_END
 
 			#endif // MOOON_NET_THRIFT_HELPER_H 		

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Thrift结构分析及增加取客户端IP功能实现

    分析Thrift的结构动机是为了实现服务端能取到客户端的IP,因此需要对它的结构、调用流程有些了解。另外,请注意本文针对的是TNonblockingServ...

    一见
  • thrift使用小记

        Thrift是一个跨语言的服务部署框架,最初由Facebook于2007年开发,2008年进入Apache开源项目。Thrift通过一个中间语言(IDL...

    一见
  • boost::bind和boost::function使用示例

    C++11已支持bind和function,之前的不支持,但可以借助boost达到同样目的。看如下两段代码:

    一见
  • win64 IDEA meaven 配置安装Thrift自动生成代码到目录

    版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/haluoluo211/article/details...

    bear_fish
  • rpc框架之 thrift 学习 1 - 安装 及 hello world

    thrift是一个facebook开源的高效RPC框架,其主要特点是跨语言及二进制高效传输(当然,除了二进制,也支持json等常用序列化机制),官网地址:htt...

    菩提树下的杨过
  • python部署thrift服务以及客户

    Linux环境:thrift --gen java importservice.thrift

    py3study
  • node.js:npm列出安装包的所有版本

    用户1148648
  • Thrift结构分析及增加取客户端IP功能实现

    分析Thrift的结构动机是为了实现服务端能取到客户端的IP,因此需要对它的结构、调用流程有些了解。另外,请注意本文针对的是TNonblockingServ...

    一见
  • 连续研发【附近的人】---swoole love thrift 3000 ci第二篇(十)

    连续研发【附近的人】---swoole love thrift 3000 ci(十)

    老李秀
  • thrift使用小记

        Thrift是一个跨语言的服务部署框架,最初由Facebook于2007年开发,2008年进入Apache开源项目。Thrift通过一个中间语言(IDL...

    一见

扫码关注云+社区

领取腾讯云代金券