前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Thrift辅助类,用于简化Thrift编程

Thrift辅助类,用于简化Thrift编程

作者头像
一见
发布2018-08-10 16:54:24
1.8K0
发布2018-08-10 16:54:24
举报
文章被收录于专栏:蓝天蓝天

CThriftServerHelper用于服务端,CThriftClientHelper用于客户端。 源代码链接:https://github.com/eyjian/mooon/blob/master/mooon/include/mooon/net/thrift_helper.h IDL定义: service PackageManagerService { } 服务端使用示例: CThriftServerHelper _thrift_server_helper; return _thrift_server_helper.serve(FLAGS_package_port, rpc_threads); 客户端使用示例: CThriftClientHelper thrift_client_helper(FLAGS_package_ip, FLAGS_package_port); thrift_client_helper.connect(); // 注意需要处理异常TTransportException/TApplicationException/TException

代码语言:javascript
复制
 #ifndef MOOON_NET_THRIFT_HELPER_H
  
 			#define MOOON_NET_THRIFT_HELPER_H
 
 			#include <mooon/net/config.h>
 
 			#include <mooon/sys/log.h>
 
 			#include <mooon/utils/string_utils.h>
 
 			#include <mooon/utils/scoped_ptr.h>
 
 			#include <arpa/inet.h>
 
 			#include <boost/scoped_ptr.hpp>
 
 			#include <thrift/concurrency/PosixThreadFactory.h>
 
 			#include <thrift/concurrency/ThreadManager.h>
 
 			#include <thrift/protocol/TBinaryProtocol.h>
 
 			#include <thrift/server/TNonblockingServer.h>
 
 			#include <thrift/transport/TSocketPool.h>
 
 			#include <thrift/transport/TTransportException.h>
 
 			#include <vector>
 
 			NET_NAMESPACE_BEGIN
 
 
 
 // 用来判断thrift是否已经连接,包括两种情况:
 
 // 1.从未连接过,也就是还未打开过连接
 
 // 2.连接被对端关闭了
 
 			inline bool thrift_not_connected(
 
 			        apache::thrift::transport::TTransportException::TTransportExceptionType type)
 
 {
 
 			    return (apache::thrift::transport::TTransportException::NOT_OPEN == type)
 
 || (apache::thrift::transport::TTransportException::END_OF_FILE == type);
 
 }
 
 
 
 			inline bool thrift_not_connected(
 
 			        apache::thrift::transport::TTransportException& ex)
 
 {
 
 			    apache::thrift::transport::TTransportException::TTransportExceptionType type = ex.getType();
 
 			    return thrift_not_connected(type);
 
 }
 
 
 
 // thrift客户端辅助类
 
 //
 
 // 使用示例:
 
 // mooon::net::CThriftClientHelper<ExampleServiceClient> client(rpc_server_ip, rpc_server_port);
 
 // try
 
 // {
 
 // client.connect();
 
 // client->foo();
 
 // }
 
 // catch (apache::thrift::transport::TTransportException& ex)
 
 // {
 
 // MYLOG_ERROR("thrift exception: (%d)%s\n", ex.getType(), ex.what());
 
 // }
 
 // catch (apache::thrift::transport::TApplicationException& ex)
 
 // {
 
 // MYLOG_ERROR("thrift exception: %s\n", ex.what());
 
 // }
 
 // catch (apache::thrift::TException& ex)
 
 // {
 
 // MYLOG_ERROR("thrift exception: %s\n", ex.what());
 
 // }
 
 // Transport除默认的TFramedTransport (TBufferTransports.h),还可选择:
 
 // TBufferedTransport (TBufferTransports.h)
 
 // THttpTransport
 
 // TZlibTransport
 
 // TFDTransport (TSimpleFileTransport)
 
 //
 
 // Protocol除默认的apache::thrift::protocol::TBinaryProtocol,还可选择:
 
 // TCompactProtocol
 
 // TJSONProtocol
 
 // TDebugProtocol
 
 			template <class ThriftClient,
 
 class Protocol=apache::thrift::protocol::TBinaryProtocol,
 
 class Transport=apache::thrift::transport::TFramedTransport>
 
 class CThriftClientHelper
 
 {
 
 public:
 
 // host thrift服务端的IP地址
 
 // port thrift服务端的端口号
 
 // connect_timeout_milliseconds 连接thrift服务端的超时毫秒数
 
 // receive_timeout_milliseconds 接收thrift服务端发过来的数据的超时毫秒数
 
 // send_timeout_milliseconds 向thrift服务端发送数据时的超时毫秒数
 
 			    CThriftClientHelper(const std::string &host, uint16_t port,
 
 int connect_timeout_milliseconds=2000,
 
 int receive_timeout_milliseconds=2000,
 
 int send_timeout_milliseconds=2000);
 
 
 
 // 支持指定多个servers,运行时随机选择一个,当一个异常时自动选择其它
 
 // num_retries 重试次数
 
 // retry_interval 重试间隔,单位为秒
 
 // max_consecutive_failures 单个Server最大连续失败次数
 
 // randomize_ 是否随机选择一个Server
 
 // always_try_last 是否总是重试最后一个Server
 
 			    CThriftClientHelper(const std::vector<std::pair<std::string, int> >& servers,
 
 int connect_timeout_milliseconds=2000,
 
 int receive_timeout_milliseconds=2000,
 
 int send_timeout_milliseconds=2000,
 
 int num_retries=1, int retry_interval=60,
 
 int max_consecutive_failures=1,
 
 			                        bool randomize=true, bool always_try_last=true
 
 );
 
 ~CThriftClientHelper();
 
 
 
 // 连接thrift服务端
 
 //
 
 // 出错时,可抛出以下几个thrift异常:
 
 // apache::thrift::transport::TTransportException
 
 // apache::thrift::TApplicationException
 
 // apache::thrift::TException
 
 			    void connect();
 
 			    bool is_connected() const;
 
 
 
 // 断开与thrift服务端的连接
 
 //
 
 // 出错时,可抛出以下几个thrift异常:
 
 // apache::thrift::transport::TTransportException
 
 // apache::thrift::TApplicationException
 
 // apache::thrift::TException
 
 			    void close();
 
 
 
 			    apache::thrift::transport::TSocket* get_socket() { return _socket.get(); }
 
 const apache::thrift::transport::TSocket get_socket() const { return _socket.get(); }
 
 			    ThriftClient* get() { return _client.get(); }
 
 			    ThriftClient* get() const { return _client.get(); }
 
 			    ThriftClient* operator ->() { return get(); }
 
 			    ThriftClient* operator ->() const { return get(); }
 
 
 
 // 取thrift服务端的IP地址
 
 const std::string& get_host() const;
 
 // 取thrift服务端的端口号
 
 			    uint16_t get_port() const;
 
 
 
 // 返回可读的标识,常用于记录日志
 
 			    std::string str() const
 
 {
 
 			        return utils::CStringUtils::format_string("thrift://%s:%u", get_host().c_str(), get_port());
 
 }
 
 
 
 private:
 
 			    void init();
 
 
 
 private:
 
 int _connect_timeout_milliseconds;
 
 int _receive_timeout_milliseconds;
 
 int _send_timeout_milliseconds;
 
 
 
 private:
 
 // TSocket只支持一个server,而TSocketPool是TSocket的子类支持指定多个server,运行时随机选择一个
 
 			    boost::shared_ptr<apache::thrift::transport::TSocket> _socket;
 
 			    boost::shared_ptr<apache::thrift::transport::TTransport> _transport;
 
 			    boost::shared_ptr<apache::thrift::protocol::TProtocol> _protocol;
 
 			    boost::shared_ptr<ThriftClient> _client;
 
 };
 
 
 
 ////////////////////////////////////////////////////////////////////////////////
 
 // thrift服务端辅助类
 
 //
 
 // 使用示例:
 
 // mooon::net::CThriftServerHelper<CExampleHandler, ExampleServiceProcessor> _thrift_server;
 
 // try
 
 // {
 
 // _thrift_server.serve(listen_port);
 
 // }
 
 // catch (apache::thrift::TException& ex)
 
 // {
 
 // MYLOG_ERROR("thrift exception: %s\n", ex.what());
 
 // }
 
 // ProtocolFactory除了默认的TBinaryProtocolFactory,还可选择:
 
 // TCompactProtocolFactory
 
 // TJSONProtocolFactory
 
 // TDebugProtocolFactory
 
 //
 
 // 只支持TNonblockingServer一种Server
 
 			template <class ThriftHandler,
 
 class ServiceProcessor,
 
 class ProtocolFactory=apache::thrift::protocol::TBinaryProtocolFactory>
 
 class CThriftServerHelper
 
 {
 
 public:
 
 // 启动rpc服务,请注意该调用是同步阻塞的,所以需放最后调用
 
 // port thrift服务端的监听端口号
 
 // num_threads thrift服务端开启的线程数
 
 //
 
 // 出错时,可抛出以下几个thrift异常:
 
 // apache::thrift::transport::TTransportException
 
 // apache::thrift::TApplicationException
 
 // apache::thrift::TException
 
 // 参数num_io_threads,只有当Server为TNonblockingServer才有效
 
 			    void serve(uint16_t port, uint8_t num_worker_threads=1, uint8_t num_io_threads=1);
 
 			    void serve(const std::string &ip, uint16_t port, uint8_t num_worker_threads, uint8_t num_io_threads=1);
 
 			    void serve(const std::string &ip, uint16_t port, uint8_t num_worker_threads, uint8_t num_io_threads, void* attached);
 
 
 
 // 要求ThriftHandler类有方法attach(void*)
 
 			    void serve(uint16_t port, void* attached, uint8_t num_worker_threads=1, uint8_t num_io_threads=1);
 
 			    void stop();
 
 
 
 			    ThriftHandler* get()
 
 {
 
 			        return _handler.get();
 
 }
 
 			    ThriftHandler* get() const
 
 {
 
 			        return _handler.get();
 
 }
 
 
 
 private:
 
 			    void init(const std::string &ip, uint16_t port, uint8_t num_worker_threads, uint8_t num_io_threads);
 
 
 
 private:
 
 			    boost::shared_ptr<ThriftHandler> _handler;
 
 			    boost::shared_ptr<apache::thrift::TProcessor> _processor;
 
 			    boost::shared_ptr<apache::thrift::protocol::TProtocolFactory> _protocol_factory;
 
 			    boost::shared_ptr<apache::thrift::server::ThreadManager> _thread_manager;
 
 			    boost::shared_ptr<apache::thrift::concurrency::PosixThreadFactory> _thread_factory;
 
 			    boost::shared_ptr<apache::thrift::server::TServer> _server;
 
 };
 
 
 
 ////////////////////////////////////////////////////////////////////////////////
 
 // 被thrift回调的写日志函数,由set_thrift_debug_log_function()调用它
 
 			inline void write_thrift_debug_log(const char* log)
 
 {
 
 			    MYLOG_DEBUG("%s", log);
 
 }
 
 
 
 			inline void write_thrift_info_log(const char* log)
 
 {
 
 			    MYLOG_INFO("%s", log);
 
 }
 
 
 
 			inline void write_thrift_error_log(const char* log)
 
 {
 
 			    MYLOG_ERROR("%s", log);
 
 }
 
 
 
 // 将thrift输出写入到日志文件中
 
 			inline void set_thrift_debug_log_function()
 
 {
 
 if (::mooon::sys::g_logger != NULL)
 
 {
 
 			        apache::thrift::GlobalOutput.setOutputFunction(write_thrift_debug_log);
 
 }
 
 }
 
 
 
 			inline void set_thrift_info_log_function()
 
 {
 
 if (::mooon::sys::g_logger != NULL)
 
 {
 
 			        apache::thrift::GlobalOutput.setOutputFunction(write_thrift_info_log);
 
 }
 
 }
 
 
 
 			inline void set_thrift_error_log_function()
 
 {
 
 if (::mooon::sys::g_logger != NULL)
 
 {
 
 			        apache::thrift::GlobalOutput.setOutputFunction(write_thrift_error_log);
 
 }
 
 }
 
 
 
 ////////////////////////////////////////////////////////////////////////////////
 
 			template <class ThriftClient, class Protocol, class Transport>
 
 			CThriftClientHelper<ThriftClient, Protocol, Transport>::CThriftClientHelper(
 
 const std::string &host, uint16_t port,
 
 int connect_timeout_milliseconds, int receive_timeout_milliseconds, int send_timeout_milliseconds)
 
 : _connect_timeout_milliseconds(connect_timeout_milliseconds),
 
 			          _receive_timeout_milliseconds(receive_timeout_milliseconds),
 
 			          _send_timeout_milliseconds(send_timeout_milliseconds)
 
 {
 
 			    set_thrift_debug_log_function();
 
 			    _socket.reset(new apache::thrift::transport::TSocket(host, port));
 
 			    init();
 
 }
 
 
 
 			template <class ThriftClient, class Protocol, class Transport>
 
 			CThriftClientHelper<ThriftClient, Protocol, Transport>::CThriftClientHelper(
 
 const std::vector<std::pair<std::string, int> >& servers,
 
 int connect_timeout_milliseconds,
 
 int receive_timeout_milliseconds,
 
 int send_timeout_milliseconds,
 
 int num_retries, int retry_interval,
 
 int max_consecutive_failures,
 
 			        bool randomize, bool always_try_last)
 
 : _connect_timeout_milliseconds(connect_timeout_milliseconds),
 
 			          _receive_timeout_milliseconds(receive_timeout_milliseconds),
 
 			          _send_timeout_milliseconds(send_timeout_milliseconds)
 
 {
 
 			    set_thrift_debug_log_function();
 
 
 
 			    apache::thrift::transport::TSocketPool* socket_pool = new apache::thrift::transport::TSocketPool(servers);
 
 			    socket_pool->setNumRetries(num_retries);
 
 			    socket_pool->setRetryInterval(retry_interval);
 
 			    socket_pool->setMaxConsecutiveFailures(max_consecutive_failures);
 
 			    socket_pool->setRandomize(randomize);
 
 			    socket_pool->setAlwaysTryLast(always_try_last);
 
 			    _socket.reset(socket_pool);
 
 			    init();
 
 }
 
 
 
 			template <class ThriftClient, class Protocol, class Transport>
 
 			void CThriftClientHelper<ThriftClient, Protocol, Transport>::init()
 
 {
 
 			    _socket->setConnTimeout(_connect_timeout_milliseconds);
 
 			    _socket->setRecvTimeout(_receive_timeout_milliseconds);
 
 			    _socket->setSendTimeout(_send_timeout_milliseconds);
 
 
 
 // Transport默认为apache::thrift::transport::TFramedTransport
 
 			    _transport.reset(new Transport(_socket));
 
 // Protocol默认为apache::thrift::protocol::TBinaryProtocol
 
 			    _protocol.reset(new Protocol(_transport));
 
 // 服务端的Client
 
 			    _client.reset(new ThriftClient(_protocol));
 
 }
 
 
 
 			template <class ThriftClient, class Protocol, class Transport>
 
 			CThriftClientHelper<ThriftClient, Protocol, Transport>::~CThriftClientHelper()
 
 {
 
 			    close();
 
 }
 
 
 
 			template <class ThriftClient, class Protocol, class Transport>
 
 			void CThriftClientHelper<ThriftClient, Protocol, Transport>::connect()
 
 {
 
 if (!_transport->isOpen())
 
 {
 
 // 如果Transport为TFramedTransport,则实际调用:TFramedTransport::open -> TSocketPool::open
 
 			        _transport->open();
 
 // 当"TSocketPool::open: all connections failed"时,
 
 // TSocketPool::open就抛出异常TTransportException,异常类型为TTransportException::NOT_OPEN
 
 }
 
 }
 
 
 
 			template <class ThriftClient, class Protocol, class Transport>
 
 			bool CThriftClientHelper<ThriftClient, Protocol, Transport>::is_connected() const
 
 {
 
 			    return _transport->isOpen();
 
 }
 
 
 
 			template <class ThriftClient, class Protocol, class Transport>
 
 			void CThriftClientHelper<ThriftClient, Protocol, Transport>::close()
 
 {
 
 if (_transport->isOpen())
 
 {
 
 			        _transport->close();
 
 }
 
 }
 
 
 
 			template <class ThriftClient, class Protocol, class Transport>
 
 const std::string& CThriftClientHelper<ThriftClient, Protocol, Transport>::get_host() const
 
 {
 
 			    return _socket->getHost();
 
 }
 
 
 
 			template <class ThriftClient, class Protocol, class Transport>
 
 			uint16_t CThriftClientHelper<ThriftClient, Protocol, Transport>::get_port() const
 
 {
 
 			    return static_cast<uint16_t>(_socket->getPort());
 
 }
 
 
 
 ////////////////////////////////////////////////////////////////////////////////
 
 			template <class ThriftHandler, class ServiceProcessor, class ProtocolFactory>
 
 			void CThriftServerHelper<ThriftHandler, ServiceProcessor, ProtocolFactory>::serve(uint16_t port, uint8_t num_worker_threads, uint8_t num_io_threads)
 
 {
 
 			    serve("0.0.0.0", port, num_worker_threads, num_io_threads);
 
 }
 
 
 
 			template <class ThriftHandler, class ServiceProcessor, class ProtocolFactory>
 
 			void CThriftServerHelper<ThriftHandler, ServiceProcessor, ProtocolFactory>::serve(const std::string &ip, uint16_t port, uint8_t num_worker_threads, uint8_t num_io_threads)
 
 {
 
 			    init("0.0.0.0", port, num_worker_threads, num_io_threads);
 
 
 
 // 这里也可直接调用serve(),但推荐run()
 
 // !!!注意调用run()的进程或线程会被阻塞
 
 			    _server->run();
 
     _thread_manager->join(); 
 }
 
 
 
 			template <class ThriftHandler, class ServiceProcessor, class ProtocolFactory>
 
 			void CThriftServerHelper<ThriftHandler, ServiceProcessor, ProtocolFactory>::serve(const std::string &ip, uint16_t port, uint8_t num_worker_threads, uint8_t num_io_threads, void* attached)
 
 {
 
 			    init(ip, port, num_worker_threads, num_io_threads);
 
 
 
 // 关联
 
 if (attached != NULL)
 
 			        _handler->attach(attached);
 
 
 
 // 这里也可直接调用serve(),但推荐run()
 
 // !!!注意调用run()的进程或线程会被阻塞
 
 			    _server->run();
 
     _thread_manager->join(); 
 }
 
 
 
 			template <class ThriftHandler, class ServiceProcessor, class ProtocolFactory>
 
 			void CThriftServerHelper<ThriftHandler, ServiceProcessor, ProtocolFactory>::serve(uint16_t port, void* attached, uint8_t num_worker_threads, uint8_t num_io_threads)
 
 {
 
 			    init("0.0.0.0", port, num_worker_threads, num_io_threads);
 
 
 
 // 关联
 
 if (attached != NULL)
 
 			        _handler->attach(attached);
 
 
 
 // 这里也可直接调用serve(),但推荐run()
 
 // !!!注意调用run()的进程或线程会被阻塞
 
 			    _server->run();
 
     _thread_manager->join(); 
 }
 
 
 
 			template <class ThriftHandler, class ServiceProcessor, class ProtocolFactory>
 
 			void CThriftServerHelper<ThriftHandler, ServiceProcessor, ProtocolFactory>::stop()
 
 {
 
 			    _server->stop();
 
     _thread_manager->stop(); 
 }
 
 
 
 			template <class ThriftHandler, class ServiceProcessor, class ProtocolFactory>
 
 			void CThriftServerHelper<ThriftHandler, ServiceProcessor, ProtocolFactory>::init(const std::string &ip, uint16_t port, uint8_t num_worker_threads, uint8_t num_io_threads)
 
 {
 
 			    set_thrift_debug_log_function();
 
 
 
 			    _handler.reset(new ThriftHandler);
 
 			    _processor.reset(new ServiceProcessor(_handler));
 
 
 
 // ProtocolFactory默认为apache::thrift::protocol::TBinaryProtocolFactory
 
 			    _protocol_factory.reset(new ProtocolFactory());
 
 			    _thread_manager = apache::thrift::server::ThreadManager::newSimpleThreadManager(num_worker_threads);
 
 			    _thread_factory.reset(new apache::thrift::concurrency::PosixThreadFactory());
 
 
 
 			    _thread_manager->threadFactory(_thread_factory);
 
 			    _thread_manager->start();
 
 
 
 			    apache::thrift::server::TNonblockingServer* server = new apache::thrift::server::TNonblockingServer(_processor, _protocol_factory, port, _thread_manager);
 
 			    server->setNumIOThreads(num_io_threads);
 
 			    _server.reset(server);
 
 
 
 // 不要调用_server->run(),交给serve()来调用,
 
 // 因为一旦调用了run()后,调用线程或进程就被阻塞了。
 
 }
 
 
 
 			NET_NAMESPACE_END
 
 			#endif // MOOON_NET_THRIFT_HELPER_H 		
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2014-05-02 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档