前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【C++】开源:ThreadPoll线程池实现与使用

【C++】开源:ThreadPoll线程池实现与使用

作者头像
DevFrank
发布2024-07-24 15:32:42
1490
发布2024-07-24 15:32:42
举报
文章被收录于专栏:C++开发学习交流

😏1. 线程池介绍

线程池是一种线程管理的抽象概念,它主要用于优化多线程应用程序的性能和资源利用。在多线程编程中,创建和销毁线程是一个开销较大的操作。线程池通过预先创建一组线程,并将任务提交给这些线程来执行,从而避免了重复创建和销毁线程的开销。

线程池通常由以下几个组件组成:

1.任务队列(Task Queue):用于存储待执行的任务。当任务提交到线程池时,它们被放置在任务队列中等待执行。 2.线程池管理器(Thread Pool Manager):负责创建、管理和调度线程池中的线程。它控制着线程的数量,可以动态地增加或减少线程的数量,以适应不同的工作负载。 3.工作线程(Worker Threads):线程池中的实际执行单元。它们不断地从任务队列中获取任务并执行。 4.任务接口(Task Interface):定义了要执行的任务的接口。通常,任务是以函数或可运行对象的形式表示。

使用线程池的好处包括:

提高性能:线程池可以减少线程的创建和销毁次数,避免了频繁的上下文切换,提高了多线程程序的性能和响应速度。 资源管理:线程池可以限制并发线程的数量,避免资源过度占用,从而更好地管理系统资源。 提高可扩展性:通过调整线程池的大小,可以适应不同的并发需求,提高系统的可扩展性。

综上,线程池是一种重要的多线程编程技术,它能够有效地管理和利用线程,提高程序的性能和资源利用率。

😊2. 线程池实现1-单头文件

Github项目:https://github.com/progschj/ThreadPool

threadpoll.h

代码语言:javascript
复制
#ifndef THREAD_POOL_H
#define THREAD_POOL_H

#include <vector>
#include <queue>
#include <memory>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <future>
#include <functional>
#include <stdexcept>

class ThreadPool {
public:
    ThreadPool(size_t);
    // template enqueue: F & Args
    template<class F, class... Args>
    auto enqueue(F&& f, Args&&... args) // parameter list
        -> std::future<typename std::result_of<F(Args...)>::type>;  // return type
    ~ThreadPool();
private:
    // need to keep track of threads so we can join them
    std::vector< std::thread > workers;
    // the task queue
    std::queue< std::function<void()> > tasks;
    
    // synchronization: wait and wakeup
    std::mutex queue_mutex;
    std::condition_variable condition;
    bool stop;
};
 
// the constructor just launches some amount of workers
inline ThreadPool::ThreadPool(size_t threads)
    :   stop(false)
{
    for (size_t i = 0; i < threads; ++i)
        workers.emplace_back(
            [this]
            {
                for(;;)
                {
                    // function template define
                    std::function<void()> task;
                    {
                        std::unique_lock<std::mutex> lock(this->queue_mutex);
                        // wait
                        this->condition.wait(lock,
                            [this]{ return this->stop || !this->tasks.empty(); });
                        if(this->stop && this->tasks.empty())
                            return;
                        // add and pop
                        task = std::move(this->tasks.front());
                        this->tasks.pop();
                    }

                    task();
                }
            }
    );
}

// add new work item to the pool
template<class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args) 
    -> std::future<typename std::result_of<F(Args...)>::type>
{
    using return_type = typename std::result_of<F(Args...)>::type;

    auto task = std::make_shared< std::packaged_task<return_type()> >(
            std::bind(std::forward<F>(f), std::forward<Args>(args)...)
        );
        
    std::future<return_type> res = task->get_future();
    {
        std::unique_lock<std::mutex> lock(queue_mutex);

        // don't allow enqueueing after stopping the pool
        if(stop)
            throw std::runtime_error("enqueue on stopped ThreadPool");

        tasks.emplace([task](){ (*task)(); });
    }
    // wake up
    condition.notify_one();
    return res;
}

// the destructor joins all threads
inline ThreadPool::~ThreadPool()
{
    {
        std::unique_lock<std::mutex> lock(queue_mutex);
        stop = true;
    }
    // wake up
    condition.notify_all();
    for(std::thread &worker: workers)
        worker.join();
}

#endif

main.cpp

代码语言:javascript
复制
#include <iostream>
#include <vector>
#include <chrono>

#include "threadpoll.h"

int m_sum(int x, int y) {
    return x + y;
}

int main()
{
    // 创建容量为5的线程池
    ThreadPool pool(5);

    // 简单使用
    auto result_simple = pool.enqueue(m_sum, 3, 5);
    std::cout << "result_simple: " << result_simple.get() << std::endl;

    // 复杂使用
    std::vector< std::future<int> > results;

    for(int i = 0; i < 8; ++i) {
        results.emplace_back(
            pool.enqueue([i] {
                std::cout << "hello " << i << std::endl;
                // std::this_thread::sleep_for(std::chrono::seconds(1));
                std::cout << "world " << i << std::endl;
                
                return i*i;
            })
        );
    }

    std::this_thread::sleep_for(std::chrono::seconds(2));
    std::cout << "result: ";
    for(auto && result: results)
        std::cout << result.get() << ' ';
    std::cout << std::endl;
    
    return 0;
}

编译运行:

代码语言:javascript
复制
g++ -o main main.cpp -lpthread && ./main

推荐使用这一种。使用上在原项目基础上进行了扩充,通过使用线程池,可以很方便地对线程进行操作,且不用考虑多任务的冲突等。

😆3. 线程池实现2-较复杂

Github项目:https://github.com/volute24/ThreadPoll

代码语言:javascript
复制
// main.cpp
#include "threadpool.h"
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>

#define THREADPOOL_MAX_NUM 5

void* mytask(void *arg)
{
    printf("thread %d is working on task %d\n", (int)pthread_self(), *(int*)arg);
    sleep(1);
    free(arg);
    return NULL;
}

int main(int argc, char* argv[])
{
    threadpool_t pool;
    // 初始化线程池,最多5个线程
    threadpool_init(&pool, THREADPOOL_MAX_NUM);
	// 创建10个任务
    for(int i=0; i < 10; i++)
    {
        int *arg =(int *)malloc(sizeof(int));
        *arg = i;
        threadpool_add_task(&pool, mytask, arg);
        //printf("arg address:%p,arg:[%d],i:[%d]\n",arg,*arg,i);
    }
    threadpool_destroy(&pool);
    return 0;
}
代码语言:javascript
复制
// threadpool.h
#ifndef _THREAD_POLL_H_
#define _THREAD_POLL_H_

#include "condition.h"

// 封装线程池中的对象需要执行的任务对象
typedef struct task
{
	void *(*run)(void *args);//函数指针,需要执行的任务
	void *arg;			//参数
	struct task *next;	//任务队列中下一个任务
}task_t;

//定义线程池结构体
typedef struct threadpool
{
	condition_t ready;   //状态量
	task_t *first;	     //任务队列中第一个任务
	task_t *last;		//任务队列中最后一个任务
	int counter;		//线程池中已有线程数
	int idle;			//线程池中空闲线程数
	int max_threads;	//线程池最大线程数
	int quit;			//是否退出标志
}threadpool_t;

//线程池初始化
void threadpool_init(threadpool_t *pool,int threads);
//线程池中加入任务
void threadpool_add_task(threadpool_t *pool,void *(*run)(void *args),void *arg);
//销毁线程池
void threadpool_destroy(threadpool_t *pool);

#endif
代码语言:javascript
复制
// threadpoll.cpp
#include "threadpool.h"
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <time.h>
#include <pthread.h>

using namespace std;

//线程执行
void *thread_routine(void *arg)
{
	struct timespec abstime;
	int timeout;
	printf("thread %d is starting\n",(int)pthread_self());
	threadpool_t *pool = (threadpool_t *)arg;

	while(1)
	{
		timeout = 0;
		//访问池前加锁
		condition_lock(&pool->ready);
		//空闲
		pool->idle++;
		//等待队列任务|| 收到线程池销毁通知
		while(pool->first == NULL && !pool->quit)
		{
			//否则线程阻塞等待
			printf("thread %d is waiting\n",(int)pthread_self());
			//获取从当前时间加上等待时间,设置超时睡眠时间
			//clock_gettime 在编译链接时需加上 -lrt ,librt中实现了clock_gettime函数
			clock_gettime(CLOCK_REALTIME,&abstime);  //CLOCK_REALTIME 系统实时时间
			abstime.tv_sec += 2;
			int status;
			status = condition_timedwait(&pool->ready,&abstime);
			if(status == ETIMEDOUT)
			{
				printf("thread %d wait timed out\n",(int)pthread_self());
				timeout = -1;
				break;
			}
		}

		pool->idle--;
		
		if(pool->first != NULL)
		{
		 	//取出等待队列最前任务,移除任务,并执行任务
		 	task_t *t = pool->first;
			pool->first = t->next;
			//由于任务执行需要消耗时间,先解锁让其他线程访问线程池
			condition_unlock(&pool->ready);
			//执行任务
			t->run(t->arg);
			//执行完任务释放内存
            free(t);
           	//重新加锁
            condition_lock(&pool->ready);	
		}
		//退出线程池
	        if(pool->quit && pool->first == NULL)
	        {
	            pool->counter--;
	            //若线程池中没有线程,通知等待线程(主线程)全部任务已经完成
	            if(pool->counter == 0)
	            {
	                condition_signal(&pool->ready);
	            }
	            condition_unlock(&pool->ready);
	            break;
	        }
		 	//超时,跳出销毁线程
	        if(timeout == 1)
	        {
	            pool->counter--;
	            condition_unlock(&pool->ready);
	            break;
	        }
			condition_unlock(&pool->ready);
	}
    printf("thread %d is exiting\n", (int)pthread_self());
    return NULL;
}

//线程池初始化
void threadpool_init(threadpool_t *pool, int threads)
{
    
    int nstatu = condition_init(&pool-ready>);
    printf("Init return values:%d\n",nstatu);
    pool->first = NULL;
    pool->last =NULL;
    pool->counter =0;
    pool->idle =0;
    pool->max_threads = threads;
    pool->quit =0;
    
}

//增加一个任务到线程池
void threadpool_add_task(threadpool_t *pool, void *(*run)(void *arg), void *arg)
{
    //产生一个新的任务
    task_t *newtask = (task_t *)malloc(sizeof(task_t));
    newtask->run = run;
    newtask->arg = arg;
    newtask->next=NULL;//新加的任务放在队列尾端
    
    //线程池的状态被多个线程共享,操作前需要加锁
    condition_lock(&pool->ready);
    
    if(pool->first == NULL)
    {
        pool->first = newtask;
    }        
    else    
    {
        pool->last->next = newtask;
    }
    pool->last = newtask;  //队列尾指向新加入的线程
    
    //线程池中有线程空闲,唤醒
    if(pool->idle > 0)
    {
        condition_signal(&pool->ready);
    }
    //当前线程池中线程个数没有达到设定的最大值,创建一个新的线程
    else if(pool->counter < pool->max_threads)
    {
        pthread_t tid;
        pthread_create(&tid, NULL, thread_routine,pool);
        pool->counter++;
    }
    //结束,访问解锁
    condition_unlock(&pool->ready);
}

//线程池销毁
void threadpool_destroy(threadpool_t *pool)
{
    if(pool->quit)
    {
    return;
    }
    condition_lock(&pool->ready);
    pool->quit = 1;
    //线程池中线程个数大于0
    if(pool->counter > 0)
    {
        //对于等待的线程,发送信号唤醒
        if(pool->idle > 0)
        {
            condition_broadcast(&pool->ready);
        }
        //正在执行任务的线程,等待他们结束任务
        while(pool->counter)
        {
            condition_wait(&pool->ready);
        }
    }
    condition_unlock(&pool->ready);
    condition_destroy(&pool->ready);
}

。。。

编译:g++ main.cpp condition.cpp threadpool.cpp -lpthread

运行如下:

代码语言:javascript
复制
Init return values:0
thread 1696954112 is starting
thread 1705346816 is starting
thread 1688561408 is starting
thread 1688561408 is working on task 0
thread 1671776000 is starting
thread 1671776000 is working on task 2
thread 1705346816 is working on task 1
thread 1696954112 is working on task 3
thread 1680168704 is starting
thread 1680168704 is working on task 4
thread 1688561408 is working on task 5
thread 1671776000 is working on task 6
thread 1705346816 is working on task 7
thread 1680168704 is working on task 8
thread 1696954112 is working on task 9
thread 1680168704 is exiting
thread 1705346816 is exiting
thread 1696954112 is exiting
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2023-08-04,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 😏1. 线程池介绍
  • 😊2. 线程池实现1-单头文件
  • 😆3. 线程池实现2-较复杂
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档