前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >《C++并发编程实战》读书笔记(2):并发操作的同步

《C++并发编程实战》读书笔记(2):并发操作的同步

作者头像
C语言与CPP编程
发布2023-08-10 08:20:15
3480
发布2023-08-10 08:20:15
举报
文章被收录于专栏:c语言与cpp编程

第4章 并发操作的同步

4.1 等待事件或等待其他条件

如果线程甲需要等待线程乙完成任务,可以使用C++标准库的条件变量来等待事件发生。<condition_variable>中提供了condition_variable和condition_variable_any,前者只能配合mutex使用,而后者可以与任意符合互斥标准的类型使用,会产生额外开销。主要使用成员函数wait、notify_one、notify_all。

例如可以实现一个生产者消费者模型,通过队列来传递数据,一端准备数据另一端处理数据,其中条件变量的作用是消费者线程取出数据前检查队列是否非空,否则释放锁并等待生产者线程准备数据。

代码语言:javascript
复制
std::mutex mut;
std::queue<Widget> data_queue;
std::condition_variable data_cond;

void data_preparation_thread() {
    while (...) {
        const Widget data = prepare_data();
        {
          std::lock_guard<std::mutex> lk(mut);
          data_queue.push(data);
        }
        // 通知消费者线程
        data_cond.notify_one();
    }
}

void data_processing_thread() {
    while (...) {
        // 需要多次加锁解锁,所以用unique_lock
        std::unique_lock<std::mutex> lk(mut);
        // wait首先判断lambda,成立则返回,否则解锁互斥进入阻塞
        // 每次被notify后解除阻塞并获取锁,重复上述过程
        data_cond.wait(lk, [] { return !data_queue.empty(); });
        Widget data = data_queue.front();
        data_queue.pop();
        lk.unlock();
        process(data);
    }
}

也可以实现一个简略的线程安全的队列:

代码语言:javascript
复制
template <typename T>
class threadsafe_queue {
   private:
    mutable std::mutex mut;
    std::queue<T> data_queue;
    std::condition_variable data_cond;

   public:
    threadsafe_queue() {}
    threadsafe_queue(threadsafe_queue const& other) {
        std::lock_guard<std::mutex> lk(other.mut);
        data_queue = other.data_queue;
    }

    void push(T new_value) {
        std::lock_guard<std::mutex> lk(mut);
        data_queue.push(new_value);
        data_cond.notify_one();
    }

    void wait_and_pop(T& value) {
        std::unique_lock<std::mutex> lk(mut);
        data_cond.wait(lk, [this] { return !data_queue.empty(); });
        value = data_queue.front();
        data_queue.pop();
    }
};

4.2 使用future等待一次性事件发生

若线程需等待某一次性事件,可以以适当方式取得一个代表目标事件的future,此后线程就可以一边执行其他任务一边在future上等待。一旦目标事件发生,future就进入就绪状态,无法重置。

<future>中提供了两种类模板future和shared_future,同一事件仅可关联一个future实例,但可关联多个shared_future,并且目标事件发生后关联的所有shared_future实例都就绪。future本身不提供同步,多线程时需要用同步方式进行保护。


4.2.1 从后台任务返回值

并不急需某任务的返回值时,可以用async异步地启动任务,获得一个future对象;对后者调用get会阻塞当前线程,等待future准备完并返回该值。

代码语言:javascript
复制
int f() { ... }

std::future<int> answer = std::async(f);
...
std::cout << answer.get();

给async的任务函数传递参数类似给thread传递参数。

代码语言:javascript
复制
// 调用成员函数的情况
// p=&x; p->foo("hello")
auto f1 = std::async(&X::foo, &x, "hello");
// tmpx=x; tmpx.foo("hello")
auto f2 = std::async(&X::foo, x, "hello");

// 调用仿函数的情况
// tmpy=y; tmpy(3.14)
auto f3 = std::async(Y(), 3.14);
// y(3.14)
auto f4 = std::async(std::ref(y), 3.14);

// 函数形参为引用的情况
X baz(X&);
auto f5 = std::async(baz, std::ref(x));

可以给async传递参数指定运行方式,deferred代表直到在future上调用wait/get才执行任务函数,async代表开启专属线程来执行;默认为deferred|async。

代码语言:javascript
复制
auto f = std::async(std::launch::async, Y(), 1.2);

4.2.2 关联future实例与任务

类模板packaged_task把任务包装起来,可作为任务调度器、线程池的构建单元,其模板参数是函数签名,例如int(int,double*)。它具备函数调用操作符,参数取决于上述模板参数,调用时将参数传递给任务函数,通过get_future获取future对象,异步运行得到结果后保存到该对象。

例如图形用户界面需要接收其他线程的消息来更新界面。

代码语言:javascript
复制
std::mutex m;
std::deque<std::packaged_task<void()> > tasks;

// 图形用户界面的线程函数
void gui_thread() {
    while (...) {
        get_and_process_gui_message();
        std::packaged_task<void()> task;
        {
            std::lock_guard<std::mutex> lk(m);
            if (tasks.empty()) continue;
            task = std::move(tasks.front());
            tasks.pop_front();
        }
        task();
    }
}

// 其他线程通过该函数传递消息
template <typename Func>
std::future<void> post_task_for_gui_thread(Func f) {
    std::packaged_task<void()> task(f);
    std::future<void> res = task.get_future();
    std::lock_guard<std::mutex> lk(m);
    tasks.push_back(std::move(task));
    return res;
}

4.2.3 创建std::promise

有些任务无法以简单的函数调用表达,或者执行结果来自多个部分的代码,那么就需要使用std::promise显式地异步求值。

promise通过get_future获取关联的future对象,等待数据的线程在future上阻塞,提供数据的线程通过set_value设置数据,设置完后future即就绪。若promise销毁时仍未set_value,则传递异常。

下面是单线程处理多个连接的例子。这里假设传入的数据包含有ID与荷载数据,接收后将ID与promise对应,将相关值设为荷载数据。对于传出的数据而言,promise的相关值是代表是否成功的bool。

代码语言:javascript
复制
void process_connections(connection_set& connections) {
    while (...) {
        for (connection_iterator connection = ...) {
            if (connection->has_incoming_data()) {
                data_packet data = connection->incoming();
                std::promise<payload_type>& p = connection->get_promise(data.id);
                p.set_value(data.payload);
            }
            if (connection->has_outgoing_data()) {
                outgoing_packet data = connection->top_of_outgoing_queue();
                connection->send(data.payload);
                data.promise.set_value(true);
            }
        }
    }
}

async与packaged_task运行的函数抛出异常时会保存在future对象中,调用get时再次抛出。对于promise而言,应用set_exception保存异常

代码语言:javascript
复制
some_promise.set_exception(std::make_exception_ptr(std::logic_error("foo")));

4.2.4 多个线程一起等待

shared_future可以让多个线程等待同一个目标事件。每个线程复制一份shared_future副本,成为各线程独有的局部变量;通过该局部变量访问将由标准库自动同步,可以安全地访问。

代码语言:javascript
复制
std::promise<int> p1;
auto f1 = p1.get_future();
assert(f1.valid());
std::shared_future<int> sf1 = std::move(f1);
assert(!f1.valid());
assert(sf1.valid());

std::promise<int> p2;
auto sf2 = p2.get_future().share();

4.3 限时等待

之前介绍的所有可能阻塞的调用,其阻塞都可能漫无止境。为此可以采用一些超时机制:延迟超时表示等待一定时间,后缀为for,绝对超时表示等待到某时间点,后缀为until。

std::chrono库中时钟是时间信息的来源,每个时钟类都提供当前时刻now、时间值的类型time_point、计时单元的长度ratio<>、计时速率是否恒定is_steady。常用时钟类包括system_clock,steady_clock,high_resolution_clock。

时长类duration<>,其模板参数有两个,第一个指采用何种类型表示计时单元的数量,第二个指每个计时单元代表多少秒。例如std::chrono::duration<double,std::ratio<1,1000>>代表采用double值计数的毫秒时长类。

代码语言:javascript
复制
auto f = std::async(some_task);
if(f.wait_for(std::chrono::milliseconds(35))==
    std::future_status::ready){    
    process(f.get());
}

时间点类time_point<>,模板参数有两个,第一个指参考时钟,第二个指计时单元,即特化的duration。

代码语言:javascript
复制
std::condition_variable cv;
bool done;
std::mutex m;

bool wait_loop() {
    auto const timeout =
        std::chrono::steady_clock::now() + std::chrono::milliseconds(500);
    std::unique_lock<std::mutex> lk(m);
    while (!done) {
        if (cv.wait_until(lk, timeout) == std::cv_status::timeout) break;
    }
    return done;
}

4.4 运用同步操作简化代码

在并发实战中可以使用贴近函数式编程的风格,函数调用的结果完全取决于参数而非任何外部状态。线程间不会直接共享数据,而是由各任务分别预先准备妥自己所需的数据,随后通过future将结果发送到其他有需要的线程。

例如可以实现并行的快排:

代码语言:javascript
复制
template <typename T>
std::list<T> parallel_quick_sort(std::list<T> input) {
    if (input.empty()) {
        return input;
    }
    // 将input的开头剪切到result
    // 以此为分界值,将input分为两段
    std::list<T> result;
    result.splice(result.begin(), input, input.begin());
    T const& pivot = *result.begin();
    auto divide_point = std::partition(input.begin(), input.end(),
                                       [&](T const& t) { return t < pivot; });
    // 异步处理较小的一段
    std::list<T> lower_part;
    lower_part.splice(lower_part.end(), input, input.begin(), divide_point);
    std::future<std::list<T> > new_lower(std::async(&parallel_quick_sort<T>, std::move(lower_part)));
    // 本线程处理较大的一段
    auto new_higher(parallel_quick_sort(std::move(input)));
    // 汇合所有结果
    result.splice(result.end(), new_higher);
    result.splice(result.begin(), new_lower.get());
    return result;
}

除了函数式编程,CSP(通信式串行线程)也有同样特性,其中线程完全隔离,没有共享数据,通过管道传递消息。具体代码这里不再演示。

C++20中还提出两个新特性:latch和barrier。latch是一个同步对象,内含计数器,减到0时就绪。

代码语言:javascript
复制
void foo() {
    unsigned const thread_count = ...;
    latch done(thread_count);
    my_data data[thread_count];
    std::vector<std::future<void> > threads;
    for (unsigned i = 0; i < thread_count; ++i)
        threads.push_back(std::async(std::launch::async, [&, i] {
            data[i] = make_data(i);
            done.count_down();
            ...
        }));
    done.wait();
    process_data(data, thread_count);
}

而barrier针对一组给定的线程,每个线程运行到barrier处就阻塞,直到同组的所有线程都抵达才释放。

代码语言:javascript
复制
void process_data(data_source &source, data_sink &sink) {
    unsigned const num_threads = ...
    barrier sync(num_threads);
    std::vector<joining_thread> threads(num_threads);
    
    std::vector<data_chunk> chunks;
    result_block result;

    for (unsigned i = 0; i < num_threads; ++i) {
        threads[i] = joining_thread([&, i] {
            while (...) {
                if (!i) {
                    data_block current_block = source.get_next_data_block();
                    chunks = divide_into_chunks(current_block, num_threads);
                }
                sync.arrive_and_wait();
                result.set_chunk(i, num_threads, process(chunks[i]));
                sync.arrive_and_wait();
                if (!i) {
                    sink.write_data(std::move(result));
                }
            }
        });
    }
}
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2023-06-11,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 C语言与CPP编程 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档