前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Linux多线程【线程池】

Linux多线程【线程池】

作者头像
北 海
发布2023-11-09 10:12:07
2760
发布2023-11-09 10:12:07
举报

✨个人主页: 北 海 🎉所属专栏: Linux学习之旅 🎃操作环境: CentOS 7.6 腾讯云远程服务器



🌇前言

线程池是一种管理线程的机制,它可以在需要时自动创建和销毁线程,以及分配和回收线程资源。线程池的主要优点是减少了频繁创建和销毁线程所带来的开销,提高了系统的稳定性和可扩展性。此外,线程池还可以有效地控制线程的数量,避免过多线程导致的资源竞争和系统过载

图片来源:《什么是线程池?线程池ThreadPoolExecutor使用及其原理又是什么?


🏙️正文

1.线程池的概念

1.1.池化技术

所谓的 线程池 就是 提前创建一批线程,当任务来临时,线程直接从任务队列中获取任务执行,可以提高整体效率;同时一批线程会被合理维护,避免调度时造成额外开销

像这种把未来会高频使用到,并且创建较为麻烦的资源提前申请好的技术称为 池化技术池化技术 可以极大地提高性能,最典型的就是 线程池,常用于各种涉及网络连接相关的服务中,比如 MySQL 连接池、**HTTP** 连接池、**Redis** 连接池

除了线程池外还有内存池,比如 STL 中的容器在进行空间申请时,都是直接从 空间配置器 allocator 中获取的,并非直接使用系统调用来申请空间

池化技术 的本质:空间换时间

池化技术 就好比你把钱从银行提前取出一部分放在支付宝中,可以随时使用,十分方便和高效,总不至于需要用钱时还得跑到银行排队取钱

1.2.线程池的优点

线程池 的优点在于 高效、方便

  1. 线程在使用前就已经创建好了,使用时直接将任务交给线程完成
  2. 线程会被合理调度,确保 任务与线程 间能做到负载均衡

线程池 中的线程数量不是越多越好,因为线程增多会导致调度变复杂,具体创建多少线程取决于具体业务场景,比如 处理器内核、剩余内存、网络中的 socket 数量等

线程池 还可以配合 「生产者消费者模型」 一起使用,做到 解耦与提高效率

  • 可以把 任务队列 换成 「生产者消费者模型」
1.3.线程池的应用场景

线程池 有以下几种应用场景:

  1. 存在大量且短小的任务请求,比如 Web 服务器中的网页请求,使用 线程池 就非常合适,因为网页点击量众多,并且大多都没有长时间连接访问
  2. 对性能要求苛刻,力求快速响应需求,比如游戏服务器,要求对玩家的操作做出快速响应
  3. 突发大量请求,但不至于使服务器产生过多的线程,短时间内,在服务器创建大量线程会使得内存达到极限,造成出错,可以使用 线程池 规避问题

2.线程池的实现

2.1.线程池_V1(朴素版)

「朴素版」:实现最基本的线程池功能,直接使用系统提供的接口

所谓朴素版就是不加任何优化设计,只实现 线程池 最基础的功能,便于理解 线程池

创建 ThreadPool_v1.hpp 头文件

线程池 实现为一个类,提供接口供外部调用

首先要明白 线程池 的两大核心:一批线程任务队列,客户端发出请求,新增任务,线程获取任务,执行任务,因此 ThreadPool_v1.hpp 的大体框架如下

  • 一批线程,通过容器管理
  • 任务队列,存储就绪的任务
  • 互斥锁
  • 条件变量

互斥锁 的作用是 保证多个线程并访问任务队列时的线程安全,而 条件变量 可以在 任务队列 为空时,让一批线程进入等待状态,也就是线程同步

注:为了方便实现,直接使用系统调用接口及容器,比如 pthread_t**、**vector**、**queue

代码语言:javascript
复制
#pragma once

#include <vector>
#include <string>
#include <queue>
#include <memory>
#include <unistd.h>
#include <pthread.h>

namespace Yohifo
{
#define THREAD_NUM 10

    template<class T>
    class ThreadPool
    {
    public:
        ThreadPool(int num = THREAD_NUM)
            :_threads(num), _num(num)
        {
            // 初始化互斥锁和条件变量
            pthread_mutex_init(&_mtx, nullptr);
            pthread_cond_init(&_cond, nullptr);
        }

        ~ThreadPool()
        {
            // 互斥锁、条件变量
            pthread_mutex_destroy(&_mtx);
            pthread_cond_destroy(&_cond);
        }

        void init()
        {
            // 其他信息初始化(当前不需要)
        }

        void start()
        {
            // 启动线程池
            // ...
        }

        // 提供给线程的回调函数
        static void *threadRoutine(void *args)
        {
            // 业务处理
            // ...
        }
        
    private:
        std::vector<pthread_t> _threads;
        int _num; // 线程数量
        std::queue<T> _tasks; // 利用 STL 自动扩容的特性,无需担心容量
        pthread_mutex_t _mtx;
        pthread_cond_t _cond;
    };
}

注意:

  • 需要提前给 vector 扩容,避免后面使用时发生越界访问
  • 提供给线程的回调函数需要设置为静态,否则线程调不动(参数不匹配)

填补函数体

初始化线程池 init() — 位于 ThreadPool

当前场景只需要初始化 互斥锁条件变量,在 构造函数 中完成就行了,所以这里的 init() 函数不需要补充

启动线程池 start() — 位于 ThreadPool

启动 线程池 需要先创建出一批线程,这里直接循环创建即可

代码语言:javascript
复制
void start()
{
    // 创建一批线程并启动
    for(int i = 0; i < _num; i++)
        pthread_create(&_threads[i], nullptr, threadRoutine, nullptr); // (存疑)
}

线程的回调函数 threadRoutine() — 位于 ThreadPool

这里进行简单测试,打印当前线程的线程 ID 就行了,并且直接 detach,主线程无需等待次线程运行结束

代码语言:javascript
复制
// 提供给线程的回调函数
static void *threadRoutine(void *args)
{
    // 避免等待线程,直接剥离
    pthread_detach(pthread_self());

    while (true)
    {
        std::cout << "Thread Running... " << pthread_self() << std::endl;
        sleep(1);
    }
}

创建 main.cc 源文件,测试线程池的代码

代码语言:javascript
复制
#include "ThreadPool_V1.hpp"
#include <memory>

int main()
{
    std::unique_ptr<Yohifo::ThreadPool<int>> ptr(new Yohifo::ThreadPool<int>());

    ptr->init();
    ptr->start();

    // 还有后续动作

    return 0;
}

编译并运行代码,可以看到 确实创建了一批线程,当主线程退出后,其他次线程也就跟着终止了


线程池 还需要提供一个重要的接口 pushTask(),将用户需要执行的业务装载至 任务队列 中,等待线程执行

装载任务 pushTask() — 位于 ThreadPool

代码语言:javascript
复制
// 装载任务
void pushTask(const T& task)
{
    // 本质上就是在生产商品,需要加锁保护
    pthread_mutex_lock(&_mtx);
    _tasks.push(task);

    // 唤醒消费者进行消费
    pthread_cond_signal(&_cond);
    pthread_mutex_unlock(&_mtx);
}

装载任务的本质就是在生产任务,相当于用户充当生产者,通过这个接口将任务生产至任务队列中,而线程充当消费者,从任务队列中获取任务并消费

所以线程的回调函数需要从 任务队列 中获取任务,进行消费

  1. 检测是否有任务
  2. 有 -> 消费
  3. 没有 -> 等待

线程回调函数 threadRoutine() — 位于 ThreadPool

代码语言:javascript
复制
// 提供给线程的回调函数
static void *threadRoutine(void *args)
{
    // 避免等待线程,直接剥离
    pthread_detach(pthread_self());

    while (true)
    {
        // 任务队列是临界资源,需要保护
        pthread_mutex_lock(&_mtx);

        // 等待条件满足
        while(_tasks.empty())
            pthread_cond_wait(&_cond, &_mtx);

        T task = _tasks.front();
        _tasks.pop();

        // task(); // 进行消费(存疑)

        pthread_mutex_unlock(&_mtx);
    }
}

注意: 判断任务队列是否为空需要使用 while**,确保在多线程环境中不会出现问题**

因为 任务队列、互斥锁、条件变量 是类内成员,而这里的 threadRoutine() 函数是一个静态函数,并没有 this 指针以访问类内成员,可以采取传递 this 指针的方式解决问题

启动线程池 start() — 位于 ThreadPool

代码语言:javascript
复制
void start()
{
    // 创建一批线程并启动
    for(int i = 0; i < _num; i++)
        pthread_create(&_threads[i], nullptr, threadRoutine, this); // 传递 this 指针
}

threadRoutine() 函数需要将参数 void* 转化为所在类对象的指针,并通过该指针访问类内成员

线程回调函数 threadRoutine() — 位于 ThreadPool

代码语言:javascript
复制
// 提供给线程的回调函数
static void *threadRoutine(void *args)
{
    // 避免等待线程,直接剥离
    pthread_detach(pthread_self());

    auto ptr = static_cast<ThreadPool<T>*>(args);

    while (true)
    {
        // 任务队列是临界资源,需要保护
        pthread_mutex_lock(&ptr->_mtx);

        // 等待条件满足
        while(ptr->_tasks.empty())
            pthread_cond_wait(&ptr->_cond, &ptr->_mtx);

        T task = ptr->_tasks.front();
        ptr->_tasks.pop();

        //task(); // 进行消费(存疑)

        pthread_mutex_unlock(&ptr->_mtx);
    }
}

为了使得提高代码的可阅读性及可拓展性,这里将会封装一批接口,供函数调用

加锁、解锁 — 位于 ThreadPool

代码语言:javascript
复制
void lockQueue()
{
    pthread_mutex_lock(&_mtx);
}

void unlockQueue()
{
    pthread_mutex_unlock(&_mtx);
}

等待、唤醒 — 位于 ThreadPool

代码语言:javascript
复制
void threadWait()
{
    pthread_cond_wait(&_cond, &_mtx);
}

void threadWakeUp()
{
    pthread_cond_signal(&_cond);
}

判空、获取任务 — 位于 ThreadPool

代码语言:javascript
复制
bool isEmpty()
{
    return _tasks.empty();
}

T popTask()
{
    T task = _tasks.front();
    _tasks.pop();

    return task;
}

接口封装完毕后,可以顺便修改之前的代码,比如 装载任务 pushTask()

装载任务 pushTask() — 位于 ThreadPool

代码语言:javascript
复制
// 装载任务
void pushTask(const T& task)
{
    // 本质上就是在生产商品,需要加锁保护
    lockQueue();
    _tasks.push(task);

    // 唤醒消费者进行消费
    threadWakeUp();
    unlockQueue();
}

以及 消费者 threadRountine()

线程回调函数 threadRoutine() — 位于 ThreadPool

代码语言:javascript
复制
// 提供给线程的回调函数
static void *threadRoutine(void *args)
{
    // 避免等待线程,直接剥离
    pthread_detach(pthread_self());

    auto ptr = static_cast<ThreadPool<T>*>(args);

    while (true)
    {
        // 任务队列是临界资源,需要保护
        ptr->lockQueue();

        // 等待条件满足
        while(ptr->isEmpty())
            ptr->threadWait();

        T task = ptr->popTask();
        pthread_mutex_unlock(&ptr->_mtx);

		// 消费行为可以不用加锁(一个商品只会被一个线程消费)
		task(); 
    }
}

细节: 轮到线程执行任务时,不需要加锁,这就好比你买桶泡面回家,是不必担心别人会和你争抢,可以慢慢消费;同样的,你也不应该占用锁资源,主动让出锁资源以提高整体效率

task() 表示执行任务,这里实际是一个 operator()() 的重载,详见 Linux多线程【生产者消费者模型】 中关于 Task.hpp 的设计,因为我们这里也需要使用任务,所以可以直接把之前写的代码拷贝过来

任务类 Task.hpp

代码语言:javascript
复制
#pragma once

#include <string>

namespace Yohifo
{
    // 支持泛型
    template<class T>
    class Task
    {
    public:
        Task(T x = 0, T y = 0, char op = '+')
            :_x(x), _y(y), _op(op), _res(0), _err(0)
        {}

        // 重载运算操作
        void operator()()
        {
            // 简单计算
            switch(_op)
            {
                case '+':
                    _res = _x + _y;
                break;
                case '-':
                    _res = _x - _y;
                break;
                case '*':
                    _res = _x * _y;
                break;
                case '/':
                    if(_y == 0)
                        _err = -1;
                    else
                        _res = _x / _y;    
                break;
                case '%':
                    if(_y == 0)
                        _err = -2;
                    else
                        _res = _x % _y;    
                break;
                default:
                    _err = -3;
                break;
            }
        }

        // 获取计算结果
        std::string getResult()
        {
            // 根据错误标识,返回计算结果
            std::string ret = std::to_string(_x) + " " + _op + " " + std::to_string(_y);
            
            if(_err)
            {
                ret += " error";

                // 判读是 / 错误还是 % 错误
                if(_err == -1)
                    ret += " [-1] \t / 0 引发了错误";
                else if(_err == -2)
                    ret += " [-2] \t % 0 引发了错误";
                else
                    ret += " [-3] \t 不合法的操作符,只能为 [+-*/%]";
            }
            else
            {
                ret += " = " + std::to_string(_res);
            }

            return ret;
        }

    private:
        T _x;
        T _y;
        char _op; // 运算符
        T _res; // 结果
        int _err; // 错误标识
    };
}

轮到 main.cc 进行操作了,逻辑很简单:创建线程池对象,初始化线程池,启动线程池,装载任务,等待运行结果

补充 main.cc

代码语言:javascript
复制
#include "ThreadPool_V1.hpp"
#include <memory>

typedef Yohifo::Task<int> type;
    
int main()
{
    std::unique_ptr<Yohifo::ThreadPool<type>> ptr(new Yohifo::ThreadPool<type>());

    ptr->init();
    ptr->start();

    // 还有后续动作
    while(true)
    {
        // 输入 操作数 操作数 操作符
        int x = 0, y = 0;
        char op = '+';
        std::cout << "输入 x: ";
        std::cin >> x;
        std::cout << "输入 y: ";
        std::cin >> y;
        std::cout << "输入 op: ";
        std::cin >> op;

        // 构建任务对象
        type task(x, y, op);

        // 装载任务
        ptr->pushTask(task);
    }

    return 0;
}

现在还有最后一个问题:如何获取计算结果?可以在 线程 执行完任务后,直接显示计算结果,也可以通过传入回调函数的方式,获取计算结果,前者非常简单,只需要在 threadRoutine() 中加入这行代码即可

线程回调函数 threadRoutine() — 位于 ThreadPool

代码语言:javascript
复制
void *threadRoutine(void *args)
{
	// ...

    // 显示计算结果
    std::cout << task.getResult() << std::endl;
}

除此之外,我们也可以通过 回调函数 的方式获取计算结果

目标:给线程传入一个回调函数,线程执行完任务后,将任务传给回调函数,回调函数结合业务逻辑,灵活处理结果

单纯打印的话,很容易就可以写出这个回调函数

回调函数 callBack() — 位于 main.cc 源文件

代码语言:javascript
复制
// 回调函数
void callBack(type& task)
{
    // 获取计算结果后打印
    std::string ret = task.getResult();

    std::cout << "计算结果为: " << ret;
}

为了能让 线程 在执行任务后能回调,需要将这个函数对象作为参数,传递给 ThreadPool 对象

源文件 main.cc

代码语言:javascript
复制
// ...

int main()
{
    std::unique_ptr<Yohifo::ThreadPool<type>> ptr(new Yohifo::ThreadPool<type>(callBack));

	// ...
}

当然,这边传递了一个对象,那边就得接收此对象,为了存储该函数对象,ThreadPool 新增一个类成员:_func,函数对象类型为 void (T&)

修改 ThreadPool.hpp 头文件

代码语言:javascript
复制
// ...
#include <functional>


namespace Yohifo
{
#define THREAD_NUM 10

    template<class T>
    class ThreadPool
    {
        using func_t = std::function<void(T&)>; // 包装器

    public:
        ThreadPool(func_t func, int num = THREAD_NUM)
            :_threads(num), _num(num), _func(func)
        {
            // 初始化互斥锁和条件变量
            pthread_mutex_init(&_mtx, nullptr);
            pthread_cond_init(&_cond, nullptr);
        }
		
		// ...
		
    private:
    		// ...
   	        func_t _func;
    };
}

修改完成后,创建 ThreadPool 对象时,支持传入一个类型为 void(T&) 的函数对象

获取函数对象后,需要让 线程 在执行完任务后进行回调,但又因为这玩意是一个类内成员,同样需要借助外部传入的 this 指针进行访问,这里直接封装成一个接口,顺便进行调用

回调函数对象 callBack() — 位于 ThreadPool

代码语言:javascript
复制
func_t callBack(T &task)
{
    _func(task);
}

线程回调函数 threadRoutine() — 位于 ThreadPool

代码语言:javascript
复制
// 提供给线程的回调函数
static void *threadRoutine(void *args)
{
		// ...

        task(); // 执行任务
        ptr->callBack(task); // 回调函数
    }
}

做完上述准备工作后,可以进行测试

程序结果正常,不必在意打印问题,因为屏幕也是被多线程并发访问的资源,没加锁保护,导致出现问题


2.2.线程池_V2(封装版)

「封装版」:引入自己封装实现的线程库 Thread.hpp**,支持对线程做出更多操作**

之前写的线程池代码不够优雅,所能展现的线程相关信息太少了,为此可以选择引入之前封装实现的 Thread.hpp

自己封装的 Thread.hpp 头文件

代码语言:javascript
复制
#pragma once

#include <iostream>
#include <string>
#include <pthread.h>

enum class Status
{
    NEW = 0, // 新建
    RUNNING, // 运行中
    EXIT // 已退出
};

// 参数、返回值为 void 的函数类型
typedef void (*func_t)(void*);

class Thread
{
public:
    Thread(int num = 0, func_t func = nullptr, void* args = nullptr)
        :_tid(0), _status(Status::NEW), _func(func), _args(args)
    {
        // 根据编号写入名字
        char name[128];
        snprintf(name, sizeof name, "thread-%d", num);
        _name = name;
    }

    ~Thread()
    {}

    // 获取 ID
    pthread_t getTID() const
    {
        return _tid;
    }

    // 获取线程名
    std::string getName() const
    {
        return _name;
    }

    // 获取状态
    Status getStatus() const
    {
        return _status;
    }

    // 回调方法
    static void* runHelper(void* args)
    {
        Thread* myThis = static_cast<Thread*>(args);

        // 很简单,回调用户传进来的 func 函数即可
        myThis->_func(myThis->_args);
    }

    // 启动线程
    void run()
    {
        int ret = pthread_create(&_tid, nullptr, runHelper, this);
        if(ret != 0)
        {
            std::cerr << "create thread fail!" << std::endl;
            exit(1); // 创建线程失败,直接退出
        }
        _status =  Status::RUNNING; // 更改状态为 运行中
    }

    // 线程等待
    void join()
    {
        int ret = pthread_join(_tid, nullptr);
        if(ret != 0)
        {
            std::cerr << "thread join fail!" << std::endl;
            exit(1); // 等待失败,直接退出
        }
        _status = Status::EXIT; // 更改状态为 退出
    }

private:
    pthread_t _tid; // 线程 ID
    std::string _name; // 线程名
    Status _status; // 线程状态
    func_t _func; // 线程回调函数
    void* _args; // 传递给回调函数的参数
};

不再直接使用原生线程库,转而使用自己封装的线程库

创建 ThreadPool_V2.hpp 头文件

拷贝 ThreadPool_V1.hpp,对其中的部分内容进行修改即可

代码语言:javascript
复制
#pragma once

#include <vector>
#include <string>
#include <queue>
#include <memory>
#include <functional>
#include <unistd.h>
#include <pthread.h>
#include "Task.hpp"
#include "Thread.hpp"

namespace Yohifo
{
#define THREAD_NUM 10

    template<class T>
    class ThreadPool
    {
        using func_t = std::function<void(T&)>; // 包装器

    public:
        ThreadPool(func_t func, int num = THREAD_NUM)
            :_num(num), _func(func)
        {
            // 初始化互斥锁和条件变量
            pthread_mutex_init(&_mtx, nullptr);
            pthread_cond_init(&_cond, nullptr);
        }

        ~ThreadPool()
        {
            // 等待线程退出
            for(auto &t : _threads)
                t.join();

            // 互斥锁、条件变量
            pthread_mutex_destroy(&_mtx);
            pthread_cond_destroy(&_cond);
        }

        void init()
        {
            // 创建一批线程
            for(int i = 0; i < _num; i++)
                _threads.push_back(Thread(i, threadRoutine, this));
        }

        void start()
        {
            // 启动线程
            for(auto &t : _threads)
                t.run();
        }

        // 提供给线程的回调函数(已修改返回类型为 void)
        static void threadRoutine(void *args)
        {
            // 避免等待线程,直接剥离
            pthread_detach(pthread_self());

            auto ptr = static_cast<ThreadPool<T>*>(args);

            while (true)
            {
                // 任务队列是临界资源,需要保护
                ptr->lockQueue();

                // 等待条件满足
                while(ptr->isEmpty())
                    ptr->threadWait();

                T task = ptr->popTask();
                ptr->unlockQueue();

                task();
                ptr->callBack(task); // 回调函数
            }
        }

        // 装载任务
        void pushTask(const T& task)
        {
            // 本质上就是在生产商品,需要加锁保护
            lockQueue();
            _tasks.push(task);

            // 唤醒消费者进行消费
            threadWakeUp();
            unlockQueue();
        }
    
    protected:
        void lockQueue()
        {
            pthread_mutex_lock(&_mtx);
        }

        void unlockQueue()
        {
            pthread_mutex_unlock(&_mtx);
        }

        void threadWait()
        {
            pthread_cond_wait(&_cond, &_mtx);
        }

        void threadWakeUp()
        {
            pthread_cond_signal(&_cond);
        }

        bool isEmpty()
        {
            return _tasks.empty();
        }

        T popTask()
        {
            T task = _tasks.front();
            _tasks.pop();

            return task;
        }

        func_t callBack(T &task)
        {
            _func(task);
        }

    private:
        std::vector<Thread> _threads;
        std::queue<T> _tasks; // 利用 STL 自动扩容的特性,无需担心容量
        pthread_mutex_t _mtx;
        pthread_cond_t _cond;
        func_t _func;
    };
}

涉及修改的内容:

  • _threads 类型由 vector<pthread_t> 变为 vector<Thread>
  • init() 函数用于创建线程,注册线程信息
  • start() 函数用于启动线程
  • ~ThreadPool() 中新增等待线程退出
  • 线程回调函数 threadRoutinue() 返回值改为 void
  • 新增函数对象 _func

测试结果如下


2.3.线程池_V3(优化版)

「优化版」:从任务队列入手,引入 「生产者消费者模型」,同时引入 RAII 风格的锁,实现自动化加锁与解锁

当前的 线程池 设计已经完成的差不多了,接下来重点在于完善其他地方,比如 任务队列及锁的优化

线程池 专注于 任务处理,至于如何确保任务装载及获取时的线程安全问题,交给 「生产者消费者模型」(基于阻塞队列) 就行了,线程池_V3 版的代码可以优化成下面这个样子

线程池 ThreadPool_V3.hpp

代码语言:javascript
复制
#pragma once

#include <vector>
#include <string>
#include <memory>
#include <functional>
#include <unistd.h>
#include <pthread.h>
#include "Task.hpp"
#include "Thread.hpp"
#include "BlockingQueue.hpp" // CP模型

namespace Yohifo
{
#define THREAD_NUM 10

    template<class T>
    class ThreadPool
    {
        using func_t = std::function<void(T&)>; // 包装器

    public:
        ThreadPool(func_t func, int num = THREAD_NUM)
            :_num(num), _func(func)
        {
        }

        ~ThreadPool()
        {
            // 等待线程退出
            for(auto &t : _threads)
                t.join();
        }

        void init()
        {
            // 创建一批线程
            for(int i = 0; i < _num; i++)
                _threads.push_back(Thread(i, threadRoutine, this));
        }

        void start()
        {
            // 启动线程
            for(auto &t : _threads)
                t.run();
        }

        // 提供给线程的回调函数(已修改返回类型为 void)
        static void threadRoutine(void *args)
        {
            // 避免等待线程,直接剥离
            pthread_detach(pthread_self());

            auto ptr = static_cast<ThreadPool<T>*>(args);

            while (true)
            {
                // 从CP模型中获取任务
                T task = ptr->popTask();


                task();
                ptr->callBack(task); // 回调函数
            }
        }

        // 装载任务
        void pushTask(const T& task)
        {
            _blockqueue.Push(task);
        }
    
    protected:
        func_t callBack(T &task)
        {
            _func(task);
        }

        T popTask()
        {
            T task;
            _blockqueue.Pop(&task);

            return task;
        }

    private:
        std::vector<Thread> _threads;
        int _num; // 线程数量
        BlockQueue<T> _blockqueue; // 阻塞队列
        func_t _func;
    };
}

之前的 互斥锁、条件变量 相关操作交给 「生产者消费者模型」 处理,线程池 不必关心,关于 「生产者消费者模型」 的实现详见 Linux多线程【生产者消费者模型】

手动 加锁、解锁 显得不够专业,并且容易出问题,比如忘记释放锁资源而造成死锁,因此我们可以设计一个小组件 LockGuard,实现 RAII 风格的锁:初始化创建,析构时销毁

小组件 LockGuard.hpp

代码语言:javascript
复制
#pragma once

#include <pthread.h>

class LockGuard
{
public:
    LockGuard(pthread_mutex_t *pmtx)
        :_pmtx(pmtx)
    {
        // 加锁
        pthread_mutex_lock(_pmtx);
    }

    ~LockGuard()
    {
        // 解锁
        pthread_mutex_unlock(_pmtx);
    }

private:
    pthread_mutex_t *_pmtx;
};

将这个小组件加入 BlockingQueue.hpp 中,可以得到以下代码

生产者消费者模型 BlockingQueue.hpp

代码语言:javascript
复制
#pragma once

#include <queue>
#include <mutex>
#include <pthread.h>
#include "LockGuard.hpp"

// 命名空间,避免冲突
namespace Yohifo
{
#define DEF_SIZE 10

    template<class T>
    class BlockQueue
    {
    public:
        BlockQueue(size_t cap = DEF_SIZE)
            :_cap(cap)
        {
            // 初始化锁与条件变量
            pthread_mutex_init(&_mtx, nullptr);
            pthread_cond_init(&_pro_cond, nullptr);
            pthread_cond_init(&_con_cond, nullptr);
        }

        ~BlockQueue()
        {
            // 销毁锁与条件变量
            pthread_mutex_destroy(&_mtx);
            pthread_cond_destroy(&_pro_cond);
            pthread_cond_destroy(&_con_cond);
        }

        // 生产数据(入队)
        void Push(const T& inData)
        {
            // 加锁(RAII风格)
            LockGuard lock(&_mtx);

            // 循环判断条件是否满足
            while(IsFull())
            {
                pthread_cond_wait(&_pro_cond, &_mtx);
            }

            _queue.push(inData);

            // 可以加策略唤醒,比如生产一半才唤醒消费者
            pthread_cond_signal(&_con_cond);

            // 自动解锁

        }

        // 消费数据(出队)
        void Pop(T* outData)
        {
            // 加锁(RAII 风格)
            LockGuard lock(&_mtx);

            // 循环判读条件是否满足
            while(IsEmpty())
            {
                pthread_cond_wait(&_con_cond, &_mtx);
            }

            *outData = _queue.front();
            _queue.pop();

            // 可以加策略唤醒,比如消费完后才唤醒生产者
            pthread_cond_signal(&_pro_cond);

            // 自动解锁
        }

    private:
        // 判断是否为满
        bool IsFull()
        {
            return _queue.size() == _cap;
        }
        
        // 判断是否为空
        bool IsEmpty()
        {
            return _queue.empty();
        }

    private:
        std::queue<T> _queue;
        size_t _cap; // 阻塞队列的容量
        pthread_mutex_t _mtx; // 互斥锁
        pthread_cond_t _pro_cond; // 生产者条件变量
        pthread_cond_t _con_cond; // 消费者条件变量
    };
}

最后引入 main.cc,并编译运行程序,查看结果是否正确

源文件 main.cc

代码语言:javascript
复制
#include "ThreadPool_V3.hpp"
#include <memory>

typedef Yohifo::Task<int> type;

// 回调函数
void callBack(type& task)
{
    // 获取计算结果后打印
    std::string ret = task.getResult();

    std::cout << "计算结果为: " << ret << std::endl;
}

int main()
{
    std::unique_ptr<Yohifo::ThreadPool<type>> ptr(new Yohifo::ThreadPool<type>(callBack));

    ptr->init();
    ptr->start();

    // 还有后续动作
    while(true)
    {
        // 输入 操作数 操作数 操作符
        int x = 0, y = 0;
        char op = '+';
        std::cout << "输入 x: ";
        std::cin >> x;
        std::cout << "输入 y: ";
        std::cin >> y;
        std::cout << "输入 op: ";
        std::cin >> op;

        // 构建任务对象
        type task(x, y, op);

        // 装载任务
        ptr->pushTask(task);
    }

    return 0;
}

运行结果如下

如何证明现在有一批线程在运行呢?

通过指令查看,当程序运行后,再新开一个终端,并输入以下命令

代码语言:javascript
复制
ps -aL | grep threadPool

注:threadPool 为当前程序编译后生成的可执行文件名

可以看到:除了主线程 5847 外,其他次线程都在等待任务就绪,从生产者消费者模型中获取任务并执行;当大量并发任务来临时,线程池是能大大提高效率的


3.单例模式

3.1.什么是单例模式

代码构建类,类实例化出对象,这个实例化出的对象也可以称为 实例,比如常见的 STL 容器,在使用时,都是先根据库中的类,形成一个 实例 以供使用;正常情况下,一个类可以实例化出很多很多个对象,但对于某些场景来说,是不适合创建出多个对象的

比如本文中提到的 线程池,当程序运行后,仅需一个 线程池对象 来进行高效任务计算,因为多个 线程池对象 无疑会大大增加调度成本,因此需要对 线程池类 进行特殊设计,使其只能创建一个 对象,换句话说就是不能让别人再创建对象

正如 一山不容二虎 一样,线程池 对象在一个程序中是不推荐出现多个的

在一个程序中只允许实例化出一个对象,可以通过 单例模式 来实现,单例模式 是非常 经典、常用、常考 的设计模式

什么是设计模式? 设计模式就是计算机大佬们在长时间项目实战中总结出来的解决方案,是帮助菜鸡编写高质量代码的利器,常见的设计模式有 单例模式、建造者模式、工厂模式、代理模式等

3.2.单例模式的特点

单例模式 最大的特点就是 只允许存在一个对象(实例),这就好比现在的 一夫一妻制 一样,要是在古代,单例模式 肯定不被推崇

在很多服务器开发场景中, 经常需要让服务器加载很多的数据 (上百 GB) 到内存中,此时往往要用一个 单例 的类来管理这些数据;在我们今天的场景中,也需要一个 单例线程池 来协同生产者与消费者

3.3.单例模式的简单实现

单例模式 有两种实现方向:饿汉懒汉,它们避免类被再次创建出对象的手段是一样的:构造函数私有化、删除拷贝构造

只要外部无法访问 构造函数,那么也就无法构建对象了,比如下面这个类 Signal

单例类 Signal

代码语言:javascript
复制
#pragma once

#include <iostream>

namespace Yohifo
{
    class Signal
    {
    private:
        // 构造函数私有化
        Signal()
        {}

        // 删除拷贝构造
        Signal(const Signal&) = delete;
    };
}

当外界试图创建对象时

当然这只实现了一半,还有另一半是 创建一个单例对象,既然外部受权限约束无法创建对象,那么类内是肯定可以创建对象的,只需要创建一个指向该类对象的 静态指针 或者一个 静态对象,再初始化就好了;因为外部无法访问该指针,所以还需要提供一个静态函数 getInstance() 以获取单例对象句柄,至于具体怎么实现,需要分不同方向(饿汉 or 懒汉)

代码语言:javascript
复制
#pragma once

#include <iostream>

namespace Yohifo
{
    class Signal
    {
    private:
        // 构造函数私有化
        Signal()
        {}

        // 删除拷贝构造
        Signal(const Signal&) = delete;
     
     public:
     	// 获取单例对象的句柄
        static Signal *getInstance()
        {
            return _sigptr;
        }

        void print()
        {
            std::cout << "Hello Signal!" << std::endl;
        }
        
    private:
        // 指向单例对象的静态指针
        static Signal *_sigptr;
    };
}

注意: 构造函数不能只声明,需要实现,即使什么都不写

为什么要删除拷贝构造? 如果不删除拷贝构造,那么外部可以借助拷贝构造函数,拷贝构造出一个与 单例对象 一致的 “对象”,此时就出现两个对象,这是不符合 单例模式 特点的 为什么要创建一个静态函数? 单例对象也需要被初始化,并且要能被外部使用 调用链逻辑:通过静态函数获取句柄(静态单例对象地址)-> 通过地址调用该对象的其他函数

3.3.1.饿汉模式

张三总是很饿,尽管饭菜还没准备好,他就已经早早的把碗洗好了,等到开饭时,直接开干

饿汉模式 也是如此,在程序加载到内存时,就已经早早的把 单例对象 创建好了(此时程序服务还没有完全启动),也就是在外部直接通过 new 实例化一个对象,具体实现如下

代码语言:javascript
复制
#pragma once

#include <iostream>

namespace Yohifo
{
    // 饿汉模式
    class Signal
    {
    private:
        // 构造函数私有化
        Signal()
        {}

        // 删除拷贝构造
        Signal(const Signal&) = delete;

    public:
        static Signal *getInstance()
        {
            return _sigptr;
        }

        void print()
        {
            std::cout << "Hello Signal!" << std::endl;
        }

    private:
        // 指向单例对象的静态指针
        static Signal *_sigptr;
    };

    Signal* Signal::_sigptr = new Signal();
}

注:在程序加载时,该对象会被创建

这里的 单例对象 本质就有点像 全局变量,在程序加载时就已经创建好了

外部可以直接通过 getInstance() 获取 单例对象 的操作句柄,来调用类中的其他函数

main.cc

代码语言:javascript
复制
#include <iostream>
#include "Signal.hpp"

int main()
{
    Yohifo::Signal::getInstance()->print();

    return 0;
}

运行结果为

这就实现了一个简单的 饿汉版单例类,除了创建 static Signal* 静态单例对象指针 外,也可以直接定义一个 静态单例对象,生命周期随进程,不过要注意的是:getInstance() 需要返回的也是该静态单例对象的地址,不能返回值,因为拷贝构造被删除了;并且需要在类的外部初始化该静态单例对象

代码语言:javascript
复制
#pragma once

#include <iostream>

namespace Yohifo
{
    // 饿汉模式
    class Signal
    {
    private:
        // 构造函数私有化
        Signal()
        {}

        // 删除拷贝构造
        Signal(const Signal&) = delete;

    public:
        static Signal *getInstance()
        {
            return &_sig;
        }

        void print()
        {
            std::cout << "Hello Signal!" << std::endl;
        }

    private:
        // 静态单例对象
        static Signal _sig;
    };

    // 初始化
    Signal Signal::_sig;
}

饿汉模式 是一个相对简单的单例实现方向,只需要在类中声明,在类外初始化就行了,但它也会带来一定的弊端:延缓服务启动速度

完全启动服务是需要时间的,创建 单例对象 也是需要时间的,饿汉模式 在服务正式启动前会先创建对象,但凡这个单例类很大,服务启动时间势必会受到影响,大型项目启动,时间就是金钱

并且由于 饿汉模式 每次都会先创建 单例对象,再启动服务,如果后续使用 单例对象 还好说,但如果后续没有使用 单例对象,那么这个对象就是白创建了,在延缓服务启动的同时造成了一定的资源浪费

综上所述,饿汉模式 不是很推荐使用,除非图实现简单,并且服务规模较小;既然 饿汉模式 有缺点,就需要改进,于是就出现了 懒汉模式

3.3.2.懒汉模式

李四也是个很饿的人,他也有一个自己的碗,吃完饭后碗会脏,但他不像张三那样极端,李四比较懒,只有等他吃饭的时候,他才会去洗碗,李四这种做法让他感到无比轻松

懒汉模式 中,单例对象 并不会在程序加载时创建,而是在第一次调用时创建,第一次调用创建后,后续无需再创建,直接使用即可

代码语言:javascript
复制
#pragma once

#include <iostream>

namespace Yohifo
{
    // 懒汉模式
    class Signal
    {
    private:
        // 构造函数私有化
        Signal()
        {}

        // 删除拷贝构造
        Signal(const Signal&) = delete;

    public:
        static Signal *getInstance()
        {
            // 第一次调用才创建
            if(_sigptr == nullptr)
            {
                _sigptr = new Signal();
            }

            return _sigptr;
        }

        void print()
        {
            std::cout << "Hello Signal!" << std::endl;
        }

    private:
        // 静态指针
        static Signal *_sigptr;
    };

    // 初始化静态指针
    Signal* Signal::_sigptr = nullptr;
}

注意: 此时的静态指针需要初始化为 nullptr**,方便第一次判断**

饿汉模式 中出现的问题这里全都避免了

  • 创建耗时 -> 只在第一次使用时创建
  • 占用资源 -> 如果不使用,就不会被创建

懒汉模式 的核心在于 延时加载,可以优化服务器的速度及资源占用

延时加载这种机制就有点像 「写时拷贝」,就du你不会使用,从而节省资源开销,类似的还有 动态库、进程地址空间 等

当然,懒汉模式 下也是可以正常使用 单例对象

这样看来,懒汉模式 确实优秀,实现起来也不麻烦,为什么会说 饿汉模式 更简单呢?

这是因为当前只是单线程场景,程序暂时没啥问题,如果当前是多线程场景,问题就大了,如果一批线程同时调用 getInstance(),同时认定 _sigptr 为空,就会创建多个 单例对象,这是不合理的

也就是说当前实现的 懒汉模式 存在严重的线程安全问题

如何证明?

简单改一下代码,每创建一个单例对象,就打印一条语句,将代码放入多线程环境中测试

获取单例对象句柄 getInstance() — 位于 Signal

代码语言:javascript
复制
static Signal *getInstance()
{
    // 第一次调用才创建
    if(_sigptr == nullptr)
    {
        std::cout << "创建了一个单例对象" << std::endl;
        _sigptr = new Signal();
    }

    return _sigptr;
}

源文件 main.cc

其中使用了 lambda 表达式来作为线程的回调函数,重点在于查看现象

代码语言:javascript
复制
#include <iostream>
#include <pthread.h>
#include "Signal.hpp"

int main()
{
    // 创建一批线程
    pthread_t arr[10];
    for(int i = 0; i < 10; i++)
    {
        pthread_create(arr + i, nullptr, [](void*)->void*
            {
                // 获取句柄
                auto ptr = Yohifo::Signal::getInstance();
                ptr->print();
                return nullptr;
            }, nullptr);
    }

    for(int i = 0; i < 10; i++)
        pthread_join(arr[i], nullptr);

    return 0;
}

运行结果如下:

当前代码在多线程环境中,同时创建了多个 单例对象,因此是存在线程安全问题的

饿汉模式没有线程安全问题吗? 没有,因为饿汉模式下,单例对象一开始就被创建了,即便是多线程场景中,也不会创建多个对象,它们也做不到

3.3.3.懒汉模式(线程安全版)

有问题就解决,解决多线程并发访问的利器是 互斥锁,那就创建 互斥锁 保护单例对象的创建

代码语言:javascript
复制
#pragma once

#include <iostream>
#include <mutex>

namespace Yohifo
{
    // 懒汉模式
    class Signal
    {
    private:
        // 构造函数私有化
        Signal()
        {}

        // 删除拷贝构造
        Signal(const Signal&) = delete;

    public:
        static Signal *getInstance()
        {
            // 加锁保护
            pthread_mutex_lock(&_mtx);
            if(_sigptr == nullptr)
            {
                std::cout << "创建了一个单例对象" << std::endl;
                _sigptr = new Signal();
            }
            pthread_mutex_unlock(&_mtx);

            return _sigptr;
        }

        void print()
        {
            std::cout << "Hello Signal!" << std::endl;
        }

    private:
        // 静态指针
        static Signal *_sigptr;
        static pthread_mutex_t _mtx;
    };

    // 初始化静态指针
    Signal* Signal::_sigptr = nullptr;

    // 初始化互斥锁
    pthread_mutex_t Signal::_mtx = PTHREAD_MUTEX_INITIALIZER;
}

注意: getInstance() 是静态函数,互斥锁也要定义为静态的,可以初始化为全局静态锁

依旧是借助之前的多线程场景,测试一下改进后的 懒汉模式 代码有没有问题

结果是没有问题,单例对象 也只会创建一个

现在还面临最后一个问题:效率问题

当前代码确实能保证只会创建一个 单例对象,但即使后续不会创建 单例对象,也需要进行 加锁、判断、解锁 这个流程,要知道 加锁 也是有资源消耗的,所以这种写法不妥

解决方案是:DoubleCheck 双检查加锁

加锁 前再增加一层判断,如此一来,N 个线程,顶多只会进行 N加锁与解锁,这是非常优雅的解决方案

获取静态对象句柄 getInstance() — 位于 Signal

代码语言:javascript
复制
static Signal *getInstance()
{
    // 双检查
    if(_sigptr == nullptr)
    {
        // 加锁保护
        pthread_mutex_lock(&_mtx);
        if(_sigptr == nullptr)
        {
            std::cout << "创建了一个单例对象" << std::endl;
            _sigptr = new Signal();
        }
        pthread_mutex_unlock(&_mtx);
    }

    return _sigptr;
}

单纯的 if 判断并不会消耗很多资源,但 加锁 行为会消耗资源,延缓程序运行速度,双检查加锁 可以有效避免这个问题

这是个精妙绝伦的代码设计,值得学习


所以 懒汉模式 麻烦吗?

相比于 饿汉模式,确实挺麻烦的,不仅要判断后创建 单例对象,还需要考虑线程安全问题

值得一提的是,懒汉模式 还有一种非常简单的写法:调用 getInstance() 时创建一个静态单例对象并返回,因为静态单例对象只会初始化一次,所以是可行的,并且在 C++11 之后,可以保证静态变量初始化时的线程安全问题,也就不需要 双检查加锁 了,实现起来非常简单

代码语言:javascript
复制
#pragma once

#include <iostream>
#include <mutex>

namespace Yohifo
{
    // 懒汉模式
    class Signal
    {
    private:
        // 构造函数私有化
        Signal()
        {}

        // 删除拷贝构造
        Signal(const Signal&) = delete;

    public:
        static Signal *getInstance()
        {
            // 静态单例对象,只会初始化一次,并且生命周期随进程
            static Signal _sig;

            return &_sig;
        }

        void print()
        {
            std::cout << "Hello Signal!" << std::endl;
        }
    };
}

结果也是正常的

所以如果当前的生产环境所支持的 C++ 版本为 C++11 及以后,在实现 懒汉模式 时可以选择这种简便的方式,是非常不错的;如果为了兼容性,也可以选择传统写法

注意: 静态变量创建时的线程安全问题,在 C++11 之前是不被保障的

关于 单例模式 的其他问题

new 出来的单例对象不需要销毁吗? 这个单例对象生成周期随进程,进程结束了,资源也就都被销毁了,如果想手动销毁,可以设计一个垃圾回收内部类 GC**,主动去销毁单例对象**


3.4.线程池_V4(最终版)

有了 单例模式 的相关知识后,就可以开始编写最终版线程池了

「最终版」:将线程池改为 单例模式,只允许存在一个线程池对象

这里选择 懒汉模式,因为比较优秀,并且为了确保兼容性,选择 经典写法

首先是修改 ThreadPool 为单例模式

头文件 ThreadPool_V4.hpp

代码语言:javascript
复制
#pragma once

#include <vector>
#include <string>
#include <memory>
#include <functional>
#include <unistd.h>
#include <pthread.h>
#include "Task.hpp"
#include "Thread.hpp"
#include "BlockingQueue.hpp" // CP模型

namespace Yohifo
{
#define THREAD_NUM 10

    template<class T>
    class ThreadPool
    {
        using func_t = std::function<void(T&)>; // 包装器

	// 私有化
    private:
        ThreadPool(func_t func, int num = THREAD_NUM)
            :_num(num), _func(func)
        {
        }

        ~ThreadPool()
        {
            // 等待线程退出
            for(auto &t : _threads)
                t.join();
        }

        // 删除拷贝构造
        ThreadPool(const ThreadPool<T> &) = delete;
		
		// ...

    private:
    	// ...
    	
        // 创建静态单例对象指针及互斥锁
        static ThreadPool<T> *_inst;
        static pthread_mutex_t _mtx;
    };

    // 初始化指针
    template<class T>
    ThreadPool<T>* ThreadPool<T>::_inst = nullptr;

    // 初始化互斥锁
    template<class T>
    pthread_mutex_t ThreadPool<T>::_mtx = PTHREAD_MUTEX_INITIALIZER;
}

然后提供一个获取 单例对象 句柄的函数,如果是第一次创建 单例对象,就需要在创建完对象后,顺便进行 init()start()

获取句柄 getInstance() — 位于 ThreadPool

代码语言:javascript
复制
static ThreadPool<T>* getInstance()
{
    // 双检查
    if(_inst == nullptr)
    {
        // 加锁
        LockGuard lock(&_mtx);
        if(_inst == nullptr)
        {
            // 创建对象
            _inst = new ThreadPool<T>();

			// 初始化及启动服务
			_inst->init();
			_inst->start();
        }
    }

    return _inst;
}

单例模式 改完了,但现在面临一个尴尬的问题:main.cc 无法直接将回调函数 callBack() 进行传递,因为它根本无法创建对象

可以试试曲线救国:将函数对象传递给 getInstance() 函数,如果用户不传,那就使用缺省参数,也就是直接打印结果

总之,修修改改后的线程池长这样

头文件 ThreadPool_V4.hpp

代码语言:javascript
复制
#pragma once

#include <vector>
#include <string>
#include <memory>
#include <functional>
#include <unistd.h>
#include <pthread.h>
#include "Task.hpp"
#include "Thread.hpp"
#include "BlockingQueue.hpp" // CP模型

namespace Yohifo
{
#define THREAD_NUM 10

    template<class T>
    class ThreadPool
    {
        using func_t = std::function<void(T&)>; // 包装器

    private:
        ThreadPool(int num = THREAD_NUM)
            :_num(num)
        {
        }

        ~ThreadPool()
        {
            // 等待线程退出
            for(auto &t : _threads)
                t.join();
        }

        // 删除拷贝构造
        ThreadPool(const ThreadPool<T> &) = delete;

    public:
        static ThreadPool<T>* getInstance(const func_t &func = [](T& task){ std::cout << task.getResult() << std::endl; })
        {
            // 双检查
            if(_inst == nullptr)
            {
                // 加锁
                LockGuard lock(&_mtx);
                if(_inst == nullptr)
                {
                    // 创建对象
                    _inst = new ThreadPool<T>();

                    // 初始化及启动服务
                    _inst->init();
                    _inst->start();
                }
            }

            // 支持随时更改 main.cc 传过来的回调函数
            _inst->_func = func;

            return _inst;
        }

    public:
        void init()
        {
            // 创建一批线程
            for(int i = 0; i < _num; i++)
                _threads.push_back(Thread(i, threadRoutine, this));
        }

        void start()
        {
            // 启动线程
            for(auto &t : _threads)
                t.run();
        }

        // 提供给线程的回调函数(已修改返回类型为 void)
        static void threadRoutine(void *args)
        {
            // 避免等待线程,直接剥离
            pthread_detach(pthread_self());

            auto ptr = static_cast<ThreadPool<T>*>(args);

            while (true)
            {
                // 从CP模型中获取任务
                T task = ptr->popTask();

                task();
                ptr->callBack(task); // 回调函数
            }
        }

        // 装载任务
        void pushTask(const T& task)
        {
            _blockqueue.Push(task);
        }
    
    protected:
        func_t callBack(T &task)
        {
            _func(task);
        }

        T popTask()
        {
            T task;
            _blockqueue.Pop(&task);

            return task;
        }

    private:
        std::vector<Thread> _threads;
        int _num; // 线程数量
        BlockQueue<T> _blockqueue; // 阻塞队列
        func_t _func;

        // 创建静态单例对象指针及互斥锁
        static ThreadPool<T> *_inst;
        static pthread_mutex_t _mtx;
    };

    // 初始化指针
    template<class T>
    ThreadPool<T>* ThreadPool<T>::_inst = nullptr;

    // 初始化互斥锁
    template<class T>
    pthread_mutex_t ThreadPool<T>::_mtx = PTHREAD_MUTEX_INITIALIZER;
}

此时 main.cc 想要使用线程池对象时,就得通过 getInstance() 获取句柄,然后才能进行操作

源文件 main.cc

代码语言:javascript
复制
#include "ThreadPool_V4.hpp"
#include <memory>

typedef Yohifo::Task<int> type;

// 回调函数
void callBack(type& task)
{
    // 获取计算结果后打印
    std::string ret = task.getResult();

    std::cout << "计算结果为: " << ret << std::endl;
}

int main()
{
    // 还有后续动作
    while(true)
    {
        // 输入 操作数 操作数 操作符
        int x = 0, y = 0;
        char op = '+';
        std::cout << "输入 x: ";
        std::cin >> x;
        std::cout << "输入 y: ";
        std::cin >> y;
        std::cout << "输入 op: ";
        std::cin >> op;

        // 构建任务对象
        type task(x, y, op);

        // 装载任务
        Yohifo::ThreadPool<type>::getInstance(callBack)->pushTask(task);
    }

    return 0;
}

此时是可以获取结果的,也可以看到一批线程正在候等任务到达

如何证明当前的 单例模式 生效了?

在调用 getInstance() 之前查看正在运行中的线程数量,调用完后再次查看,如果线程数量从 1 个变成多个,就证明 单例模式 是生效的(延迟加载)

还可以通过其他方式证明,比如多行打印 单例对象句柄,查看地址是否为同一个,就可以知道 单例模式 是否生效了

至此我们的 线程池_V4 最终版 代码算是完善了,以下是一些注意事项及建议

  1. 注意加锁解锁的位置,尽可能提高效率
  2. 使用双检查加锁,避免不必要的竞争
  3. 可以使用 volatile 修饰静态单例对象指针,避免被编译器优化覆盖

4.周边问题补充

4.1.STL线程安全问题

STL 库中的容器是否是 线程安全 的?

答案是 不是

因为 STL 设计的初衷就是打造出极致性能容器,而加锁、解锁操作势必会影响效率,因此 STL 中的容器并未考虑线程安全,在之前编写的 生产者消费者模型线程池 中,使用了部分 STL 容器,如 vectorqueuestring 等,这些都是需要我们自己去加锁、解锁,以确保多线程并发访问时的线程安全问题

从另一方面来说,STL 容器种类繁多,容器间实现方式各不相同,无法以统一的方式进行加锁、解锁操作,比如哈希表中就有 锁表锁桶 两种方式

所以在多线程场景中使用 STL 库时,需要自己确保线程安全

4.2.智能指针线程安全问题

C++ 标准提供的智能指针有三种:unique_ptrshared_ptrweak_ptr

首先来说 unique_ptr,这是个功能单纯的智能指针,只具备基本的 RAII 风格,不支持拷贝,因此无法作为参数传递,也就不涉及线程安全问题

其次是 shared_ptr,得益于 引用计数,这个智能指针支持拷贝,可能被多线程并发访问,但标准库在设计时考虑到了这个问题,索性将 shared_ptr 对于引用计数的操作设计成了 原子操作 CAS,这就确保了它的 线程安全,至于 weak_ptr,这个就是 shared_ptr 的小弟,名为弱引用智能指针,具体实现与 shared_ptr 一脉相承,因此它也是线程安全的

4.3.其他常见锁概念

悲观锁:总是认为数据会被其他线程修改,于是在自己访问数据前,会先加锁,其他线程想访问时只能等待,之前使用的锁都属于悲观锁

乐观锁:并不认为其他线程会来修改数据,因此在访问数据前,并不会加锁,但是在更新数据前,会判断其他数据在更新前有没有被修改过,主要通过 版本号机制 和 CAS**操作实现**

CAS 操作:当需要更新数据时,会先判断内存中的值与之前获取的值是否相等,如果相等就用新值覆盖旧值,失败就不断重试

自旋锁:申请锁失败时,线程不会被挂起,而且不断尝试申请锁

自旋 本质上就是一个不断 轮询 的过程,即不断尝试申请锁,这种操作是十分消耗 CPU 时间的,因此推荐临界区中的操作时间较短时,使用 自旋锁 以提高效率;操作时间较长时,自旋锁 会严重占用 CPU 时间

自旋锁 的优点:可以减少线程切换的消耗

自旋锁相关接口

代码语言:javascript
复制
#include <pthread.h>

pthread_spinlock_t lock; // 自旋锁类型

int pthread_spin_init(pthread_spinlock_t *lock, int pshared); // 初始化自旋锁

int pthread_spin_destroy(pthread_spinlock_t *lock); // 销毁自旋锁

// 自旋锁加锁
int pthread_spin_lock(pthread_spinlock_t *lock); // 失败就不断重试(阻塞式)
int pthread_spin_trylock(pthread_spinlock_t *lock); // 失败就继续向后运行(非阻塞式)

// 自旋锁解锁
int pthread_spin_unlock(pthread_spinlock_t *lock);

就这接口风格,跟 mutex 互斥锁 是一脉相承,可以轻易上手,将 线程池 中的 互斥锁 轻易改为 自旋锁

公平锁:一种用于同步多线程或多进程之间访问共享资源的机制,它通过使用互斥锁和相关的调度策略来确保资源的公平分配,以提高系统的性能和稳定性

非公平锁:通常使用信号量(Semaphore)或自旋锁(Spinlock)等机制。这些锁机制没有严格的按照请求的顺序来分配锁,而是以更高的性能为目标,允许一些线程或进程在较短时间内多次获取锁资源,从而减少了竞争开销

4.4.读者写者问题

除了 生产者消费者模型 外,还有一个 读者写者模型,用来解决 读者写者 问题,核心思想是 读者共享,写者互斥

这就好比博客发布了,允许很多人同时读,但如果作者想要进行修改,那么其他人自然也就无法查看了,这就是一个很典型的 读者写者 问题

读者写者模型 也遵循 321 原则

3 种关系:

  • 读者<->读者 无关系
  • 写者<->写者 互斥
  • 读者<->写者 互斥、同步

2 种角色:读者写者

1 个交易场所:阻塞队列或其他缓冲区

为什么读者与读者间甚至不存在互斥关系? 因为读者读取数据时,并不会对数据做出修改,因此不需要维持互斥关系

pthread 库中提供了 读写锁 相关接口

代码语言:javascript
复制
#include <pthread.h>

pthread_rwlock_t; // 读写锁类型

// 初始化读写锁
int pthread_rwlock_init(pthread_rwlock_t *__restrict__ __rwlock, const pthread_rwlockattr_t *__restrict__ __attr); 

// 销毁读写锁
int pthread_rwlock_destroy(pthread_rwlock_t *__rwlock) 

 // 读者,加锁
int pthread_rwlock_rdlock(pthread_rwlock_t *__rwlock); // 阻塞式
int pthread_rwlock_tryrdlock(pthread_rwlock_t *__rwlock); // 非阻塞式

// 写者,加锁
int pthread_rwlock_wrlock(pthread_rwlock_t *__rwlock); // 阻塞式 
int pthread_rwlock_trywrlock(pthread_rwlock_t *__rwlock); // 非阻塞式

// 解锁(读者锁、写者锁都可以解)
int pthread_rwlock_unlock(pthread_rwlock_t *__rwlock); 

注意: 读者和写者使用的加锁接口并不是同一个

关于 读者写者模型 的实现

读者读数据时,允许其他读者一起读取数据,但不允许写者修改数据

写者写数据时,不允许读者进入

读者读取完数据后,通知写者进行写入

写者写完数据后,通知读者进行读取

以下是伪代码

代码语言:javascript
复制
int reader_cnt = 0; // 统计读者的数量
pthread_mutex_t lock; // 互斥锁
sem_t w(1); // 二元信号量

读者
	// 加锁
	{
		pthread_mutex_lock(&lock);
		if(reader_cnt == 0)
			P(w); // 第一个读者进入,申请信号量

		reader_cnt++; // 进入了一个读者
		pthread_mutex_unlock(&lock);	
	}
	
	// 读取数据

	// 解锁
	{
		pthread_mutex_lock(&lock);
		reader_cnt--; // 走了一个读者
		if(reader_cnt == 0)
			V(w); // 最后一个读者走了,归还信号量
		pthread_mmutex_unlock();
	}



写者
	// 加锁
	{
		P(w); // 申请信号量
		if(reader_cnt > 0)
		{
			V(w); // 归还信号量
			// 挂起等待
		}
	}
	
	// 写入数据

	// 解锁
	{
		V(w); // 归还信号量
	}

因为现实中,读者数量大多数情况下都是多于写者的,所以势必会存在很多很多读者不断读取,导致写者根本申请不到信号量,写者陷入 死锁 状态

这是读者写者模型的特性,也是 读者优先 策略的体现,如果想要避免死锁,可以选择 写者优先 策略,优先让写者先写,读者先等一等


🌆总结

以上就是关于 Linux多线程【线程池】的全部内容了,作为多线程篇章的收官之作,首先学习了池化技术,了解了线程池的特性,然后又分别实现了四个版本的线程池,循序渐进,最终得到了单例版的线程池,得益于模板,此线程池可以轻松应用于其他场景中,最后还学习了多线程的一些周边知识,比如线程安全、锁概念、读者写者问题。总之多线程算是正式结束了,下一篇将会打开网络的大门


相关文章推荐 Linux多线程 =====:> 【初始多线程】【线程控制】【线程互斥与同步】【生产者消费者模型】 Linux进程信号 ===== :> 【信号产生】【信号保存】【信号处理】 Linux进程间通信 ===== :> 【消息队列、信号量】【共享内存】【命名管道】【匿名管道】 Linux基础IO ===== :> 【软硬链接与动静态库】【深入理解文件系统】【模拟实现C语言文件流】【重定向及缓冲区理解】【文件理解与操作】 Linux进程控制 ===== :> 【简易版bash】【进程程序替换】【创建、终止、等待】 Linux进程学习 ===== :> 【进程地址】【环境变量】【进程状态】【基本认知】 Linux基础 ===== :> 【gdb】【git】【gcc/g++】【vim】Linux 权限理解和学习听说Linux基础指令很多?这里都帮你总结好了

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2023-11-08,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 🌇前言
  • 🏙️正文
    • 1.线程池的概念
      • 1.1.池化技术
      • 1.2.线程池的优点
      • 1.3.线程池的应用场景
    • 2.线程池的实现
      • 2.1.线程池_V1(朴素版)
      • 2.2.线程池_V2(封装版)
      • 2.3.线程池_V3(优化版)
    • 3.单例模式
      • 3.1.什么是单例模式
      • 3.2.单例模式的特点
      • 3.3.单例模式的简单实现
      • 3.4.线程池_V4(最终版)
    • 4.周边问题补充
      • 4.1.STL线程安全问题
      • 4.2.智能指针线程安全问题
      • 4.3.其他常见锁概念
      • 4.4.读者写者问题
  • 🌆总结
相关产品与服务
容器服务
腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档