首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

实现线程安全队列——细粒度锁实现

包含同步语义的简单实现

template

class ThreadSafeQueue

{

public:

void Push(T new_value)

{

std::lock_guard lk(m_mtx);

m_queue.push(std::move(new_value));

m_cond.notify_one(); // 1

}

void WaitAndPop(T &value) // 2

{

std::unique_lock lk(m_mtx);

m_cond.wait(lk, [this]

{ return !m_queue.empty(); });

value = std::move(m_queue.front());

m_queue.pop();

}

std::shared_ptr WaitAndPop() // 3

{

std::unique_lock lk(m_mtx);

m_cond.wait(lk, [this]

{ return !m_queue.empty(); }); // 4

std::shared_ptr res(

std::make_shared(std::move(m_queue.front())));

m_queue.pop();

return res;

}

bool TryPop(T &value)

{

std::lock_guard lk(m_mtx);

if (m_queue.empty())

return false;

value = std::move(m_queue.front());

m_queue.pop();

return true;

}

std::shared_ptr TryPop()

{

std::lock_guard lk(m_mtx);

if (m_queue.empty())

return std::shared_ptr(); // 5

std::shared_ptr res(

std::make_shared(std::move(m_queue.front())));

m_queue.pop();

return res;

}

bool Empty() const

{

std::lock_guard lk(m_mtx);

return m_queue.empty();

}

private:

mutable mutex m_mtx;

queue m_queue;

condition_variable m_cond;

};

复制代码

这个版本是最为简单的实现版本,直接用的stl库中的队列来实现,所有成员函数公用一把锁来实现线程安全,需要注意的点有以下几点:

条件变量产生的虚假唤醒,你可以通过手动while循环来避免,也可以通过在wait后面加上谓词条件(lamda表达式)

锁需要设置为mutable,保证const版本的成员函数可用

但这个实现有非常大的隐患和不足!

隐患

如果在调用WaitAndPop函数时发生了异常,由于可能有其他的线程也在调用WaitAndPop发生等待,而由于每次notify一个线程,一旦构造 std::shared_ptr的过程中发生异常,那么其他的线程将会陷入永久的等待!

解决方法: 由于异常发生在内存的申请过程中,我们如果把  中直接存入  那么就不会有这个问题。

改写后的代码如下:

template

class ThreadSafeQueue

{

public:

void Push(T new_value)

{

auto data = std::make_shared(std::move(new_value));

std::lock_guard lk(m_mtx);

m_queue.push(data);

m_cond.notify_one(); // 1

}

void WaitAndPop(T &value) // 2

{

std::unique_lock lk(m_mtx);

m_cond.wait(lk, [this]

{ return !m_queue.empty(); });

value = std::move(*m_queue.front());

m_queue.pop();

}

std::shared_ptr WaitAndPop() // 3

{

std::unique_lock lk(m_mtx);

m_cond.wait(lk, [this]

{ return !m_queue.empty(); }); // 4

std::shared_ptr res = m_queue.front();

m_queue.pop();

return res;

}

bool TryPop(T &value)

{

std::lock_guard lk(m_mtx);

if (m_queue.empty())

return false;

value = std::move(*m_queue.front());

m_queue.pop();

return true;

}

std::shared_ptr TryPop()

{

std::lock_guard lk(m_mtx);

if (m_queue.empty())

return std::shared_ptr(); // 5

std::shared_ptr res = m_queue.front();

m_queue.pop();

return res;

}

bool Empty() const

{

std::lock_guard lk(m_mtx);

return m_queue.empty();

}

private:

mutable mutex m_mtx;

queue> m_queue;

condition_variable m_cond;

};

复制代码

这个版本的代码不仅是预防了异常安全,同样性能也得到了很好的优化,Push 过程的内存申请过程可以放到临界区以外,提高了并发度。

设计细粒度锁队列提高并发

前面的简单版本,有个非常明显的不足,几乎没有任何并发的性能,因为所有的成员函数都必须加锁,临界区非常的大,这哪里是并发,这都强行变成了同步执行,那这样肯定不行啊,我们找找原因。

这个原因很简单,由于我们是通过stl内部的queue封装所实现的,我们的任何的成员函数操作实现都必须访问到这个共享变量,一旦变量被共享,要实现线程安全那就必须加锁同步,这便是原因所在了。

这就是我们现在要做的事情,把锁的粒度减少,实际就是把变量的共享和操作细分。

细粒度锁队列实现

实现简单队列

在这之前我们先自己实现一个简单的队列,如下:

template

class Queue

{

private:

struct node

{

T data_;

std::unique_ptr next_;

node(T data) : data_(std::move(data))

{}

};

std::unique_ptr m_head;

node *m_tail{};

public:

Queue() = default;

Queue(const Queue &other) = delete;

Queue &operator=(const Queue &other) = delete;

std::shared_ptr TryPop()

{

if (!m_head)

{

return nullptr;

}

auto ret = std::make_shared(std::move(m_head->data_));

auto oldHead = std::move(m_head);

m_head = std::move(oldHead->next_); //这里把next资源进行转移,防止oldHead析构后导致整个链表析构

return ret;

}

void Push(T new_value)

{

auto p = std::make_unique(new_value);

auto *new_tail = p.get();

if (m_tail)

{//如果队列不为空

m_tail->next_ = std::move(p);

} else

{//队列为空则需要特殊处理

m_head = std::move(p);

}

m_tail = new_tail;

}

};

复制代码

next指针为啥不用原始指针?嗯,其实应该要用原始指针的,这里偷个懒,为了不写delete语句,用的unique_ptr,在使用这个独占指针的时候记得要转移所有权,否则会出现连环析构的现象!

由于使用了unique_ptr管理next_指针,那么析构的时候会自动完成,但是会有个问题,如果队列中的数据量大的话,整个函数栈会爆掉,我亲自测试了下,大概存入的数据量达到1e4级别就会爆栈。。。但是没关系,我们将他用作并发编程中的队列时,用于生产消费的队列里的空闲任务一般也不会到达这个量级,当然有空的话也可以改进然后优化。

分析并发设计

我们再来简单分析下这个内存共享的情况pop操作需要用到head,push操作需要用到head和tail。但是有个严重的问题:除了这两个内存被共享外,由于未采用空头节点,两个成员函数内用  指针访问到的内存都可能发生共享(对应  与 )。这样的话很难在保证细粒度的情况下实现线程安全了。。。这样下去的实现还不如之前的。

通过分离数据实现并发

前面的隐患已经分析清楚了,如何解决它?你可以使用预分配一个虚拟节点(无数据),确保这个节点永远在队列的最后,用来分离头尾指针能访问的节点”的办法,走出这个困境。这样通过 pop 和 push 操作通过 next_ 指针访问到的数据就永远不可能是同一个数据了。 代码如下:

template

class Queue

{

private:

struct node

{

std::shared_ptr data_;

std::unique_ptr next_;

};

std::unique_ptr m_head;

node *m_tail;

public:

Queue():m_head(new node),m_tail(m_head.get()){}; //初始化空节点

Queue(const Queue &other) = delete;

Queue &operator=(const Queue &other) = delete;

std::shared_ptr TryPop()

{

if (m_head.get() == m_tail)

{

return nullptr;

}

auto ret = m_head->data_;

auto oldHead = std::move(m_head);

m_head = std::move(oldHead->next_);

return ret;

}

void Push(T new_value)

{

auto data = std::make_shared(std::move(new_value));

auto p = std::make_unique(new_value); //新的空节点

m_tail->data_ = data;

//开始移动补充最后的空节点

auto* new_tail = p.get();

m_tail->next_ = std::move(p);

m_tail = new_tail;

}

};

复制代码

现在两个操作共享的内存就只有 m_head 和 m_tail 了,而且在 Push 操作中只使用到了共享内存 m_tail,那么接下来的并发安全实现可以开始细粒度化了,我们用两个互斥锁来实现它。一个互斥锁用于锁住访问m_head的行为,一个用于锁住访问m_tail的行为,具体到代码可以因使用时间的长短对临界区进行进一步缩小。

具体代码如下:

template

class ThreadSafeQueue

{

struct node

{

std::shared_ptr data;

std::unique_ptr next;

};

std::mutex m_headMtx;

std::unique_ptr m_head;

std::mutex m_tailMtx;

node *m_tail;

public:

ThreadSafeQueue() :

m_head(new node), m_tail(m_head.get())

{}

ThreadSafeQueue(const ThreadSafeQueue &other) = delete;

ThreadSafeQueue &operator=(const ThreadSafeQueue &other) = delete;

std::shared_ptr TryPop()

{

std::unique_ptr old_head = pop_head();

return old_head ? old_head->data : std::shared_ptr();

}

void Push(T new_value)

{

std::shared_ptr new_data(

std::make_shared(std::move(new_value)));

std::unique_ptr p(new node);

node *const new_tail = p.get();

//开始锁临界区

std::lock_guard tail_lock(m_tailMtx);

m_tail->data = new_data;

m_tail->next = std::move(p);

m_tail = new_tail;

}

private:

node *get_tail()

{

std::lock_guard tail_lock(m_tailMtx);

return m_tail;

}

std::unique_ptr pop_head()

{

//这里head一定要先被锁

std::lock_guard head_lock(m_headMtx);

if (m_head.get() == get_tail())

{

return nullptr;

}

std::unique_ptr old_head = std::move(m_head);

m_head = std::move(old_head->next);

return old_head;

}

};

复制代码

注意: 当get_tail()调用前,请确保 m_headMtx 已经上锁,这一步也是很重要的哦。如果不这样,调用pop_head()时,就无法确保 get_tail 得到的数据在使用的时候为最新,如下代码,如果进入head_lock临界区后,old_tail被其他线程改了,那么整个操作就不对了。

std::unique_ptr pop_head() // 这是个有缺陷的实现

{

node* const old_tail=get_tail(); // 在m_headMtx范围外获取旧尾节点的值

std::lock_guard head_lock(head_mutex);

if(head.get()==old_tail) //

{

return nullptr;

}

std::unique_ptr old_head=std::move(head);

head=std::move(old_head->next); //

return old_head;

}

复制代码

再来看看异常安全是否有保证,如果TryPop() 中的对锁的操作会产生异常,由于直到锁获取后才能对数据进行修改。因此,TryPop()是异常安全的。另一方面,Push()可以在堆上新分配出一个T的实例,以及一个node的新实例,这里可能会抛出异常。但是,所有分配的对象都赋给了智能指针,那么当异常发生时,他们就会被释放掉。

并发度分析

TryPop()持有m_tailMtx也只有很短的时间,只为保护对tail的读取。因此,当有数据push进队列后,TryPop()几乎及可以完全并发调用了。同样在执行中,对m_headMtx的持有时间也是极短的。当并发访问时,会增加对TryPop()的访问次数;由于只有一个线程,在同一时间内可以访问pop_head(),且多线程情况下可以删除队列中的旧节点,并且安全的返回数据。

添加条件变量实现同步等待

现在已经实现了细粒度锁的线程安全队列,不过只有TryPop()可以并发访问(且只有一个重载存在)。那么方便的同步的WaitAndPop()呢?

Push实现

向队列中添加新节点是相当简单的——下面的实现与上面的代码差不多。

template

void ThreadSafe::Push(T new_value)

{

auto new_data = std::make_shared(std::move(new_value));

std::unique_ptr p(new node);

{//生产临界区

std::lock_guard tail_lock(tail_mutex);

tail->data=new_data;

auto* new_tail=p.get();

tail->next=std::move(p);

tail=new_tail;

}

data_cond.notify_one();

}

复制代码

WaitAndPop实现

template

class ThreadSafeQueue

{

private:

node* get_tail()

{

std::lock_guard tail_lock(tail_mutex);

return tail;

}

std::unique_ptr pop_head() // 1

{

std::unique_ptr old_head=std::move(head);

head=std::move(old_head->next);

return old_head;

}

std::unique_lock wait_for_data() // 2

{

std::unique_lock head_lock(head_mutex);

data_cond.wait(head_lock,[&]{return head.get()!=get_tail();});

return std::move(head_lock); // 3

}

std::unique_ptr wait_pop_head()

{

std::unique_lock head_lock(wait_for_data()); // 4

return pop_head();

}

std::unique_ptr wait_pop_head(T& value)

{

std::unique_lock head_lock(wait_for_data()); // 5

value=std::move(*head->data);

return pop_head();

}

public:

std::shared_ptr WaitAndPop()

{

std::unique_ptr const old_head=wait_pop_head();

return old_head->data;

}

void WaitAndPop(T& value)

{

auto _ = wait_pop_head(value);

}

};

复制代码

可能大家看到代码好像有点多,实际上都只是为了代码的重用,例如pop_head()和wait_for_data(),这些函数分别是删除头结点和等待队列中有数据弹出的。wait_for_data()特别值得关注,因为其不仅等待使用lambda函数对条件变量进行等待,而且它还会将锁的实例返回给调用者。这就确保了wait_pop_head的线程安全。pop_head()是对TryPop()代码的复用。

TryPop和Empty实现

template

class ThreadSafeQueue

{

private:

std::unique_ptr try_pop_head()

{

std::lock_guard head_lock(head_mutex);

if(head.get()==get_tail())

{

return std::unique_ptr();

}

return pop_head();

}

std::unique_ptr try_pop_head(T& value)

{

std::lock_guard head_lock(head_mutex);

if(head.get()==get_tail())

{

return std::unique_ptr();

}

value=std::move(*head->data);

return pop_head();

}

public:

std::shared_ptr TryPop()

{

std::unique_ptr old_head=try_pop_head();

return old_head?old_head->data:std::shared_ptr();

}

bool TryPop(T& value)

{

std::unique_ptr const old_head=try_pop_head(value);

return old_head;

}

void Empty()

{

std::lock_guard head_lock(head_mutex);

return (head.get()==get_tail());

}

};

复制代码

完整代码

代码仓库:thread_safe_queue

  • 发表于:
  • 原文链接https://kuaibao.qq.com/s/20230206A0017500?refer=cp_1026
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券