1、EventLoopThread(IO线程类)
任何一个线程,只要创建并运行了EventLoop,都称之为IO线程
IO线程不一定是主线程
muduo并发模型one loop per thread + threadpool(计算线程池)
为了方便今后使用,定义了EventLoopThread类,该类封装了IO线程
EventLoopThread创建了一个线程 在线程函数中创建了一个EvenLoop对象并调用EventLoop::loop
多个IO线程可以用IO线程池来管理,对应的类是EventLoopThreadPool
C++ Code
class EventLoopThread : boost::noncopyable
{
public:
typedef boost::function<void(EventLoop *)> ThreadInitCallback;
EventLoopThread(const ThreadInitCallback &cb = ThreadInitCallback());
~EventLoopThread();
EventLoop *startLoop()// 启动线程,该线程就成为了IO线程
{
thread_.start(); // 执行threadFunc(); 构造函数初始化列表中thread_(boost::bind(&EventLoopThread::threadFunc, this))
....
};
private:
void threadFunc(); // 线程函数
EventLoop *loop_; // loop_指针指向一个EventLoop对象
bool exiting_;
Thread thread_;
MutexLock mutex_;
Condition cond_;
ThreadInitCallback callback_; // 回调函数在EventLoop::loop事件循环之前被调用
};
EventLoopThread::~EventLoopThread()
{
exiting_ = true;
loop_->quit(); // 退出IO线程,让IO线程的loop循环退出,从而退出了IO线程
thread_.join();
}
C++ Code
EventLoop *EventLoopThread::startLoop()
{
assert(!thread_.started());
thread_.start(); //启动线程,此时有两个线程在运行,
//一个是调用EventLoopThread::startLoop()的线程,一个是执行EventLoopThread::threadFunc()的线程(IO线程)
{
MutexLockGuard lock(mutex_);
while (loop_ == NULL)
{
cond_.wait(); //函数返回loop_,所以要等待IO线程启动
}
}
return loop_;
}
void EventLoopThread::threadFunc()
{
EventLoop loop;
if (callback_)
{
callback_(&loop);
}
{
MutexLockGuard lock(mutex_);
// 一般情况是EventLoopThread对象先析构,析构函数调用loop_->quit() 使得loop.loop() 退出循环
// 这样threadFunc 退出,loop栈上对象析构,loop_ 指针失效,但此时已经不会再通过loop_ 访问loop,
// 故不会有问题。
loop_ = &loop;
cond_.notify();
}
loop.loop();
//assert(exiting_);
}
测试代码:
#include <muduo/net/EventLoop.h>
#include <muduo/net/EventLoopThread.h>
#include <stdio.h>
using namespace muduo;
using namespace muduo::net;
void runInThread()
{
printf("runInThread(): pid = %d, tid = %d\n",
getpid(), CurrentThread::tid());
}
int main()
{
printf("main(): pid = %d, tid = %d\n",
getpid(), CurrentThread::tid());
EventLoopThread loopThread;
EventLoop *loop = loopThread.startLoop();
// 异步调用runInThread,即将runInThread添加到loop对象所在IO线程,让该IO线程执行
loop->runInLoop(runInThread);
sleep(1);
// runAfter内部也调用了runInLoop,所以这里也是异步调用,让该IO线程添加一个2s定时器
loop->runAfter(2, runInThread);
sleep(3);
//~EventLoopThread()会调用loop_->quit();
printf("exit main().\n");
}
执行输出如下:
simba@ubuntu:~/Documents/build/debug/bin$ ./reactor_test06 20131108 03:29:12.749530Z 2628 TRACE IgnoreSigPipe Ignore SIGPIPE - EventLoop.cc:51 main(): pid = 2628, tid = 2628 20131108 03:29:12.753135Z 2629 TRACE updateChannel fd = 4 events = 3 - EPollPoller.cc:104 20131108 03:29:12.753794Z 2629 TRACE EventLoop EventLoop created 0xB7415F44 in thread 2629 - EventLoop.cc:76 20131108 03:29:12.754266Z 2629 TRACE updateChannel fd = 5 events = 3 - EPollPoller.cc:104 20131108 03:29:12.754707Z 2629 TRACE loop EventLoop 0xB7415F44 start looping - EventLoop.cc:108 20131108 03:29:12.755088Z 2629 TRACE poll 1 events happended - EPollPoller.cc:65 20131108 03:29:12.756033Z 2629 TRACE printActiveChannels {5: IN } - EventLoop.cc:271 runInThread(): pid = 2628, tid = 2629 20131108 03:29:13.755730Z 2629 TRACE poll 1 events happended - EPollPoller.cc:65 20131108 03:29:13.756388Z 2629 TRACE printActiveChannels {5: IN } - EventLoop.cc:271 20131108 03:29:15.755858Z 2629 TRACE poll 1 events happended - EPollPoller.cc:65 20131108 03:29:15.757316Z 2629 TRACE printActiveChannels {4: IN } - EventLoop.cc:271 20131108 03:29:15.757469Z 2629 TRACE readTimerfd TimerQueue::handleRead() 1 at 1383881355.757345 - TimerQueue.cc:62 runInThread(): pid = 2628, tid = 2629 exit main(). 20131108 03:29:16.755942Z 2629 TRACE poll 1 events happended - EPollPoller.cc:65 20131108 03:29:16.755988Z 2629 TRACE printActiveChannels {5: IN } - EventLoop.cc:271 20131108 03:29:16.756003Z 2629 TRACE loop EventLoop 0xB7415F44 stop looping - EventLoop.cc:133 simba@ubuntu:~/Documents/build/debug/bin$
主线程不是IO线程,根据前面的文章,timerfd_ = 4, wakeupFd_ = 5。主线程调用 loop->runInLoop(runInThread); 由于不是IO线
程调用runInLoop, 故调用queueInLoop() 将runInThead 添加到队列,然后wakeup() IO线程,IO线程在doPendingFunctors() 中取
出队列的runInThread()执行,可以看到IO线程的tid 跟主线程不一样。同理,loop->runAfter(2, runInThread); 也是一样的流程,需
要唤醒一下,此时只是执行runAfter() 添加了一个2s的定时器, 2s超时,timerfd_ 可读,先handleRead()一下然后执行回调函数
runInThread()。那为什么exit main() 之后wakeupFd_ 还会有可读事件呢?那是因为EventLoopThead 栈上对象析构,在析构函数内
loop_ ->quit(), 由于不是在IO线程调用quit(),故也需要唤醒一下,IO线程才能从poll 返回,这样再次循环判断 while (!quit_) 就能
退出IO线程。
2、EventLoopThreadPool(IO线程池类)
IO线程池的功能是开启若干个IO线程,并让这些IO线程处于事件循环的状态
class EventLoopThreadPool : boost::noncopyable
{
public:
typedef boost::function<void(EventLoop *)> ThreadInitCallback;
EventLoopThreadPool(EventLoop *baseLoop);
~EventLoopThreadPool();
void setThreadNum(int numThreads)
{
numThreads_ = numThreads;
}
void start(const ThreadInitCallback &cb = ThreadInitCallback());
// 如果loops_为空,则loop指向baseLoop_
// 如果不为空,按照round-robin(RR,轮叫)的调度方式选择一个EventLoop
EventLoop *getNextLoop();
private:
EventLoop *baseLoop_; // 与Acceptor所属EventLoop相同
bool started_;
int numThreads_; // 线程数,除去mainReactor
int next_; // 新连接到来,所选择的EventLoop对象下标
boost::ptr_vector<EventLoopThread> threads_; // IO线程列表
std::vector<EventLoop *> loops_; // EventLoop列表
};
void EventLoopThreadPool::start(const ThreadInitCallback &cb)
{
assert(!started_);
baseLoop_->assertInLoopThread();
started_ = true;
for (int i = 0; i < numThreads_; ++i)
{
EventLoopThread *t = new EventLoopThread(cb);
threads_.push_back(t);
loops_.push_back(t->startLoop()); // 启动EventLoopThread线程,在进入事件循环之前,会调用cb
}
if (numThreads_ == 0 && cb)
{
// 只有一个EventLoop,在这个EventLoop进入事件循环之前,调用cb
cb(baseLoop_);
}
}
现在使用 mainReactor + ThreadPool(subReactors) 模式,则baseLoop_ 与TcpServer 和 Acceptor 中的 loop_ 成员是相同的,即mainReactor 处理监听事件,已连接套接字事件轮询给线程池中的subReactors 处理。此时需要注意,创建一个TcpConnection对象时,需要绑定其中一个subReactor, 如下:
void TcpServer::newConnection(int sockfd, const InetAddress &peerAddr)
{
loop_->assertInLoopThread();
// 按照轮叫的方式选择一个EventLoop
EventLoop *ioLoop = threadPool_->getNextLoop();
InetAddress localAddr(sockets::getLocalAddr(sockfd));
TcpConnectionPtr conn(new TcpConnection(ioLoop,
connName,
sockfd,
localAddr,
peerAddr));
connections_[connName] = conn;
ioLoop->runInLoop(boost::bind(&TcpConnection::connectEstablished, conn));
}
测试程序:
#include <muduo/net/TcpServer.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/InetAddress.h>
#include <boost/bind.hpp>
#include <stdio.h>
using namespace muduo;
using namespace muduo::net;
class TestServer
{
public:
TestServer(EventLoop *loop,
const InetAddress &listenAddr, int numThreads)
: loop_(loop),
server_(loop, listenAddr, "TestServer"),
numThreads_(numThreads)
{
server_.setConnectionCallback(
boost::bind(&TestServer::onConnection, this, _1));
server_.setMessageCallback(
boost::bind(&TestServer::onMessage, this, _1, _2, _3));
server_.setThreadNum(numThreads);
}
void start()
{
server_.start();
}
private:
void onConnection(const TcpConnectionPtr &conn)
{
if (conn->connected())
{
printf("onConnection(): new connection [%s] from %s\n",
conn->name().c_str(),
conn->peerAddress().toIpPort().c_str());
}
else
{
printf("onConnection(): connection [%s] is down\n",
conn->name().c_str());
}
}
void onMessage(const TcpConnectionPtr &conn,
const char *data,
ssize_t len)
{
printf("onMessage(): received %zd bytes from connection [%s]\n",
len, conn->name().c_str());
}
EventLoop *loop_;
TcpServer server_;
int numThreads_;
};
int main()
{
printf("main(): pid = %d\n", getpid());
InetAddress listenAddr(8888);
EventLoop loop;
TestServer server(&loop, listenAddr, 4);
server.start();
loop.loop();
}
此时共有5个IO线程,1个主线程(mainReactor)和 4个线程池中的线程(subReactor),server.start() 会启动线程池中的4个线程,并且启动mainReactor 监听:
// 该函数多次调用是无害的
// 该函数可以跨线程调用
void TcpServer::start()
{
if (!started_)
{
started_ = true;
threadPool_->start(threadInitCallback_);
}
if (!acceptor_->listenning())
{
// get_pointer返回原生指针
loop_->runInLoop(
boost::bind(&Acceptor::listen, get_pointer(acceptor_)));
}
}
开启两个telnet 客户端连接服务器,其中一个输入aaaa, 另一个输入ddddd, 服务器端输出如下:
simba@ubuntu:~/Documents/build/debug/bin$ ./reactor_test10 main(): pid = 8628 20131108 11:33:15.190620Z 8628 TRACE updateChannel fd = 4 events = 3 - EPollPoller.cc:104 20131108 11:33:15.191246Z 8628 TRACE EventLoop EventLoop created 0xBFB77D50 in thread 8628 - EventLoop.cc:62 20131108 11:33:15.191568Z 8628 TRACE updateChannel fd = 5 events = 3 - EPollPoller.cc:104
20131108 11:33:15.192270Z 8629 TRACE updateChannel fd = 9 events = 3 - EPollPoller.cc:104 20131108 11:33:15.192625Z 8629 TRACE EventLoop EventLoop created 0xB7484F44 in thread 8629 - EventLoop.cc:62 20131108 11:33:15.192927Z 8629 TRACE updateChannel fd = 10 events = 3 - EPollPoller.cc:104
20131108 11:33:15.193356Z 8629 TRACE loop EventLoop 0xB7484F44 start looping - EventLoop.cc:94 20131108 11:33:15.193759Z 8630 TRACE updateChannel fd = 12 events = 3 - EPollPoller.cc:104 20131108 11:33:15.194100Z 8630 TRACE EventLoop EventLoop created 0xB6AFEF44 in thread 8630 - EventLoop.cc:62 20131108 11:33:15.194398Z 8630 TRACE updateChannel fd = 13 events = 3 - EPollPoller.cc:104
20131108 11:33:15.194786Z 8630 TRACE loop EventLoop 0xB6AFEF44 start looping - EventLoop.cc:94 20131108 11:33:15.195135Z 8631 TRACE updateChannel fd = 15 events = 3 - EPollPoller.cc:104 20131108 11:33:15.195534Z 8631 TRACE EventLoop EventLoop created 0xB60FEF44 in thread 8631 - EventLoop.cc:62 20131108 11:33:15.207467Z 8631 TRACE updateChannel fd = 16 events = 3 - EPollPoller.cc:104
20131108 11:33:15.208169Z 8631 TRACE loop EventLoop 0xB60FEF44 start looping - EventLoop.cc:94 20131108 11:33:15.208940Z 8632 TRACE updateChannel fd = 18 events = 3 - EPollPoller.cc:104 20131108 11:33:15.209576Z 8632 TRACE EventLoop EventLoop created 0xB58FDF44 in thread 8632 - EventLoop.cc:62 20131108 11:33:15.210087Z 8632 TRACE updateChannel fd = 19 events = 3 - EPollPoller.cc:104
20131108 11:33:15.210445Z 8628 TRACE updateChannel fd = 6 events = 3 - EPollPoller.cc:104 20131108 11:33:15.210750Z 8628 TRACE loop EventLoop 0xBFB77D50 start looping - EventLoop.cc:94 20131108 11:33:15.211122Z 8632 TRACE loop EventLoop 0xB58FDF44 start looping - EventLoop.cc:94 20131108 11:33:18.958878Z 8628 TRACE poll 1 events happended - EPollPoller.cc:65 20131108 11:33:18.959167Z 8628 TRACE printActiveChannels {6: IN} - EventLoop.cc:257 20131108 11:33:18.959226Z 8628 INFO TcpServer::newConnection [TestServer] - new connection[TestServer:0.0.0.0:8888#1] from 127.0.0.1:56411 - TcpServer.cc:93 20131108 11:33:18.959262Z 8628 DEBUG TcpConnection TcpConnection::ctor[TestServer:0.0.0.0:8888#1] at 0x8C84F98 fd=20 - TcpConnection.cc:62 20131108 11:33:18.959277Z 8628 TRACE newConnection [1] usecount=1 - TcpServer.cc:111 20131108 11:33:18.959300Z 8628 TRACE newConnection [2] usecount=2 - TcpServer.cc:113 20131108 11:33:18.959322Z 8628 TRACE newConnection [5] usecount=3 - TcpServer.cc:122 20131108 11:33:18.959343Z 8629 TRACE poll 1 events happended - EPollPoller.cc:65 20131108 11:33:18.959365Z 8629 TRACE printActiveChannels {10: IN } - EventLoop.cc:257 20131108 11:33:18.959378Z 8629 TRACE connectEstablished [3] usecount=3 - TcpConnection.cc:78 20131108 11:33:18.959409Z 8629 TRACE updateChannel fd = 20 events = 3 - EPollPoller.cc:104 onConnection(): new connection [TestServer:0.0.0.0:8888#1] from 127.0.0.1:56411 20131108 11:33:18.959433Z 8629 TRACE connectEstablished [4] usecount=3 - TcpConnection.cc:83 20131108 11:33:23.111546Z 8628 TRACE poll 1 events happended - EPollPoller.cc:65 20131108 11:33:23.111628Z 8628 TRACE printActiveChannels {6: IN } - EventLoop.cc:257 20131108 11:33:23.111662Z 8628 INFO TcpServer::newConnection [TestServer] - new connection[TestServer:0.0.0.0:8888#2] from 127.0.0.1:56412 - TcpServer.cc:93 20131108 11:33:23.111680Z 8628 DEBUG TcpConnection TcpConnection::ctor[TestServer:0.0.0.0:8888#2] at 0x8C85128 fd=21 - TcpConnection.cc:62 20131108 11:33:23.111693Z 8628 TRACE newConnection [1] usecount=1 - TcpServer.cc:111 20131108 11:33:23.111722Z 8628 TRACE newConnection [2] usecount=2 - TcpServer.cc:113 20131108 11:33:23.111746Z 8628 TRACE newConnection [5] usecount=3 - TcpServer.cc:122 20131108 11:33:23.111769Z 8630 TRACE poll 1 events happended - EPollPoller.cc:65 20131108 11:33:23.111792Z 8630 TRACE printActiveChannels {13: IN} - EventLoop.cc:257 20131108 11:33:23.111805Z 8630 TRACE connectEstablished [3] usecount=3 - TcpConnection.cc:78 20131108 11:33:23.111813Z 8630 TRACE updateChannel fd = 21 events = 3 - EPollPoller.cc:104 onConnection(): new connection [TestServer:0.0.0.0:8888#2] from 127.0.0.1:56412 20131108 11:33:23.111836Z 8630 TRACE connectEstablished [4] usecount=3 - TcpConnection.cc:83 20131108 11:33:25.219778Z 8631 TRACE poll nothing happended - EPollPoller.cc:74 20131108 11:33:25.219829Z 8632 TRACE poll nothing happended - EPollPoller.cc:74 20131108 11:33:28.969971Z 8629 TRACE poll nothing happended - EPollPoller.cc:74 20131108 11:33:33.119151Z 8630 TRACE poll nothing happended - EPollPoller.cc:74 20131108 11:33:33.119202Z 8628 TRACE poll nothing happended - EPollPoller.cc:74 20131108 11:33:33.754975Z 8629 TRACE poll 1 events happended - EPollPoller.cc:65 20131108 11:33:33.755031Z 8629 TRACE printActiveChannels {20: IN} - EventLoop.cc:257 20131108 11:33:33.755042Z 8629 TRACE handleEvent [6] usecount=2 - Channel.cc:67 onMessage(): received 6 bytes from connection [TestServer:0.0.0.0:8888#1] 20131108 11:33:33.755128Z 8629 TRACE handleEvent [12] usecount=2 - Channel.cc:69 20131108 11:33:35.230224Z 8631 TRACE poll nothing happended - EPollPoller.cc:74 20131108 11:33:35.230274Z 8632 TRACE poll nothing happended - EPollPoller.cc:74 20131108 11:33:36.540663Z 8630 TRACE poll 1 events happended - EPollPoller.cc:65 20131108 11:33:36.540715Z 8630 TRACE printActiveChannels {21: IN} - EventLoop.cc:257 20131108 11:33:36.540727Z 8630 TRACE handleEvent [6] usecount=2 - Channel.cc:67 onMessage(): received 7 bytes from connection [TestServer:0.0.0.0:8888#2] 20131108 11:33:36.540780Z 8630 TRACE handleEvent [12] usecount=2 - Channel.cc:69 20131108 11:33:43.129769Z 8628 TRACE poll nothing happended - EPollPoller.cc:74 20131108 11:33:43.765633Z 8629 TRACE poll nothing happended - EPollPoller.cc:74
一个进程本来被打开的文件描述符就有0,1,2;
每个Reactor 的 EventLoop 对象构造时,默认使用的是EPollPoller,即EPollPoller::epollfd_ ;
此外还有两个channel(EventLoop::timeQueue_ ::timerfd_ 和 EventLoop::wakeupFd_ )
处于被poll()关注可读事件的状态,而且是一直关注直到事件循环结束。
即每个Reactor 都分别有这3个fd;
对于mainReactor来说,还有Acceptor::acceptSocket_.sockfd_ (listenfd); Acceptor::idleFd_ ; (/dev/null)
按上述程序来说,mainReactor中:epollfd_ = 3; timerfd_ = 4; wakeupFd_ = 5; sockfd_ = 6; idleFd_ = 7;
(8,9,10),(11,12,13),(14,15,16),(17,18,19) 分别归4个IO线程所有
这样已连接套接字只能从20开始,而且均匀分配到4个subReactor 处理事件(可读事件(包括接收数据,连接关闭),可写事件(内核发送缓冲区不为满),错误事件)
当第一个客户端连接上来时,sockfd_ 可读,mainReactor 调用 TcpServer::newConnection(), 创建一个TcpConnection对象,绑定到线程池中的第一个IO线程上,函数内调用ioLoop->runInLoop(); 会唤醒第一个IO线程,即第一个IO线程的wakeupFd_ (10)可读,handleEvent() 处理后继续处理doPendingFunctors(),执行TcpConnection::connectEstablished(),接下去的流程包括接收数据(fd = 20 可读)可以参考EventLoop(三)的描述。
当然如果我们传递的numThreads_ = 0 或者不传递; 即只有一个mainReactor, 则监听套接字和已连接套接字事件都要由这个mainReactor处理。
参考:
《UNP》
muduo manual.pdf
《linux 多线程服务器编程:使用muduo c++网络库》