前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >《C++并发编程实战》读书笔记(5):设计并发代码

《C++并发编程实战》读书笔记(5):设计并发代码

作者头像
C语言与CPP编程
发布2023-09-06 17:32:31
1430
发布2023-09-06 17:32:31
举报
文章被收录于专栏:c语言与cpp编程c语言与cpp编程

今天继续更新《Effective C++》和《C++并发编程实战》的读书笔记,下面是已经更新过的内容:

《C++并发编程实战》读书笔记(1):并发、线程管控

《C++并发编程实战》读书笔记(2):并发操作的同步

《C++并发编程实战》读书笔记(3):内存模型和原子操作

《C++并发编程实战》读书笔记(4):设计并发数据结构

《Effective C++》读书笔记(1):让自己习惯C++

《Effective C++》读书笔记(2):构造/析构/赋值运算

《Effective C++》读书笔记(3):资源管理

《Effective C++》读书笔记(4):设计与声明

《Effective C++》读书笔记(5):实现

第8章 设计并发代码

在线程间切分任务的方法包括:1、先在线程间切分数据再处理。2、以递归的方式划分数据,例如快排中每次以某元素为界将数据集划分为大小两部分然后再递归处理。3、依据工作类别划分任务,每部分代码只承担单一的功能职责。

例如第四章中演示了使用std::async实现并行版快排,也可以自己实现:

代码语言:javascript
复制
template <typename T>
struct sorter {
    struct chunk_to_sort {
        std::list<T> data;
        std::promise<std::list<T> > promise;
    };

    thread_safe_stack<chunk_to_sort> chunks;
    std::vector<std::thread> threads;
    unsigned const max_thread_count;
    std::atomic<bool> end_of_data;

    sorter()
        : max_thread_count(std::thread::hardware_concurrency() - 1),
          end_of_data(false) {}

    ~sorter() {
        end_of_data = true;
        for (unsigned i = 0; i < threads.size(); ++i) {
            threads[i].join();
        }
    }

    std::list<T> do_sort(std::list<T>& chunk_data) {
        if (chunk_data.empty()) {
            return chunk_data;
        }

        std::list<T> result;
        result.splice(result.begin(), chunk_data, chunk_data.begin());
        T const& partition_val = *result.begin();

        auto divide_point =
            std::partition(chunk_data.begin(), chunk_data.end(),
                           [&](T const& val) { return val < partition_val; });
        chunk_to_sort new_lower_chunk;
        new_lower_chunk.data.splice(new_lower_chunk.data.end(), chunk_data,
                                    chunk_data.begin(), divide_point);

        std::future<std::list<T> > new_lower =
            new_lower_chunk.promise.get_future();
        chunks.push(std::move(new_lower_chunk));
        if (threads.size() < max_thread_count) {
            threads.push_back(std::thread(&sorter<T>::sort_thread, this));
        }

        std::list<T> new_higher(do_sort(chunk_data));

        result.splice(result.end(), new_higher);
        while (new_lower.wait_for(std::chrono::seconds(0)) !=
               std::future_status::ready) {
            try_sort_chunk();
        }

        result.splice(result.begin(), new_lower.get());
        return result;
    }

    void sort_thread() {
        while (!end_of_data) {
            try_sort_chunk();
            std::this_thread::yield();
        }
    }
    
    void try_sort_chunk() {
        std::shared_ptr<chunk_to_sort> chunk = chunks.pop();
        if (chunk) {
            chunk->promise.set_value(do_sort(chunk->data));
        }
    }
};

template <typename T>
std::list<T> parallel_quick_sort(std::list<T> input) {
    if (input.empty()) {
        return input;
    }
    sorter<T> s;
    return s.do_sort(input);
}

影响并发代码性能的因素包括:1、处理器的数量,C++11可以通过std::thread::hardware_concurrency获取硬件线程数量。2、数据竞争和缓存乒乓,例如循环处理原子变量或加锁时变量所含的数据在不同缓存之间多次来回传递,严重影响程序性能。3、不经意共享,多个线程访问同一个数据的不同元素时可能访问同一个缓存块,造成数据乒乓;C++17定义了std::hardware_destructive_interference_size用于表示一个字节数的限度,数据分布范围超出该值就不会有不经意共享。4、数据的紧凑程度,单个线程访问的数据可能属于不同内存块,C++17定义了std::hardward_constructive_interference_size表示同一缓存块保证容纳的最大连续字节数。5、过度任务切换和线程过饱和。

还有一些要额外考虑的因素:1、异常安全,某线程上有函数因异常而退出会导致整个程序被终结。2、可伸缩性,尽可能令并发程度达到最大并确保实际工作量足以让多个处理器有效运行。3、利用多线程掩藏等待行为,提高响应能力。

下面是标准库部分并行化实现的例子。

首先是for_each,使用RAII类join_threads,用future<void>存储工作线程的返回值,用futures[i].get()传递异常。

代码语言:javascript
复制
class join_threads {
    std::vector<std::thread>& threads_;
public:
    explicit join_threads(std::vector<std::thread>& threads)
        : threads_(threads) {}
    ~join_threads() {
        for (std::size_t i = 0; i < threads_.size(); ++i) {
            if (threads_[i].joinable()) {
                threads_[i].join();
            }
        }
    }
};

template <typename Iterator, typename Func>
void parallel_for_each(Iterator first, Iterator last, Func f) {
    unsigned long const length = std::distance(first, last);
    if (!length) return;

    unsigned long const min_per_thread = 25;
    unsigned long const max_threads = (length + min_per_thread - 1) / min_per_thread;
    unsigned long const hardware_threads = std::thread::hardware_concurrency();
    unsigned long const num_threads = std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads);
    unsigned long const block_size = length / num_threads;

    std::vector<std::future<void> > futures(num_threads - 1);
    std::vector<std::thread> threads(num_threads - 1);
    join_threads joiner(threads);

    Iterator block_start = first;
    for (unsigned long i = 0; i < (num_threads - 1); ++i) {
        Iterator block_end = block_start;
        std::advance(block_end, block_size);
        std::packaged_task<void(void)> task(
            [=]() { std::for_each(block_start, block_end, f); });
        futures[i] = task.get_future();
        threads[i] = std::thread(std::move(task));
        block_start = block_end;
    }
    std::for_each(block_start, last, f);
    for (unsigned long i = 0; i < (num_threads - 1); ++i) {
        futures[i].get();
    }
}

// 也可以用std::async简化
template <typename Iterator, typename Func>
void parallel_for_each(Iterator first, Iterator last, Func f) {
    unsigned long const length = std::distance(first, last);
    if (!length) return;

    unsigned long const min_per_thread = 25;

    if (length < (2 * min_per_thread)) {
        std::for_each(first, last, f);
    } else {
        Iterator const mid_point = first + length / 2;
        std::future<void> first_half =
            std::async(&parallel_for_each<Iterator, Func>, first, mid_point, f);
        parallel_for_each(mid_point, last, f);
        first_half.get();
    }
}

接着是find,使用std::promise在工作线程中设定最终结果,查找到后设置原子变量done_flag终止其他线程。

代码语言:javascript
复制
template <typename Iterator, typename MatchType>
Iterator parallel_find(Iterator first, Iterator last, MatchType match) {
    struct find_element {
        void operator()(Iterator begin, Iterator end, MatchType match,
                        std::promise<Iterator>* result,
                        std::atomic<bool>* done_flag) {
            try {
                for (; (begin != end) && !done_flag->load(); ++begin) {
                    if (*begin == match) {
                        result->set_value(begin);
                        done_flag->store(true);
                        return;
                    }
                }
            } catch (...) {
                try {
                    result->set_exception(std::current_exception());
                    done_flag->store(true);
                } catch (...) {
                }
            }
        }
    };

    unsigned long const length = std::distance(first, last);
    if (!length) return last;
    unsigned long const min_per_thread = 25;
    unsigned long const max_threads = (length + min_per_thread - 1) / min_per_thread;
    unsigned long const hardware_threads = std::thread::hardware_concurrency();
    unsigned long const num_threads = std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads);
    unsigned long const block_size = length / num_threads;

    std::promise<Iterator> result;
    std::atomic<bool> done_flag(false);
    std::vector<std::thread> threads(num_threads - 1);
    
    {
        join_threads joiner(threads);
        
        Iterator block_start = first;
        for (unsigned long i = 0; i < (num_threads - 1); ++i) {
            Iterator block_end = block_start;
            std::advance(block_end, block_size);
            threads[i] = std::thread(find_element(), block_start, block_end, match, &result, &done_flag);
            block_start = block_end;
        }
        find_element()(block_start, last, match, &result, &done_flag);
    }
    if (!done_flag.load()) {
        return last;
    }
    return result.get_future().get();
}
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2023-08-29,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 C语言与CPP编程 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档