我在C++17中实现了一个线程池,这与C++14并不是向后兼容的,因为使用了std::invoke_result
,这是C++17的新特性。
这个问题的重点是最佳实践,我很想知道这其中是否有一个看起来很有趣的™(也就是说,任何看起来不应该出现的奇怪的动作或东西)。
您可以找到这个这里的当前实现(使用审查后的修复和诸如此类的)。
实现分为两个文件:
#pragma once
#include <vector>
#include <thread>
#include <future> //packaged_task
#include <queue>
#include <functional> //bind
#include <mutex>
#include <condition_variable>
#include <type_traits> //invoke_result
class thread_pool {
public:
thread_pool(size_t thread_count);
~thread_pool();
//since std::thread objects are not copiable, it doesn't make sense for a thread_pool
// to be copiable.
thread_pool(const thread_pool &) = delete;
thread_pool &operator=(const thread_pool &) = delete;
//F must be Callable, and invoking F with ...Args must be well-formed.
template <typename F, typename ...Args>
auto execute(F, Args&&...);
private:
//_task_container_base and _task_container exist simply as a wrapper around a
// MoveConstructible - but not CopyConstructible - Callable object. Since an
// std::function requires a given Callable to be CopyConstructible, we cannot
// construct one from a lambda function that captures a non-CopyConstructible
// object (such as the packaged_task declared in execute) - because a lambda
// capturing a non-CopyConstructible object is not CopyConstructible.
//_task_container_base exists only to serve as an abstract base for _task_container.
class _task_container_base {
public:
virtual ~_task_container_base() {};
virtual void operator()() = 0;
};
//_task_container takes a typename F, which must be Callable and MoveConstructible.
// Furthermore, F must be callable with no arguments; it can, for example, be a
// bind object with no placeholders.
// F may or may not be CopyConstructible.
template <typename F>
class _task_container : public _task_container_base {
public:
//here, std::forward is needed because we need the construction of _f *not* to
// bind an lvalue reference - it is not a guarantee that an object of type F is
// CopyConstructible, only that it is MoveConstructible.
_task_container(F &&func) : _f(std::forward<F>(func)) {}
void operator()() override {
_f();
}
private:
F _f;
};
//returns a unique_ptr to a _task_container that wraps around a given function
// for details on _task_container_base and _task_container, see above
// This exists so that _Func may be inferred from f.
template <typename _Func>
static std::unique_ptr<_task_container_base> allocate_task_container(_Func &&f) {
//in the construction of the _task_container, f must be std::forward'ed because
// it may not be CopyConstructible - the only requirement for an instantiation
// of a _task_container is that the parameter is of a MoveConstructible type.
return std::unique_ptr<_task_container_base>(
new _task_container<_Func>(std::forward<_Func>(f))
);
}
std::vector<std::thread> _threads;
std::queue<std::unique_ptr<_task_container_base>> _tasks;
std::mutex _task_mutex;
std::condition_variable _task_cv;
bool _stop_threads = false;
};
template <typename F, typename ...Args>
auto thread_pool::execute(F function, Args &&...args) {
std::unique_lock<std::mutex> queue_lock(_task_mutex, std::defer_lock);
std::packaged_task<std::invoke_result_t<F, Args...>()> task_pkg(
std::bind(function, args...)
);
std::future<std::invoke_result_t<F, Args...>> future = task_pkg.get_future();
queue_lock.lock();
//this lambda move-captures the packaged_task declared above. Since the packaged_task
// type is not CopyConstructible, the function is not CopyConstructible either -
// hence the need for a _task_container to wrap around it.
_tasks.emplace(
allocate_task_container([task(std::move(task_pkg))]() mutable { task(); })
);
queue_lock.unlock();
_task_cv.notify_one();
return std::move(future);
}
#include "threadpool.h"
thread_pool::thread_pool(size_t thread_count) {
for (size_t i = 0; i < thread_count; ++i) {
//start waiting threads. Workers listen for changes through
// the thread_pool member condition_variable
_threads.emplace_back(
std::thread(
[&]() {
std::unique_lock<std::mutex> queue_lock(_task_mutex, std::defer_lock);
while (true) {
queue_lock.lock();
_task_cv.wait(
queue_lock,
[&]() -> bool { return !_tasks.empty() || _stop_threads; }
);
//used by dtor to stop all threads without having to
// unceremoniously stop tasks. The tasks must all be finished,
// lest we break a promise and risk a future object throwing
// an exception.
if (_stop_threads && _tasks.empty()) return;
//to initialize temp_task, we must move the unique_ptr from the
// queue to the local stack. Since a unique_ptr cannot be copied
// (obviously), it must be explicitly moved. This transfers
// ownership of the pointed-to object to *this, as specified in
// 20.11.1.2.1 [unique.ptr.single.ctor].
auto temp_task = std::move(_tasks.front());
_tasks.pop();
queue_lock.unlock();
(*temp_task)();
}
}
)
);
}
}
thread_pool::~thread_pool() {
_stop_threads = true;
_task_cv.notify_all();
for (std::thread &thread : _threads) {
thread.join();
}
}
driver.cpp (演示用法的简单文件)。没有测试,不需要审查)
#include <iostream>
#include <vector>
#include "threadpool.h"
int multiply(int x, int y) {
return x * y;
}
int main() {
thread_pool pool(4);
std::vector<std::future<int>> futures;
for (const int &x : { 2, 4, 7, 13 }) {
futures.push_back(pool.execute(multiply, x, 2));
}
for (auto &fut : futures) {
std::cout << fut.get() << std::endl;
}
return 0;
}
发布于 2019-06-07 04:09:20
在我看来,你的代码看起来非常好,结构也很好。它展示了现代C++编码成语。您还在代码中包含了对标准的引用。所有这些都是非常感谢的。
以下是一些建议:
#include
s排序。像这样:#include #include //bind #include //packaged_Task# #include #include #include > //invoke_result #include std::thread
的构造函数通过rvalue引用传递可调用对象。为什么不和它保持一致呢?std::unique_ptr<_task_container_base>
重复几种类型。考虑给它取个名字。此外,您的allocate_task_container
函数重复返回类型。而不是返回_task_container<_Func>(std::forward<_Func>(f)) (新_task_container<_Func>(std::forward<_Func>(f)););您可以只使用返回新_task_container<_Func>(std::forward<_Func>(f));https://codereview.stackexchange.com/questions/221626
复制相似问题