前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >C++线程池看这篇就够了,支持不同优先级,支持带返回值

C++线程池看这篇就够了,支持不同优先级,支持带返回值

作者头像
程序员的园
发布2024-07-18 13:11:35
1760
发布2024-07-18 13:11:35
举报
文章被收录于专栏:程序员的园——原创文章

如果知道我会死在哪里,那我将永远不去那个地方 -查理 芒格

引言

随着多核处理器的普及,并发编程在提高应用程序性能方面变得越来越重要。C++标准库提供了多线程支持,但直接使用std::thread进行大规模并发编程无疑增加了线程创建、销毁的开销。

线程池作为一种高效管理线程的机制,具有如下的有点(1)通过重用已存在的线程,减少对象的创建、销毁的开销,提升性能;(2)通过重复利用已创建的线程降低线程创建和销毁造成的消耗,防止消耗过多的内存或系统资源;(3)当任务到达时,任务可以不需要等待线程创建就能立即执行,消除了线程创建所带来的延迟,使应用程序响应更快;(4)线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性。线程池可以进行统一的分配、调优和监控,提高线程的可管理性。

本文将深入探讨C++线程池的原理、实现以及最佳实践。

原理

简言之,线程池的原理为多个线程从一个任务队列中取任务,如果取到任务便执行任务,未取到任务则等待新的任务到来,直到将所有任务取完。

由以上可知,线程池需要一个任务队列、一个线程队列,同时,为了保证取任务、添加任务的原子性,需要配套的控制变量(互斥锁、条件变量),具体详述如下:

(1)线程池初始化:线程池在创建时,会预先创建一组线程并保存在池中。这些线程通常处于休眠状态,等待任务的到来;

(2)任务队列:当有新任务到达时,它会被放入一个任务队列中。线程池中的线程会等待新任务到来的通知;

(3)线程复用:一旦线程执行完一个任务,它不会立即被销毁,而是一直在池内等待新任务的到来

(4)线程管理:线程池还负责管理线程的生命周期。例如,如果所有线程都在忙碌状态,并且队列中还有新的任务等待处理,线程池可能会选择创建新的线程来处理这些任务。

依据场景的不同,存在但不限于如下两种场景

1. 任务有不同的优先级,优先级高的任务希望能够先被执行,优先级低的任务可以延后执行;

2. 针对需要执行的任务,有的任务需要结果,有的任务不需要结果。

结合如上的场景,本文实现了可以支持如上需求的线程池。

实现

如下源码可直接拿走运行,也可以后台回复“线程池”获取源码下载链接。

代码语言:javascript
复制
//header 
#pragma once
#include <functional>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <future>

enum class TaskPriority:int
{
  kTP_Highest,
  kTP_Normal
};


class TaskEvent final
{
public:
  TaskEvent(std::function<void()>f, TaskPriority p=TaskPriority::kTP_Highest):
  m_event(f),
  m_priority(p)
  {}

  const TaskPriority priority()const noexcept
{
    return m_priority;
  }

  void operator()()
{
    m_event();
  }

private:
    std::function<void()> m_event{nullptr};
    TaskPriority m_priority;
};

struct Compare {
  bool operator()(const std::shared_ptr <TaskEvent>& left, const std::shared_ptr <TaskEvent>& right) 
{   
    return left->priority() > right->priority(); 
  }
};

class ThreadPool
{
public:
  ThreadPool(int thread_num=std::thread::hardware_concurrency(), int max_thread_num= 4* std::thread::hardware_concurrency());
  ~ThreadPool();

  void AddTask(std::function<void()> t);
  void AddTask(std::shared_ptr<TaskEvent> te);

  template<typename F, typename... Args>
  auto AddHasResultTask(F&& f, Args&&... args) -> std::shared_future<decltype(f(args...))>//shared_futrue for saved into vector or queue
{
    using returnType = decltype(f(args...));
    auto task = std::make_shared<std::packaged_task<returnType()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
    std::shared_future<returnType> result = task->get_future().share();

    auto  t = std::make_shared<TaskEvent>([task]() {          (*task)();       });
    AddTask(t);
    return result;
  }

private:
  static void threadWorkFunction(void* p);

private:
  int m_current_thread_num{0};
  int m_max_thread_num{0};
  std::atomic_bool m_thread_pool_runing{true};

  //about threads
  std::vector<std::thread> m_threads;
  //about threads end

  //about tasks
  std::priority_queue< std::shared_ptr<TaskEvent>,std::vector<std::shared_ptr<TaskEvent>>,Compare>m_tasks_queue;//priority queue
  std::mutex m_tasks_mtx;
  std::condition_variable m_tasks_cv;
  //about tasks end
};


//source file
#include "ThreadPool.h"

ThreadPool::ThreadPool(int thread_num, int max_thread_num):
m_current_thread_num(thread_num),
m_max_thread_num(max_thread_num)
{
  for (int i =0; i< m_current_thread_num; i++)
  {
    m_threads.emplace_back(&ThreadPool::threadWorkFunction, this);
  }
}

ThreadPool::~ThreadPool()
{
  m_thread_pool_runing.store(false);
  m_tasks_cv.notify_all();
  for (auto& t : m_threads)
  {
    if (t.joinable())
    {

      t.join();
    }
  }
}

void ThreadPool::AddTask(std::function<void()> t)
{
  AddTask(std::make_shared<TaskEvent>(t));
}

void ThreadPool::AddTask(std::shared_ptr<TaskEvent> te)
{
  
  if (!m_thread_pool_runing)
  {
    return;//do not add new task
  }

  std::unique_lock<std::mutex> lk(m_tasks_mtx);
  m_tasks_queue.push(te);
  int size = m_tasks_queue.size();
  lk.unlock();

  if (size> m_current_thread_num && size<m_max_thread_num)
  {
    m_threads.emplace_back(&ThreadPool::threadWorkFunction, this);
  }
  m_tasks_cv.notify_all();
}

void ThreadPool::threadWorkFunction(void* p)
{
  if (!p)
  {
    return;
  }

  auto pool = static_cast<ThreadPool*>(p);
  if (!pool)
  {
    return;
  }

  //stop tasks and tasks queue is empty, will exit thread
  while (pool->m_thread_pool_runing.load()
    || pool->m_tasks_queue.size()>0)
  {
    std::unique_lock<std::mutex> lk(pool->m_tasks_mtx);
    pool->m_tasks_cv.wait(lk, [p_lambda = pool ]()->bool{
      return !p_lambda->m_tasks_queue.empty()||!p_lambda->m_thread_pool_runing.load();
    });

    if (!pool->m_tasks_queue.empty())
    {
      auto task = pool->m_tasks_queue.top();
      pool->m_tasks_queue.pop();

      (*task)();
    }
  }
}



//test file

int a = 100;
std::mutex g_mtx;

void use_thread_pool()
{
  std::shared_ptr<ThreadPool> pool=std::make_shared<ThreadPool>();

  for (int i = 0; i<10; i++)
  {
    pool->AddTask([]() {
      std::unique_lock<std::mutex> lk(g_mtx);
      a += 100;
      std::cout<<"thread id="<< std::this_thread::get_id()<<" a="<<a<<"\n";
      //printf("thread id=%s, a=%d \n",ss.str(),a);
      });
  }
  std::this_thread::sleep_for(std::chrono::seconds(5));
  pool.reset();
}

const int data_size = 3;
const int calculate_time= 10;
void use_thread_pool_with_result()
{
  float* result_data = (float*)malloc(data_size* sizeof(float));
  memset(result_data,0, data_size * sizeof(float));

  std::vector<std::shared_future<float*>> sf_vec;
  sf_vec.reserve(calculate_time);
  std::shared_ptr<ThreadPool> pool = std::make_shared<ThreadPool>();

  for (int i = 0; i < calculate_time; i++)
  {
    auto sf = pool->AddHasResultTask([]() ->float*{
      float* t = (float*)malloc(data_size * sizeof(float));
      t[0]=1;
      t[1]=2;
      t[2]=3;
      return t;
      });
    sf_vec.push_back(sf);
  }

  for (auto sf: sf_vec)
  {
    auto t = sf.get();
    if (!t)
    {
      continue;
    }

    for (int i =0;i<data_size;i++)
    {
      result_data[i]+=t[i];
    }
    free(t);
  }
  for (int i = 0; i < data_size; i++)
  {
    std::cout<<result_data[i] <<"\t";
  }
  std::cout<<"\n";
  free(result_data);
}
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2024-01-12,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 程序员的园 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档