前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >《C++并发编程实战》读书笔记(6):高级线程管理、并行算法函数、测试与除错

《C++并发编程实战》读书笔记(6):高级线程管理、并行算法函数、测试与除错

作者头像
C语言与CPP编程
发布2023-09-19 20:40:50
2330
发布2023-09-19 20:40:50
举报
文章被收录于专栏:c语言与cpp编程c语言与cpp编程

第9章 高级线程管理

9.1 线程池

大多数程序中并不方便给每个任务分配单独的线程,但仍可通过线程池来充分利用可调配的并发算力:将可同时执行的任务提交到线程池,放入任务队列中等待,工作线程循环地领取并执行任务。

以下是一种实现,提交任务后返回future,提交者可通过future获取任务结果,任务先被包装成packaged_task再被包装成function,由工作线程来处理。

代码语言:javascript
复制
class ThreadPool {
private:
    std::vector<std::thread> threads;
    ThreadsafeQueue<std::function<void()>> taskQueue;
    std::atomic<bool> stop;
    join_threads joiner;
public:
    ThreadPool(size_t numThreads = std::thread::hardware_concurrency()) 
        : stop(false),joiner(threads) {
        for (size_t i = 0; i < numThreads; ++i) {
            threads.emplace_back([this]() {
                while (!stop) {
                    run_pending_task();
                }
            });
        }
    }
    
    // 避免所有线程都在等待其他线程完成任务
    void run_pending_task(){
        std::function<void()> task;               
        if (taskQueue.try_pop(task))
            task();
        else
            std::this_thread::yield();
    }
    
    ~ThreadPool() {
        stop = true;
    }

    template<class F, class... Args>
    auto submit(F&& f, Args&&... args) -> std::future<decltype(f(args...))> {
        using ReturnType = decltype(f(args...));
        auto task = std::make_shared<std::packaged_task<ReturnType()>>(
            std::bind(std::forward<F>(f), std::forward<Args>(args)...)
        );
        std::future<ReturnType> result = task->get_future();
        taskQueue.push([task]() { (*task)(); });
        return result;
    }
};

例如可以实现基于线程池的快排:

代码语言:javascript
复制
template <typename T>
struct sorter {
    ThreadPool pool;

    std::list<T> do_sort(std::list<T>& chunk_data) {
        if (chunk_data.empty()) return chunk_data;
        // 将原list分为大小两段
        std::list<T> result;
        result.splice(result.begin(), chunk_data, chunk_data.begin());
        T const& partition_val = *result.begin();
        auto divide_point = std::partition(chunk_data.begin(), chunk_data.end(),[&](T const& val) { return val < partition_val; });
        // 两段分别处理
        std::list<T> new_lower_chunk;
        new_lower_chunk.splice(new_lower_chunk.end(), chunk_data,chunk_data.begin(), divide_point);
        auto new_lower = pool.submit(std::bind(&sorter::do_sort, this, std::move(new_lower_chunk)));
        std::list<T> new_higher(do_sort(chunk_data));
        result.splice(result.end(), new_higher);
        // 避免所有线程彼此等待
        while (!new_lower.is_ready()) {
            pool.run_pending_task();
        }
        result.splice(result.begin(), new_lower.get());
        return result;
    }
};

template <typename T>
std::list<T> parallel_quick_sort(std::list<T> input) {
    if (input.empty()) return input;
    sorter<T> s;
    return s.do_sort(input);
}

上述线程池仅具备一个全局的任务队列,即使使用无锁队列来优化仍然会有严重的缓存乒乓,导致性能浪费。可以为每个线程配备thread_local任务队列,仅当线程自身线程没有任务时才从全局队列领取任务。

此外,倘若某线程自身队列为空,而另一线程的队列为满,需支持窃取任务。首先实现支持这样操作的队列,仅用锁简单实现,一端用于push/pop,另一端用于steal。

代码语言:javascript
复制
class work_stealing_queue {
private:
    typedef std::function<void()> data_type;
    std::deque<data_type> the_queue;
    mutable std::mutex the_mutex;

public:
    work_stealing_queue() {}

    work_stealing_queue(const work_stealing_queue& other) = delete;
    work_stealing_queue& operator=(const work_stealing_queue& other) = delete;

    void push(data_type data) {
        std::lock_guard<std::mutex> lock(the_mutex);
        the_queue.push_front(std::move(data));
    }

    bool try_pop(data_type& res) {
        std::lock_guard<std::mutex> lock(the_mutex);
        if (the_queue.empty()) return false;
        res = std::move(the_queue.front());
        the_queue.pop_front();
        return true;
    }

    bool try_steal(data_type& res) {
        std::lock_guard<std::mutex> lock(the_mutex);
        if (the_queue.empty()) return false;
        res = std::move(the_queue.back());
        the_queue.pop_back();
        return true;
    }
};

基于上面的结构,可以实现支持任务窃取的线程池:

代码语言:javascript
复制
class thread_pool {
private:
    typedef std::function<void()> task_type;
    std::vector<std::thread> threads;
    join_threads joiner;
    std::atomic_bool done;
    // 全局任务队列
    thread_safe_queue<task_type> pool_work_queue;
    std::vector<std::unique_ptr<work_stealing_queue>> queues;
    // 指向线程独有的任务队列
    static thread_local work_stealing_queue* local_work_queue;
    // 线程编号
    static thread_local unsigned my_index;

    void worker_thread(unsigned my_index_) {
        my_index = my_index_;
        local_work_queue = queues[my_index].get();
        while (!done) {
            run_pending_task();
        }
    }

    bool pop_task_from_local_queue(task_type& task) {
        return local_work_queue && local_work_queue->try_pop(task);
    }

    bool pop_task_from_pool_queue(task_type& task) {
        return pool_work_queue.try_pop(task);
    }
    // 遍历,偷取任务
    bool pop_task_from_other_thread_queue(task_type& task) {
        for (unsigned i = 0; i < queues.size(); ++i) {
            unsigned const index = (my_index + i + 1) % queues.size();
            if (queues[index]->try_steal(task)) {
                return true;
            }
        }
        return false;
    }

public:
    thread_pool() : joiner(threads), done(false) {
        unsigned const thread_count = std::thread::hardware_concurrency();
        try {
            for (unsigned i = 0; i < thread_count; ++i) {
                queues.push_back(std::unique_ptr<work_stealing_queue>(
                    new work_stealing_queue));
                threads.push_back(
                    std::thread(&thread_pool::worker_thread, this, i));
            }
        } catch (...) {
            done = true;
            throw;
        }
    }

    ~thread_pool() { done = true; }

    template <class F, class... Args>
    auto submit(F&& f, Args&&... args) -> std::future<decltype(f(args...))> {
        using ReturnType = decltype(f(args...));
        auto task = std::make_shared<std::packaged_task<ReturnType()>>(
            std::bind(std::forward<F>(f), std::forward<Args>(args)...));
        std::future<ReturnType> result = task->get_future();
        if (local_work_queue) {
            local_work_queue->push([task]() { (*task)(); });
        } else {
            pool_work_queue.push([task]() { (*task)(); });
        }
        return result;
    }

    void run_pending_task() {
        task_type task;
        if (pop_task_from_local_queue(task) || pop_task_from_pool_queue(task) ||
            pop_task_from_other_thread_queue(task)) {
            task();
        } else {
            std::this_thread::yield();
        }
    }
};

9.2 中断线程

C++20中引入了能接收中断、自动join的jthread。但自己实现也不复杂。借助thread_local的interrupt_flag来辅助实现,通过interrupt成员函数来设置中断,并借此实现可中断的条件变量/future上的等待。

代码语言:javascript
复制
thread_local interrupt_flag this_thread_interrupt_flag;

class interruptible_thread {
    std::thread internal_thread;
    interrupt_flag* flag;

public:
    template <typename FunctionType>
    interruptible_thread(FunctionType f) {
        std::promise<interrupt_flag*> p;
        internal_thread = std::thread([f, &p] {
            p.set_value(&this_thread_interrupt_flag);
            try{
                f();
            }catch(...){}
        });
        flag = p.get_future().get();
    }
    // 设置中断
    void interrupt() {
        if (flag) {
            flag->set();
        }
    }
};
// 如果已设置中断则抛出异常
void interruption_point() {
    if (this_thread_interrupt_flag.is_set()) {
        throw std::exception();
    }
}
// 可中断的条件变量等待
template <typename Lockable>
void interruptible_wait(std::condition_variable_any& cv, Lockable& lk) {
    this_thread_interrupt_flag.wait(cv, lk);
}
// 可中断的future等待
template <typename T, typename Lockable>
void interruptible_wait(std::future<T>& uf, Lockable& lk) {
    while (!this_thread_interrupt_flag.is_set()) {
        if (uf.wait_for(lk, 1ms) == std::future_status::ready) break;
    }
}

其中,interrupt_flag的实现如下,基于condition_variable_any而非普通条件变量,set时(即设置中断时)唤醒条件变量,wait时多次检查是否设置中断。

代码语言:javascript
复制
class interrupt_flag {
    std::atomic<bool> flag;
    std::condition_variable_any* thread_cond_any;
    std::mutex set_clear_mutex;

public:
    interrupt_flag() : thread_cond_any(nullptr) {}
    void set() {
        flag.store(true, std::memory_order_relaxed);
        std::lock_guard<std::mutex> lk(set_clear_mutex);
        if (thread_cond_any) {
            thread_cond_any->notify_all();
        }
    }

    bool is_set() const { return flag.load(std::memory_order_relaxed); }

    template <typename Lockable>
    void wait(std::condition_variable_any& cv, Lockable& lk) {
        struct custom_lock {
            interrupt_flag* self;
            Lockable& lk;
            custom_lock(interrupt_flag* self_,
                        std::condition_variable_any& cond, Lockable& lk_)
                : self(self_), lk(lk_) {
                self->set_clear_mutex.lock();
                self->thread_cond_any = &cond;
            }
            void unlock() {
                lk.unlock();
                self->set_clear_mutex.unlock();
            }
            void lock() { std::lock(self->set_clear_mutex, lk); }
            ~custom_lock() { self->thread_cond_any = nullptr; }
        };
        custom_lock cl(this, cv, lk);
        interruption_point();
        cv.wait(cl);
        interruption_point();
    }
};

可以用try/catch来捕获中断,按某种方式处理然后继续执行。中断线程在实际应用中的常见场景是运行程序前开启后台任务,程序运行完退出时中断后台任务。


第10章 并行算法函数

C++17向标准库加入了并行算法函数,在原有函数的参数列表前新增了执行策略参数。<execution>中定义了三种执行策略sequenced_policy、parallel_policy、parallel_unsequenced_policy,以及对应的传给并行算法函数的对象seq、par、par_unseq。

不同策略会影响算法函数的复杂度、抛出异常时的行为、何时何地何种方式执行。其中seq代表顺序策略,令算法函数在发起调用的线程上执行全部操作,没有内存次序限制;par代表并行策略,内部操作可能在发起调用的线程上也可能另外创建线程执行,涉及的变量绝不能引发数据竞争;par_unseq代表非顺序并行策略,并行化最高,涉及的变量不得以任何形式同步。

例如某网站有庞大的日志,需要逐行处理日志提炼各项信息,最后聚合结果,类似mapreduce。由于每行日志的处理都独立,只需最后总数正确,所以可以用transfrom_reduce来处理:

代码语言:javascript
复制
struct log_info {
    std::string page;
    time_t visit_time;
    std::string browser;
};

extern log_info parse_log_line(std::string const &line);

using visit_map_type = std::unordered_map<std::string, unsigned long long>;

visit_map_type count_visits_per_page(
    std::vector<std::string> const &log_lines) {
    struct combine_visits {
        visit_map_type operator()(visit_map_type lhs,
                                  visit_map_type rhs) const {
            if (lhs.size() < rhs.size()) std::swap(lhs, rhs);
            for (auto const &entry : rhs) {
                lhs[entry.first] += entry.second;
            }
            return lhs;
        }

        visit_map_type operator()(log_info log, visit_map_type map) const {
            ++map[log.page];
            return map;
        }
        visit_map_type operator()(visit_map_type map, log_info log) const {
            ++map[log.page];
            return map;
        }
        visit_map_type operator()(log_info log1, log_info log2) const {
            visit_map_type map;
            ++map[log1.page];
            ++map[log2.page];
            return map;
        }
    };

    return std::transform_reduce(std::execution::par, log_lines.begin(),
                                 log_lines.end(), visit_map_type(),
                                 combine_visits(), parse_log_line);
}
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2023-09-13 08:30,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 C语言与CPP编程 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
腾讯云服务器利旧
云服务器(Cloud Virtual Machine,CVM)提供安全可靠的弹性计算服务。 您可以实时扩展或缩减计算资源,适应变化的业务需求,并只需按实际使用的资源计费。使用 CVM 可以极大降低您的软硬件采购成本,简化 IT 运维工作。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档