任务池可以用来异步处理任务,比如清理过期日志、HTTP请求,本文介绍的任务池还支持定时触发任务,在SetTimer得注意的两个坑 一文中介绍了工作线程如果想使用定时器需要有消息循环,有了本文介绍的任务池,工作线程可以将定时器的实现交给它。
任务池实现机制如下图所示:
第一,一个任务池会启动一个线程,线程启动时调用RegisterClass注册窗口类,再调用CreateWindowEx创建一个窗口,然后进入消息循环GetMessage。
第二,当调用PostTask投递任务时指定任务执行回调和上下文参数,任务池为该任务分配一个任务ID,将任务ID、任务执行回调、上下文参数打包作为自定义消息WM_TASK_POOL的LPARAM参数,调用PostMessage投递到消息队列。
第三,当调用PostTimerTask投递定时任务时指定任务执行回调、上下文参数和定时周期,调用SetTimer设定定时器,定时触发WM_TIMER消息到消息队列,同时将定时器ID与定时任务绑定。
第四,消息循环GetMessage获取WM_TASK_POOL消息时执行LPARAM中的任务执行回调,获取WM_TIMER消息时根据消息ID查询绑定的定时任务并执行。
任务取消实现机制:投递任务时,任务池会为每个任务分配一个任务ID并将任务ID放到任务集合对象中。业务方通过任务ID取消任务执行时,将任务ID从任务集合对象中移除。任务池执行任务回调前判断如果任务ID不在任务集合对象就不执行。注意,如果当前任务正在执行,取消任务将返失败。由于任务是被异步执行,在释放任务执行回调过程中访问的资源时,务必等任务回调执行完成或取消任务。
任务池的类图如下图:
类CThreadBase,在 一个简单实用的线程基类 文章中介绍的线程基类。
类CTaskPool,派生于CThreadBase,提供四个接口:
接口WaitUntilCanPostTask(), 等待任务池可以接受任务,业务方投递任务前需调用,因为任务池在线程启动后需要创建窗口,如果窗口尚未创建,投递任务调用PostMessage()时将失败。
接口PostTask(),投递任务,指定任务执行回调和上下文参数,返回任务ID。
接口PostTimerTask(),投递定时任务,指定任务执行回调、上下文参数、定时周期,返回任务ID。
接口CancelTask(),取消任务,指定任务ID。
类CTaskItem,实体类封装任务,包括任务ID、任务回调、任务上下文参数。
类CCriticalSection,封装临界区,在 自动解锁与提前解锁 文章中已介绍。
源码包括:TaskPool.h、TaskPool.cpp、CriticalSection.h、CriticalSection.cpp
TaskPool.h
#pragma once
#include <set>
#include <map>
#include "ThreadBase.h"
#include "CriticalSection.h"
#define INVALID_TASK_ID 0
// task entry
typedef void (__stdcall *TaskProc)(unsigned int nTaskId, void* pContext);
class CTaskPool : public CThreadBase
{
public:
class CTaskItem
{
public:
unsigned int m_nTaskId = 0;
TaskProc m_pTaskProcFun = nullptr;
void* m_pContext = nullptr;
};
public:
CTaskPool();
~CTaskPool();
public:
// set task pool name
void SetName(const std::string& strName) { m_strName = strName; }
// can not post task before inner window is created
bool WaitUntilCanPostTask();
// return INVALID_TASK_ID if failed
unsigned int PostTask(TaskProc pFunc, void* pContext);
// call pFunc once for every nElapse time
// return INVALID_TASK_ID if failed
unsigned int PostTimerTask(TaskProc pFunc, void* pContext, unsigned int nElapse);
// TaskFunc will not be called after cancel
// return false if the task is processing, otherwise return true
bool CancelTask(unsigned int nTaskId);
protected: // override CThreadBase
virtual bool OnStart(const std::string& strParam) override;
virtual void OnRun(const std::string& strParam) override;
virtual void OnStop() override;
private:
unsigned int AllocateTaskId();
static LRESULT CALLBACK MsgWndWindowProc(HWND hWnd, UINT message, WPARAM wParam, LPARAM lParam);
void ProcessTask(const CTaskItem* pTaskItem);
void ProcessTimer(unsigned int nTimerId);
private:
std::string m_strName;
DWORD m_nThreadId = 0;
HANDLE m_hSyncInit = NULL;
HWND m_hTaskPoolWnd = NULL;
CCSWrap m_csTaskPool; // for mutex visit
unsigned int m_nProcessingTaskId = 0;
std::map<unsigned int, CTaskItem*> m_mapTimerId2Task;
std::set<unsigned int> m_setWillProcessTaskIds;
static std::map<HWND, CTaskPool*>* s_mapHwnd2TaskPool;
static CCSWrap s_csHwnd2TaskPool;
};
TaskPool.cpp
#include "TaskPool.h"
// self implement log module
#define LOG_DEBUG(TAG, ...)
#define LOG_INFO(TAG, ...)
#define LOG_WARN(TAG, ...)
#define LOG_ERROR(TAG, ...)
#define LOG_TAG "taskpool"
#define WINDOW_CLASS_NAME L"task_pool"
#define WM_TASK_POOL 1000
std::map<HWND, CTaskPool*>* CTaskPool::s_mapHwnd2TaskPool = new std::map<HWND, CTaskPool*>;
CCSWrap CTaskPool::s_csHwnd2TaskPool;
CTaskPool::CTaskPool()
{
}
CTaskPool::~CTaskPool()
{
}
bool CTaskPool::WaitUntilCanPostTask()
{
if (m_hTaskPoolWnd != NULL)
{
return true;
}
if (m_hSyncInit != NULL)
{
WaitForSingleObject(m_hSyncInit, INFINITE);
}
return m_hTaskPoolWnd != NULL;
}
unsigned int CTaskPool::PostTask(TaskProc pFunc, void* pContext)
{
if (m_hTaskPoolWnd == NULL)
{
THREAD_BASE_ASSERT(false);
return INVALID_TASK_ID;
}
unsigned int nTaskId = AllocateTaskId();
CCriticalSection cs(m_csTaskPool.GetCS());
m_setWillProcessTaskIds.insert(m_setWillProcessTaskIds.end(), nTaskId);
cs.Leave();
CTaskItem* pTaskItem = new CTaskItem();
pTaskItem->m_nTaskId = nTaskId;
pTaskItem->m_pTaskProcFun = pFunc;
pTaskItem->m_pContext = pContext;
if (PostMessage(m_hTaskPoolWnd, WM_TASK_POOL, 0, (LPARAM)pTaskItem))
{
return nTaskId;
}
else
{
CCriticalSection cs(m_csTaskPool.GetCS());
m_setWillProcessTaskIds.erase(nTaskId);
cs.Leave();
delete pTaskItem;
THREAD_BASE_ASSERT(false);
LOG_ERROR(LOG_TAG, "failed to post WM_TASK_POOL message, error is %d", GetLastError());
return INVALID_TASK_ID;
}
}
unsigned int CTaskPool::PostTimerTask(TaskProc pFunc, void* pContext, unsigned int nElapse)
{
if (m_hTaskPoolWnd == NULL)
{
THREAD_BASE_ASSERT(false);
return INVALID_TASK_ID;
}
unsigned int nTaskId = AllocateTaskId();
if (SetTimer(m_hTaskPoolWnd, nTaskId, nElapse, nullptr) == 0)
{
THREAD_BASE_ASSERT(false);
return INVALID_TASK_ID;
}
CTaskItem* pTaskItem = new CTaskItem();
pTaskItem->m_nTaskId = nTaskId;
pTaskItem->m_pTaskProcFun = pFunc;
pTaskItem->m_pContext = pContext;
CCriticalSection cs(m_csTaskPool.GetCS());
m_mapTimerId2Task[nTaskId] = pTaskItem;
return nTaskId;
}
bool CTaskPool::CancelTask(unsigned int nTaskId)
{
CCriticalSection cs(m_csTaskPool.GetCS());
// first judge wheather it is a timer task
auto it = m_mapTimerId2Task.find(nTaskId);
if (it != m_mapTimerId2Task.end())
{
KillTimer(m_hTaskPoolWnd, nTaskId);
delete it->second;
it->second = nullptr;
m_mapTimerId2Task.erase(nTaskId);
return true;
}
// can not cancel processing task
if (nTaskId == m_nProcessingTaskId)
{
return false;
}
m_setWillProcessTaskIds.erase(nTaskId);
return true;
}
bool CTaskPool::OnStart(const std::string& strParam)
{
LOG_DEBUG(LOG_TAG, "the task pool of %s starts", m_strName.c_str());
m_hSyncInit = CreateEvent(nullptr, TRUE, FALSE, nullptr);
return true;
}
void CTaskPool::OnRun(const std::string& strParam)
{
LOG_DEBUG(LOG_TAG, "the task pool of %s is running", m_strName.c_str());
m_nThreadId = GetCurrentThreadId();
WNDCLASS wc = { 0 };
wc.lpszClassName = WINDOW_CLASS_NAME;
wc.hbrBackground = CreateSolidBrush(RGB(255, 255, 255));
wc.hIcon = LoadIcon(NULL, IDI_APPLICATION);
wc.hCursor = LoadCursor(NULL, IDC_ARROW);
wc.lpfnWndProc = MsgWndWindowProc;
wc.hInstance = NULL;
if (RegisterClass(&wc) == 0 && GetLastError() != ERROR_CLASS_ALREADY_EXISTS)
{
LOG_ERROR(LOG_TAG, "failed to register window class, error is %d", GetLastError());
SetEvent(m_hSyncInit);
return;
}
m_hTaskPoolWnd = CreateWindowEx(WS_EX_LAYERED, WINDOW_CLASS_NAME, L"123145", WS_POPUP, CW_USEDEFAULT, CW_USEDEFAULT, 1, 1, GetDesktopWindow(), 0, NULL, 0);
if (m_hTaskPoolWnd == NULL)
{
LOG_ERROR(LOG_TAG, "failed to create window for task pool, error is %d", GetLastError());
SetEvent(m_hSyncInit);
return;
}
HWND hTaskPoolWnd = m_hTaskPoolWnd;
CCriticalSection csInsert(s_csHwnd2TaskPool.GetCS());
(*s_mapHwnd2TaskPool)[hTaskPoolWnd] = this;
csInsert.Leave();
SetEvent(m_hSyncInit);
MSG msg;
while (GetMessage(&msg, 0, 0, 0))
{
DispatchMessage(&msg);
}
CCriticalSection csDelete(s_csHwnd2TaskPool.GetCS());
(*s_mapHwnd2TaskPool).erase(hTaskPoolWnd); // not used m_hTaskPoolWnd, it may be reset
csDelete.Leave();
LOG_DEBUG(LOG_TAG, "the task pool of %s is over", m_strName.c_str());
}
void CTaskPool::OnStop()
{
LOG_DEBUG(LOG_TAG, "the task pool of %s is stopped", m_strName.c_str());
if (m_nThreadId > 0)
{
PostThreadMessage(m_nThreadId, WM_QUIT, 0, 0);
}
CCriticalSection cs(m_csTaskPool.GetCS());
for (auto item : m_mapTimerId2Task)
{
KillTimer(m_hTaskPoolWnd, item.first);
delete item.second;
item.second = nullptr;
}
m_mapTimerId2Task.clear();
m_nThreadId = 0;
m_hTaskPoolWnd = NULL;
cs.Leave();
if (m_hSyncInit)
{
CloseHandle(m_hSyncInit);
m_hSyncInit = NULL;
}
}
unsigned int CTaskPool::AllocateTaskId()
{
CCriticalSection cs(m_csTaskPool.GetCS());
static unsigned int s_nTaskId = 0;
s_nTaskId++;
return s_nTaskId;
}
LRESULT CALLBACK CTaskPool::MsgWndWindowProc(HWND hWnd, UINT message, WPARAM wParam, LPARAM lParam)
{
CTaskPool* pTaskPool = nullptr;
if (hWnd != NULL)
{
CCriticalSection cs(s_csHwnd2TaskPool.GetCS());
auto it = (*s_mapHwnd2TaskPool).find(hWnd);
if (it != (*s_mapHwnd2TaskPool).end())
{
pTaskPool = it->second;
}
cs.Leave();
}
if (message == WM_TASK_POOL)
{
CTaskItem* pTaskItem = (CTaskItem*)lParam;
THREAD_BASE_ASSERT(pTaskPool);
if (pTaskPool)
{
pTaskPool->ProcessTask(pTaskItem);
}
delete pTaskItem;
return 0L;
}
else if (message == WM_TIMER)
{
if (pTaskPool)
{
pTaskPool->ProcessTimer((unsigned int)wParam);
}
return 0L;
}
return DefWindowProc(hWnd, message, wParam, lParam);
}
void CTaskPool::ProcessTask(const CTaskItem* pTaskItem)
{
// first judge wheather the task is canceled
CCriticalSection csBefore(m_csTaskPool.GetCS());
m_nProcessingTaskId = pTaskItem->m_nTaskId;
auto it = m_setWillProcessTaskIds.find(pTaskItem->m_nTaskId);
if (it == m_setWillProcessTaskIds.end())
{
LOG_WARN(LOG_TAG, "the task %d is already cancel", pTaskItem->m_nTaskId);
return;
}
else
{
m_nProcessingTaskId = pTaskItem->m_nTaskId;
}
csBefore.Leave();
// process task
if (pTaskItem->m_pTaskProcFun)
{
pTaskItem->m_pTaskProcFun(pTaskItem->m_nTaskId, pTaskItem->m_pContext);
}
CCriticalSection csAfter(m_csTaskPool.GetCS());
m_nProcessingTaskId = INVALID_TASK_ID;
m_setWillProcessTaskIds.erase(pTaskItem->m_nTaskId);
csAfter.Leave();
}
void CTaskPool::ProcessTimer(unsigned int nTimerId)
{
CCriticalSection cs(m_csTaskPool.GetCS());
auto it = m_mapTimerId2Task.find(nTimerId);
if (it == m_mapTimerId2Task.end())
{
THREAD_BASE_ASSERT(false);
LOG_ERROR(LOG_TAG, "failed to find task for timer %d", nTimerId);
return;
}
CTaskItem* pTaskItem = it->second;
cs.Leave();
if (pTaskItem->m_pTaskProcFun)
{
pTaskItem->m_pTaskProcFun(pTaskItem->m_nTaskId, pTaskItem->m_pContext);
}
}
CriticalSection.h
#pragma once
#include <windows.h>
/***
临界区对象,构造函数自动进入临界区,析构函数自动释放临界区,析构之前可手动调Leave提前离开临界区
*/
class CCriticalSection
{
public:
CCriticalSection(CRITICAL_SECTION* pCS);
~CCriticalSection();
public:
/**
@name 离开临界区
*/
void Leave();
private:
CRITICAL_SECTION* m_pCS = nullptr;
bool m_bAlreadyLeave = false;
};
/**
将临界区变量封装成对象,利用构造函数初始化,析构函数释放资源
*/
class CCSWrap
{
public:
CCSWrap() { InitializeCriticalSection(&m_cs); }
~CCSWrap() { DeleteCriticalSection(&m_cs); }
public:
CRITICAL_SECTION* GetCS() { return &m_cs; }
private:
CRITICAL_SECTION m_cs;
};
CriticalSection.cpp
#include "CriticalSection.h"
CCriticalSection::CCriticalSection(CRITICAL_SECTION* pCS)
{
m_pCS = pCS;
if (m_pCS)
{
EnterCriticalSection(m_pCS);
}
}
CCriticalSection::~CCriticalSection()
{
if (!m_bAlreadyLeave)
{
if (m_pCS)
{
LeaveCriticalSection(m_pCS);
}
}
}
void CCriticalSection::Leave()
{
if (!m_bAlreadyLeave)
{
m_bAlreadyLeave = true;
if (m_pCS)
{
LeaveCriticalSection(m_pCS);
}
}
}