前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Thrift线程和状态机分析

Thrift线程和状态机分析

作者头像
一见
发布2018-08-06 19:27:25
1.2K0
发布2018-08-06 19:27:25
举报
文章被收录于专栏:蓝天

启动Thrift时,可启动两类线程,一是TNonblockingIOThread,另一是Worker:

TNonblockingIOThread负责接受连接,和收发数据;而Worker负责回调服务端的用户函数。

TNonblockingIOThread::registerEvents主要做了两件事:

1) 注册TNonblockingIOThread::listenHandler(),这个是用来接受连接请求的;

2) 注册TNonblockingIOThread::notifyHandler(),这个是用来监听管道的。

TNonblockingIOThread和Worker两类线程间通过队列进行通讯,队列类型为std::queue >。

代码语言:javascript
复制
class ThreadManager::Task: public Runnable
{
public:
void run()
{
// runnable_实际为TNonblockingServer::TConnection::Task
runnable_->run();
}
private:
// 这里的Runnable实际为TNonblockingServer::TConnection::Task
// 在TNonblockingServer::TConnection::transition()中被push进来
boost::shared_ptr<Runnable> runnable_;
};

2. TNonblockingServer::TConnection::transition()

transition()为状态切换函数,状态有两种:一是socket的状态,另一是rpc会话的状态。APP开头的是rpc会话的状态,SOCKET开头的是socket的状态。

在APP_READ_REQUEST状态发生在IO线程中,addTask()会将任务转交给或工作线程,然后由工作线程回调服务端的函数。

代码语言:javascript
复制
TNonblockingServer::TConnection::Task
{
public:
void run()
{
// 回调
processor_->process(input_, output_, connectionContext_);
// 回调完后通知,
// 从工作线程重回到IO线程
connection_->notifyIOThread(); // ioThread_->notify(this);
// 这个将触发TNonblockingIOThread::notifyHandler()
}
};
TNonblockingIOThread::notifyHandler()
{
// 从管道中取出connection的指针地址
TNonblockingServer::TConnection* connection = NULL;
int nBytes = recv(fd, cast_sockopt(&connection), kSize, 0);
connection->transition(); // 进入状态转换函数
}

3. RPC函数被调用过程

IO线程收到完整的RPC请求包时,以任务方式转给工作线程,然后由工作线程回调用户写的RPC函数。

完成的调用过程如下图所示:

任务从IO线程进入工作线程:

4. 管道和任务队列

IO线程以Task方式将连接交给工作线程,而工作线程在回调完后,以管道方式还回给IO线程。连接从IO线程进入到或工作线程后,会从libevent中删除,返回后再进入libevent。

5. 对象间关系

代码语言:javascript
复制
class TNonblockingServer: public TServer
{
public:
void serve() // 用户可以直接调用server(),但直接调用run()是更好的做法
{
// 创建socket监听
// 创建TNonblockingIOThread
// 通过Thread启动TNonblockingIOThread
}
};
class TServer: public concurrency::Runnable
{
public:
virtual void serve() = 0;
virtual void run() // 用户也可以直接调用run()
{
serve();
}
};

6. 相关代码摘要

代码语言:javascript
复制
// 线程
// thrift支持原生posix线程和boost线程
void PthreadThread::start()
{
// PthreadThread是一个Posix线程类
pthread_create(&pthread_, &thread_attr, threadMain, (void*)selfRef);
}
static void* PthreadThread::threadMain(void* arg)
{
thread->runnable()->run();
}
// 以下为IO线程
/// Three states for sockets: recv frame size, recv data, and send mode
enum TSocketState
{
SOCKET_RECV_FRAMING,
SOCKET_RECV,
SOCKET_SEND
};
/**
* Five states for the nonblocking server:
*  1) initialize
*  2) read 4 byte frame size
*  3) read frame of data
*  4) send back data (if any)
*  5) force immediate connection close
*/
enum TAppState
{
APP_INIT,            // 初始化
APP_READ_FRAME_SIZE, // 接收包大小
APP_READ_REQUEST,    // 接收包数据
APP_WAIT_TASK,
APP_SEND_RESULT,     // 发送数据
APP_CLOSE_CONNECTION // 关闭连接
};
// 启动监听和IO线程
void TNonblockingServer::serve()
{
createAndListenOnSocket();
for (uint32_t id = 0; id < numIOThreads_; ++id)
{
// TNonblockingIOThread是一个Runnable
// 以委托方式被运行在PthreadThread中
thread = new TNonblockingIOThread(this, id, listenFd, useHighPriorityIOThreads_);
ioThreads_.push_back(thread);
}
for (uint32_t i = 1; i < ioThreads_.size(); ++i)
{
// PthreadThread thread;
thread->start();
}
ioThreads_[0]->run(); // 这将阻塞调用线程
for (uint32_t i = 0; i < ioThreads_.size(); ++i)
{
ioThreads_[i]->join();
}
}
void TNonblockingIOThread::run()
{
eventBase_ = event_base_new();
// IO线程在启动时会调用registerEvents()
// 在registerEvents()中完成两个回调函数的注册:listenHandler和notifyHandler
// listenHandler回调负责接受请求,并创建连接对象
registerEvents();
event_base_loop(eventBase_, 0); // libevent
}
void TNonblockingIOThread::registerEvents()
{
// listenHandler和socket关联
event_set(&serverEvent_, listenSocket_, EV_READ | EV_PERSIST,
TNonblockingIOThread::listenHandler, server_);
// notifyHandler和pipe关联
event_set(?ificationEvent_, getNotificationRecvFD(), EV_READ | EV_PERSIST,
TNonblockingIOThread::notifyHandler, this);
}
static void listenHandler(evutil_socket_t fd, short which, void* v)
{
((TNonblockingServer*)v)->handleEvent(fd, which);
}
void TNonblockingServer::handleEvent(int fd, short which)
{
accept();
createConnection();
}
TNonblockingServer::TConnection* TNonblockingServer::createConnection()
{
// 会将自己绑定到一个线程
// 采用轮询的方式选择线程
// int selectedThreadIdx = nextIOThread_;
// nextIOThread_ = (nextIOThread_ + 1) % ioThreads_.size();
// std::stack connectionStack_;
// 使用了内存池connectionStack_
// App状态:APP_INIT
// Socket状态:SOCKET_RECV_FRAMING
}
static void eventHandler(evutil_socket_t fd, short /* which */, void* v)
{
assert(fd == ((TConnection*)v)->getTSocket()->getSocketFD());
((TConnection*)v)->workSocket();
}
void TNonblockingServer::TConnection::setFlags(short eventFlags)
{
event_set(&event_, tSocket_->getSocketFD(), eventFlags_, TConnection::eventHandler, this);
}
void TNonblockingServer::TConnection::setRead()
{
setFlags(EV_READ | EV_PERSIST);
}
void TNonblockingServer::TConnection::setWrite()
{
setFlags(EV_WRITE | EV_PERSIST);
}
void TNonblockingServer::TConnection::setIdle()
{
setFlags(0);
}
void TNonblockingServer::TConnection::workSocket()
{
case SOCKET_RECV_FRAMING:
TSocket::read(); // 接收包大小
transition();
case SOCKET_RECV:
TSocket::read(); // 接收包数据
transition();
case SOCKET_SEND:
TSocket::write_partial(); // 发送数据(非阻塞的)
transition();
}
void TNonblockingIOThread::notifyHandler(evutil_socket_t fd, short which, void* v)
{
recv();
connection->transition();
}
// transition()为状态迁移函数
void TNonblockingServer::TConnection::transition()
{
case APP_INIT:
setRead();
case APP_WAIT_TASK:
setWrite();
case APP_READ_REQUEST:
setIdle();
}
TNonblockingServer::TConnection::Task
{
public:
void run()
{
// 回调
processor_->process(input_, output_, connectionContext_);
// 回调完后通知,
// 从工作线程重回到IO线程
// connection_的指针地址将通过管道传给工作线程
connection_->notifyIOThread(); // ioThread_->notify(this);
}
};
TNonblockingIOThread::notifyHandler()
{
// 从管道中取出connection的指针地址
TNonblockingServer::TConnection* connection = NULL;
int nBytes = recv(fd, cast_sockopt(&connection), kSize, 0);
connection->transition(); // 进入状态转换函数
}
// 以下为工作线程
class ThreadManager::Impl : public ThreadManager;
class SimpleThreadManager : public ThreadManager::Impl;
class ThreadManager::Worker: public Runnable;
class ThreadManager::Task : public Runnable;
void SimpleThreadManager::start()
{
// workerCount_为工作线程数
addWorker(workerCount_);
}
void ThreadManager::Impl::addWorker(size_t value)
{
for (size_t ix = 0; ix < value; ix++)
{
worker = new ThreadManager::Worker(this);
// thread为PthreadThread
// 调用了worker->run();
thread->start();
}
}
void ThreadManager::Worker::run()
{
ThreadManager::Task task;
task->run();
}
class ThreadManager::Task: public Runnable
{
public:
void run()
{
// runnable_实际为TNonblockingServer::TConnection::Task
runnable_->run();
}
private:
// 这里的Runnable实际为TNonblockingServer::TConnection::Task
// 在TNonblockingServer::TConnection::transition()中被push进来
boost::shared_ptr runnable_;
};
void ThreadManager::Impl::add(shared_ptr value)
{
// std::queue > tasks_;
task = new ThreadManager::Task(value, expiration);
tasks_.push(task);
}
// 两者关系
class TNonblockingServer: public TServer
{
public:
TNonblockingServer(const boost::shared_ptr& threadManager);
private:
// TNonblockingServer关联了ThreadManager
boost::shared_ptr threadManager_;
};
// 工作线程将回调TNonblockingServer::TConnection::Task
class TNonblockingServer::TConnection::Task: public Runnable
{
};
// task为TNonblockingServer::TConnection::Task
void TNonblockingServer::addTask(boost::shared_ptr task)
{
// 将任务交给工作线程
// threadManager_为SimpleThreadManager
threadManager_->add(task, 0LL, taskExpireTime_);
}
void TNonblockingServer::TConnection::transition()
{
case APP_READ_REQUEST:
if (server_->isThreadPoolProcessing())
{
boost::shared_ptr task =
new TNonblockingServer::TConnection::Task(
processor_, inputProtocol_, outputProtocol_, this);
// server_为TNonblockingServer
// 回调交给工作线程,IO线程不做这个工作
server_->addTask(task); // server_为TNonblockingServer
}
else
{
// 调用TNonblockingServer的构造函数时,
// 如果没有指定参数ThreadManager,则会走这条分支
// 这种情况下,isThreadPoolProcessing()返回false
processor_->process(inputProtocol_, outputProtocol_, connectionContext_);
}
}
void TNonblockingServer::TConnection::Task::run()
{
// 回调
processor_->process(input_, output_, connectionContext_);
}
内嵌关系:
1) TNonblockingServer内嵌了类TConnection,而TConnection又内嵌了类Task
2) ThreadManager内嵌了类Impl、类Worker和类Task(注意区分于TConnection内嵌的Task),而Impl又是ThreadManager的子类,而Task是对Runnable的实现
class TNonblockingServer: public TServer
{
public:
void serve() // 用户可以直接调用server(),但直接调用run()是更好的做法
{
// 创建socket监听
// 创建TNonblockingIOThread
// 通过Thread启动TNonblockingIOThread
}
};
class TServer: public concurrency::Runnable
{
public:
virtual void serve() = 0;
virtual void run() // 用户也可以直接调用run()
{
serve();
}
};
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2015/09/08 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 2. TNonblockingServer::TConnection::transition()
  • 3. RPC函数被调用过程
  • 4. 管道和任务队列
  • 5. 对象间关系
  • 6. 相关代码摘要
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档