lock-free
和spin-lock
核心技术都是采用CAS
。
lock-free
示例代码:
#include <stdio.h>
#include <stdatomic.h>
#include <pthread.h>
static void * adding(void *input);
int count = 0;
int main()
{
pthread_t tid[10];
for(int i = 0; i < 10; i++)
{
pthread_create(&tid[i], NULL, adding, NULL);
}
for(int i = 0; i < 10; i++)
{
pthread_join(tid[i], NULL);
}
printf("the value of count is %d\n", count);
}
static void *
adding(void *input)
{
int val;
for(int i = 0; i < 1000000; i++)
{
do
{
val = count;
} while (!atomic_compare_exchange_weak(&count, &val, val+1));
}
pthread_exit(NULL);
}
用perf
测试代码:
Performance counter stats for './test_free_lock' (10 runs):
804.76 msec task-clock:u # 1.768 CPUs utilized ( +- 3.99% )
0 context-switches:u # 0.000 /sec
0 cpu-migrations:u # 0.000 /sec
83 page-faults:u # 93.230 /sec ( +- 0.24% )
<not supported> cycles:u
<not supported> instructions:u
<not supported> branches:u
<not supported> branch-misses:u
0.4553 +- 0.0157 seconds time elapsed ( +- 3.45% )
spin-lock
示例代码:
#include <stdio.h>
#include <stdatomic.h>
#include <pthread.h>
static void * adding(void *input);
int count = 0;
int lock = 0;
int main()
{
pthread_t tid[10];
for(int i=0; i<10; i++)
{
pthread_create(&tid[i], NULL, adding, NULL);
}
for(int i=0; i<10; i++)
{
pthread_join(tid[i], NULL);
}
printf("the value of count is %d\n", count);
}
static void *
adding(void *input)
{
int expected = 0;
for(int i = 0; i < 1000000; i++)
{
// if the lock is 0(unlock), then set it to 1(lock)
while(!atomic_compare_exchange_weak(&lock, &expected, 1))
{
/*
* if the CAS fails, the expected will be set to 1,
* so we need to change it to 0 again. */
expected = 0;
}
count++;
lock = 0;
}
}
用perf
测试代码:
Performance counter stats for './test_spin_lock' (10 runs):
7,277.08 msec task-clock:u # 1.993 CPUs utilized ( +- 4.31% )
0 context-switches:u # 0.000 /sec
0 cpu-migrations:u # 0.000 /sec
75 page-faults:u # 10.415 /sec ( +- 0.31% )
<not supported> cycles:u
<not supported> instructions:u
<not supported> branches:u
<not supported> branch-misses:u
3.652 +- 0.159 seconds time elapsed ( +- 4.37% )
对比两脚本的执行结果,lock-free
是明显优于spin-lock
的。接着从程序代码的差异上分析,lock-free
在一条语句上(atomic_compare_exchange_weak(&count, &val, val+1)
)完成了修改,而spin-lock
则是“持有锁》修改值〉释放锁”。它们之间的差异可以由下两图体现:
lock-free
:spin-lock
:lock-free
的线程可以分别独自的访问到共享变量count
,而spin-lock
在被其他线程持有锁之后,只能占着cpu空转。哪怕lock-free
也会出现冲突,并且需要重新操作一次,但也仅限于此。因此在自身上下文中lock-free
通常就能够完成操作,无需等待(出现冲突时只需要重新运行一次即可),而spin-lock
则只能空转。从上文中可以了解到lock-free
是有一些局限性的,因为lock-free
只能针对于某个特定的整数变量有效,而在实际场景中我们共享的数据一般都是复杂的,甚至是多个变量的。其次,我们也知道队列是一个FIFO模型,在入队和出队实质上就是一个操作队列的头(Header)和尾(Tail)的过程。也就是说在队列中会产生冲突的地方只有Header
和Tail
,那么只要对Header
和Tail
的操作可以满足CAS
的操作即可将lock-free
应用于此,但此时需要注意ABA的问题。
队列通用设计
tail index
指示下一个生产者放置元素的位置。一直递增的值直到越界后回到起点。head index
指示下一个可以被消费的元素的位置。一直递增tail index
达到或超过head index
,增加了一个新的原子计数器dequeue counter
,再实际操作dequeue之前,先增加它的值。如果在增加之后dequeue counter
是小于tail index
的,那么就可以保证可以dequeue到数据,同时也保证了增加head index
的安全,因为随后增加head index
也一定会小于tail index
。当然,如果dequeue counter
大于tail index
,那么就等量的减少它的值,并使dequeue操作失败。无锁设计
freeListHead
)中存在ABA的问题(从空闲链表中获取Block
时),而该问题的根本原因是freeListHead
的头结点(head
)被读取后,head.next
的值已经被改变了,因此解决此ABA的根本方法就是去察觉到head.next
的值是否发生改变。这里采用一个比较通用的方法:“对每个Block
引入一个原子的计数器(Block.count
),在读取Block.next
之前加1,并在CAS
之后减1,当Block.count
不为0时不允许任何人改变Block.next
的值(因为无法改变Block.next
的值,因此Blcok
就无法重新放回freeListHead
,便杜绝了ABA的可能)”。 Block.count
还有隐含的意思:“当一个Block
使用完并要放回到freeListHead
时,在实际放回到链表之前Block.count
必须为0,否则啥也不做”。但如果是自旋的等待Block.count
为0的话,那么其实就是一个自旋锁,考虑到最终是谁将Block
添加到链表中并不重要,因此可以将“添加到freeListHead
”的操作交给最后一个持有Block
的线程。为此我们可以在Block
上设置一个flag值告知最后一个持有Block
的线程应该把它添加到freeListHead
中(值用should_be_on_free_list
表示)。利用这个flag值可以让add
变成完全无锁。Block.count
之间仍然可能产生竞争,因此将Block.count
的最高位作为flag使用,这样就能让他们变成原子的。性能设计:
生产者队列由一群Block
组成。每个Block
可以存储一定数量的元素,能存储的元素个数是2的幂。另外Block
中有一个标志标识队列是否为空(emptyFlags
),还有一个变量指示完全dequeue的数量(elementsCompletelyDequeued
)。
创建两个Block pool
:
一个是预申请的Block
数组,一旦消费掉后,就一直保持空。该数组用一个简单的“fetch-and-add atomic”指令来完成消费(无需等待的),当然还需要检测一下是否在合理的范围之类。
另一个是全局的无锁的Block
空闲链表,那些被释放的Block
会被放到该链表中等待重用。其实现就是一个无锁链表。
初始时,listHead = NULL
添加一个Block
到链表中时:
void
add_knowing_refcount_is_zero(Block *freeListHead, Block *block)
{
/*
* 到了这里意味着 block.count 一定是0,
* 因此可以自由的更改 block.next 的值 */
while (1)
{
Block *head = freeListHead;
block->next = head;
/*
* 这里把 block.count 值改为1后,将重新与 try_get() 产生竞争(ABA),
* 因此 block.count 的值有可能在 try_get() 中被增加 */
block->count = 1;
if (!CAS(freeListHead, head, block))
{
/*
* 添加失败后将引用计数减回去(上面设置成1),
* 并通知最后一个持有该Block的线程去执行添加操作。 */
if (fetch_add(block->count, should_be_on_free_list - 1) == 1)
{
/*
* 到这里意味着由我们自己执行 add 操作 */
continue;
}
}
return;
}
}
Block *
add(Block *freeListHead, Block *block)
{
if (fetch_add(block->count, should_be_on_free_list) == 0)
{
add_knowing_refcount_is_zero(freeListHead, block);
}
}
从链表中移除一个Block
时:
Block *
try_get(Block *freeListHead)
{
Block *head = freeListHead;
while (head)
{
prevHead = head;
count = head->count;
/*
* count为0时不能修改next,因此需要直接 continue。
* 如果 CAS 失败的话也需要 continue */
if ((count & REFS_MASK) == 0 || !CAS(head->count, count, count + 1))
{
head = freeListHead;
continue;
}
Block *next = head->next;
if (CAS(freeListHead, head, next))
{
/* 成功获取block后,需要将引用计数归0 */
fetch_add(head->count, -2);
return head;
}
/* 获取失败需要将增加的引用计数重新减回去 */
count = fetch_add(head->count, -1);
if (count == should_be_on_free_list + 1)
{
/* 由该线程负责执行 add 操作 */
add_knowing_refcount_is_zero(prevHead);
}
}
return NULL;
}
C++
项目。支持MPMC模型的无锁队列 我的博客即将同步至腾讯云开发者社区,邀请大家一同入驻:https://cloud.tencent.com/developer/support-plan?invite_code=61arrh92b4is