C++20推出了官方的协程库,但是在此之前C++并没有提供协程语法。libco是经典的C++协程库,本文将从源码角度分析libco,并参考了原作者的文章。
(据C++20 Coroutine 性能测试说libco被coroutine吊锤)
Reference: SJTU,IPADS,OS,07-process 腾讯技术工程:万字长文 | 漫谈libco协程设计及实现
协程本质上就是用户态线程,又名纤程,将调度的代码在用户态重新实现。将内核态代码移到用户态其实是常见的思路了,例如驱动的libos,网络的dpdk,乃至于微内核。
对于线程而言,其上下文切换流程如下,需要两次权限等级切换和三次栈切换。上下文存储在内核栈上。
从时间角度:
从空间角度:
因此,从并发的角度看,协程是更好的并发模型。
linux根据POSIX标准提供了ucontext库支持原生协程,但是在POSIX.1-2008中被废除了。大概是因为协程在语言级别就能实现,所以没必要扔系统层,KISS?
生产者直接调度至消费者,最优调度
上下文切换是调度的核心,在libco中通过coctx_swap.S汇编实现,这段代码还是挺有趣的。
对应swapcontext(ucontext_t*oucp, ucontext_t*ucp)
结构体声明
印证了上面提到的TCB冗余,这里的上下文仅仅保存了除了r10和r11之外的通用寄存器、栈的大小、栈低位指针(最大栈顶)。
struct coctx_t
{
void *regs[ 14 ];
size_t ss_size;
char *ss_sp;
};
// 64 bit
// low | regs[0]: r15 |
// | regs[1]: r14 |
// | regs[2]: r13 |
// | regs[3]: r12 |
// | regs[4]: r9 |
// | regs[5]: r8 |
// | regs[6]: rbp |
// | regs[7]: rdi |
// | regs[8]: rsi |
// | regs[9]: ret | //ret func addr
// | regs[10]: rdx |
// | regs[11]: rcx |
// | regs[12]: rbx |
// hig | regs[13]: rsp |
函数声明
rdi寄存器是当前上下文地址,rsi寄存器是目标上下文地址,X86传参机制
extern "C"
{
extern void coctx_swap( coctx_t *,coctx_t* ) asm("coctx_swap");
};
保存上下文
首先将rsp移到rax,然后将通用寄存器存入regs。值得注意的是这里两次压入的rax分别是rsp和返回地址RA。没有压入rax的原因在于rax按照约定是调用后放返回值的,所以没必要保存。
leaq (%rsp),%rax
movq %rax, 104(%rdi)
movq %rbx, 96(%rdi)
movq %rcx, 88(%rdi)
movq %rdx, 80(%rdi)
movq 0(%rax), %rax
movq %rax, 72(%rdi)
movq %rsi, 64(%rdi)
movq %rdi, 56(%rdi)
movq %rbp, 48(%rdi)
movq %r8, 40(%rdi)
movq %r9, 32(%rdi)
movq %r12, 24(%rdi)
movq %r13, 16(%rdi)
movq %r14, 8(%rdi)
movq %r15, (%rdi)
xorq %rax, %rax
设置上下文
然后从目标上下文取出所有通用寄存器,并将返回地址RA压入栈,从而能够在ret时根据函数调用机制跳转到coctx_swap后的指令。
这里的leaq 8(%rsp)本质上是在对返回地址进行出栈操作,从而恢复到存入上下文之前的栈,然后通过pushq 72(%rsi)入栈伪造的返回地址,在ret时跳转到另一个协程中。
如果看过ROP(Return-Orient-Programming)或者玩过栈溢出的话对这个应该会很眼熟,所以学安全还是很有用的。
movq 48(%rsi), %rbp
movq 104(%rsi), %rsp
movq (%rsi), %r15
movq 8(%rsi), %r14
movq 16(%rsi), %r13
movq 24(%rsi), %r12
movq 32(%rsi), %r9
movq 40(%rsi), %r8
movq 56(%rsi), %rdi
movq 80(%rsi), %rdx
movq 88(%rsi), %rcx
movq 96(%rsi), %rbx
leaq 8(%rsp), %rsp
pushq 72(%rsi)
movq 64(%rsi), %rsi
ret
创建上下文中在libco中通过coctx.cpp实现,初始化上下文的栈和跳转点
对应makecontext(ucontext_t *ucp, void (*func)(), int argc, ...)
int coctx_make(coctx_t* ctx, coctx_pfn_t pfn, const void* s, const void* s1) {
char* sp = ctx->ss_sp + ctx->ss_size - sizeof(void*);
sp = (char*)((unsigned long)sp & -16LL);
memset(ctx->regs, 0, sizeof(ctx->regs));
void** ret_addr = (void**)(sp);
*ret_addr = (void*)pfn;
ctx->regs[kRSP] = sp;
ctx->regs[kRETAddr] = (char*)pfn;
ctx->regs[kRDI] = (char*)s;
ctx->regs[kRSI] = (char*)s1;
return 0;
}
协程的sp分为两种,主协程的sp在栈上(也就是原本的线程所持有的系统栈),其他协程的sp在堆上。第一次调度时,需要通过coctx_make完成context的初始化,之后通过swapcontext自动进行上下文切换。
在这里,栈的最高位是(ctx->ss_sp + ctx->ss_size -sizeof(void*)) & -16LL,最低位是ctx->ss_sp,也就是在大小为ss_size的堆上形成的协程栈。
通用寄存器初始均为0
memset(ctx->regs, 0, sizeof(ctx->regs));
下面这段代码相当于push RA,& -16LL 进行内存对齐(后四位置0)
char* sp = ctx->ss_sp + ctx->ss_size - sizeof(void*);
sp = (char*)((unsigned long)sp & -16LL);
void** ret_addr = (void**)(sp);
*ret_addr = (void*)pfn;
然后将rsp设置为栈底,ra设置为入口函数pfn,传入的参数作为入口函数的参数rdi,rsi。
typedef void* (*coctx_pfn_t)( void* s, void* s2 );
libco专为epoll服务,co_epoll.cpp就是对epoll做了点封装,co_hook_syscall.cpp对read/write做了些封装。
hook NIO后协程将会不进行阻塞,而是通过poll挂载在epoll协程上(yield),当poll就绪或者超时后再恢复协程执行(resume)。无视他们,主要看co_routine.cpp。
协程结构体
协程默认有128K的协程栈在stack_mem中,ctx表示上下文,pfn和arg分别为当前函数和参数,同时还有4K的私有变量数组aSpec(key是索引)。所以创建协程的开销很大,为了避免开销,从协程池取出协程后只需把pfn重置为协程入口函数即可。
struct stCoRoutine_t
{
stCoRoutineEnv_t *env;
pfn_co_routine_t pfn;
void *arg;
coctx_t ctx;
char cStart;
char cEnd;
char cIsMain;
char cEnableSysHook;
char cIsShareStack;
void *pvEnv;
//char sRunStack[ 1024 * 128 ];
stStackMem_t* stack_mem;
//save satck buffer while confilct on same stack_buffer;
char* stack_sp;
unsigned int save_size;
char* save_buffer;
stCoSpec_t aSpec[1024];
};
环境结构体
struct stCoRoutineEnv_t
{
stCoRoutine_t *pCallStack[ 128 ];
int iCallStackSize;
stCoEpoll_t *pEpoll;
//for copy stack log lastco and nextco
stCoRoutine_t* pending_co;
stCoRoutine_t* occupy_co;
};
stCoRoutine_t *GetCurrCo( stCoRoutineEnv_t *env )
{
return env->pCallStack[ env->iCallStackSize - 1 ];
}
环境表征递归调用的协程,深度最大为128。pCallStack的每个元素均为协程,通过size获取当前的递归深度,GetCurrCo获取当前的协程(吐槽一下,后面就没怎么看到这玩意儿被调用,inline不香么)。但是常规情况递归深度都不会很大。
协程切换
非共享栈的情况下很简单,直接context swap即可。
为了节约内存引入共享栈机制,用拷贝共享栈的时间换取每个协程栈的空间,共享栈的内存通常较大,视作许多协程栈的堆叠,每个协程栈无需占据128K而是根据size动态分配。
如果即将执行的协程并不是共享栈的持有者,则让共享栈的持有者将自己的栈(在共享栈顶)存入buffer中,然后将共享栈转移给即将执行的协程。
在协程切换完成后,即将执行的协程将自己的栈从buffer中取出并复制到共享栈中。
void co_swap(stCoRoutine_t* curr, stCoRoutine_t* pending_co)
{
stCoRoutineEnv_t* env = co_get_curr_thread_env();
//get curr stack sp
char c;
curr->stack_sp= &c;
if (!pending_co->cIsShareStack)
{
env->pending_co = NULL;
env->occupy_co = NULL;
}
else
{
env->pending_co = pending_co;
//get last occupy co on the same stack mem
stCoRoutine_t* occupy_co = pending_co->stack_mem->occupy_co;
//set pending co to occupy thest stack mem;
pending_co->stack_mem->occupy_co = pending_co;
env->occupy_co = occupy_co;
if (occupy_co && occupy_co != pending_co)
{
save_stack_buffer(occupy_co);
}
}
//swap context
coctx_swap(&(curr->ctx),&(pending_co->ctx) );
//stack buffer may be overwrite, so get again;
stCoRoutineEnv_t* curr_env = co_get_curr_thread_env();
stCoRoutine_t* update_occupy_co = curr_env->occupy_co;
stCoRoutine_t* update_pending_co = curr_env->pending_co;
if (update_occupy_co && update_pending_co && update_occupy_co != update_pending_co)
{
//resume stack buffer
if (update_pending_co->save_buffer && update_pending_co->save_size > 0)
{
memcpy(update_pending_co->stack_sp, update_pending_co->save_buffer, update_pending_co->save_size);
}
}
}
首先resume Handler协程,然后执行Eventloop开始监听
Callback
从Epoll事件中获取Item,通过Item的回调函数来resume可继续执行的协程
stTimeoutItem_t *item = (stTimeoutItem_t*)result->events[i].data.ptr;
void OnPollProcessEvent( stTimeoutItem_t * ap )
{
stCoRoutine_t *co = (stCoRoutine_t*)ap->pArg;
co_resume( co );
}
Resume
要执行协程时,协程调用深度++,然后切换协程。
顺便吐槽一句,明明有了GetCurrCo为啥不直接inline呢。
void co_resume( stCoRoutine_t *co )
{
stCoRoutineEnv_t *env = co->env;
stCoRoutine_t *lpCurrRoutine = env->pCallStack[ env->iCallStackSize - 1 ];
if( !co->cStart )
{
coctx_make( &co->ctx,(coctx_pfn_t)CoRoutineFunc,co,0 );
co->cStart = 1;
}
env->pCallStack[ env->iCallStackSize++ ] = co;
co_swap( lpCurrRoutine, co );
}
Yield
当协程完成后,执行流返回上层,将上下文切换到上层协程
顺便继续吐槽一句,明明有了GetCurrCo为啥不直接inline呢。
void co_yield_env( stCoRoutineEnv_t *env )
{
stCoRoutine_t *last = env->pCallStack[ env->iCallStackSize - 2 ];
stCoRoutine_t *curr = env->pCallStack[ env->iCallStackSize - 1 ];
env->iCallStackSize--;
co_swap( curr, last);
}
libco本质上是单线程(对应主协程),无法利用多核,仅仅是并发而非并行,所以要配合多进程多线程。(fork or pthread_create)。