前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >支持定时任务的任务池

支持定时任务的任务池

作者头像
gaigai
发布2019-08-29 17:34:10
7320
发布2019-08-29 17:34:10
举报
文章被收录于专栏:Windows开发Windows开发

任务池可以用来异步处理任务,比如清理过期日志、HTTP请求,本文介绍的任务池还支持定时触发任务,在SetTimer得注意的两个坑 一文中介绍了工作线程如果想使用定时器需要有消息循环,有了本文介绍的任务池,工作线程可以将定时器的实现交给它。

任务池实现机制如下图所示:

第一,一个任务池会启动一个线程,线程启动时调用RegisterClass注册窗口类,再调用CreateWindowEx创建一个窗口,然后进入消息循环GetMessage。

第二,当调用PostTask投递任务时指定任务执行回调和上下文参数,任务池为该任务分配一个任务ID,将任务ID、任务执行回调、上下文参数打包作为自定义消息WM_TASK_POOL的LPARAM参数,调用PostMessage投递到消息队列。

第三,当调用PostTimerTask投递定时任务时指定任务执行回调、上下文参数和定时周期,调用SetTimer设定定时器,定时触发WM_TIMER消息到消息队列,同时将定时器ID与定时任务绑定。

第四,消息循环GetMessage获取WM_TASK_POOL消息时执行LPARAM中的任务执行回调,获取WM_TIMER消息时根据消息ID查询绑定的定时任务并执行。

任务取消实现机制:投递任务时,任务池会为每个任务分配一个任务ID并将任务ID放到任务集合对象中。业务方通过任务ID取消任务执行时,将任务ID从任务集合对象中移除。任务池执行任务回调前判断如果任务ID不在任务集合对象就不执行。注意,如果当前任务正在执行,取消任务将返失败。由于任务是被异步执行,在释放任务执行回调过程中访问的资源时,务必等任务回调执行完成或取消任务。

任务池的类图如下图:

类CThreadBase,在 一个简单实用的线程基类 文章中介绍的线程基类。

类CTaskPool,派生于CThreadBase,提供四个接口:

接口WaitUntilCanPostTask(), 等待任务池可以接受任务,业务方投递任务前需调用,因为任务池在线程启动后需要创建窗口,如果窗口尚未创建,投递任务调用PostMessage()时将失败。

接口PostTask(),投递任务,指定任务执行回调和上下文参数,返回任务ID。

接口PostTimerTask(),投递定时任务,指定任务执行回调、上下文参数、定时周期,返回任务ID。

接口CancelTask(),取消任务,指定任务ID。

类CTaskItem,实体类封装任务,包括任务ID、任务回调、任务上下文参数。

类CCriticalSection,封装临界区,在 自动解锁与提前解锁 文章中已介绍。

源码包括:TaskPool.h、TaskPool.cpp、CriticalSection.h、CriticalSection.cpp

TaskPool.h

代码语言:javascript
复制
#pragma once

#include <set>
#include <map>
#include "ThreadBase.h"
#include "CriticalSection.h"

#define INVALID_TASK_ID  0

// task entry
typedef void (__stdcall *TaskProc)(unsigned int nTaskId, void* pContext);

class CTaskPool : public CThreadBase
{
public:
    class CTaskItem
    {
    public:
        unsigned int m_nTaskId = 0;
        TaskProc m_pTaskProcFun = nullptr;
        void* m_pContext = nullptr;
    };

public:
    CTaskPool();
    ~CTaskPool();

public:    
    // set task pool name
    void SetName(const std::string& strName) { m_strName = strName; }

    // can not post task before inner window is created
    bool WaitUntilCanPostTask();

    // return INVALID_TASK_ID if failed
    unsigned int PostTask(TaskProc pFunc, void* pContext);

    // call pFunc once for every nElapse time
    // return INVALID_TASK_ID if failed
    unsigned int PostTimerTask(TaskProc pFunc, void* pContext, unsigned int nElapse);

    // TaskFunc will not be called after cancel
    // return false if the task is processing, otherwise return true
    bool CancelTask(unsigned int nTaskId);

protected:  // override CThreadBase
    virtual bool OnStart(const std::string& strParam) override;
    virtual void OnRun(const std::string& strParam) override;
    virtual void OnStop() override;

private:
    unsigned int AllocateTaskId();
    static LRESULT CALLBACK MsgWndWindowProc(HWND hWnd, UINT message, WPARAM wParam, LPARAM lParam);
    void ProcessTask(const CTaskItem* pTaskItem);
    void ProcessTimer(unsigned int nTimerId);

private:
    std::string m_strName;
    DWORD m_nThreadId = 0;
    HANDLE m_hSyncInit = NULL;
    HWND m_hTaskPoolWnd = NULL;

    CCSWrap m_csTaskPool;  // for mutex visit
    unsigned int m_nProcessingTaskId = 0;
    std::map<unsigned int, CTaskItem*> m_mapTimerId2Task;    
    std::set<unsigned int> m_setWillProcessTaskIds;

    static std::map<HWND, CTaskPool*>* s_mapHwnd2TaskPool;
    static CCSWrap s_csHwnd2TaskPool;
};

TaskPool.cpp

代码语言:javascript
复制
#include "TaskPool.h"

// self implement log module
#define LOG_DEBUG(TAG, ...)
#define LOG_INFO(TAG, ...)
#define LOG_WARN(TAG, ...)
#define LOG_ERROR(TAG, ...)

#define LOG_TAG  "taskpool"
#define WINDOW_CLASS_NAME  L"task_pool"
#define WM_TASK_POOL        1000

std::map<HWND, CTaskPool*>* CTaskPool::s_mapHwnd2TaskPool = new std::map<HWND, CTaskPool*>;
CCSWrap CTaskPool::s_csHwnd2TaskPool;

CTaskPool::CTaskPool()
{
}

CTaskPool::~CTaskPool()
{
}

bool CTaskPool::WaitUntilCanPostTask()
{
    if (m_hTaskPoolWnd != NULL)
    {
        return true;
    }

    if (m_hSyncInit != NULL)
    {
        WaitForSingleObject(m_hSyncInit, INFINITE);
    }

    return m_hTaskPoolWnd != NULL;
}

unsigned int CTaskPool::PostTask(TaskProc pFunc, void* pContext)
{
    if (m_hTaskPoolWnd == NULL)
    {
        THREAD_BASE_ASSERT(false);
        return INVALID_TASK_ID;
    }
    
    unsigned int nTaskId = AllocateTaskId();
    CCriticalSection cs(m_csTaskPool.GetCS());
    m_setWillProcessTaskIds.insert(m_setWillProcessTaskIds.end(), nTaskId);
    cs.Leave();

    CTaskItem* pTaskItem = new CTaskItem();
    pTaskItem->m_nTaskId = nTaskId;
    pTaskItem->m_pTaskProcFun = pFunc;
    pTaskItem->m_pContext = pContext;
    if (PostMessage(m_hTaskPoolWnd, WM_TASK_POOL, 0, (LPARAM)pTaskItem))
    {
        return nTaskId;
    }
    else
    {
        CCriticalSection cs(m_csTaskPool.GetCS());
        m_setWillProcessTaskIds.erase(nTaskId);
        cs.Leave();

        delete pTaskItem;
        THREAD_BASE_ASSERT(false);
        LOG_ERROR(LOG_TAG, "failed to post WM_TASK_POOL message, error is %d", GetLastError());
        return INVALID_TASK_ID;
    }
}

unsigned int CTaskPool::PostTimerTask(TaskProc pFunc, void* pContext, unsigned int nElapse)
{
    if (m_hTaskPoolWnd == NULL)
    {
        THREAD_BASE_ASSERT(false);
        return INVALID_TASK_ID;
    }
    
    unsigned int nTaskId = AllocateTaskId();    
    if (SetTimer(m_hTaskPoolWnd, nTaskId, nElapse, nullptr) == 0)
    {
        THREAD_BASE_ASSERT(false);
        return INVALID_TASK_ID;
    }

    CTaskItem* pTaskItem = new CTaskItem();
    pTaskItem->m_nTaskId = nTaskId;
    pTaskItem->m_pTaskProcFun = pFunc;
    pTaskItem->m_pContext = pContext;

    CCriticalSection cs(m_csTaskPool.GetCS());
    m_mapTimerId2Task[nTaskId] = pTaskItem;
    return nTaskId;
}

bool CTaskPool::CancelTask(unsigned int nTaskId)
{
    CCriticalSection cs(m_csTaskPool.GetCS());

    // first judge wheather it is a timer task
    auto it = m_mapTimerId2Task.find(nTaskId);
    if (it != m_mapTimerId2Task.end())
    {
        KillTimer(m_hTaskPoolWnd, nTaskId);
        delete it->second;
        it->second = nullptr;
        m_mapTimerId2Task.erase(nTaskId);
        return true;
    }    
        
    // can not cancel processing task
    if (nTaskId == m_nProcessingTaskId)
    {
        return false;
    }

    m_setWillProcessTaskIds.erase(nTaskId);
    return true;
}

bool CTaskPool::OnStart(const std::string& strParam)
{
    LOG_DEBUG(LOG_TAG, "the task pool of %s starts", m_strName.c_str());
    m_hSyncInit = CreateEvent(nullptr, TRUE, FALSE, nullptr);
    return true;
}

void CTaskPool::OnRun(const std::string& strParam)
{
    LOG_DEBUG(LOG_TAG, "the task pool of %s is running", m_strName.c_str());

    m_nThreadId = GetCurrentThreadId();

    WNDCLASS wc = { 0 };
    wc.lpszClassName = WINDOW_CLASS_NAME;
    wc.hbrBackground = CreateSolidBrush(RGB(255, 255, 255));
    wc.hIcon = LoadIcon(NULL, IDI_APPLICATION);
    wc.hCursor = LoadCursor(NULL, IDC_ARROW);
    wc.lpfnWndProc = MsgWndWindowProc;
    wc.hInstance = NULL;
    if (RegisterClass(&wc) == 0 && GetLastError() != ERROR_CLASS_ALREADY_EXISTS)
    {
        LOG_ERROR(LOG_TAG, "failed to register window class, error is %d", GetLastError());
        SetEvent(m_hSyncInit);
        return;
    }

    m_hTaskPoolWnd = CreateWindowEx(WS_EX_LAYERED, WINDOW_CLASS_NAME, L"123145", WS_POPUP, CW_USEDEFAULT, CW_USEDEFAULT, 1, 1, GetDesktopWindow(), 0, NULL, 0);
    if (m_hTaskPoolWnd == NULL)
    {        
        LOG_ERROR(LOG_TAG, "failed to create window for task pool, error is %d", GetLastError());
        SetEvent(m_hSyncInit);
        return;
    }    

    HWND hTaskPoolWnd = m_hTaskPoolWnd;
    CCriticalSection csInsert(s_csHwnd2TaskPool.GetCS());
    (*s_mapHwnd2TaskPool)[hTaskPoolWnd] = this;
    csInsert.Leave();

    SetEvent(m_hSyncInit);

    MSG msg;
    while (GetMessage(&msg, 0, 0, 0))
    {
        DispatchMessage(&msg);
    }

    CCriticalSection csDelete(s_csHwnd2TaskPool.GetCS());
    (*s_mapHwnd2TaskPool).erase(hTaskPoolWnd);  // not used m_hTaskPoolWnd, it may be reset
    csDelete.Leave();

    LOG_DEBUG(LOG_TAG, "the task pool of %s is over", m_strName.c_str());
}

void CTaskPool::OnStop()
{
    LOG_DEBUG(LOG_TAG, "the task pool of %s is stopped", m_strName.c_str());

    if (m_nThreadId > 0)
    {
        PostThreadMessage(m_nThreadId, WM_QUIT, 0, 0);        
    }

    CCriticalSection cs(m_csTaskPool.GetCS());
    for (auto item : m_mapTimerId2Task)
    {
        KillTimer(m_hTaskPoolWnd, item.first);
        delete item.second;
        item.second = nullptr;
    }
    m_mapTimerId2Task.clear();
    m_nThreadId = 0;
    m_hTaskPoolWnd = NULL;
    cs.Leave();

    if (m_hSyncInit)
    {
        CloseHandle(m_hSyncInit);
        m_hSyncInit = NULL;
    }
}

unsigned int CTaskPool::AllocateTaskId()
{
    CCriticalSection cs(m_csTaskPool.GetCS());
    static unsigned int s_nTaskId = 0;
    s_nTaskId++;
    return s_nTaskId;
}

LRESULT CALLBACK CTaskPool::MsgWndWindowProc(HWND hWnd, UINT message, WPARAM wParam, LPARAM lParam)
{
    CTaskPool* pTaskPool = nullptr;
    if (hWnd != NULL)
    {
        CCriticalSection cs(s_csHwnd2TaskPool.GetCS());        
        auto it = (*s_mapHwnd2TaskPool).find(hWnd);
        if (it != (*s_mapHwnd2TaskPool).end())
        {
            pTaskPool = it->second;
        }
        cs.Leave();
    }

    if (message == WM_TASK_POOL)
    {
        CTaskItem* pTaskItem = (CTaskItem*)lParam;
        THREAD_BASE_ASSERT(pTaskPool);
        if (pTaskPool)
        {
            pTaskPool->ProcessTask(pTaskItem);
        }        
        delete pTaskItem;
        return 0L;
    }
    else if (message == WM_TIMER)
    {
        if (pTaskPool)
        {
            pTaskPool->ProcessTimer((unsigned int)wParam);
        }
        return 0L;
    }

    return DefWindowProc(hWnd, message, wParam, lParam);
}

void CTaskPool::ProcessTask(const CTaskItem* pTaskItem)
{
    // first judge wheather the task is canceled
    CCriticalSection csBefore(m_csTaskPool.GetCS());
    m_nProcessingTaskId = pTaskItem->m_nTaskId;
    auto it = m_setWillProcessTaskIds.find(pTaskItem->m_nTaskId);
    if (it == m_setWillProcessTaskIds.end())
    {
        LOG_WARN(LOG_TAG, "the task %d is already cancel", pTaskItem->m_nTaskId);
        return;
    }
    else
    {
        m_nProcessingTaskId = pTaskItem->m_nTaskId;
    }
    csBefore.Leave();

    // process task
    if (pTaskItem->m_pTaskProcFun)
    {
        pTaskItem->m_pTaskProcFun(pTaskItem->m_nTaskId, pTaskItem->m_pContext);
    }

    CCriticalSection csAfter(m_csTaskPool.GetCS());
    m_nProcessingTaskId = INVALID_TASK_ID;
    m_setWillProcessTaskIds.erase(pTaskItem->m_nTaskId);
    csAfter.Leave();
}

void CTaskPool::ProcessTimer(unsigned int nTimerId)
{    
    CCriticalSection cs(m_csTaskPool.GetCS());
    auto it = m_mapTimerId2Task.find(nTimerId);
    if (it == m_mapTimerId2Task.end())
    {        
        THREAD_BASE_ASSERT(false);
        LOG_ERROR(LOG_TAG, "failed to find task for timer %d", nTimerId);
        return;
    }
    CTaskItem* pTaskItem = it->second;
    cs.Leave();
    
    if (pTaskItem->m_pTaskProcFun)
    {
        pTaskItem->m_pTaskProcFun(pTaskItem->m_nTaskId, pTaskItem->m_pContext);
    }
}

CriticalSection.h

代码语言:javascript
复制
#pragma once

#include <windows.h>

/***
临界区对象,构造函数自动进入临界区,析构函数自动释放临界区,析构之前可手动调Leave提前离开临界区
*/
class CCriticalSection
{
public:
    CCriticalSection(CRITICAL_SECTION* pCS);
    ~CCriticalSection();

public:
    /**
    @name 离开临界区
    */
    void Leave();

private:
    CRITICAL_SECTION* m_pCS = nullptr;
    bool m_bAlreadyLeave = false;
};

/**
将临界区变量封装成对象,利用构造函数初始化,析构函数释放资源
*/
class CCSWrap
{
public:
    CCSWrap()  { InitializeCriticalSection(&m_cs); }
    ~CCSWrap() { DeleteCriticalSection(&m_cs); }

public:
    CRITICAL_SECTION* GetCS() { return &m_cs; }

private:
    CRITICAL_SECTION m_cs;
};

CriticalSection.cpp

代码语言:javascript
复制
#include "CriticalSection.h"

CCriticalSection::CCriticalSection(CRITICAL_SECTION* pCS)
{
    m_pCS = pCS;
    if (m_pCS)
    {
        EnterCriticalSection(m_pCS);
    }    
}

CCriticalSection::~CCriticalSection()
{
    if (!m_bAlreadyLeave)
    {
        if (m_pCS)
        {
            LeaveCriticalSection(m_pCS);
        }        
    }    
}

void CCriticalSection::Leave()
{
    if (!m_bAlreadyLeave)
    {
        m_bAlreadyLeave = true;
        if (m_pCS)
        {
            LeaveCriticalSection(m_pCS);
        }
    }
}
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-08-10,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Windows开发 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
消息队列 CMQ 版
消息队列 CMQ 版(TDMQ for CMQ,简称 TDMQ CMQ 版)是一款分布式高可用的消息队列服务,它能够提供可靠的,基于消息的异步通信机制,能够将分布式部署的不同应用(或同一应用的不同组件)中的信息传递,存储在可靠有效的 CMQ 队列中,防止消息丢失。TDMQ CMQ 版支持多进程同时读写,收发互不干扰,无需各应用或组件始终处于运行状态。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档