前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >共享内存无锁队列的实现

共享内存无锁队列的实现

原创
作者头像
豆豆的包包
修改2017-11-06 10:50:54
11.8K0
修改2017-11-06 10:50:54
举报
文章被收录于专栏:豆包的专栏豆包的专栏

作者:范健

导语: 共享内存无锁队列是老调重弹了,相关的实现网上都能找到很多。但看了公司内外的很多实现,都有不少的问题,于是自己做了重新实现。主要是考虑了一些异常情况加强健壮性,并且考虑了C++11的内存模型。

为什么需要共享内存无锁队列?

为了便于查找定位问题,需要做一个日志收集跟踪系统,每个业务模块都需要调用SDK输出格式化的本地日志并将日志发送到远端。

为了避免发送日志阻塞业务,典型的做法是业务线程将日志写入队列,另一个线程异步地从队列中读取数据并发送。考虑到IO性能,且日志数据能容忍小概率的丢失,所以队列不应该是在磁盘上。又因为业务模块可能是多线程模式也可能是多进程模式,所以队列应该是在共享内存中。

简单的做法是,对队列的读写都加锁,但这样无疑会导致高并发下性能瓶颈就在这把锁上。所以我们需要无锁队列。看了公司内外很多版本的无锁队列实现,多多少少都有些问题,所以自己重新实现了一个版本。

环形数组

大部分无锁队列都是用环形数组实现的,简单高效,这里也不例外。假设队列长度为queue_len,用read_index表示可读的位置,用write_index表示可写的位置。

每次修改read_index或write_index的时候都需要将其归一化:

代码语言:txt
复制
read_index %= queue_len

队列已使用空间used_len的计算为:

代码语言:txt
复制
write_index >= read_index ?
  write_index - read_index : queue_len - read_index + write_index

判断队列IsEmpty的条件为:

代码语言:txt
复制
read_index == write_index

如果不做特殊处理,判断队列IsFull的条件和IsEmpty的条件一样,从而难以区分。所以我们将队列可写入长度设为queue_len-1。这样判断长度为write_len的数据是否可以写入的条件为:

代码语言:txt
复制
// 注意是 < 而不是 <= 
used_len + write_len < queue_len

一写一读

先来考虑一写一读的场景,实现起来最简单。

写操作:先判断是否可以写入,如果可以,则先写数据,写完数据后再修改write_index。

读操作:先判断是否可以读取used_len > 0,如果可以,则先读数据,读完再修改read_index。

因为read_index和write_index都只会有一个地方去写,所以其实不需要加锁也不需要原子操作,直接修改即可。需要注意读写数据的时候都需要考虑遇到数组尾部的情况。

多写一读

再来考虑复杂些的多写一读的场景。因为多个生产者都会修改write_index,所以在不加锁的情况下必须使用原子操作,笔者使用的是GCC内置原子操作函数:

代码语言:txt
复制
// __sync系列的内置函数在C++11之后已经过时,不建议使用
// C++11的std::atomic函数就是用__atomic系列内置函数实现的,所以也考虑了C++11提出的内存模型
// 该函数在*ptr == *expected的时候,将*ptr = desired,并返回true,否则返回false,并将*expected = *ptr
// 最后两个参数分别表示修改成功和失败时使用的内存模型,后面会讲
bool __atomic_compare_exchange_n (type *ptr, type *expected, type desired, bool weak, int success_memorder, int failure_memorder);

一种错误实现:

有的实现在写入过程中对write_index使用了多次原子操作,比如先原子增加write_index,再写入数据,如果写入失败,再原子减小write_index,看起来每次操作都是原子的,但多个原子操作连在一起就不是原子操作了,整个写入过程中对write_index应当只有一次原子操作。

常见的错误实现:

1 .先读取write_index,判断新的数据是否有足够的空间可以写入。

1.1 如果没有足够空间则返回队列满。

2 .如果有足够的空间,则准备写入。

2.1 一写的时候,是先写数据再改write_index。多写的时候为了避免同时写到同一片内存,需要先申请空间再写入数据。即先原子增加write_index,如果成功,再写入数据。

2.2 为了避免在生产者还未写完数据的时候,消费者就尝试读取,所以需要个同步机制告诉消费者数据正在写入中。比如头部预留一个字节,初始为0表示正在写入,写完数据后再改为1表示写入完成。头部中一般还有2字节表示数据长度。

3 .消费者发现used_len > 0即可尝试读取。

3.1 如果首字节为0,表示数据正在写入,等待。

3.2 如果首字节不为0,表示数据已写完,可以读取。

4 .消费者读取数据后,需要将read_index前移到合适的位置,且因为只有一个消费者,这里无需使用原子操作。

这种实现看似OK,其实也有问题。如果生产者在修改write_index之后,在修改头部首字节为1之前,这段时间内crash的话,就会导致消费者永远停留在等待生产者写完的状态上,且这个状态无法自动恢复。

我的优化一:

  1. 消费者发现头部首字节为0,则等待,但最多等待一段时间比如5ms。
  2. 在写入数据限制了最大长度的前提下,以现代计算机的速度,从修改write_index然后copy数据最后修改头部首字节为1,这段时间是非常快的,远小于5ms。
  3. 如果等待5ms后,发现首字节还是0,则认为该生产者crash了,根据头部中的长度信息,向前跳过这个非法数据块。

但如果生产者还没来得及写入数据长度就crash了呢?就想跳过非法数据块也不知道该跳多少了。

我的优化二:

1 .将队列分成N个定长block,定义如下:

代码语言:txt
复制
struct Block {
 union {
     struct {
         bool m_used;
         uint8_t m_blk_cnt;
         uint8_t m_blk_idx;
         uint16_t m_blk_len;
     };
     char m_head_reserved[8];
 };
 char m_data[kBlockDataSize];

 bool CanUsed(uint8_t expected_blk_idx) const {
     return m_used && expected_blk_idx == m_blk_idx
         && m_blk_cnt <= kMaxBlockCount
         && m_blk_idx < m_blk_cnt
         && m_blk_len <= kBlockDataSize;
 }
};

2 .生产者写数据时先计算需要的blk_cnt,再原子地将write_index前移blk_cnt。写数据的时候第一个block最后写,每个block内部依然是最后写头部首字节m_used = true。

3 .当等待5ms后发现m_used还是false,认为写入者crash之后,就可以以block为单位向前跳跃,直到跳到一个合法block或者没有可以读取的数据为止。合法block判断条件为blk.CanUsed(0)。

这样就算生产者在任意时刻crash,消费者都有能力自动恢复,找到下一个合法block。但如果消费者并没有真正crash只是因为某种神秘的原因写入太慢超过了5ms,怎么办?

  1. 首先,因为消费者已经跳过,所以它这次写入的数据肯定是不会被消费了,即极小概率会遗漏数据。
  2. 其次,我们考虑更极小概率的情况,只有当生产者慢到队列循环了完整一轮,其它生产者重新申请到这片block准备写入,才会产生数据脏写。
  3. 再次,就算真的出现数据脏写,一般头部的blk_cnt和blk_idx等信息不会对不上,消费者每次消费数据都会通过CanUsed函数检测,检测不通过的都会跳过。
  4. 最后,如果说非要考虑极端情况,可以通过在头部中再加入block_crc和total_crc来校验数据。笔者考虑到日志数据容忍这种极小概率的错乱,所以省略了。

内存模型

看似完美了,真的吗?其实不然。以上还没有考虑内存模型。因为编译器的优化,实际代码执行顺序不一定是你写的顺序。也就是说虽然我们是先写数据最后设置m_used = true,但实际执行顺序并不一定真的如此,有可能先执行了m_used = true,再执行数据copy,这就乱套了。因此我们需要指定内存模型。关于内存模型推荐参阅文章http://blog.jobbole.com/106516/

1.生产者对于m_used的修改,内存模型应该使用release。保证在这个操作之前的memory accesses不会重排到这个操作之后去,这样就不会向消费者提前释放可用信号。

代码语言:txt
复制
__atomic_store_n(&blk.m_used, true, __ATOMIC_RELEASE);

2 .消费者对于m_used的读取,内存模型应该使用acquire。保证在这个操作之后的memory accesses不会重排到这个操作之前去,这样就不会提前读到生产者还未写完的数据。

代码语言:txt
复制
__atomic_load_n(&m_used, __ATOMIC_ACQUIRE);

3 .对write_index的修改,即调用atomic_compare_exchange_n函数,最后两个参数应该都是ATOMIC_RELAXED,即内存模式使用relaxed,即没有约束。因为write_index只是多生产者之间用来做类似互斥的竞争,本来就是靠m_used真正约束生产者和消费者之间的行为顺序。

共享内存

另外一个值得一提的点是,共享内存我使用mmap,而非shmget。因为担心一台机器上部署的程序太多,可能出现共享内存key冲突的情况。万一出现共享内存冲突,被别的程序写坏了,就会出现莫名其妙的情况。所以使用mmap指定模块相关的文件路径,就不用太担心了。

需要多读吗?

如果再进一步实现多写多读,需要对read_index也考虑原子操作,加上稍显复杂的block检查跳跃逻辑,实现难度较高。但我们首先该问一个问题,真的需要多读吗?

我认为是不需要的:

  1. 首先,消费者可以批量读取,一次读取足够或者全部的可读数据。通过对后续业务逻辑的优化,一般单读都能满足性能要求。
  2. 其次,可以一读批量读取后再做进一步进程内多线程分发,会更加简单。
  3. 再次,如果单读真的不能满足性能要求,说明读后的业务逻辑非常重,那么这个时候,性能瓶颈就肯定不会是队列读取这里了,那么给读加锁无疑是更合适的选择。

有感而发

  1. 要写出高健壮性的代码,一定要时刻记得,程序可能会在你的任何一行代码处因为bug或者意外crash,不要想当然以为执行了上一行代码就一定会执行下一行代码。crash后重启是否能正常恢复?
  2. 写多线程多进程相关的逻辑,涉及到并发操作的时候,要考虑仔细,需不需要加锁?不加锁会有什么问题?
  3. 使用共享内存等共享资源时,更要想到,这资源不是我独占的,万一被有意或无意的篡改了数据该怎么办?能否尽量避免被别人篡改?如果被篡改,是否有发现和恢复机制?
  4. 不要以为你写的代码顺序就是真正的执行顺序,需要考虑内存模型。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 为什么需要共享内存无锁队列?
  • 环形数组
  • 一写一读
  • 多写一读
  • 需要多读吗?
  • 有感而发
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档