今天继续更新《Effective C++》和《C++并发编程实战》的读书笔记,下面是已经更新过的内容:
《Effective C++》读书笔记(1):让自己习惯C++
《Effective C++》读书笔记(2):构造/析构/赋值运算
第8章 设计并发代码
在线程间切分任务的方法包括:1、先在线程间切分数据再处理。2、以递归的方式划分数据,例如快排中每次以某元素为界将数据集划分为大小两部分然后再递归处理。3、依据工作类别划分任务,每部分代码只承担单一的功能职责。
例如第四章中演示了使用std::async实现并行版快排,也可以自己实现:
template <typename T>
struct sorter {
struct chunk_to_sort {
std::list<T> data;
std::promise<std::list<T> > promise;
};
thread_safe_stack<chunk_to_sort> chunks;
std::vector<std::thread> threads;
unsigned const max_thread_count;
std::atomic<bool> end_of_data;
sorter()
: max_thread_count(std::thread::hardware_concurrency() - 1),
end_of_data(false) {}
~sorter() {
end_of_data = true;
for (unsigned i = 0; i < threads.size(); ++i) {
threads[i].join();
}
}
std::list<T> do_sort(std::list<T>& chunk_data) {
if (chunk_data.empty()) {
return chunk_data;
}
std::list<T> result;
result.splice(result.begin(), chunk_data, chunk_data.begin());
T const& partition_val = *result.begin();
auto divide_point =
std::partition(chunk_data.begin(), chunk_data.end(),
[&](T const& val) { return val < partition_val; });
chunk_to_sort new_lower_chunk;
new_lower_chunk.data.splice(new_lower_chunk.data.end(), chunk_data,
chunk_data.begin(), divide_point);
std::future<std::list<T> > new_lower =
new_lower_chunk.promise.get_future();
chunks.push(std::move(new_lower_chunk));
if (threads.size() < max_thread_count) {
threads.push_back(std::thread(&sorter<T>::sort_thread, this));
}
std::list<T> new_higher(do_sort(chunk_data));
result.splice(result.end(), new_higher);
while (new_lower.wait_for(std::chrono::seconds(0)) !=
std::future_status::ready) {
try_sort_chunk();
}
result.splice(result.begin(), new_lower.get());
return result;
}
void sort_thread() {
while (!end_of_data) {
try_sort_chunk();
std::this_thread::yield();
}
}
void try_sort_chunk() {
std::shared_ptr<chunk_to_sort> chunk = chunks.pop();
if (chunk) {
chunk->promise.set_value(do_sort(chunk->data));
}
}
};
template <typename T>
std::list<T> parallel_quick_sort(std::list<T> input) {
if (input.empty()) {
return input;
}
sorter<T> s;
return s.do_sort(input);
}
影响并发代码性能的因素包括:1、处理器的数量,C++11可以通过std::thread::hardware_concurrency获取硬件线程数量。2、数据竞争和缓存乒乓,例如循环处理原子变量或加锁时变量所含的数据在不同缓存之间多次来回传递,严重影响程序性能。3、不经意共享,多个线程访问同一个数据的不同元素时可能访问同一个缓存块,造成数据乒乓;C++17定义了std::hardware_destructive_interference_size用于表示一个字节数的限度,数据分布范围超出该值就不会有不经意共享。4、数据的紧凑程度,单个线程访问的数据可能属于不同内存块,C++17定义了std::hardward_constructive_interference_size表示同一缓存块保证容纳的最大连续字节数。5、过度任务切换和线程过饱和。
还有一些要额外考虑的因素:1、异常安全,某线程上有函数因异常而退出会导致整个程序被终结。2、可伸缩性,尽可能令并发程度达到最大并确保实际工作量足以让多个处理器有效运行。3、利用多线程掩藏等待行为,提高响应能力。
下面是标准库部分并行化实现的例子。
首先是for_each,使用RAII类join_threads,用future<void>存储工作线程的返回值,用futures[i].get()传递异常。
class join_threads {
std::vector<std::thread>& threads_;
public:
explicit join_threads(std::vector<std::thread>& threads)
: threads_(threads) {}
~join_threads() {
for (std::size_t i = 0; i < threads_.size(); ++i) {
if (threads_[i].joinable()) {
threads_[i].join();
}
}
}
};
template <typename Iterator, typename Func>
void parallel_for_each(Iterator first, Iterator last, Func f) {
unsigned long const length = std::distance(first, last);
if (!length) return;
unsigned long const min_per_thread = 25;
unsigned long const max_threads = (length + min_per_thread - 1) / min_per_thread;
unsigned long const hardware_threads = std::thread::hardware_concurrency();
unsigned long const num_threads = std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads);
unsigned long const block_size = length / num_threads;
std::vector<std::future<void> > futures(num_threads - 1);
std::vector<std::thread> threads(num_threads - 1);
join_threads joiner(threads);
Iterator block_start = first;
for (unsigned long i = 0; i < (num_threads - 1); ++i) {
Iterator block_end = block_start;
std::advance(block_end, block_size);
std::packaged_task<void(void)> task(
[=]() { std::for_each(block_start, block_end, f); });
futures[i] = task.get_future();
threads[i] = std::thread(std::move(task));
block_start = block_end;
}
std::for_each(block_start, last, f);
for (unsigned long i = 0; i < (num_threads - 1); ++i) {
futures[i].get();
}
}
// 也可以用std::async简化
template <typename Iterator, typename Func>
void parallel_for_each(Iterator first, Iterator last, Func f) {
unsigned long const length = std::distance(first, last);
if (!length) return;
unsigned long const min_per_thread = 25;
if (length < (2 * min_per_thread)) {
std::for_each(first, last, f);
} else {
Iterator const mid_point = first + length / 2;
std::future<void> first_half =
std::async(¶llel_for_each<Iterator, Func>, first, mid_point, f);
parallel_for_each(mid_point, last, f);
first_half.get();
}
}
接着是find,使用std::promise在工作线程中设定最终结果,查找到后设置原子变量done_flag终止其他线程。
template <typename Iterator, typename MatchType>
Iterator parallel_find(Iterator first, Iterator last, MatchType match) {
struct find_element {
void operator()(Iterator begin, Iterator end, MatchType match,
std::promise<Iterator>* result,
std::atomic<bool>* done_flag) {
try {
for (; (begin != end) && !done_flag->load(); ++begin) {
if (*begin == match) {
result->set_value(begin);
done_flag->store(true);
return;
}
}
} catch (...) {
try {
result->set_exception(std::current_exception());
done_flag->store(true);
} catch (...) {
}
}
}
};
unsigned long const length = std::distance(first, last);
if (!length) return last;
unsigned long const min_per_thread = 25;
unsigned long const max_threads = (length + min_per_thread - 1) / min_per_thread;
unsigned long const hardware_threads = std::thread::hardware_concurrency();
unsigned long const num_threads = std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads);
unsigned long const block_size = length / num_threads;
std::promise<Iterator> result;
std::atomic<bool> done_flag(false);
std::vector<std::thread> threads(num_threads - 1);
{
join_threads joiner(threads);
Iterator block_start = first;
for (unsigned long i = 0; i < (num_threads - 1); ++i) {
Iterator block_end = block_start;
std::advance(block_end, block_size);
threads[i] = std::thread(find_element(), block_start, block_end, match, &result, &done_flag);
block_start = block_end;
}
find_element()(block_start, last, match, &result, &done_flag);
}
if (!done_flag.load()) {
return last;
}
return result.get_future().get();
}