前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >线程池:线程池的实现 | 日志

线程池:线程池的实现 | 日志

作者头像
南桥
发布2024-10-02 08:04:33
40
发布2024-10-02 08:04:33
举报
文章被收录于专栏:南桥谈编程

原理

在一个可执行程序内部存在多个线程和一个任务队列。如果任务队列里长时间没有任务,这些线程就会休眠,如果此时来了一个任务,那么线程就会被唤醒。像这种,提前创建好线程,需要的时候直接使用,我们称之为线程池。这种本质上就是一个生产消费模型。

线程池实现

代码语言:javascript
复制
//ThreadPool.hpp
#pragma once

#include<iostream>
#include<unistd.h>
#include<string>
#include<vector>
#include<queue>
#include<functional>
#include"Thread.hpp"

using namespace threadModel;

static const int gdefaultnum=5;

void test()
{
    while(true)
    {
        std::cout<<"hello world"<<std::endl;
        sleep(1);
    }
}

template<typename T>
class ThreadPool
{
private:
    void LockQueue()
    {
        pthread_mutex_lock(&_mutex);
    }

    void UnlockQueue()
    {
        pthread_mutex_unlock(&_mutex);
    }

    void Wakeup()
    {
        pthread_cond_signal(&_cond);
    }

    void WakeupAll()
    {
        pthread_cond_broadcast(&_cond);
    }

    void Sleep()
    {
        pthread_cond_wait(&_cond,&_mutex);
    }

    bool IsEmpty()
    {
        return _task_queue.empty();
    }

    void HandlerTask(const std::string& name)  // this
    {
        while (true)
        {
            LockQueue();
            //如果队列为空(有任务)
            while(IsEmpty()&&_isrunning) //线程没有任务,但是在工作,继续休眠
            {
                _sleep_thread_num++;
                Sleep();
                _sleep_thread_num--;

            }

            if(IsEmpty()&&!_isrunning) // 任务是空的,并且线程退出工作
            {
                std::cout<<name<<" quit..."<<std::endl;
                UnlockQueue();
                break;
            }

            // 队列不为空,有任务 或者 队列被唤醒
            // 取任务
            T t=_task_queue.front();
            _task_queue.pop();
            UnlockQueue();

            // 此处任务已经不在任务队列中,任务已经被拿走,处理任务和临界资源是两码事

            t(); // 处理任务,不能不用也不能在临界区中处理
            std::cout<<name<<": "<<t.result()<<std::endl;
        }
        
    }

public:
    ThreadPool(int thread_num=gdefaultnum)
        :_thread_num(thread_num)
        ,_isrunning(false) //刚开始线程没有使用
        ,_sleep_thread_num(0)
    {
        pthread_mutex_init(&_mutex,nullptr);
        pthread_cond_init(&_cond,nullptr);
    }

    void Init()
    {
        func_t func=std::bind(&ThreadPool::HandlerTask,this,std::placeholders::_1);
        for(int i=0;i<_thread_num;i++)
        {
            std::string threadname="thread-"+std::to_string(i+1);
            _threads.emplace_back(threadname,func);
        }
    }

    void Start()
    {
        _isrunning=true;
        for(auto& thread:_threads)
        {
            thread.Start();
        }
    }

    void Stop()
    {
        LockQueue();
        _isrunning=false;
        WakeupAll();
        UnlockQueue();
    }

    void Equeue(const T &in)
    {
        LockQueue();
        if(_isrunning)
        {
            _task_queue.push(in);
            // 如果当前有线程在等待,需要唤醒
            if(_sleep_thread_num>0)
            {
                Wakeup();
            }
        }

        UnlockQueue();
    }

    ~ThreadPool()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_cond);
    }
private:
    int _thread_num;
    std::vector<Thread> _threads;  // 管理多个线程
    std::queue<T> _task_queue; // 任务队列
    bool _isrunning; //当前线程是否在工作
    int _sleep_thread_num;   //计数器:休眠的线程个数

    pthread_mutex_t _mutex;
    pthread_cond_t _cond;
};

日志

日志是软件运行的记录信息,可以向显示器打印,也可以向文件中打印,日志必须有特定的格式:

  • [日志等级] [pid] [filename] [filenumber] [time] 日志内容(支持可变参数)

日志等级:DEBUG、INFO、WARNING、ERROR、FATAL(致命的)

代码语言:javascript
复制
// 日志等级
enum
{
    DEBUG=1,
    INFO,
    WARING,
    ERROR,
    FATAL
};
  • 日志消息:日志等级、id、文件名、行号、当前时间等
代码语言:javascript
复制
// 日志消息
class logmessage
{

public:
    std::string _level; // 日志等级
    pid_t _id; 
    std::string _filename; // 文件名
    int _filenumber;  // 行号
    std::string _cur_time;
    std::string _message_info;

};

获取当前时间函数接口

代码语言:javascript
复制
std::string GetCurTime()
{
    time_t now=time(nullptr);

    struct tm* cur_time=localtime(&now);
    char buffer[128];
    snprintf(buffer,sizeof(buffer),"%d-%02d-%02d %02d:%02d:%02d",
            cur_time->tm_year+1900,
            cur_time->tm_mon+1,
            cur_time->tm_mday,
            cur_time->tm_hour,
            cur_time->tm_min,
            cur_time->tm_sec);
    return std::string(buffer);
}
  • time(nullptr) 返回当前的时间(从 1970 年 1 月 1 日到现在的秒数),并将其赋值给 now 变量。time_t 是表示时间点的类型。
  • localtime(&now) now 转换为当地时间,并返回一个指向 tm 结构的指针。tm 结构包含了年、月、日、时、分、秒等信息。

启用类型设置

代码语言:javascript
复制
void Enable(int type)
{
    _type=type;
}

Enable 函数用于设置日志输出类型,可以选择输出到屏幕或文件。

输出到屏幕

代码语言:javascript
复制
void FlushLogToSCreen(const logmessage& lg)
{
    printf("[%s][%d][%s][%d][%s] %s",
        lg._level.c_str(),
        lg._id,
        lg._filename.c_str(),
        lg._filenumber,
        lg._cur_time.c_str(),
        lg._message_info.c_str()
    );
}

FlushLogToSCreen 函数将日志信息格式化并输出到控制台。使用 printf 格式化字符串。

输出到文件

代码语言:javascript
复制
void FlushLogToFile(const logmessage& lg)
{
    std::ofstream out(_logfile,std::ios::app); // 追加打印
    if(!out.is_open())
        return;

    char logtxt[2048];
    snprintf(logtxt,sizeof(logtxt),"[%s][%d][%s][%d][%s] %s",
        lg._level.c_str(),
        lg._id,
        lg._filename.c_str(),
        lg._filenumber,
        lg._cur_time.c_str(),
        lg._message_info.c_str()
    );

    out.write(logtxt,strlen(logtxt));
    out.close();
}

FlushLogToFile 函数将日志信息写入指定的文件。以追加模式打开文件,并在打开失败时返回。 使用snprintf 格式化日志信息,然后将其写入文件。

选择输出方式

代码语言:javascript
复制
void FlushLog(const logmessage& lg)
{
    switch(_type)
    {
        case SCREEN_TYPE:
            FlushLogToSCreen(lg);
            break;
        case FILE_TYPE:
            FlushLogToFile(lg);
            break;
    }
}

创建日志消息

代码语言:javascript
复制
void logMessage(std::string filename,int filenumber,int level,const char* format,...)
{
    logmessage lg;
    lg._level=LevelToString(level);
    lg._id=getpid();
    lg._filename=filename;
    lg._filenumber=filenumber;
    lg._cur_time=GetCurTime();

    va_list ap;
    va_start(ap,format);
    char log_info[1024];
    vsnprintf(log_info,sizeof(log_info),format,ap);
    va_end(ap);
    
    lg._message_info=log_info;

    FlushLog(lg);
}
  • logMessage 函数用于创建一条新的日志消息。它接受文件名、文件编号、日志级别和格式化字符串作为参数。
  • 使用可变参数处理(va_list)来处理格式化字符串。
  • 将生成的日志信息存储在 lg 对象中,并调用 FlushLog 函数进行输出。
  • va_list ap;声明了一个 va_list 类型的变量 ap,它用于存储可变参数列表。在 C 语言中,va_list 是一个用于遍历不定数量参数的类型。
  • va_start(ap, format);va_start 宏初始化 ap 以指向函数参数列表中的第一个可变参数。这里的 format 是最后一个固定参数,va_start 会从它的下一个参数开始读取可变参数。因此,ap 现在可以用来访问 format 之后的所有参数。
  • va_end(ap)用于清理va_list 变量 ap。在读取完可变参数后,调用 va_end 是良好的实践,它可以释放与 ap 相关的资源(如果有的话)。

完整代码

代码语言:javascript
复制
#pragma once

#include<iostream>
#include<string>
#include<cstring>
#include<sys/types.h>
#include<unistd.h>
#include<stdarg.h>
#include<ctime>
#include<fstream>
#include<pthread.h>
#include<cstdio>
#include"LockGuard.hpp"

using std::cin;
using std::cout;
using std::endl;

namespace log_ns
{

    // 日志等级
    enum
    {
        DEBUG=1,
        INFO,
        WARING,
        ERROR,
        FATAL
    };

    // 日志消息
    class logmessage
    {

    public:
        std::string _level; // 日志等级
        pid_t _id; 
        std::string _filename; // 文件名
        int _filenumber;  // 行号
        std::string _cur_time;
        std::string _message_info;

    };

    pthread_mutex_t glock=PTHREAD_MUTEX_INITIALIZER; // 定义一个全局的锁

    std::string LevelToString(int level)
    {
        LockGuard lockguard(&glock);
        switch(level)
        {
            case DEBUG:
                return "DEBUG";
            case INFO:
                return "INFO";
            case WARING:
                return "WARING";
            case ERROR:
                return "ERROR";
            case FATAL:
                return "FATAL";
            default:
                return "UNKNOWN";
        }
    }

    std::string GetCurTime()
    {
        time_t now=time(nullptr);

        struct tm* cur_time=localtime(&now);
        char buffer[128];
        snprintf(buffer,sizeof(buffer),"%d-%02d-%02d %02d:%02d:%02d",
                cur_time->tm_year+1900,
                cur_time->tm_mon+1,
                cur_time->tm_mday,
                cur_time->tm_hour,
                cur_time->tm_min,
                cur_time->tm_sec);
        return std::string(buffer);
    }

    #define SCREEN_TYPE 1
    #define FILE_TYPE 2

    const std::string glogfile="./log.txt";
    // 日志
    class Log
    {
    public:
        Log(const std::string& logfeile=glogfile):_logfile(logfeile),_type(SCREEN_TYPE)
        {

        }

        void Enable(int type)
        {
            _type=type;
        }

        void FlushLogToSCreen(const logmessage& lg)
        {
            printf("[%s][%d][%s][%d][%s] %s",
                lg._level.c_str(),
                lg._id,
                lg._filename.c_str(),
                lg._filenumber,
                lg._cur_time.c_str(),
                lg._message_info.c_str()
            );
        }

        void FlushLogToFile(const logmessage& lg)
        {
            std::ofstream out(_logfile,std::ios::app); // 追加打印
            if(!out.is_open())
                return;

            char logtxt[2048];
            snprintf(logtxt,sizeof(logtxt),"[%s][%d][%s][%d][%s] %s",
                lg._level.c_str(),
                lg._id,
                lg._filename.c_str(),
                lg._filenumber,
                lg._cur_time.c_str(),
                lg._message_info.c_str()
            );

            out.write(logtxt,strlen(logtxt));
            out.close();
        }

        void FlushLog(const logmessage& lg)
        {

            switch(_type)
            {
                case SCREEN_TYPE:
                    FlushLogToSCreen(lg);
                    break;
                case FILE_TYPE:
                    FlushLogToFile(lg);
                    break;
            }
        }

        void logMessage(std::string filename,int filenumber,int level,const char* format,...)
        {
            logmessage lg;
            lg._level=LevelToString(level);
            lg._id=getpid();
            lg._filename=filename;
            lg._filenumber=filenumber;
            lg._cur_time=GetCurTime();

            va_list ap;
            va_start(ap,format);
            char log_info[1024];
            vsnprintf(log_info,sizeof(log_info),format,ap);
            va_end(ap);
            
            lg._message_info=log_info;

            //cout<<lg._message_info<<endl;
            // 打印日志
            FlushLog(lg);

        }

        ~Log()
        {

        }

    private:
        int _type;
        std::string _logfile;
    };

    Log lg;

    #define LOG(Level,Format,...) do{lg.logMessage(__FILE__,__LINE__,Level,Format,##__VA_ARGS__);}while(0)
    #define EnableScreen() do{lg.Enable(SCREEN_TYPE);}while(0)
    #define EnableFile() do{lg.Enable(FILE_TYPE);}while(0)

};

携带日志的线程池设计

代码语言:javascript
复制
//ThreadPool.hpp
#pragma once

#include<iostream>
#include<unistd.h>
#include<string>
#include<vector>
#include<queue>
#include<functional>
#include"Thread.hpp"
#include"Log.hpp"

using namespace threadModel;
using namespace log_ns;

static const int gdefaultnum=5;

void test()
{
    while(true)
    {
        std::cout<<"hello world"<<std::endl;
        sleep(1);
    }
}

template<typename T>
class ThreadPool
{
private:
    void LockQueue()
    {
        pthread_mutex_lock(&_mutex);
    }

    void UnlockQueue()
    {
        pthread_mutex_unlock(&_mutex);
    }

    void Wakeup()
    {
        pthread_cond_signal(&_cond);
    }

    void WakeupAll()
    {
        pthread_cond_broadcast(&_cond);
    }

    void Sleep()
    {
        pthread_cond_wait(&_cond,&_mutex);
    }

    bool IsEmpty()
    {
        return _task_queue.empty();
    }

    void HandlerTask(const std::string& name)  // this
    {
        while (true)
        {
            LockQueue();
            //如果队列为空(有任务)
            while(IsEmpty()&&_isrunning) //线程没有任务,但是在工作,继续休眠
            {
                _sleep_thread_num++;
                LOG(INFO,"%s thread sleep begin!\n",name.c_str());

                Sleep();

                LOG(INFO,"%s thread wakeup!\n",name.c_str());

                _sleep_thread_num--;

            }

            if(IsEmpty()&&!_isrunning) // 任务是空的,并且线程退出工作
            {
                UnlockQueue();
                LOG(INFO,"%s quit\n",name.c_str());
                break;
            }

            // 队列不为空,有任务 或者 队列被唤醒
            // 取任务
            T t=_task_queue.front();
            _task_queue.pop();
            UnlockQueue();

            // 此处任务已经不在任务队列中,任务已经被拿走,处理任务和临界资源是两码事

            t(); // 处理任务,不能不用也不能在临界区中处理
            LOG(DEBUG,"hander task done, task is: \n%s",t.result().c_str());
        }
        
    }

public:
    ThreadPool(int thread_num=gdefaultnum)
        :_thread_num(thread_num)
        ,_isrunning(false) //刚开始线程没有使用
        ,_sleep_thread_num(0)
    {
        pthread_mutex_init(&_mutex,nullptr);
        pthread_cond_init(&_cond,nullptr);
    }

    void Init()
    {
        func_t func=std::bind(&ThreadPool::HandlerTask,this,std::placeholders::_1);
        for(int i=0;i<_thread_num;i++)
        {
            std::string threadname="thread-"+std::to_string(i+1);
            _threads.emplace_back(threadname,func);
            LOG(DEBUG,"construct thread %s done, init success.\n",threadname.c_str());
        }
    }

    void Start()
    {
        _isrunning=true;
        for(auto& thread:_threads)
        {
            LOG(DEBUG,"Start thread %s done.\n",thread.Name().c_str());
            thread.Start();
        }
    }

    void Stop()
    {
        LockQueue();
        _isrunning=false;
        WakeupAll();
        UnlockQueue();
        LOG(INFO,"Thread Pool Stop Success!\n");
    }

    void Equeue(const T &in)
    {
        LockQueue();
        if(_isrunning)
        {
            _task_queue.push(in);
            // 如果当前有线程在等待,需要唤醒
            if(_sleep_thread_num>0)
            {
                Wakeup();
            }
        }

        UnlockQueue();
    }

    ~ThreadPool()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_cond);
    }
private:
    int _thread_num;
    std::vector<Thread> _threads;  // 管理多个线程
    std::queue<T> _task_queue; // 任务队列
    bool _isrunning; //当前线程是否在工作
    int _sleep_thread_num;   //计数器:休眠的线程个数

    pthread_mutex_t _mutex;
    pthread_cond_t _cond;
};
代码语言:javascript
复制
//Main.cc
#include"ThreadPool.hpp"
#include"Task.hpp"
#include"Log.hpp"
#include<memory>

using std::cin;
using std::cout;
using std::endl;
using namespace log_ns;

int main()
{

    EnableScreen();
    //std::unique_ptr<ThreadPool> tp=std::make_unique<>();  //构建一个ThreadPool对象
    ThreadPool<Task> *tp=new ThreadPool<Task>();
    
    tp->Init();

    tp->Start();

    int cnt=10;
    while (cnt)
    {
        // 不断地向线程池推送任务
        sleep(1);
        Task t(1,1);
        tp->Equeue(t);
        LOG(INFO,"equeue a task, %s\n",t.debug().c_str());
        sleep(1);
    }
    tp->Stop();
    LOG(INFO,"thread pool stop!\n");
    sleep(10);

    return 0;
}
代码语言:javascript
复制
// Thread.hpp
#pragma once
#include <pthread.h>
#include <iostream>
#include <string>
#include<functional>

namespace threadModel
{
    // 线程执行的方法
    //typedef void (*func_t)(ThreadData* td);
    using func_t=std::function<void(const std::string&)>;

    class Thread
    {
    public:
        void Excute()
        {
            _isrunning = true;
            _func(_name);
            _isrunning = false;
        }

    public:
        Thread(const std::string &name, func_t func) : _name(name), _func(func)
        {
        }
        // void *ThreadRoutine(void* args)实际上参数里面还有一个Thread* this
        static void *ThreadRoutine(void *args) // 加上static后,参数里面就没有Thread* this
        {
            Thread *self = static_cast<Thread *>(args); // 获得当前对象
            self->Excute();
            return nullptr;
        }

        bool Start()
        {
            int n = ::pthread_create(&_tid, nullptr, ThreadRoutine, this);
            if (n != 0)
                return false;
            return true;
        }

        std::string Status()
        {
            if (_isrunning)
                return "running";
            else
                return "sleep";
        }

        void Stop()
        {
            if (_isrunning)
            {
                pthread_cancel(_tid);
                _isrunning = false;
            }
        }

        void Join()
        {
            pthread_join(_tid, nullptr);
        }
        
        std::string Name()
        {
            return _name;
        }

        ~Thread()
        {
            Stop();
        }

    private:
        std::string _name;
        pthread_t _tid;
        bool _isrunning;
        func_t _func; // 线程执行的回调函数
    };
}
代码语言:javascript
复制
//Task.hpp
#pragma once
#include<iostream>
#include<string>
#include<functional>

class Task
{

public:
    Task()
    {}
    Task(int x,int y):_x(x),_y(y)
    {}

    void Excute()
    {
        _result=_x+_y;
    }

    void operator()()
    {
        Excute();
    }

    std::string debug()
    {
        std::string msg=std::to_string(_x)+"+"+std::to_string(_y)+"=?";
        return msg;
    }
    std::string result()
    {
        std::string msg=std::to_string(_x)+"+"+std::to_string(_y)+"="+std::to_string(_result);
        return msg;
    }

private:
    int _x;
    int _y;
    int _result;
};
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2024-10-01,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 原理
  • 线程池实现
  • 日志
    • 获取当前时间函数接口
      • 启用类型设置
        • 输出到屏幕
          • 输出到文件
            • 选择输出方式
              • 创建日志消息
                • 完整代码
                • 携带日志的线程池设计
                领券
                问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档