前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >《C++并发编程实战》读书笔记(4):设计并发数据结构

《C++并发编程实战》读书笔记(4):设计并发数据结构

作者头像
C语言与CPP编程
发布2023-08-10 08:39:28
4080
发布2023-08-10 08:39:28
举报
文章被收录于专栏:c语言与cpp编程

本文包括第6章设计基于锁的并发数据结构与第7章设计无锁数据结构,后者实在有些烧脑了。此外,发现吴天明版的中译本有太多太离谱的翻译错误了,还得是中英对照才行:)

第6章 设计基于锁的并发数据结构

设计支持并发访问的数据结构时,一方面需要确保访问安全,通常需要限定其提供的接口,另一方面需要是按真正的并发操作,仅利用互斥保护并发实际上是串行化。

设计基于锁的并发数据结构的奥义就是确保先锁定合适的互斥,再访问数据,并尽可能缩短持锁时间。

可以采用锁实现线程安全的栈容器。

代码语言:javascript
复制
struct empty_stack : std::exception {
    const char* what() const throw() { return "empty stack"; }
};

template <typename T>
class threadsafe_stack {
private:
    std::stack<T> data;
    mutable std::mutex m;

public:
    threadsafe_stack() {}
    threadsafe_stack(const threadsafe_stack& other) {
        std::lock_guard<std::mutex> lock(other.m);
        data = other.data;
    }
    threadsafe_stack& operator=(const threadsafe_stack&) = delete;

    void push(T new_value) {
        std::lock_guard<std::mutex> lock(m);
        data.push(std::move(new_value));
    }
    std::shared_ptr<T> pop() {
        std::lock_guard<std::mutex> lock(m);
        if (data.empty()) throw empty_stack();
        std::shared_ptr<T> const res(
            std::make_shared<T>(std::move(data.top())));
        data.pop();
        return res;
    }
};

可以采用锁和条件变量实现线程安全的队列容器。data_queue中存储shared_ptr而非原始值,是为了把shared_ptr的初始化从wait_and_pop移动到push处,使得wait_and_pop中不会抛出异常。否则假如push操作通知条件变量时有多个消费者线程在等待,被notify_one通知到的消费者线程初始化shared_ptr时抛出异常,那么其他消费者线程不会被唤醒。

代码语言:javascript
复制
#include <condition_variable>
#include <memory>
#include <mutex>
#include <queue>

template <typename T>
class threadsafe_queue {
private:
    mutable std::mutex mut;
    std::queue<std::shared_ptr<T> > data_queue;
    std::condition_variable data_cond;

public:
    threadsafe_queue() {}

    std::shared_ptr<T> wait_and_pop() {
        std::unique_lock<std::mutex> lk(mut);
        data_cond.wait(lk, [this] { return !data_queue.empty(); });
        std::shared_ptr<T> res = data_queue.front();
        data_queue.pop();
        return res;
    }

    void push(T new_value) {
        std::shared_ptr<T> data(std::make_shared<T>(std::move(new_value)));
        std::lock_guard<std::mutex> lk(mut);
        data_queue.push(data);
        data_cond.notify_one();
    }
};

上面的容器直接保护整个data_queue,只用了一个互斥,可以采取更精细粒度的锁操作。为此使用基于单向链表实现的队列,单向链表包含一个不含数据的头节点,后续的每个节点存储指向数据的指针与指向下一个节点的指针。这样的话,就可以对头尾节点分别加锁,减小锁的粒度。

代码语言:javascript
复制
template <typename T>
class threadsafe_queue {
private:
    struct node {
        std::shared_ptr<T> data;
        std::unique_ptr<node> next;
    };

    std::mutex head_mutex;
    std::mutex tail_mutex;
    std::unique_ptr<node> head;
    node* tail;
    std::condition_variable data_cond;
    
    std::unique_ptr<node> wait_pop_head() {
        std::unique_lock<std::mutex> head_lock(wait_for_data());
        return pop_head();
    }
    
    std::unique_lock<std::mutex> wait_for_data() {
        std::unique_lock<std::mutex> head_lock(head_mutex);
        data_cond.wait(head_lock, [&] { return head != get_tail(); });
        return std::move(head_lock);
    }
    
    node* get_tail() {
        std::lock_guard<std::mutex> tail_lock(tail_mutex);
        return tail;
    }
    
    std::unique_ptr<node> pop_head() {
        std::unique_ptr<node> const old_head = std::move(head);
        head = std::move(old_head->next);
        return old_head;
    }

public:
    threadsafe_queue() : head(new node), tail(head.get()) {}
    threadsafe_queue(const threadsafe_queue& other) = delete;
    threadsafe_queue& operator=(const threadsafe_queue& other) = delete;

    std::shared_ptr<T> wait_and_pop() {
        std::unique_ptr<node> const old_head = wait_pop_head();
        return old_head->data;
    }
    
    void push(T new_value) {
        std::shared_ptr<T> new_data(std::make_shared<T>(std::move(new_value)));
        std::unique_ptr<node> p(new node);
        {
            std::lock_guard<std::mutex> tail_lock(tail_mutex);
            tail->data = new_data;
            node* const new_tail = p.get();
            tail->next = std::move(p);
            tail = new_tail;
        }
        data_cond.notify_one();
    }
};

与栈和队列相比,大多数数据结构支持多种多样的操作,要考虑更多访问模式。例如对于字典来说,首先只考虑基本操作增删改查,其次实现上来说散列表比二叉树和有序数组更支持细粒度的锁,因此必须放弃std::map支持的多余接口、迭代器操作、默认的底层实现。

代码语言:javascript
复制
template <typename Key, typename Value, typename Hash = std::hash<Key> >
class threadsafe_lookup_table {
private:
    class bucket_type {
    private:
        typedef std::pair<Key, Value> bucket_value;
        typedef std::list<bucket_value> bucket_data;
        typedef typename bucket_data::iterator bucket_iterator;

        bucket_data data;
        mutable std::shared_mutex mutex;

        bucket_iterator find_entry_for(Key const& key) const {
            return std::find_if(
                data.begin(), data.end(),
                [&](bucket_value const& item) { return item.first == key; });
        }

    public:
        Value value_for(Key const& key, Value const& default_value) const {
            std::shared_lock<std::shared_mutex> lock(mutex);
            bucket_iterator const found_entry = find_entry_for(key);
            return (found_entry == data.end()) ? default_value
                                               : found_entry->second;
        }

        void add_or_update_mapping(Key const& key, Value const& value) {
            std::unique_lock<std::shared_mutex> lock(mutex);
            bucket_iterator const found_entry = find_entry_for(key);
            if (found_entry == data.end()) {
                data.push_back(bucket_value(key, value));
            } else {
                found_entry->second = value;
            }
        }

        void remove_mapping(Key const& key) {
            std::unique_lock<std::shared_mutex> lock(mutex);
            bucket_iterator const found_entry = find_entry_for(key);
            if (found_entry != data.end()) {
                data.erase(found_entry);
            }
        }
    };

    std::vector<std::unique_ptr<bucket_type> > buckets;
    Hash hasher;

    bucket_type& get_bucket(Key const& key) const {
        std::size_t const bucket_index = hasher(key) % buckets.size();
        return *buckets[bucket_index];
    }

public:
    typedef Key key_type;
    typedef Value mapped_type;
    typedef Hash hash_type;

    threadsafe_lookup_table(unsigned num_buckets = 19,
                            Hash const& hasher_ = Hash())
        : buckets(num_buckets), hasher(hasher_) {
        for (unsigned i = 0; i < num_buckets; ++i) {
            buckets[i].reset(new bucket_type);
        }
    }

    threadsafe_lookup_table(threadsafe_lookup_table const& other) = delete;
    threadsafe_lookup_table& operator=(threadsafe_lookup_table const& other) =
        delete;

    Value value_for(Key const& key,
                    Value const& default_value = Value()) const {
        return get_bucket(key).value_for(key, default_value);
    }

    void add_or_update_mapping(Key const& key, Value const& value) {
        get_bucket(key).add_or_update_mapping(key, value);
    }

    void remove_mapping(Key const& key) { get_bucket(key).remove_mapping(key); }
};

再例如对于链表来说,倘若想让链表支持迭代,而STL风格的迭代器的生命周期完全不受容器控制,可以以成员函数的形式提供迭代功能。为了减小锁的粒度,干脆让每个节点都有自己的互斥。

代码语言:javascript
复制
template <typename T>
class threadsafe_list {
    struct node {
        std::mutex m;
        std::shared_ptr<T> data;
        std::unique_ptr<node> next;

        node() : next() {}
        node(T const& value) : data(std::make_shared<T>(value)) {}
    };

    node head;

public:
    threadsafe_list() {}

    ~threadsafe_list() {
        remove_if([](T const&) { return true; });
    }

    threadsafe_list(threadsafe_list const& other) = delete;
    threadsafe_list& operator=(threadsafe_list const& other) = delete;

    void push_front(T const& value) {
        std::unique_ptr<node> new_node(new node(value));
        std::lock_guard<std::mutex> lk(head.m);
        new_node->next = std::move(head.next);
        head.next = std::move(new_node);
    }

    template <typename Function>
    void for_each(Function f) {
        node* current = &head;
        std::unique_lock<std::mutex> lk(head.m);
        while (node* const next = current->next.get()) {
            std::unique_lock<std::mutex> next_lk(next->m);
            lk.unlock();
            f(*next->data);
            current = next;
            lk = std::move(next_lk);
        }
    }
};

第7章 设计无锁数据结构

非阻塞是指没有使用互斥、条件变量、future进行同步。仅仅非阻塞往往并不足够,例如第5章使用atomic_flag实现自旋锁,非阻塞但效率并不高。

无锁代表如果多个线程共同操作同一份数据,那么有限步骤内其中某一线程能够完成自己的操作。采用无锁结构可以最大限度地实现并发并且提高代码健壮性,避免锁阻塞其他线程,或者持锁线程抛出异常时其他线程无法继续处理。

以下是采用引用计数和宽松原子操作的无锁栈容器的实现。书中这段代码讲解了28页,有兴趣的可以仔细读读原文。简略地讲,引用计数的作用是避免pop时重复delete;由于缺乏原子的共享指针,所以将引用计数分为内部与外部两个,手动管理引用计数,每当指针被读取外部计数器自增,读取完成内部计数器自减。

代码语言:javascript
复制
template <typename T>
class lock_free_stack {
private:
    struct node;
    struct counted_node_ptr {
        int external_count;
        node* ptr;
    };
    struct node {
        std::shared_ptr<T> data;
        std::atomic<int> internal_count;
        counted_node_ptr next;
        node(T const& data_)
            : data(std::make_shared<T>(data_)), internal_count(0) {}
    };
    // counted_node_ptr足够小,可以无锁实现原子变量
    std::atomic<counted_node_ptr> head;
    void increase_head_count(counted_node_ptr& old_counter) {
        counted_node_ptr new_counter;
        do {
            new_counter = old_counter;
            ++new_counter.external_count;
        } while (!head.compare_exchange_strong(old_counter, new_counter,
                                               std::memory_order_acquire,
                                               std::memory_order_relaxed));
        old_counter.external_count = new_counter.external_count;
    }

public:
    ~lock_free_stack() {
        while (pop())
            ;
    }
    void push(T const& data) {
        counted_node_ptr new_node;
        new_node.ptr = new node(data);
        new_node.external_count = 1;
        new_node.ptr->next = head.load(std::memory_order_relaxed);
    // 若head==next则head=next返回true
    // 否则代表head被其他线程修改,则next=head返回false
        while (!head.compare_exchange_weak(new_node.ptr->next, new_node,
                                           std::memory_order_release,
                                           std::memory_order_relaxed))
            ;
    }
    std::shared_ptr<T> pop() {
        counted_node_ptr old_head = head.load(std::memory_order_relaxed);
        for (;;) {
    // 递增外部计数,表示正被指涉
            increase_head_count(old_head);
            node* const ptr = old_head.ptr;
    // 已经到栈底
            if (!ptr) {
                return std::shared_ptr<T>();
            }
            if (head.compare_exchange_strong(old_head, ptr->next,
                                             std::memory_order_relaxed)) {
    // 当前线程成功弹出并独占head
                std::shared_ptr<T> res;
                res.swap(ptr->data);
    // 头节点弹出栈,当前线程也不访问,总共减2
                int const count_increase = old_head.external_count - 2;
                if (ptr->internal_count.fetch_add(count_increase,
                                                  std::memory_order_release) ==
                    -count_increase) {
    // 内部计数为0,删除节点
                    delete ptr;
                }
                return res;
    // 如果当前线程最后一个持有指针
            } else if (ptr->internal_count.fetch_add(
                           -1, std::memory_order_relaxed) == 1) {
                ptr->internal_count.load(std::memory_order_acquire);
                delete ptr;
            }
        }
    }
};

与栈不同的是,对于队列结构,push与pop访问不同部分‍

代码语言:javascript
复制
template <typename T>
class lock_free_queue {
private:
    struct node;
    struct counted_node_ptr {
        int external_count;
        node* ptr;
    };
    std::atomic<counted_node_ptr> head;
    std::atomic<counted_node_ptr> tail;

    struct node_counter {
        unsigned internal_count : 30;
        unsigned external_counters : 2;
    };

    struct node {
        std::atomic<T*> data;
        std::atomic<node_counter> count;
        std::atomic<counted_node_ptr> next;

        node() {
            node_counter new_count;
            new_count.internal_count = 0;
            new_count.external_counters = 2;
            count.store(new_count);
            next.ptr = nullptr;
            next.external_count = 0;
        }
     // 针对某节点释放引用
        void release_ref() {
            node_counter old_counter = count.load(std::memory_order_relaxed);
            node_counter new_counter;
            do {
                new_counter = old_counter;
                --new_counter.internal_count;
            } while (!count.compare_exchange_strong(old_counter, new_counter,
                                                    std::memory_order_acquire,
                                                    std::memory_order_relaxed));
            if (!new_counter.internal_count && !new_counter.external_counters) {
                delete this;
            }
        }
    };
    // 针对节点释放其外部计数器
    static void free_external_counter(counted_node_ptr& old_node_ptr) {
        node* const ptr = old_node_ptr.ptr;
        int const count_increase = old_node_ptr.external_count - 2;
        node_counter old_counter = ptr->count.load(std::memory_order_relaxed);
        node_counter new_counter;
        do {
            new_counter = old_counter;
            --new_counter.external_counters;
            new_counter.internal_count += count_increase;
        } while (!ptr->count.compare_exchange_strong(
            old_counter, new_counter, std::memory_order_acquire,
            std::memory_order_relaxed));
        if (!new_counter.internal_count && !new_counter.external_counters) {
            delete ptr;
        }
    }
    // 针对某节点获取新的引用
    static void increase_external_count(std::atomic<counted_node_ptr>& counter,
                                        counted_node_ptr& old_counter) {
        counted_node_ptr new_counter;
        do {
            new_counter = old_counter;
            ++new_counter.external_count;
        } while (!counter.compare_exchange_strong(old_counter, new_counter,
                                                  std::memory_order_acquire,
                                                  std::memory_order_relaxed));
        old_counter.external_count = new_counter.external_count;
    }

    void set_new_tail(counted_node_ptr& old_tail,
                      counted_node_ptr const& new_tail) {
        node* const current_tail_ptr = old_tail.ptr;
        while (!tail.compare_exchange_weak(old_tail, new_tail) &&
               old_tail.ptr == current_tail_ptr)
            ;
        if (old_tail.ptr == current_tail_ptr)
            free_external_counter(old_tail);
        else
            current_tail_ptr->release_ref();
    }

public:
    std::unique_ptr<T> pop() {
        counted_node_ptr old_head = head.load(std::memory_order_relaxed);
        for (;;) {
            increase_external_count(head, old_head);
            node* const ptr = old_head.ptr;
            if (ptr == tail.load().ptr) {
                return std::unique_ptr<T>();
            }
            counted_node_ptr next = ptr->next.load();
            if (head.compare_exchange_strong(old_head, next)) {
                T* const res = ptr->data.exchange(nullptr);
                free_external_counter(old_head);
                return std::unique_ptr<T>(res);
            }
            ptr->release_ref();
        }
    }

    void push(T new_value) {
        std::unique_ptr<T> new_data(new T(new_value));
        counted_node_ptr new_next;
        new_next.ptr = new node;
        new_next.external_count = 1;
        counted_node_ptr old_tail = tail.load();
        for (;;) {
            increase_external_count(tail, old_tail);
            T* old_data = nullptr;
            if (old_tail.ptr->data.compare_exchange_strong(old_data,
                                                           new_data.get())) {
                counted_node_ptr old_next = {0};
                if (!old_tail.ptr->next.compare_exchange_strong(old_next,
                                                                new_next)) {
                    delete new_next.ptr;
                    new_next = old_next;
                }
                set_new_tail(old_tail, new_next);
                new_data.release();
                break;
            } else {
         // 更新失败,转而协助成功的线程
                counted_node_ptr old_next = {0};
         // 将next指向本线程分配的节点充当尾节点
                if (old_tail.ptr->next.compare_exchange_strong(old_next,
                                                               new_next)) {
                    old_next = new_next;
         // 成功的话分配新节点为下次压入做准备
                    new_next.ptr = new node;
                }
         // 设置尾节点,重新循环
                set_new_tail(old_tail, old_next);
            }
        }
    }
};

相信读者已经了解了正确写出无锁代码的困难与繁复。如果想自行设计,请注意以下原则:

1、在原型设计中使用std::memory_order_seq_cst次序,便于分析和推理;

2、使用无锁的内存回收方案,例如上面的引用计数;

3、防范ABA问题,即两次读取变量的值都相同,但其实变量已经被修改过多次,解决办法是将变量与其计数器绑定;

4、找出忙等循环,协助其他线程,例如两线程同时压入队列的话某一线程就会忙等循环,可以像上面队列中的实现一样,多个线程同时push只有一个能成功,失败的线程转而协助成功线程。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2023-07-21,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 C语言与CPP编程 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
容器服务
腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档