前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >C语言实现线程池

C语言实现线程池

作者头像
叶茂林
发布2024-01-03 10:20:41
2240
发布2024-01-03 10:20:41
举报

C语言标准库中并没有提供线程池的实现,线程池需要手搓

实现线程池的基本思路是:先创建几个固定的线程,让每个线程运行起来,然后通过互斥锁和条件变量使得每个线程进入等待状态,当需要分派线程时,改变条件变量,使得某个线程退出等待状态开始执行传入的函数参数,执行完后重新进入等待状态。

同时实现了一个队列来存储需要执行的任务。

Task结构体用于表示线程池需要执行的任务,包括属性函数指针和函数参数。

代码语言:javascript
复制
typedef struct {
    void (*function)(void *); // 函数指针,表示任务的函数
    void *argument;          // 函数参数
} Task;

ThreadPool结构体用于表示线程池,包括内嵌实现的队列,用的是循环索引数组模拟实现的队列,互斥锁和条件变量,固定大小的线程组,还有一个是否销毁线程池的标记。

代码语言:javascript
复制
typedef struct {
    Task *tasks;            // 任务数组
    int size;                 // 当前任务数量
    int front;                // 队头索引
    int rear;                 // 队尾索引
    pthread_mutex_t mutex;    // 互斥锁
    pthread_cond_t condition; // 条件变量
    pthread_t *threads;       // 线程数组
    int shutdown;          // 是否销毁线程池
} ThreadPool;

初始化线程池,创建POOLSIZE个线程,创建日志文件,初始化互斥锁和条件变量。

代码语言:javascript
复制
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>

#define POOLSIZE 4
代码语言:javascript
复制
void init_thread_pool(ThreadPool *threadPool) {
    threadPool->tasks = (Task *) malloc(sizeof(Task) * POOLSIZE);
    threadPool->size = 0;
    threadPool->front = 0;
    threadPool->rear = 0;
    threadPool->shutdown=-1;
    pthread_mutex_init(&threadPool->mutex, NULL);
    pthread_cond_init(&threadPool->condition, NULL);
    threadPool->threads = (pthread_t *) malloc(sizeof(pthread_t) * POOLSIZE);
    for (int i = 0; i < POOLSIZE; ++i) {
        pthread_create(&threadPool->threads[i], NULL, execute, threadPool);
    }
}

提交任务到线程池,如果当前线程池的任务数量等于拥有的线程数,说明没有可以用的线程,进入等待,直到有空闲的线程,那么将任务添加到任务队列中,通知线程执行新任务,并写日志记录线程被分派事件。

代码语言:javascript
复制
void submit_task(ThreadPool *threadPool, void (*function)(void *), void *argument) {
    pthread_mutex_lock(&threadPool->mutex);
    while (threadPool->size == POOLSIZE) {    // 等待直到有空闲位置
        pthread_cond_wait(&threadPool->condition, &threadPool->mutex);
    }
    // 添加任务到队列
    threadPool->tasks[threadPool->rear].function = function;
    threadPool->tasks[threadPool->rear].argument = argument;
    threadPool->rear = (threadPool->rear + 1) % POOLSIZE;
    threadPool->size++;
    // 通知线程有新任务
    pthread_cond_signal(&threadPool->condition);
    pthread_mutex_unlock(&threadPool->mutex);
}

最重要的是这个一直工作的工作线程,当线程池中没有任务时一直处于等待状态,当有任务时,就从任务队列中取出一个任务,释放互斥锁,执行任务后回收该线程,并写日志记录线程被回收事件,如果线程池没有被销毁,就继续等待任务。

代码语言:javascript
复制
void *execute(void *arg) {
    ThreadPool *threadPool = (ThreadPool *) arg;
    while(threadPool->shutdown){
        pthread_mutex_lock(&threadPool->mutex);
        while (threadPool->size == 0) {    // 等待直到有任务
            pthread_cond_wait(&threadPool->condition, &threadPool->mutex);
        }
        // 取出任务
        Task task = threadPool->tasks[threadPool->front];
        threadPool->front = (threadPool->front + 1) % POOLSIZE;
        --threadPool->size;
        // 执行任务
        pthread_cond_signal(&threadPool->condition);
        pthread_mutex_unlock(&threadPool->mutex);
        task.function(task.argument);
        free(task.argument);
    }
    return NULL;
}

线程池还有一个线程的销毁功能,设置线程池销毁标记,等待所有线程结束后释放线程内存,并销毁互斥锁和条件变量。

代码语言:javascript
复制
void shutdown_thread_pool(ThreadPool *threadPool) {
    threadPool->shutdown=0;
    for (int i = 0; i < POOLSIZE; ++i) {
        pthread_join(threadPool->threads[i], NULL);
    }
    free(threadPool->threads);
    free(threadPool->tasks);
    pthread_mutex_destroy(&threadPool->mutex);
    pthread_cond_destroy(&threadPool->condition);
}

使用例子

代码语言:javascript
复制
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>

#define POOLSIZE 4

typedef struct {
    void (*function)(void *); // 函数指针,表示任务的函数
    void *argument;          // 函数参数
} Task;

typedef struct {
    Task *tasks;            // 任务数组
    int size;                 // 当前任务数量
    int front;                // 队头索引
    int rear;                 // 队尾索引
    pthread_mutex_t mutex;    // 互斥锁
    pthread_cond_t condition; // 条件变量
    pthread_t *threads;       // 线程数组
    int shutdown;          // 是否销毁线程池
} ThreadPool;

// 执行任务的线程函数
void *execute(void *arg) {
    ThreadPool *threadPool = (ThreadPool *) arg;
    while (threadPool->shutdown) {
        pthread_mutex_lock(&threadPool->mutex);
        while (threadPool->size == 0) {    // 等待直到有任务
            pthread_cond_wait(&threadPool->condition, &threadPool->mutex);
        }
        // 取出任务
        Task task = threadPool->tasks[threadPool->front];
        threadPool->front = (threadPool->front + 1) % POOLSIZE;
        --threadPool->size;
        // 执行任务
        pthread_cond_signal(&threadPool->condition);
        pthread_mutex_unlock(&threadPool->mutex);
        task.function(task.argument);
        free(task.argument);
    }
    return NULL;
}

// 初始化线程池
void init_thread_pool(ThreadPool *threadPool) {
    threadPool->tasks = (Task *) malloc(sizeof(Task) * POOLSIZE);
    threadPool->size = 0;
    threadPool->front = 0;
    threadPool->rear = 0;
    threadPool->shutdown = -1;
    pthread_mutex_init(&threadPool->mutex, NULL);
    pthread_cond_init(&threadPool->condition, NULL);
    threadPool->threads = (pthread_t *) malloc(sizeof(pthread_t) * POOLSIZE);
    for (int i = 0; i < POOLSIZE; ++i) {
        pthread_create(&threadPool->threads[i], NULL, execute, threadPool);
    }
}

// 销毁线程池
void shutdown_thread_pool(ThreadPool *threadPool) {
    threadPool->shutdown = 0;
    for (int i = 0; i < POOLSIZE; ++i) {
        pthread_join(threadPool->threads[i], NULL);
    }
    free(threadPool->threads);
    free(threadPool->tasks);
    pthread_mutex_destroy(&threadPool->mutex);
    pthread_cond_destroy(&threadPool->condition);
}

// 向线程池中添加任务
void submit_task(ThreadPool *threadPool, void (*function)(void *), void *argument) {
    pthread_mutex_lock(&threadPool->mutex);
    while (threadPool->size == POOLSIZE) {    // 等待直到有空闲位置
        pthread_cond_wait(&threadPool->condition, &threadPool->mutex);
    }
    // 添加任务到队列
    threadPool->tasks[threadPool->rear].function = function;
    threadPool->tasks[threadPool->rear].argument = argument;
    threadPool->rear = (threadPool->rear + 1) % POOLSIZE;
    threadPool->size++;
    // 通知线程有新任务
    pthread_cond_signal(&threadPool->condition);
    pthread_mutex_unlock(&threadPool->mutex);
}

void *f(void*arg) {
    printf("1\n");
    return NULL;
}

int main() {
    ThreadPool threadPool;
    init_thread_pool(&threadPool);
    // 添加一些任务到线程池
    for (int i = 0; i < 10; ++i) {
        submit_task(&threadPool, (void (*)(void *)) &f, NULL);
    }
    // 等待任务执行完毕
    shutdown_thread_pool(&threadPool);
    return 0;
}
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2024-01-02,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档