首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >C++17线程池

C++17线程池
EN

Code Review用户
提问于 2019-06-04 05:31:56
回答 1查看 9.4K关注 0票数 11

我在C++17中实现了一个线程池,这与C++14并不是向后兼容的,因为使用了std::invoke_result,这是C++17的新特性。

这个问题的重点是最佳实践,我很想知道这其中是否有一个看起来很有趣的™(也就是说,任何看起来不应该出现的奇怪的动作或东西)。

您可以找到这个这里的当前实现(使用审查后的修复和诸如此类的)。

实现分为两个文件:

线程池.h

代码语言:javascript
运行
复制
#pragma once

#include <vector>
#include <thread>
#include <future> //packaged_task
#include <queue>
#include <functional> //bind
#include <mutex>
#include <condition_variable>
#include <type_traits> //invoke_result

class thread_pool {
public:
    thread_pool(size_t thread_count);
    ~thread_pool();
    
    //since std::thread objects are not copiable, it doesn't make sense for a thread_pool
    //  to be copiable.
    thread_pool(const thread_pool &) = delete;
    thread_pool &operator=(const thread_pool &) = delete;
    
    //F must be Callable, and invoking F with ...Args must be well-formed.
    template <typename F, typename ...Args>
    auto execute(F, Args&&...);
    
private:
    //_task_container_base and _task_container exist simply as a wrapper around a 
    //  MoveConstructible - but not CopyConstructible - Callable object. Since an
    //  std::function requires a given Callable to be CopyConstructible, we cannot
    //  construct one from a lambda function that captures a non-CopyConstructible
    //  object (such as the packaged_task declared in execute) - because a lambda
    //  capturing a non-CopyConstructible object is not CopyConstructible.
    
    //_task_container_base exists only to serve as an abstract base for _task_container.
    class _task_container_base {
    public:
        virtual ~_task_container_base() {};
        
        virtual void operator()() = 0;
    };
        
    //_task_container takes a typename F, which must be Callable and MoveConstructible.
    //  Furthermore, F must be callable with no arguments; it can, for example, be a
    //  bind object with no placeholders.
    //  F may or may not be CopyConstructible.
    template <typename F>
    class _task_container : public _task_container_base {
    public:
        //here, std::forward is needed because we need the construction of _f *not* to
        //  bind an lvalue reference - it is not a guarantee that an object of type F is
        //  CopyConstructible, only that it is MoveConstructible.
        _task_container(F &&func) : _f(std::forward<F>(func)) {}
        
        void operator()() override {
            _f();
        }

    private:
        F _f;
    };
    
    //returns a unique_ptr to a _task_container that wraps around a given function
    //  for details on _task_container_base and _task_container, see above
    //  This exists so that _Func may be inferred from f.
    template <typename _Func>
    static std::unique_ptr<_task_container_base> allocate_task_container(_Func &&f) {
        //in the construction of the _task_container, f must be std::forward'ed because
        //  it may not be CopyConstructible - the only requirement for an instantiation
        //  of a _task_container is that the parameter is of a MoveConstructible type.
        return std::unique_ptr<_task_container_base>(
            new _task_container<_Func>(std::forward<_Func>(f))
        );
    }
    
    std::vector<std::thread> _threads;
    std::queue<std::unique_ptr<_task_container_base>> _tasks;
    std::mutex _task_mutex;
    std::condition_variable _task_cv;
    bool _stop_threads = false;
};

template <typename F, typename ...Args>
auto thread_pool::execute(F function, Args &&...args) {
    std::unique_lock<std::mutex> queue_lock(_task_mutex, std::defer_lock);
    std::packaged_task<std::invoke_result_t<F, Args...>()> task_pkg(
        std::bind(function, args...)
    );
    std::future<std::invoke_result_t<F, Args...>> future = task_pkg.get_future();

    queue_lock.lock();
    //this lambda move-captures the packaged_task declared above. Since the packaged_task
    //  type is not CopyConstructible, the function is not CopyConstructible either -
    //  hence the need for a _task_container to wrap around it.
    _tasks.emplace(
        allocate_task_container([task(std::move(task_pkg))]() mutable { task(); })
    );
    queue_lock.unlock();

    _task_cv.notify_one();

    return std::move(future);
}

threadpool.cpp

代码语言:javascript
运行
复制
#include "threadpool.h"

thread_pool::thread_pool(size_t thread_count) {
    for (size_t i = 0; i < thread_count; ++i) {
        
        //start waiting threads. Workers listen for changes through
        //  the thread_pool member condition_variable
        _threads.emplace_back(
            std::thread(
                [&]() {
                std::unique_lock<std::mutex> queue_lock(_task_mutex, std::defer_lock);

                    while (true) {
                        queue_lock.lock();
                        _task_cv.wait(
                            queue_lock, 
                            [&]() -> bool { return !_tasks.empty() || _stop_threads; }
                        );

                        //used by dtor to stop all threads without having to
                        //  unceremoniously stop tasks. The tasks must all be finished,
                        //  lest we break a promise and risk a future object throwing
                        //  an exception.
                        if (_stop_threads && _tasks.empty()) return;

                        //to initialize temp_task, we must move the unique_ptr from the
                        //  queue to the local stack. Since a unique_ptr cannot be copied
                        //  (obviously), it must be explicitly moved. This transfers
                        //  ownership of the pointed-to object to *this, as specified in
                        //  20.11.1.2.1 [unique.ptr.single.ctor].
                        auto temp_task = std::move(_tasks.front());
                        
                        _tasks.pop();
                        queue_lock.unlock();

                        (*temp_task)();
                    }
                }
            )
        );
    }
}

thread_pool::~thread_pool() {
    _stop_threads = true;
    _task_cv.notify_all();

    for (std::thread &thread : _threads) {
        thread.join();
    }
}

driver.cpp (演示用法的简单文件)。没有测试,不需要审查)

代码语言:javascript
运行
复制
#include <iostream>
#include <vector>
#include "threadpool.h"

int multiply(int x, int y) {
    return x * y;
}

int main() {
    thread_pool pool(4);
    std::vector<std::future<int>> futures;
    
    for (const int &x : { 2, 4, 7, 13 }) {
        futures.push_back(pool.execute(multiply, x, 2));
    }
    
    for (auto &fut : futures) {
        std::cout << fut.get() << std::endl;
    }
    
    return 0;
}
EN

回答 1

Code Review用户

发布于 2019-06-07 04:09:20

在我看来,你的代码看起来非常好,结构也很好。它展示了现代C++编码成语。您还在代码中包含了对标准的引用。所有这些都是非常感谢的。

以下是一些建议:

  1. 我喜欢按字母顺序对#includes排序。像这样:#include #include //bind #include //packaged_Task# #include #include #include > //invoke_result #include
  2. 不要将类放在命名空间中。我建议这样做。
  3. std::thread的构造函数通过rvalue引用传递可调用对象。为什么不和它保持一致呢?
  4. 不是说//F必须是可调用的,使用...Args调用F必须是格式良好的。在评论中,为什么不用代码来表达你的意图呢?模板,int> = 0>自动执行(F&&,Args&…);
  5. 在所有私有类型和数据成员之前使用下划线。这可能是一个造型问题,但这并不是必要的,因为私人成员无论如何都不能引入名称冲突。
  6. std::unique_ptr<_task_container_base>重复几种类型。考虑给它取个名字。此外,您的allocate_task_container函数重复返回类型。而不是返回_task_container<_Func>(std::forward<_Func>(f)) (新_task_container<_Func>(std::forward<_Func>(f)););您可以只使用返回新_task_container<_Func>(std::forward<_Func>(f));
票数 2
EN
页面原文内容由Code Review提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://codereview.stackexchange.com/questions/221626

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档