http线程池的主要用途是异步处理使用无状态短连接的http请求,在传输层通信基于tcp协议和应用层基于http协议的基础上,达到c++服务器与web服务器通信的目的。
设计上:
(1)服务器启动时,初始化配置数量的线程(形成被动连接线程池)。每个线程会生成epoll描述符。
(2)主线程生成监听socket,绑定端口。生成epoll描述符,注册监听socket,非阻塞接收(限定最大时间,如2s)新连接到连接队列。
(2)投放主线程连接队列中的新连接到被动连接线程池。根据硬哈希选择需求的线程来投放。加入后需要注册连接socket(注册时连接对象作为epoll事件的携带数据)到线程的epoll描述符。
(3)在每个线程的例程里会非阻塞监听epoll描述符上发生的读事件,并解析和处理获取的http请求。
这样每个业务线程可以相对独立的处理无状态的http请求。跟单业务线程的场景不同的是,http线程池的线程之间尽量减少数据共享(实在需要缓存在内存则加锁),每个线程又可以作为客户端短时间阻塞向其他服务器请求数据。
http线程池代码如下:(大致上http线程池的思路可以看得出来。主线程接收连接对象和连接对象接收数据并没有在这里展现实现过程。注意接收时需要忽略EINTR和SIGPIPE信号,如果接收返回-1且错误号为EAGAIN或EWOULDBLOCK,说明接收缓冲区满了,需要继续尝试接收,直到超时。接收返回0则表示对方断开连接,则接收失败。接收成功、失败、超时都需要移除连接对象(epoll描述符注销连接socket、关闭socket、移出和销毁连接对象),因为是短连接)
线程池头文件
/**
* \brief 定义实现轻量级(lightweight)的http服务框架类
*/
class zHttpTaskPool : private zNoncopyable
{
public:
/**
* \brief 构造函数
*/
zHttpTaskPool()
{
}
/**
* \brief 析构函数,销毁一个线程池对象
*
*/
~zHttpTaskPool()
{
final();
}
bool addHttp(zHttpTask *task);
bool init();
void final();
private:
static const int maxHttpThreads = 16; /**< 最大验证线程数量 */
zThreadGroup httpThreads; /**< http服务处理线程组 */
};
线程池源文件
/**
* \brief 轻量级http服务的主处理线程
*/
class zHttpThread : public zThread
{
private:
/**
* \brief http连接任务链表类型
*/
typedef std::list<zHttpTask * > zHttpTaskContainer;
/**
* \brief epoll事件结构向量类型
*/
typedef std::vector<struct epoll_event> epollfdContainer;
zHttpTaskPool *pool; /**< 所属的池 */
zRTime currentTime; /**< 当前时间 */
zMutex mutex; /**< 互斥变量 */
zHttpTaskContainer tasks; /**< 任务列表 */
int kdpfd;
epollfdContainer epfds;
epollfdContainer::size_type fds_count;
public:
/**
* \brief 构造函数
* \param pool 所属的连接池
* \param name 线程名称
*/
zHttpThread(
zHttpTaskPool *pool,
const std::string &name = std::string("zHttpThread"))
: zThread(name), pool(pool), currentTime()
{
kdpfd = epoll_create(256);
assert(-1 != kdpfd);
epfds.resize(256);
fds_count = 0;
}
/**
* \brief 析构函数
*/
~zHttpThread()
{
TEMP_FAILURE_RETRY(::close(kdpfd));
}
void run();
/**
* \brief 添加一个连接任务
* \param task 连接任务
*/
void add(zHttpTask *task)
{
mutex.lock();
task->addEpoll(kdpfd, EPOLLIN | EPOLLERR | EPOLLPRI, (void *)task);
tasks.push_back(task);
++fds_count;
if (fds_count > epfds.size())
{
epfds.resize(fds_count + 16);
}
mutex.unlock();
}
typedef zHttpTask* zHttpTaskP;
void remove(zHttpTaskP &task)
{
task->delEpoll(kdpfd, EPOLLIN | EPOLLERR | EPOLLPRI);
tasks.remove(task);
SAFE_DELETE(task);
fds_count--;
}
void remove(zHttpTaskContainer::iterator &it)
{
zHttpTask *task = *it;
task->delEpoll(kdpfd, EPOLLIN | EPOLLERR | EPOLLPRI);
tasks.erase(it);
SAFE_DELETE(task);
fds_count--;
}
};
/**
* \brief http线程例程
*/
void zHttpThread::run()
{
zHttpTaskContainer::iterator it, next;
while(!isFinal())
{
mutex.lock();
if (!tasks.empty())
{
int retcode = epoll_wait(kdpfd, &epfds[0], fds_count, 0);
if (retcode > 0)
{
for(int i = 0; i < retcode; ++i)
{
zHttpTask *task = (zHttpTask *)epfds[i].data.ptr;//获取epoll事件的数据,即连接对象
if (epfds[i].events & (EPOLLERR | EPOLLPRI))//检查epoll事件是否有错
{
//套接口出现错误
remove(task);
}
else if (epfds[i].events & EPOLLIN)//检查epoll事件是否是读事件
{
switch(task->httpCore())//阻塞recv连接对象缓冲区数据
{
case 1: //接收成功
case -1: //接收失败
remove(task);
break;
case 0: //接收超时,
break;
}
}
}
}
currentTime.now();
for(it = tasks.begin(), next = it, ++next; it != tasks.end(); it = next, ++next)//检查短连接的连接对象,移除超时的连接对象
{
zHttpTask *task = *it;
if (task->checkHttpTimeout(currentTime))
{
//超过指定时间验证还没有通过,需要回收连接
remove(it);
}
}
}
mutex.unlock();
zThread::msleep(50);
}
//把所有等待验证队列中的连接加入到回收队列中,回收这些连接
for(it = tasks.begin(), next = it, ++next; it != tasks.end(); it = next, ++next)
{
remove(it);
}
}
/**
* \brief 把一个TCP连接添加到验证队列中,因为存在多个验证队列,需要按照一定的算法添加到不同的验证处理队列中
* \param task 一个连接任务
*/
bool zHttpTaskPool::addHttp(zHttpTask *task)
{
//因为存在多个验证队列,需要按照一定的算法添加到不同的验证处理队列中
static unsigned int hashcode = 0;
zHttpThread *pHttpThread = (zHttpThread *)httpThreads.getByIndex(hashcode++ % maxHttpThreads);
if (pHttpThread)
pHttpThread->add(task);
return true;
}
/**
* \brief 初始化线程池,预先创建各种线程
* \return 初始化是否成功
*/
bool zHttpTaskPool::init()
{
//创建初始化验证线程
for(int i = 0; i < maxHttpThreads; ++i)
{
std::ostringstream name;
name << "zHttpThread[" << i << "]";
zHttpThread *pHttpThread = new zHttpThread(this, name.str());
if (NULL == pHttpThread)
return false;
if (!pHttpThread->start())
return false;
httpThreads.add(pHttpThread);
}
return true;
}
/**
* \brief 释放线程池,释放各种资源,等待各种线程退出
*/
void zHttpTaskPool::final()
{
httpThreads.joinAll();
}