前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Linux中的spinlock机制 - CAS和ticket spinlock

Linux中的spinlock机制 - CAS和ticket spinlock

作者头像
Linux阅码场
发布2021-11-16 13:05:09
1.3K0
发布2021-11-16 13:05:09
举报

为什么要加锁

在SMP系统中,如果仅仅是需要串行地增加一个变量的值,那么使用原子操作的函数(API)就可以了。但现实中更多的场景并不会那么简单,比如需要将一个结构体A中的数据提取出来,然后格式化、解析,再添加到另一个结构体B中,这整个的过程都要求是「原子的」,也就是完成之前,不允许其他的代码来读/写这两个结构体中的任何一个。

这时,相对轻量级的原子操作API就无法满足这种应用场景的需求了,我们需要一种更强的同步/互斥机制,那就是软件层面的「锁」的机制。

同步锁的「加锁」和「解锁」是放在一段代码的一前一后,成对出现的,这段代码被称为Critical Section/Region(临界区)。但锁保护的并不是这段代码本身,而是其中使用到的多核/多线程共享的变量,它「同步」(或者说串行化)的是对这个变量的访问,通俗的语义就是“我有你就不能有,你有我就不会有”。

Linux中主要有两种同步锁,一种是spinlock,一种是mutex。spinlock和mutex都既可以在用户进程中使用,也可以在内核中使用,它们的主要区别是前者不会导致睡眠和调度,属于busy wait形式的锁,而后者可能导致睡眠和调度,属于sleep wait形式的锁。

spinlock是最基础的一种锁,像后面将要介绍的rwlock(读写锁),seqlock(读写锁)等都是基于spinlock衍生出来的。就算是mutex,它的实现与spinlock也是密不可分。因此,本系列文章将首先围绕spinlock展开介绍。

如何加锁

Linux中spinlock机制发展到现在,其实现方式的大致有3种。

【第一种实现 - 经典的CAS】

最古老的一种做法是:spinlock用一个整形变量表示,其初始值为1,表示available的状态。当一个CPU(设为CPU A)获得spinlock后,会将该变量的值设为0,之后其他CPU试图获取这个spinlock时,会一直等待,直到CPU A释放spinlock,并将该变量的值设为1。

那么其他的CPU是以何种形式等待的,如果有多个CPU一起等待,形成了竞争又该如何处理?这里要用到经典的CAS操作(Compare And Swap)。

  • 谁和谁比较

目前,sh架构的Linux实现中还保留有这种经典的实现方法(相关代码位于/arch/sh/include/asm/spinlock-cas.h)。

static inline void arch_spin_lock(arch_spinlock_t *lock)
{
    while (!__sl_cas(&lock->lock, 1, 0));
}

static inline unsigned __sl_cas(volatile unsigned *p, unsigned old, unsigned new)
{
    __asm__ __volatile__("cas.l %1,%0,@r0"
        : "+r"(new)
        : "r"(old), "z"(p)
        : "t", "memory" );
    return new;
}

其中,"p"指向spinlock变量所在的内存位置,存储的是当前spinlock实际的值,"old"存储的试图获取spinlock的本地CPU希望的值(1)。

不断地把「期望的值」和「实际的值」进行比较(compare),当它们相等时,说明持有spinlock的CPU已经释放了锁,那么试图获取spinlock的CPU就会尝试将"new"的值(0)写入"p"(swap),以表明自己成为spinlock新的owner。

汇编代码看起来可能略费力一些,用一段伪代码来展示或许会更加地直观:

function cas(p, old, new) 
{
    if *p ≠ old 
        do nothing
    else 
    	*p ← new
}

这里只用了0和1两个值来表示spinlock的状态,没有充分利用spinlock作为整形变量的属性,为此还有一种衍生的方法,可以判断当前spinlock的争用情况。具体规则是:每个CPU在试图获取一个spinlock时,都会将这个spinlock的值减1,所以这个值可以是负数,而「负」的越多(负数的绝对值越大),说明当前的争抢越激烈。

  • 存在的问题

基于CAS的实现速度很快,尤其是在没有真正竞态的情况下(事实上大部分时候就是这种情况), 但这种方法存在一个缺点:它是「不公平」的。一旦spinlock被释放,第一个能够成功执行CAS操作的CPU将成为新的owner,没有办法确保在该spinlock上等待时间最长的那个CPU优先获得锁,这将带来延迟不能确定的问题。

【第二种实现 - Ticket Spinlock】

为了解决这种「无序竞争」带来的不公平问题,spinlock的另一种实现方法是采用排队形式的"ticket spinlock"。这里,我想展示ticket spinlock的两个实现版本,它们的原理都是一样的,只是具体细节略有差异。

  • ACRN版本

先来看下基于x86-64的ACRN hypervisor对于ticket spinlock的实现:

表示一个spinlock的数据结构由"head"和"tail"两个队列的索引组成。

typedef struct _spinlock {
    uint32_t head;
    uint32_t tail;
} spinlock_t;

"head"指向当前队列的头部,"tail"指向当前队列的尾部,其初始值都为0。

  • 解锁

一个spinlock被owner释放时,该spinlock的head值会被owner通过"inc"指令加1。

static inline void spinlock_release(spinlock_t *lock)
{
        asm volatile ("   lock incl %[head]\n"   // head加1
          :
          : [head] "m" (lock->head)
          : "cc", "memory");
}
  • 加锁

其他CPU在试图获取这个spinlock时,会通过"xadd"指令将"tail"值保存在自己的eax寄存器中,然后将该spinlock的"tail"值加1(也就是将自己加到了这个等待队列的尾部)。

static inline void spinlock_obtain(spinlock_t *lock)
{	
        asm volatile ("   movl $0x1,%%eax\n"  // eax = 1
		      "   lock xaddl %%eax,%[tail]\n"  // eax = old tail, new tail = old tail + 1
		      "   cmpl %%eax,%[head]\n"  // 比较eax(old tail)和head
		      "   jz 1f\n"  // 相等,获得锁
		      "2: pause\n"  // 不相等,继续比较
		      "   cmpl %%eax,%[head]\n"
		      "   jnz 2b\n"
		      "1:\n"
		      :
		      :
		      [head] "m"(lock->head),
		      [tail] "m"(lock->tail)
		      : "cc", "memory", "eax");
}

接下来就是不断的循环比较,判断该spinlock当前的"head"值,是否和自己存储在eax寄存器中的"tail"值相等,相等时则获得该spinlock,成为新的owner。

这类似于你去银行柜台办理业务,假设当前银行只有一个柜台,你需要在自助机上获得一个排队号码(相当于一个ticket),然后当柜台叫到的号码与你手中的号码一致时,你将坐上柜台前面的椅子,此时柜台为你服务,这也是这种实现方式被称为"ticket spinlock"的原因。

在ticket spinlock中,"compare"和"swap"的操作就分离了。把spinlock当前的值和旧的值进行比较(compare),还是由每个试图获得spinlock的CPU来执行的,但设置新的值(swap),则是由上一个持有spinlock的CPU来完成的。

  • Linux版本

再来看下基于ARMv6的Linux中,ticket spinlock的实现(相关代码位于/arch/arm/include/asm/spinlock.h):

static inline void arch_spin_unlock(arch_spinlock_t *lock)
{
    lock->tickets.owner++;
}

static inline void arch_spin_lock(arch_spinlock_t *lock)
{
    [LL/SC]

    while (lockval.tickets.next != lockval.tickets.owner) {
        wfe();
        lockval.tickets.owner = READ_ONCE(lock->tickets.owner);
    }
}

"owner"和"next"分别对应ACRN版本中的"head"和"tail"。"wfe"是ARM中的"wait for event"指令,和x86中的pause指令类似,目的是为了降低busy wait时的CPU功耗。

看起来比ACRN的实现简洁?没有,LL/SC这部分也是一段汇编代码,它完成的是和x86的"add"指令一样的工作,和前文讲述的LL/SC是一样的。

	__asm__ __volatile__(
"1:	ldrex	%0, [%3]\n"  // lockval = lock->slock
"	add	%1, %0, %4\n"    // newval = lockval + (1 << TICKET_SHIFT)
"	strex	%2, %1, [%3]\n"  // try lock->slock = newval
"	teq	%2, #0\n"  // test result = 0 ?
"	bne	1b"  // not equal, do LL/SC again
	: "=&r" (lockval), "=&r" (newval), "=&r" (tmp)
	: "r" (&lock->slock), "I" (1 << TICKET_SHIFT)
	: "cc");

这里之所以是"1<<TICKET_SHIFT"而不是1,是因为它没有把一个32位的变量全部用来表示spinlock的队列索引,而只是其中的一些bits,事实上也不可能有那么多的CPU同时等待一个spinlock。

  • 公平与效率

可见,使用ticket spinlock可以让CPU按照到达的先后顺序,去获取spinlock的所有权,形成了「有序竞争」。根据硬件维护的cache一致性协议,如果spinlock的值没有更改,那么在busy wait时,试图获取spinlock的CPU,只需要不断地读取自己包含这个spinlock变量的cache line上的值就可以了,不需要从spinlock变量所在的内存位置读取。

但是,当spinlock的值被更改时,所有试图获取spinlock的CPU对应的cache line都会被invalidate,因为这些CPU会不停地读取这个spinlock的值,所以"invalidate"状态意味着此时,它们必须重新从内存读取新的spinlock的值到自己的cache line中。

而事实上,其中只会有一个CPU,也就是队列中最先达到的那个CPU,接下来可以获得spinlock,也只有它的cache line被invalidate才是有意义的,对于其他的CPU来说,这就是做无用功。内存比cache慢那么多,开销可不小。

还是用银行叫号来类比,假设现在2号客户的业务办理完了,接下来就该在大厅里叫3号,然后3号客户去办理,但是所有排号的,4号、5号……哪怕是20号,也得听一下叫的号,对于20号来说,它完全可以在叫到19号之前打个盹嘛。

每当一个spinlock的值出现变化时,所有试图获取这个spinlock的CPU都需要读取内存,刷新自己对应的cache line,而最终只有一个CPU可以获得锁,也只有它的刷新才是有意义的。锁的争抢越激烈(试图获取锁的CPU数目越多),无谓的开销也就越大。

【第三种实现 - MCS Lock】

如果在ticket spinlock的基础上进行一定的修改,让每个CPU不再是等待同一个spinlock变量,而是基于各自不同的per-CPU的变量进行等待,那么每个CPU平时只需要查询自己对应的这个变量所在的本地cache line,仅在这个变量发生变化的时候,才需要读取内存和刷新这条cache line,这样就可以解决上述的这个问题。

要实现类似这样的spinlock的「分身」,其中的一种方法就是使用MCS lock。试图获取一个spinlock的每个CPU,都有一份自己的MCS lock。

先来看下per-CPU的MCS lock是由哪些元素构造而成的(代码位于/kernel/locking/mcs_spinlock.h):

struct mcs_spinlock {
	struct mcs_spinlock *next;
	int locked; 
};

每当一个CPU试图获取一个spinlock,它就会将自己的MCS lock加到这个spinlock的等待队列,成为该队列的一个节点(node),加入的方式是由该队列末尾的MCS lock的"next"指向这个新的MCS lock。

"locked"的值为1表示该CPU是spinlock当前的持有者,为0则表示没有持有。

加锁

对于一个锁的实现来说,最核心的操作无非就是「加锁」和「解锁」。先来看下MCS lock的加锁过程是怎样的:

void mcs_spin_lock(struct mcs_spinlock **lock, struct mcs_spinlock *node)
{
	// 初始化node
	node->locked = 0;
	node->next   = NULL;

        // 找队列末尾的那个mcs lock
	struct mcs_spinlock *prev = xchg(lock, node);
	// 队列为空,立即获得锁
	if (likely(prev == NULL)) {
		return;
	}

	// 队列不为空,把自己加到队列的末尾
	WRITE_ONCE(prev->next, node);

	// 等待lock的持有者把lock传给自己
	arch_mcs_spin_lock_contended(&node->locked);
}

前面说过,加入队列的方式是添加到末尾(tail),所以首先需要知道这个「末尾」在哪里。函数的第一个参数"lock"就是指向这个末尾的指针,之所以是二级指针,是因为它指向的是末尾节点里的"next"域,而"next"本身是一个指向"struct mcs_spinlock"的一级指针。

第二个参数"node"是试图加锁的CPU对应的MCS lock节点。

"xchg()"的名称来源于x86的XCHG指令,其实现可简化表示成这样:

xchg(*ptr, x)
{
	ret = *ptr;
	*ptr = x;
	return ret;
}

它干了两件事,一是给一个指针赋值,二是获取了这个指针在赋值前的值。

对应着上面的这个mcs_spin_lock(),通过xchg()获得的"prev"就是"*lock"最初的值(prev = *lock)。如果这个值为"NULL",说明队列为空,当前没有其他CPU持有这个spinlock,那么试图获取这个spinlock的CPU可以成功获得锁。同时,xchg()还让lock指向了这个持有锁的CPU的node(*lock = node)。

这里用了"likely()",意思是在大部分情况下,队列都是空的,说明现实的应用场景中,一个spinlock的争抢通常不会太激烈。

前面说过,"locked"的值为1表示持有锁,可此刻CPU获取锁之后,竟然没有把自己node的"locked"值设为1?这是因为在队列为空的情况,CPU可以立即获得锁,不需要基于"locked"的值进行spin,所以此时"locked"的值是1还是0,根本就无所谓。除非是在debug的时候,需要查看当前持有锁的CPU,否则绝不多留一丝「赘肉」。

如果队列不为空,那么就需要把自己这个"node"加入等待队列的末尾,"WRITE_ONCE()"的作用是赋值,在这篇文章里已经介绍过了。

具体的等待过程是调用arch_mcs_spin_lock_contended(),它等待的,或者说"spin"的,是自己MCS lock里的"value"的值,直到这个值变为1。而将这个值设为1,是由它所在队列的前面那个node,在释放spinlock的时候完成的。

#define arch_mcs_spin_lock_contended(l)					
do {									
	smp_cond_load_acquire(l, VAL);					
} while (0)
  • 解锁

那基于MCS lock的实现,释放一个spinlock的过程是怎样的呢?来看下面这个函数:

void mcs_spin_unlock(struct mcs_spinlock **lock, struct mcs_spinlock *node)
{
	// 找到等待队列中的下一个节点
	struct mcs_spinlock *next = READ_ONCE(node->next);

        // 当前没有其他CPU试图获得锁
	if (likely(!next)) {
		// 直接释放锁 
		if (likely(cmpxchg_release(lock, node, NULL) == node))
			return;
                // 等待新的node添加成功
		while (!(next = READ_ONCE(node->next)))
			cpu_relax();
	}

	// 将锁传给等待队列中的下一个node
	arch_mcs_spin_unlock_contended(&next->locked);
}

两个参数的含义同mcs_spin_lock()类似,"lock"代表队尾指针,"node"是准备释放spinlock的CPU在队列中的MCS lock节点。

大概率还是没有锁的争抢,"next"为空,说明准备释放锁的CPU已经是该队列里的最后一个,也是唯一一个CPU了,那么很简单,直接将"lock"设为NULL就可以了。

"cmpxchg_release()"中的"release"代表这里包含了一个memory barrier。如果不考虑这个memory barrier,那么它的实现可简化表示成这样:

cmpxchg(*ptr, old, new)
{
	ret = *ptr;
	if (*ptr == old)
		*ptr = new;
	return ret;
}

跟前面讲到的"xchg()"差不多,也是先获取传入指针的值并作为函数的返回值,区别是多了一个compare。结合mcs_spin_unlock()来看,就是如果"*lock == node",那么"*lock = NULL"。

如果"*lock != node",说明当前队列中有等待获取锁的CPU……等一下,这不是和前面的代码路径相矛盾吗?其实不然,两个原因:

  • 距离函数开头获得"next"指针的值已经过去一段时间了。
  • 回顾前面加锁的过程,新的node加入是先让"*lock"指向自己,再让前面一个node的"next"指向自己。

所以,在这个时间间隔里,可能又有CPU把自己添加到队列里来了。于是,待新的node添加成功后,才可以通过arch_mcs_spin_unlock_contended()将spinlock传给下一个CPU。

#define arch_mcs_spin_unlock_contended(l)				
	smp_store_release((l), 1)

传递spinlock的方式,就是将下一个node的"locked"值设为1(next->locked = 1)。

如果在释放锁的一开始,等待队列就不为空,则"lock"指针不需要移动:

可以看到,无论哪种情况,在解锁的整个过程中,持有锁的这个CPU既没有将自己node中的"locked"设为0,也没有将"next"设为NULL,好像清理工作做的不完整?

事实上,这已经完全无所谓了,当它像「击鼓传花」一样把spinlock交到下一个node手里,它就等同于从这个spinlock的等待队列中移除了。多一事不如少一事,少2个无谓的步骤,效率又可以提升不少。

所以,分身之后的spinlock在哪里?它就在每个MCS lock的"locked"域里,像波浪一样地向前推动着。"locked"的值为1的那个node,才是spinlock的「真身」。

使用MCS lock,就实现了上文那个银行叫号的例子所提出的设想,对于20号来说,不用再听大堂的广播,让19号办理完业务告诉你就行了。

  • 存在的问题

MCS lock的实现保留在了Linux的代码中,但是你却找不到任何一个地方调用了它的lock和unlock的函数。

因为相比起Linux中只占4个字节的ticket spinlock,MCS lock多了一个指针,要多占4(或者8)个字节,消耗的存储空间是原来的2-3倍。spinlock可是操作系统中使用非常广泛的数据结构,这多占的存储空间不可小视,而且spinlock常常会被嵌入到结构体中,对于像"struct page"这种对结构体大小极为敏感的,根本不可能直接使用MCS lock。

所以,真正在Linux中使用的,是下文将要介绍的,在MCS lock的基础上进行了改进的qspinlock。研究MCS lock的意义,不光是理解qspinlock的必经之路,从代码的角度,可以看出其极致精炼的设计,绝没有任何多余的步骤,值得玩味。

MCS lock可以解决在锁的争用比较激烈的场景下,cache line无谓刷新的问题,但它内含一个指针,所以更消耗存储空间,但这个指针又是不可或缺的,因为正是依靠这个指针,持有spinlock的CPU才能找到等待队列中的下一个节点,将spinlock传递给它。本文要介绍的qspinlock,其首要目标就是把原生的MCS lock结构体进行改进,「塞」进4字节的空间里。

【MCS Lock的改进 - qspinlock】

先来看一下有3个以上的CPU持有或试图获取spinlock时,等待队列的全貌:

可见,这个等待队列是由一个qspinlock和若干个MCS lock节点组成的,或者说由一个qspinlock加上一个MCS node的队列组成,其中MCS node的队列和上文描述的那个队列基本是一样的,所以我们重点先来看下这个qspinlock是如何构成的(定义在/include/asm-generic/qspinlock_types.h):

typedef struct qspinlock {
    union {
        atomic_t val;
#ifdef __LITTLE_ENDIAN
        struct {
            u8	locked;
            u8	pending;
        };
        struct {
            u16	locked_pending;
            u16	tail;
        };
#else
...
    }
}

这里使用的是union,目的是既可以直接引用结构体的位域,又可以直接引用整个变量。如果处理器采用的是little endian,那么它们的内存排布关系如下图所示:

如果采用的是big endian,其内存排布则是这样的:

以32位系统为例,"val"作为一个32位的变量,包含了三个部分:"locked byte", "pending"和"tail","tail"又细分为"tail index"和"tail cpu":

下面来说一下这几个部分各自代表什么含义。

MCS lock中表示是否持有锁的"locked"占据了32个bits,但它其实只需要表示0和1两个值,在qspinlock中被压缩成了8个bits,即"locked byte"(其实只需要1个bit)。

此外,MCS lock的结构体中没有专门的标识等待队列末尾的元素,它使用的是一个全局的二级指针来指向队列末尾。而qspinlock使用的是一个per-CPU的数组来表示每个MCS node(用qnode结构体表示),通过CPU编号作为数组的index,就可以获得对应 MCS node的内存位置,因而qspinlock使用的是末尾 MCS node的CPU编号加1,即"tail cpu",来记录等待队列tail的位置(加1的原因将在后面解释)。

struct qnode {
	struct mcs_spinlock mcs;
};
DEFINE_PER_CPU_ALIGNED(struct qnode, qnodes[MAX_NODES]);

除了14个bits的"tail cpu"(针对CPU数目小于16K的情况),还有2个bits是用作"tail index",这是因为Linux中一共有4种context,分别是task, softirq, hardirq和nmi,而一个CPU在一种context下,至多试图获取一个spinlock(原因将在下文给出),因而一个CPU至多同时试图获取4个spinlock,"tail index"就是用来标识context的编号的。

以上说的两点是qspinlock在MCS lock已有元素的基础上进行的改进,相比于MCS lock,qspinlock还新增了一个元素:占据1个bit的"pending",它的作用将随着本文叙述的推进,慢慢揭晓。

为了方便演示,我们把一个"val"对应的"locked", "pending"和"tail"作为一个三元组"(x,y,z)"来表示:

  • 加锁

第一个CPU试图获取锁 - uncontended

三元组的初始值是(0, 0, 0),当第一个CPU试图获取锁时,可以立即成为该spinlock的owner,此时三元组的值变为(0, 0, 1):

对应的代码实现是queued_spin_lock()的前半部分(位于/include/asm-generic/qspinlock.h):

#define _Q_LOCKED_OFFSET	0
#define _Q_LOCKED_VAL		(1U << _Q_LOCKED_OFFSET)  

void queued_spin_lock(struct qspinlock *lock)
{
    // (0,0,0) --> (0,0,1) 
    u32 val = atomic_cmpxchg_acquire(&lock->val, 0, _Q_LOCKED_VAL);
    if (likely(val == 0))
        return;

    queued_spin_lock_slowpath(lock, val);
}

通过"cmpxchg()",和当前lock的"val"值进行比较,看是否为0,如果为0,那么获得spinlock,同时将lock的值设为1(即_Q_LOCKED_VAL)。

第二个CPU试图获取锁 - pending

在第一个CPU未释放锁之前,如果有第二个CPU试图获取锁,那么它必须等待,因而其获取锁的过程被称为"slow path",对应的代码实现是queued_spin_lock_slowpath()中的[part 1](该函数的完整实现在源代码中超过200行,本文是做了适当简化,并拆成几个部分来分开讲解)。

#define _Q_LOCKED_OFFSET     0
#define _Q_LOCKED_BITS	     8
#define _Q_LOCKED_MASK       ((1U << _Q_LOCKED_BITS) - 1) << _Q_LOCKED__OFFSET // (*,*,1)

// "val"是在queued_spin_lock中获取的当前lock的值
void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val)
{
[part 2 - 第三个CPU试图获取spinlock] 
    ...
    goto queue; // 跳转到part 3
    ...
 
[part 1 - 第二个CPU试图获取spinlock]
    // 设置pending位 (0,0,1) --> (0,1,1) 
    val = queued_fetch_set_pending_acquire(lock);

    // 等待第一个CPU移交
    if (val & _Q_LOCKED_MASK)    
        atomic_cond_read_acquire(&lock->val, !(VAL & _Q_LOCKED_MASK));

    // 第二个CPU成功获取spinlock (0,1,0) --> (0,0,1)
    clear_pending_set_locked(lock);
    return;

[part 3 - 进入MCS lock队列]
queue:
    ...

[part 4 - 第三个CPU成功获取spinlock]
locked:
    ...
}

它首先通过queued_fetch_set_pending_acquire()函数设置"pending"为1,然后调用atomic_cond_read_acquire()函数,开始基于"locked byte"进行等待(或者说"spin"),此时三元组的值变为(0, 1, 1)。

像atomic_cond_read_acquire()这种函数,其实就是Linux中的原子操作这篇文章介绍的atomic_read()加上"cond"和"acquire",其中"cond"代表condition,表示spin基于的对象,而"acquire"用于设置Memory Barrier。这是为了保证应该在获取spinlock之后才能执行的代码,不要因为Memory Order的调换,在成功获取spinlock之前就执行了,那样就失去了保护Critical Section/Region的目的。

这第二个CPU可被视作是该spinlock的第一顺位继承人,可以做一个这样的类比:"locked byte"位域相当于是皇宫,而"pending"位域相当于是东宫,第二个CPU就是太子(假设按照立长的原则,先来后到),设置"pending"为1就相当于入主东宫,等待着继承大统。

皇帝大行之后(第一个CPU释放spinlock),皇位暂时空缺,三元组的的值变为(0, 1, 0)。之后,曾经的太子离开居住的东宫(将"pending"置0),进入皇宫,成为其新的主人(将"locked byte"置1)。这一过程对应的函数实现就是clear_pending_set_locked()。

第三个CPU试图获取锁 - uncontended queue

如果第二个CPU还在等待的时候,第三个CPU又来了,那么这第三个CPU就成了第二顺位继承人。它的等待路径的实现位于queued_spin_lock_slowpath()的[part 2]:

#define _Q_PENDING_OFFSET    8
#define _Q_PENDING_VAL       (1U << _Q_PENDING_OFFSET) // (0,1,0)

[part 2A]
    // (0,1,0)是spinlock移交的过渡状态
    if (val == _Q_PENDING_VAL) {
        // 等待移交完成 (0,1,0) --> (0,0,1)
        int cnt = 1
        val = atomic_cond_read_relaxed(&lock->val, (VAL != _Q_PENDING_VAL)|| !cnt--);
    }

[part 2B]
    // 已经有2个及以上的CPU持有或试图获取spinlock
    if (val & ~_Q_LOCKED_MASK)
        goto queue;  // 进入MCS node队列

皇宫只有一个,东宫也只有一个,对于第二顺位继承人的王子来说,只能先去宫城外面自建府邸了。对应代码的实现就是[part 2B]的"goto queue",创建一个MCS node。

但前面提到,三元组的值为(0, 1, 0)时(即_Q_PENDING_VAL),是一个「政权移交」的过渡状态,此时第一顺位继承人即将成为spinlock新的持有者,那么作为第二顺位继承人,也即将递进成为第一顺位继承人,所以如果遇到了这个特殊时期,这第三个CPU会选择使用atomic_cond_read_relaxed()函数,先稍微等一会儿(即[part 2A]部分的代码)。

如果在它第二次读取"lock->val"的值时,已经不再是(0, 1, 0),说明移交已完成,变成了(0, 0, 1),那么它将绕过[part 2B],执行[part 1]的代码(也就是第二个CPU曾经走过的路)。

进入MCS node队列

(0, 1, 0)的状态毕竟是特例,大部分情况下,代码还是会跳转到名为"queue"的label位置,开始在MCS lock队列中的活动,对应的代码实现是queued_spin_lock_slowpath()的[part 3]。

首先是从前面提到的per-CPU的qnode[]数组中获取对应的MCS node,并初始化这个node,其中"idx"即表示context编号的"tail index"。

queue:
[part 3A]
    struct mcs_spinlock *node = this_cpu_ptr(&qnodes[0].mcs);
    int idx = node->count++;
    node += idx;
    node->locked = 0;
    node->next = NULL;

然后是调用encode_tail()将自己的CPU编号和context编号「揉」进"tail"里,并通过xchg_tail()让qspinlock(即"lock")的"tail"指向自己。前面提到,"tail CPU"是CPU的编号加1,因为对于编号为0,tail index也为0的CPU,如果不加1,那么算出来的"tail"值就是0,而"tail"为0表示当前没有MCS node的队列。

[part 3B]
u32 tail = encode_tail(smp_processor_id(), idx);
// (p,1,1) -> (n,1,1)
u32 old = xchg_tail(lock, tail);

这里没有画出CPU C是基于哪个对象进行spin的,接下来的一节会给出答案。

第N个CPU试图获取锁 - contended queue

xchg_tail()除了在qspinlock中设置新的tail值,还返回了qspinlock中之前存储的tail值(即"old")。对于第N个(N > 3)试图获取spinlock的CPU,之前的tail值是指向上一个MCS node的,获得上一个node的目的是让它的"next"指向自己,这样才能加入MCS node的等待队列。

struct mcs_spinlock *prev, *next, *node;

[part 3C]
// 如果MCS node队列不为空 - 第N个CPU
if (old & _Q_TAIL_MASK) {
    // 解析tail值,获取上一个MCS node
    prev = decode_tail(old);

    // 让上一个node指向自己,加入等待队列
    WRITE_ONCE(prev->next, node);  // prev->next = node

    // 基于自己的locked值进行spin,在[part 4]中解除
    arch_mcs_spin_lock_contended(&node->locked);
}

[part 3D]
#define _Q_LOCKED_PENDING_MASK (_Q_LOCKED_MASK | _Q_PENDING_MASK)
// 自己是MCS node队列的第一个节点 - 第二顺位继承
val = atomic_cond_read_acquire(&lock->val, !(VAL & _Q_LOCKED_PENDING_MASK));

加入MCS node组成的队列后,这第N个CPU会基于自己node的"locked"进行spin,这就完全回到了上文描述的MCS lock的路径里了。

[part 3D]中的这个atomic_cond_read_acquire()函数,在本文已经是第二次出现了,上一次是在[part 1]里,所不同的是,[part 1]里是第二个CPU(第一顺位继承人,CPU B)基于代表「大位」的qspinlock的"locked byte"进行spin,而这里是第三个CPU(第二顺位继承人,CPU C)基于qspinlock的"locked byte"加上代表「嗣位」的"pending"进行spin。

也就是说,当第一顺位继承人获得spinlock,空出嗣位后,如果第二顺位继承人已经建立了MCS node,那它并不会马上占据嗣位,而是要等到CPU B也释放了spinlock,"locked byte"和"pending"都为0时,才直接获取spinlock。

[part 3D]是放在第N个CPU试图获取spinlock的代码位置之后的,说明第N个CPU在解除基于自己node的"locked"的spin之后,也会执行这段代码,原因在接下来介绍完[part 4]的代码之后,将会自然地浮出水面。

第三个CPU成功获取锁

经过等待,当qspinlock的"locked byte"和"pending"都变为0时,第三个CPU总算熬出头了,它终于可以结束spin,获取这个spinlock。

[part 4]
locked:
    // 唯一node, 直接获取锁 (n,0,0) --> (0,0,1)
    if (((val & _Q_TAIL_MASK) == tail) &&
        atomic_try_cmpxchg_relaxed(&lock->val, &val, _Q_LOCKED_VAL))
        goto release;

    // 如果MSC node队列中还有其他node (n,0,0) --> (n,0,1)
    set_locked(lock);

    // 解除下一个node在MCS node队列中的spin
    arch_mcs_spin_unlock_contended(&node->next->locked);

release:
    __this_cpu_dec(mcs_nodes[0].count);

如果qspinlock的"tail"是指向自己的,说明它是当前MCS node队列中唯一的node,那么直接通过atomic_try_cmpxchg_relaxed()获取spinlock即可,三元组的值变为(0, 0, 1)。

如果不是,说明队列中还有其他的node在等待,那么按照上文描述的MCS node队列的规则,需要调用arch_mcs_spin_unlock_contended()函数,设置下一个node(CPU D)的"locked"为1。

当CPU C获取spinlock,离开MCS node队列后,CPU D就成为了MCS node队列中的第一个node,它的"locked"也被CPU C设为了1。在上文描述的MCS lock实现中,"locked"为1就意味着spinlock被上一个owner递交给了自己。但在这里,"locked"为1只是结束了在MCS node队列中的的spin。

结束在MCS node队列中的的spin后,意味着CPU D在前面[part 3C]中的等待就结束了,它将执行[part 3D]的代码,切换为第二顺位继承人进行spin应该基于的对象,即qspinlock的"locked byte"加上"pending"(和之前的CPU C一样)。

可见,对于第N个(N > 3)CPU来说,从它加入MCS node等待队列,到最终成功获取spinlock,其spin基于的对象并不是从一而终的,在到达队首之前都是基于自己的node进行spin,到达队首后则是基于qspinlock进行spin。

  • 解锁

相比起加锁,解锁的过程看起来简直简单地不像话:

void queued_spin_unlock(struct qspinlock *lock)
{
    smp_store_release(&lock->locked, 0);
}

只需要把qspinlock中的"locked byte"置为0就可以了。这也不难理解,后面一直有CPU「盯」着呢,一旦大位空缺,马上就有人补齐,您只需要乖乖地把位子腾出来就行。

到这里,qspinlock加锁和解锁的过程就介绍完了,可以借助下面这张图片来一览它的全貌:

从中可以看出各种状态下CPU进行spin时基于的对象,以及状态之间的变化和迁移。

【总结与思考】

回过头来看一下qspinlock的组成和实现:如果只有1个或2个CPU试图获取锁,那么只需要一个4字节的qspinlock就可以了,其所占内存的大小和ticket spinlock一样。当有3个以上的CPU试图获取锁,需要一个qspinlock加上(N-2)个MCS node。

对于设计合理的spinlock,在大多数情况下,锁的争抢都不应该太激烈,最大概率是只有1个CPU试图获得锁,其次是2个,并依次递减。

这也是qspinlock中加入"pending"位域的意义,如果是两个CPU试图获取锁,那么第二个CPU只需要简单地设置"pending"为1,而不用「另起炉灶」创建一个MCS node。这是除了体积减小外,qspinlock相较于MCS lock所做的第二层优化。

试图加锁的CPU数目超过3个是小概率事件,但一旦发生,使用ticket spinlock机制就会造成多个CPU的cache line无谓刷新的问题,而qspinlock可以利用MCS node队列来解决这个问题。

可见,使用qspinlock机制来实现spinlock,具有很好的可扩展性,也就是无论当前锁的争抢程度如何,性能都可以得到保证。所以在Linux中,如果针对某个处理器架构的代码没有单独定义,qspinlock将是spinlock默认的实现方式。

#define arch_spin_lock(l)    queued_spin_lock(l)

作为SMP系统中的一个高频操作,spinlock的性能毫无疑问会影响到整个系统运行的性能,所以,从ticket spinlock到MCS lock,再到qspinlock,见证了内核开发者对其一步步的优化历程。在优秀的性能背后,是对每一行代码的反复推敲和考量,最终铸就了qspinlock精妙但并不简单的实现过程。也许,它还有优化的空间……

至此,对Linux中不同处理器架构可能采用的三种spinlock的实现方式的介绍就告一段落了,下文将开始介绍Linux中spinlock的具体使用方法。

参考:

en.wikipedia.org/wiki/C

LWN - Ticket spinlocks

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-11-11,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Linux阅码场 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 为什么要加锁
  • 如何加锁
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档