前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >《C++并发编程实战》读书笔记(1):并发、线程管控

《C++并发编程实战》读书笔记(1):并发、线程管控

作者头像
C语言与CPP编程
发布2023-08-10 08:19:27
3690
发布2023-08-10 08:19:27
举报
文章被收录于专栏:c语言与cpp编程

第1章 你好,C++并发世界

计算机系统中的并发包括任务切换与硬件并发,往往同时存在,关键因素是硬件支持的线程数。不论何种,本书谈论的技术都适用。

采用并发的理由主要是分离关注点与提升性能。但并发使得代码复杂、难懂、易错,不值得时无需采用并发。

并发的方式包括多进程与多线程。前者采用多个进程,每个进程只含一个线程,开销更大,通过昂贵的进程间通信来传递信息,但更安全并且可利用网络连接在不同计算机上并发。后者采用单一进程,内含多个线程,额外开销更低,但难以驾驭,往往暗含隐患。本书专攻多线程并发。

并发与并行都指可调配的硬件资源同时运行多个任务,但并行更强调性能,而并发更强调分离关注点或相应能力。

以一个简单的例子开启本书:

代码语言:javascript
复制
#include <iostream>
#include <thread>

void hello() { std::cout << "Hello Concurrent World\n"; }

int main() {
    std::thread t(hello);
    // join令主线程等待子线程
    t.join();
}

第2章 线程管控


2.1 线程的基本管控

每个C++程序都含有至少一个线程,即main函数所在线程。随后,程序可通过std::thread启动更多线程;它需要<thread>头文件,可以通过任何可调用类型(函数、伪函数、lambda等)发起线程。

代码语言:javascript
复制
void do_some_work();
std::thread my_thread(do_some_work);

启动线程后需要决定是与之汇合(join)还是与之分离(detach)。如果线程销毁时还没决定,那么线程会调用std::terminate终止整个程序。只有存在关联的执行线程时,即t.joinable()返回true,才能调用join/detach。

detach成员函数表示程序不等待线程结束,令线程在后台运行,归属权与控制权转交给C++运行时库。使用detach需确保所访问的外部数据始终正确有效,避免持有主线程的局部变量的指针/引用,否则主线程退出后该线程可能持有空悬指针/空悬引用。解决办法是将数据复制到新线程内部而非共享,或者使用join而非detach。

join成员函数的作用是等待线程的执行结束并回收线程资源;只能调用一次,之后就不再joinable。为了防止抛出异常时跳过join,导致程序崩溃有,可以实现一个RAII类,在析构函数中保证已经汇合过。

代码语言:javascript
复制
class thread_guard {
    std::thread& t_;

   public:
    explicit thread_guard(std::thread& t) : t_(t) {}
    ~thread_guard() {
        if (t_.joinable()) {
            t_.join();
        }
    }
    thread_guard(thread_guard const&) = delete;
    thread_guard& operator=(thread_guard const&) = delete;
};

2、向线程函数传递参数

直接向std::thread的构造函数添加更多参数即可给线程函数传递参数。不过参数是先按默认方式复制到线程内部存储空间,再被当成临时变量以右值形式传给线程函数。

例如下面的字符串字面量hello,先以const char*形式传入,再转化为std::string类型。

代码语言:javascript
复制
void f(const std::string &);
std::thread t(f,"hello");

但如果实参是指针,那么传入指针后构造string时,指针可能已经空悬。解决办法是传参时直接转换为string。

代码语言:javascript
复制
std::thread t(f,std::string(buffer));

如果线程函数的形参是左值引用,直接传入实参会被转化为右值再传入,导致错误。解决办法是用std::ref加以包装。

代码语言:javascript
复制
void f(int &i) { std::cout << i; }

int main() {
    int i = 3;
    std::thread t(f, std::ref(i));
}

想要使用成员函数作为线程函数的话,还需传入对象指针。例如下面的线程函数实际上调用w.f(i)。

代码语言:javascript
复制
class Widget {
   public:
    void f(int i) { cout << i; }
};

int main() {
    Widget w;
    int i = 4;
    std::thread t(&Widget::f, &w, i);
    t.join();
}

对于只能移动不能拷贝的参数,例如unique_ptr,若实参是临时变量则自动移动,若实参是具名变量则需使用move。

代码语言:javascript
复制
void f(std::unique_ptr<Widget>);

auto p = make_unique<Widget>();
std::thread t(f,std::move(p));

2.3 移交线程归属权

thread掌握资源,像unique_ptr一样只能移动不能拷贝;此外当thread关联一个线程时向其移动赋值会导致程序终止。支持移动操作的容器,例如vector,可以装载std::thread对象。

可以改进前文的thread_guard,使其支持构建并掌管线程,确保离开所在作用域前线程已完结。

代码语言:javascript
复制
class scoped_thread {
    std::thread t;

   public:
    explicit scoped_thread(std::thread t_) : t(std::move(t_)) {
        if (!t.joinable()) throw std::logic_error("No thread");
    }
    ~scoped_thread() { t.join(); }
    scoped_thread(scoped_thread const&) = delete;
    scoped_thread& operator=(scoped_thread const&) = delete;
};

// 使用统一初始化避免被解析为函数声明
scoped_thread t{std::thread(f)}; 

2.4 在运行时选择线程数量、线程ID

可以通过std::thread::hardware_concurrency()来获取可真正并发的线程数量,硬件信息无法获取时返回0。当用多线程分解任务时,该值是有用的指标。

以下是并行版accumulate的简易实现,根据硬件线程数计算实际需要运算的线程数,随后将任务分解到各个线程处理,最后汇总得到结果。

代码语言:javascript
复制
// 每个线程运行的子任务
template <typename Iterator, typename T>
struct accumulate_block {
    void operator()(Iterator first, Iterator last, T& result) {
        result = std::accumulate(first, last, result);
    }
};

template <typename Iterator, typename T>
T parallel_accumulate(Iterator first, Iterator last, T init) {
    unsigned long const length = std::distance(first, last);
    if (!length) return init;
    // 每个线程至少处理25个元素
    unsigned long const min_per_thread = 25;
    unsigned long const max_threads =
        (length + min_per_thread - 1) / min_per_thread;
    unsigned long const hardware_threads = std::thread::hardware_concurrency();
    // 无法获取硬件线程数时设置为2
    unsigned long const num_threads =
        std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads);
    unsigned long const block_size = length / num_threads;
    
    std::vector<T> results(num_threads);
    // 创建n-1个线程,因为本线程也进行运算任务
    std::vector<std::thread> threads(num_threads - 1);

    Iterator block_start = first;
    for (unsigned long i = 0; i < (num_threads - 1); ++i) {
        Iterator block_end = block_start;
        std::advance(block_end, block_size);
        threads[i] = std::thread(accumulate_block<Iterator, T>(), block_start,
                                 block_end, std::ref(results[i]));
        block_start = block_end;
    }
    accumulate_block<Iterator, T>()(block_start, last,
                                    results[num_threads - 1]);

    std::for_each(threads.begin(), threads.end(),
                  std::mem_fn(&std::thread::join));

    return std::accumulate(results.begin(), results.end(), init);
}

线程ID的类型是std::thread::id,可随意复制或比较。可以通过thread的get_id()成员函数获取,也可以通过std::this_thread::get_id()获取当前线程ID。


第3章 在线程间共享数据

3.1 线程间共享数据的问题

并发编程中操作由多个线程负责,争先让线程执行各自的操作,结果取决于它们执行的相对顺序,这就是条件竞争。恶性条件竞争会导致未定义行为。很经典的两个线程各自递增一个全局变量十万次的例子,理想情况下最后变量变为二十万,然而实际情况是这样:


3.2 用互斥保护共享数据

可以利用名为互斥的同步原语。C++线程库保证了一旦由线程锁住某个互斥,其他线程试图加锁时必须等待,直到原先加锁的线程将其解锁。注意应以合适的粒度加锁,仅在访问共享数据期间加锁,处理数据时尽可能解锁。

C++中通过构造std::mutex的实例来创建互斥,通过lock/unlock成员函数来加锁解锁。并不推荐直接调用成员函数,应使用其RAII类lock_guard,构造时加锁、析构时解锁。

代码语言:javascript
复制
// 使用互斥锁来保护some_list
std::list<int> some_list;
std::mutex some_mutex;

void add_to_list(int new_value)
{
    std::lock_guard<std::mutex> guard(some_mutex);
    some_list.push_back(new_value);
}

//C++17支持类模板参数推导与scoped_lock
void add_to_list(int new_value)
{
    std::scoped_lock guard(some_mutex);
    some_list.push_back(new_value);
}

然而仍可能出现未被保护的指针/引用,或者成员函数调用了不受掌控的其他函数,因此不能向锁所在的作用域之外传递受保护数据的指针/引用。然而即使用互斥保护,有些接口仍存在固有的条件竞争。例如对于栈来说:线程1判断栈非空,随后线程2取出元素,栈空,随后线程1取出元素时出错。下面是一个解决办法的示例:‍

代码语言:javascript
复制
template <typename T>
class threadsafe_stack {
   public:
    std::shared_ptr<T> pop() {
        std::lock_guard<std::mutex> lock(m);
        if (data.empty()) throw empty_stack();
        std::shared_ptr<T> const res(std::make_shared<T>(data.top()));
        data.pop();
        return res;
    }
    void pop(T& value) {
        std::lock_guard<std::mutex> lock(m);
        if (data.empty()) throw empty_stack();
        value = data.top();
        data.pop();
    }
    ...
};

最后,死锁是指两个线程都需要锁住两个互斥锁才能继续运行,而目前都只锁住一个,并苦苦等待对方解锁。以下是一些防范死锁的准则:1、如果已经持有锁,就不要获取第二个锁;确实需要获取多个锁时使用std::lock来一次性获取所有锁。2、一旦持锁,避免调用用户提供的程序接口避免嵌套锁。3、依从固定顺序获取锁。4、按层级加锁。5、事实上任何同步机制的循环等待都会导致死锁。

例如swap函数需要同时获取双方的锁时:

代码语言:javascript
复制
class X {   
public:
    friend void swap(X& lhs, X& rhs) {
        if (&lhs == &rhs) return;
        std::lock(lhs.m, rhs.m);
    // adopt_lock表示lhs.m已经上锁
        std::lock_guard<std::mutex> lock_a(lhs.m, std::adopt_lock);
        std::lock_guard<std::mutex> lock_b(rhs.m, std::adopt_lock);
        swap(lhs.some_detail, rhs.some_detail);
    }
    
    // C++17中
    friend void swap(X& lhs, X& rhs){
        if (&lhs == &rhs) return;
        std::scoped_lock guard(lhs.m,rhs.m);
        swap(lhs.some_detail, rhs.some_detail);
    }
};

unique_lock比lock_guard更灵活,不占有与之关联的互斥锁,但占用更多空间并且更慢。它提供了lock/try_lock/unlock成员函数;构造函数第二个参数传入adopt_lock表示互斥锁已上锁,传入defer_lock表示构造时无需上锁。unique_lock可移动不可复制,可以在不同作用域间转移互斥所有权,用途是让程序在同一个锁的保护下执行其他操作。


3.3 保护共享数据的其他工具

可以通过once_flag类和call_once函数来在初始化过程中保护共享数据。

代码语言:javascript
复制
std::once_flag resource_flag;
void init_resource(){ .. }

void run(){
  std::call_once(resource_flag, init_resource);
  ...
}

C++11还规定了静态数据只会初始化一次。那么单例模板类可以这样实现:

代码语言:javascript
复制
template<class T>
class Singleton {
public:
    static T& Instance() {
        static T instance;
        return instance;
    }

protected:
    Singleton() = default;
    ~Singleton() = default;

private:
    Singleton(const Singleton&) = delete;
    Singleton& operator=(const Singleton&) = delete;
};
// 使用方法:
class MyClass : public Singleton<MyClass> {
public:
    ...
private:
    MyClass();
    friend class Singleton<MyClass>;
};

对于读多写少的数据结构,C++14提供了shared_timed_mutex,C++17提供了功能更多的shared_mutex,那么写锁即lock_guard<shared_mutex>或unique_lock<shared_mutex>,读锁即shared_lock<shared_mutex>。

递归锁recursive_mutex允许同一线程对它多次加锁,释放所有锁后其他线程才可获取该锁。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2023-06-06,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 C语言与CPP编程 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
容器服务
腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档