前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >有栈协程和无栈协程

有栈协程和无栈协程

原创
作者头像
王鹏程1990
发布2021-10-13 11:37:44
4.7K0
发布2021-10-13 11:37:44
举报
文章被收录于专栏:源码阅读

(更多文章参见个人专栏:www.2333.group

概述

后台架构的微服务化,原先的单体应用被按照功能模块切分为若干进程组承担,此种架构演化带来的收益诸如:

  • 单进程复杂度降低,代码维护成本降低
  • 发布影响范围缩小,发布灵活性提升
  • 计算资源更精准的分配
  • ... ...

但是这种架构带来的另外的变化就是,原先由单进程承载的事务,可能涉及几个甚至十几个进程;在这种情况下,采取异步回调的程序架构会导致代码零散不易维护;而采取多线程的话,一方面由于线程切换效率比较低,另一方面部分系统对线程个数的上限是有限制的,因此多线程的结构并不利于算力的充分利用。因此协程技术应运而生。

所谓协程,即用户级线程,一种用于将异步代码同步化的编程机制,使得程序的执行流可以在多个并行事务之间切换但又不必承担切换带来的过高的性能损耗。当前很多的编程语言都内置协程特性或者有自己的协程库,如C/C++的libco、golang的goroutine等。而在实现机制上,又可以划分为有栈协程和无栈协程,我们分别进行介绍。

有栈协程

所谓有栈协程是指执行环境的恢复是通过函数栈(即运行时上下文)的恢复实现的,在此之前我们先回忆一下函数调用的基础知识。

函数调用过程

当可执行文件并载入到内存中,进程地址空间被划分为代码段、数据段、堆、栈等,如下图所示:

而程序运行过程中需要的内存是在栈和堆空间分配的,当进程内发生函数调用时,需要保存一些上下文信息以及为函数内局部变量分配存储空间,这些存储空间是在栈上分配的,具体来说:在函数调用之前主调函数会将函数参数和返回地址入栈,被调函数在执行之前会先将主调函数的ebp入栈,并在栈上新开辟一块内存用于存放局部变量等信息;如下图所示:

当被调函数执行完毕之后,会执行退栈操作,找到函数退出之前的下一条指令的地址并将栈中存放的局部变量信息恢复,即可恢复原来主调函数被中断的执行过程。

在这里需要关注到几点:

  • 不是所有的寄存器都需要入栈的,只有ebp和eip
  • 我们在反汇编的时候是看不到返回地址入栈的操作,这个是由call指令完成的
  • 参数入栈顺序主流是从右向左入栈的

通过对函数调用过程分析我们可以看到,函数通过函数返回语句实现执行权限的归还&通过栈中返回地址实现被中断执行流程的恢复,而有栈协程正是基于这一朴素的想法实现的:在有栈协程中,将每个并行事务看成是一个函数调用,而协程库负责把让出执行权时的协程的上下文缓存起来(即当时的栈包括局部变量、返回地址等),当协程被重新调度时,就把切出时的栈重新装载进去即可。我们选用两个协程库进行介绍,云风的基于ucontext函数簇的协程库和libco。

基于ucontext协程库

ucontext即user thread context,用户线程上下文,是Linux系统自带的一套用于获取、修改和切换当前线程上下文的结构和相关方法;主要包括:ucontext_t结构体和context函数簇;

关于ucontext函数簇

其中ucontext_t结构体:

代码语言:javascript
复制
typedef struct ucontext
{
    unsigned long int uc_flags;
    struct ucontext *uc_link;
    stack_t uc_stack;
    mcontext_t uc_mcontext;
    __sigset_t uc_sigmask;
    struct _libc_fpstate __fpregs_mem;
} ucontext_t;

主要包括:

  • uc_links:当前context执行结束后要执行的下一个context,若为空,表示执行完毕后退出程序;在协程库设计中一般用于存主协程的上下文
  • uc_stack:当前上下文的栈信息
  • uc_mcontext:用于保存当前上下文的寄存器内容
  • uc_sigmask:当前线程的信号屏蔽码

context函数簇包括:

  • int getcontext(ucontext_t *ucp),将当前执行上下文信息保存在ucp指向的ucontext结构体中
  • int setcontext(const ucontext_t *ucp),将ucp中保存的寄存器信息恢复到CPU中,用于将当前程序切换到目标上下文
  • void makecontext(ucontext_t ucp, void (func)(), int argc, ...),初始化一个ucontext_t,并设置入口函数为func
  • int swapcontext(ucontext_t *oucp, const ucontext_t *ucp),切换上下文,保存当前上下文到oucp中,并激活ucp中的上下文

context底层原理差不多,我们挑一个看就行了,这里主要看swapcontext接口,因为既包含了上下文保存又包含上下文切换的内容,相对比较全面,其实现流程如下:

代码语言:javascript
复制
int swapcontext(ucontext_t *oucp, const ucontext_t *ucp);
ENTRY(__swapcontext)
    /* Save the preserved registers, the registers used for passing args,
       and the return address.  */
    movq    %rbx, oRBX(%rdi)
    movq    %rbp, oRBP(%rdi)
    movq    %r12, oR12(%rdi)
    movq    %r13, oR13(%rdi)
    movq    %r14, oR14(%rdi)
    movq    %r15, oR15(%rdi)

    movq    %rdi, oRDI(%rdi)
    movq    %rsi, oRSI(%rdi)
    movq    %rdx, oRDX(%rdi)
    movq    %rcx, oRCX(%rdi)
    movq    %r8, oR8(%rdi)
    movq    %r9, oR9(%rdi)         //这一波是把寄存器内容缓存起来,这些寄存器是传参和传递返回地址用的

    movq    (%rsp), %rcx           //因为这里函数调用,被调函数没有执行入栈操作,所以当前栈指针是执向返回地址的
    movq    %rcx, oRIP(%rdi)       //因此这里入栈其实是把返回地址给存器来了
    leaq    8(%rsp), %rcx       
    movq    %rcx, oRSP(%rdi)       //这两句是存返回地址
    leaq    oFPREGSMEM(%rdi), %rcx  //栈上面有独立的存储空间存放浮点数环境,我们用结构体中__fpregs_mem进行存储
    movq    %rcx, oFPREGS(%rdi)
    /* Save the floating-point environment.  */
    fnstenv (%rcx)                   //存浮点数环境
    stmxcsr oMXCSR(%rdi)             //存MXCSR寄存器内容
    /* The syscall destroys some registers, save them.  */
    movq    %rsi, %r12

    /* Save the current signal mask and install the new one with
       rt_sigprocmask (SIG_BLOCK, newset, oldset,_NSIG/8).  */
    leaq    oSIGMASK(%rdi), %rdx
    leaq    oSIGMASK(%rsi), %rsi
    movl    $SIG_SETMASK, %edi
    movl    $_NSIG8,%r10d
    movl    $__NR_rt_sigprocmask, %eax 
    syscall                       //这里其实是一个系统调用过程,将系统调用号存入rax中,参数存入rdx等寄存器中,并通过
    cmpq    $-4095, %rax          //syscall触发系统调用,调用完毕后通过eax获取返回值,上面的过程其实就是将当前的系统  
    jae SYSCALL_ERROR_LABEL       //调用号存起来并把新的传进去,通过rax寄存器拿到返回值判断是否成功

    /* Restore destroyed registers.  */
    movq    %r12, %rsi

    /* Restore the floating-point context.  Not the registers, only the
       rest.  */
    movq    oFPREGS(%rsi), %rcx
    fldenv  (%rcx)
    ldmxcsr oMXCSR(%rsi)

    /* Load the new stack pointer and the preserved registers.  */
    movq    oRSP(%rsi), %rsp
    movq    oRBX(%rsi), %rbx
    movq    oRBP(%rsi), %rbp
    movq    oR12(%rsi), %r12
    movq    oR13(%rsi), %r13
    movq    oR14(%rsi), %r14
    movq    oR15(%rsi), %r15

    /* The following ret should return to the address set with
    getcontext.  Therefore push the address on the stack.  */
    movq    oRIP(%rsi), %rcx
    pushq   %rcx

    /* Setup registers used for passing args.  */
    movq    oRDI(%rsi), %rdi
    movq    oRDX(%rsi), %rdx
    movq    oRCX(%rsi), %rcx
    movq    oR8(%rsi), %r8
    movq    oR9(%rsi), %r9

    /* Setup finally  %rsi.  */
    movq    oRSI(%rsi), %rsi    //要拉起的进程的寄存器恢复

    /* Clear rax to indicate success.  */
    xorl    %eax, %eax
    ret
PSEUDO_END(__swapcontext)

一些基础知识:

  • FPU(floating point unit,浮点运算器)用于进行浮点运算的单元,在x86中,FPU有一套自己的运行时环境包括控制字、状态字、指令指针、数据指针等
    • fstenv/fnstenv用于将浮点环境缓存到指定的内存空间中
  • MXCSR register:用于存储SSE寄存器中的控制和状态信息
    • SSE(streaming SIMD Extensions,单指令多数据流扩展指令集),SSE加入新的8个128位寄存器,以及新的32位控制/状态寄存器(MXCSR)
    • SIMD,单指令多数据流技术,即用一个控制器控制多个平行的处理单元,如GPU
  • syscall
    • 我们可以在汇编程序中使用Linux系统调用,通常的步骤是:
      • 1)将系统调用号放在EAX寄存器中
      • 2)将参数存储在寄存器EBX、ECX等中
      • 3)调用相关的中断
      • 4)在EAX寄存器中获取返回

主要包括以下几方面的内容:

  • swapcontext用于将当前执行流的上下文保存到oucp所指向的结构体,同时将ucp所指向的用户上下文重新进行装载实现执行流的切换
  • 保存的信息包括三个部分:
    • 第一大块,一堆mov指令,实际上是把寄存器存起来
    • 第二大块,浮点数环境缓存起来
    • 第三大块,信号码相关信息
  • 恢复的时候也是遵循这个顺序反向来的

而云风基于coroutine协程库就是基于ucontext函数簇达到上下文切换的目的

云风的coroutine协程库

我们先通过一个例子看下这个协程库的用法:

代码语言:javascript
复制
struct args { int n;};
static void foo(struct schedule * S, void *ud) {
	struct args * arg = ud;
	int start = arg->n;
	int i;
	for (i=0;i<5;i++) {
		printf("coroutine %d : %d\n",coroutine_running(S) , start + i);
		coroutine_yield(S);
	}
}
static void test(struct schedule *S) {
	struct args arg1 = {0};
	struct args arg2 = {100};
	// 创建两个协程
	int co1 = coroutine_new(S, foo, &arg1);
	int co2 = coroutine_new(S, foo, &arg2);
	while (coroutine_status(S,co1) && coroutine_status(S,co2)) {
		coroutine_resume(S,co1);// 使用协程 co1
		coroutine_resume(S,co2);// 使用协程 co2
	}
}

int main() {
	struct schedule * S = coroutine_open();// 创建一个协程调度器
	test(S);
	coroutine_close(S);// 关闭协程调度器
	return 0;
}

示例代码比较简单:首先是创建了一个调度器schedule(本质上也是一个协程),并且通过接口coroutine_resume在调度器下面创建了两个协程co1和co2,然后通过调度器循环去拉起其中一个协程,直到一个协程执行完毕。通过示例代码,我们可以看到这个协程库主要包括以下几个接口:

  • struct schedule * coroutine_open(void):创建调度器
  • int coroutine_new(struct schedule *, coroutine_func, void *ud):创建一个协程,入口函数func,ud是用户参数
  • void coroutine_resume(struct schedule *, int id):拉起指定的协程,用id对协程进行标识
  • void coroutine_yield(struct schedule *):当前协程阻塞,让出执行权限给调度器

通过示例代码,从直观上,我们可以看出来,这个协程库采用的非对称的协程模型,当协程阻塞或者退出后,并不指定将执行权限移交给哪个协程,而是统一交还给调度器(主协程),由调度器选择指定的子协程进行调度和拉起。而更细节的,我们从源码去看下这个协程库是怎么组织协程、以及怎么通过context函数簇实现协程的执行序列的切换,部分源码如下:

代码语言:javascript
复制
struct schedule {
	char stack[STACK_SIZE];
	ucontext_t main;
	int nco;
	int cap;
	int running;
	struct coroutine **co;
};

struct coroutine {
	coroutine_func func;
	void *ud;
	ucontext_t ctx;
	struct schedule * sch;
	ptrdiff_t cap;
	ptrdiff_t size;
	int status;
	char *stack;
};

static void mainfunc(uint32_t low32, uint32_t hi32) {
	uintptr_t ptr = (uintptr_t)low32 | ((uintptr_t)hi32 << 32);
	struct schedule *S = (struct schedule *)ptr;
	int id = S->running;
	struct coroutine *C = S->co[id];
	C->func(S,C->ud);
	_co_delete(C);
	S->co[id] = NULL;
	--S->nco;
	S->running = -1;
}

void  coroutine_resume(struct schedule * S, int id) {
	struct coroutine *C = S->co[id];
	switch(C->status) {
	case COROUTINE_READY:
		getcontext(&C->ctx);
		C->ctx.uc_stack.ss_sp = S->stack;
		C->ctx.uc_stack.ss_size = STACK_SIZE;
		C->ctx.uc_link = &S->main;
		S->running = id;
		C->status = COROUTINE_RUNNING;
		uintptr_t ptr = (uintptr_t)S;
		makecontext(&C->ctx,(void (*)(void))mainfunc,2,(uint32_t)ptr,(uint32_t(ptr>>32));
		swapcontext(&S->main, &C->ctx);
		break;
	case COROUTINE_SUSPEND:
		memcpy(S->stack + STACK_SIZE - C->size, C->stack, C->size);
		S->running = id;
		C->status = COROUTINE_RUNNING;
		swapcontext(&S->main, &C->ctx);
		break;
	default:
		assert(0);
	}
}

static void _save_stack(struct coroutine *C, char *top) {
	char dummy = 0;
	assert(top - &dummy <= STACK_SIZE);
	if (C->cap < top - &dummy) {
		free(C->stack);
		C->cap = top-&dummy;
		C->stack = malloc(C->cap);
	}
	C->size = top - &dummy;
	memcpy(C->stack, &dummy, C->size);
}
                    
void coroutine_yield(struct schedule * S) {
	int id = S->running;
	struct coroutine * C = S->co[id];
	_save_stack(C,S->stack + STACK_SIZE);
	C->status = COROUTINE_SUSPEND;
	S->running = -1;
	swapcontext(&C->ctx , &S->main);
}

在了解完context函数簇之后,我们对于理解这个协程库会简单很多,我们逐个来看:

  • 结构体schedule,用于描述调度器的结构体,其中
    • stack是运行时协程栈,用于存放当前运行的协程的栈信息
    • main,用于存放调度器的上下文信息
    • coroutine,用于存放此调度器下管理的协程信息,每个协程
  • 结构体coroutine,用于描述协程信息,其中:
    • func,一个函数指针,指向
    • ctx,协程的上下文信息
    • stack,协程的栈信息,在堆空间上分配出来的
    • cap/size:栈的容量和当前使用栈空间的大小
    • status,协程的状态信息,是处于就绪状态(COROUTINE_READY)?运行状态(COROUTINE_RUNNING )?还是挂起状态(COROUTINE_SUSPEND )?
  • 接口coroutine_resume,用于拉起指定id的子协程,具体来说有以下几点需要注意:
    • 协程的id是协程在调度器协程数组里面的下标
    • 对于COROUTINE_READY状态的协程被拉起时:
      • 子协程的ss_sp指向调度器的栈空间,表示子协程是共享stack内存空间的(意味着所有子协程的栈指针都是指向这块内存空间的)
      • 会通过getcontext和makecontext设置协程的入口函数
      • 最后通过swapcontext完成从调度器到子协程的执行流的切换
    • 对于COROUTINE_SUSPEND状态的协程被拉起时:
      • 直接将子协程的栈空间装载到调度器的stack内存空间并swapcontext即可
  • 接口coroutine_yield,这个接口主要做了两件事:
    • 接口_save_stack把运行时栈保存起来,其中_save_stack是通过局部变量dummy找到栈顶指针位置的,而top实际记录的是栈底指针的位置
    • 通过接口swapcontext完成子协程和调度器的上下文切换
  • 关于_save_stack我们以示例中的运行时栈为例:

    • 通过dummy找到栈顶位置,但是这个栈包含了coroutine_yield的内容
    • 但是swapcontext是在coroutine_yield尾部切换的,所以实际切换的时候esp已经指向到foo压栈的返回地址了,意味着虽然_save_stack虽然多存了一部分,但是esp的指针仍然是正确的,没有影响

libco

还有一个广泛使用的协程库就是libco,libco是被由微信开发并大规模应用的协程库,自2013年起稳定运行于数万台微信后台机器上;具备以下特性:

  • 高性能,号称可以调度千万级协程
  • 在IO阻塞时,可以自动切换,利用hook技术+epoll事件循环实现阻塞逻辑IO化改造
  • 支持嵌套创建
  • 既支持共享栈模式也支持独立栈模式
  • 提供超时管理
  • ... ...

而libco基于性能优化考虑,没有使用ucontext,而是自己用汇编写了一套上下文切换的代码,在文件coctx_swap.S里面,他这里面只保存和恢复了寄存器内存和栈内容,相比于ucontext,少了浮点数上下文和sigmask,因为:

  • sigmask会引发一次syscall(需要从用户态进入到内核态并返回),性能上有损耗
  • 取消浮点数上下文,是因为服务端编程几乎用不到浮点数计算

此外libco的上下文切换只支持i386和x86架构,因为后台服务器大多是x86架构的,总结来看,libco牺牲了通用性,把一些不常用的场景下上下文保存的操作去掉了,实现代码性能的提升,有人进行过性能比对,libco的性能是ucontext的3.6倍

一个libco的示例:

代码语言:javascript
复制
struct stTask_t {
	int id;
};
struct stEnv_t {
	stCoCond_t* cond;
	queue<stTask_t*> task_queue;
};
void* Producer(void* args) {
	co_enable_hook_sys();
	stEnv_t* env=  (stEnv_t*)args;
	int id = 0;
	while (true) {
		stTask_t* task = (stTask_t*)calloc(1, sizeof(stTask_t));
		task->id = id++;
		env->task_queue.push(task);
		co_cond_signal(env->cond);
		poll(NULL, 0, 1000);
	}
	return NULL;
}
void* Consumer(void* args) {
	co_enable_hook_sys();
	stEnv_t* env = (stEnv_t*)args;
	while (true) {
		if (env->task_queue.empty())
		{
			co_cond_timedwait(env->cond, -1);
			continue;
		}
		stTask_t* task = env->task_queue.front();
		env->task_queue.pop();
		free(task);
	}
	return NULL;
}
int main() {
	stEnv_t* env = new stEnv_t;
	env->cond = co_cond_alloc();

	stCoRoutine_t* consumer_routine;
	co_create(&consumer_routine, NULL, Consumer, env);
	co_resume(consumer_routine);

	stCoRoutine_t* producer_routine;
	co_create(&producer_routine, NULL, Producer, env);
	co_resume(producer_routine);
	
	co_eventloop(co_get_epoll_ct(), NULL, NULL);
	return 0;
}

从代码可以看到:

  • 这是一个生产者消费者示例,一个负责生产的producer_routine协程,负责把生产的数据放到env的task_queue中;和一个负责消费的consumer_routine负责从task_queue取数据
  • libco使用结构体stCoRoutine_t描述一个协程,接口co_create负责创建协程,接口co_resume负责拉起协程
  • co_eventloop表示进入协程的执行循环

结合示例涉及的接口看一下相关的数据结构及接口原理:

co_create
代码语言:javascript
复制
struct stCoRoutineAttr_t {
	int stack_size;
	stShareStack_t*  share_stack;
	stCoRoutineAttr_t()
	{
		stack_size = 128 * 1024;
		share_stack = NULL;
	}
}__attribute__ ((packed));

struct stCoRoutineEnv_t{
	stCoRoutine_t *pCallStack[ 128 ];
	int iCallStackSize;
	stCoEpoll_t *pEpoll;
	stCoRoutine_t* pending_co;
	stCoRoutine_t* occupy_co;
};

struct stCoRoutine_t
{
	stCoRoutineEnv_t *env;
	pfn_co_routine_t pfn;
	void *arg;
	coctx_t ctx;
	char cStart;
	char cEnd;
	char cIsMain;
	char cEnableSysHook;
	char cIsShareStack;
	void *pvEnv;
	stStackMem_t* stack_mem;
	//save satck buffer while confilct on same stack_buffer;
	char* stack_sp; 
	unsigned int save_size;
	char* save_buffer;
	stCoSpec_t aSpec[1024];
};

struct coctx_t
{
#if defined(__i386__)
	void *regs[ 8 ];
#else
	void *regs[ 14 ];
#endif
	size_t ss_size;
	char *ss_sp;
	
};

stCoRoutineEnv_t *co_get_curr_thread_env() {
	return gCoEnvPerThread;
}

void co_init_curr_thread_env() {
	gCoEnvPerThread = (stCoRoutineEnv_t*)calloc( 1, sizeof(stCoRoutineEnv_t) );
	stCoRoutineEnv_t *env = gCoEnvPerThread;
	env->iCallStackSize = 0;
	struct stCoRoutine_t *self = co_create_env( env, NULL, NULL,NULL );
	self->cIsMain = 1;
	env->pending_co = NULL;
	env->occupy_co = NULL;
	coctx_init( &self->ctx );
	env->pCallStack[ env->iCallStackSize++ ] = self;
	stCoEpoll_t *ev = AllocEpoll();
	SetEpoll( env,ev );
}


int co_create( stCoRoutine_t **ppco,const stCoRoutineAttr_t *attr,pfn_co_routine_t pfn,void *arg ) {
	if( !co_get_curr_thread_env() ) {
		co_init_curr_thread_env();
	}
	stCoRoutine_t *co = co_create_env( co_get_curr_thread_env(), attr, pfn,arg );
	*ppco = co;
	return 0;
}

struct stCoRoutine_t *co_create_env( stCoRoutineEnv_t * env, const stCoRoutineAttr_t* attr,
		pfn_co_routine_t pfn,void *arg ) {
	stCoRoutineAttr_t at;
	if( attr ) {
		memcpy( &at,attr,sizeof(at) );
	}
	if( at.stack_size <= 0 ) {
		at.stack_size = 128 * 1024; 
	}
	else if( at.stack_size > 1024 * 1024 * 8 ) {
		at.stack_size = 1024 * 1024 * 8;
	}
	if( at.stack_size & 0xFFF )  {
		at.stack_size &= ~0xFFF;
		at.stack_size += 0x1000;
	}
	stCoRoutine_t *lp = (stCoRoutine_t*)malloc( sizeof(stCoRoutine_t) );
	memset( lp,0,(long)(sizeof(stCoRoutine_t))); 
	lp->env = env;
	lp->pfn = pfn;
	lp->arg = arg;
	stStackMem_t* stack_mem = NULL;
	if( at.share_stack ) {
		... ...
	}
	else {
		// 如果没有采用共享栈,则分配内存
		stack_mem = co_alloc_stackmem(at.stack_size);
	}
	lp->stack_mem = stack_mem;
	// 设置该协程的context
	lp->ctx.ss_sp = stack_mem->stack_buffer; // 栈地址
	lp->ctx.ss_size = at.stack_size; // 栈大小
	lp->cStart = 0;
	lp->cEnd = 0;
	lp->cIsMain = 0;
	lp->cEnableSysHook = 0;	// 默认不开启hook
	lp->cIsShareStack = at.share_stack != NULL;
	return lp;
}

逐个来看:

  • 结构体stCoRoutineAttr_t,用于描述协程的属性信息,目前这个属性信息只包括共享栈的信息
  • 结构体stCoRoutineEnv_t,用于描述协程的运行环境,每个线程都有唯一一个该结构的变量用于对该线程下协程进行管理
    • pCallStack协程的调用栈,最后一位是当前运行的协程,上一位是当前协程的父协程;可以看到libco最多只支持128层的协程嵌套
    • pEpoll,libco主要用epoll作为协程调度器
  • 结构体stCoRoutine_t用于描述一个协程,其中:
    • stCoRoutineEnv_t *env,指向协程所属的运行环境,可以理解为协程所属的协程管理器
    • pfn_co_routine_t pfn/void *arg,协程对应的函数及参数
    • coctx_t ctx,协程的上下文信息,包括寄存器和栈
    • cStart/cEnd/cIsMain/cEnableSysHook/cIsShareStack:一系列标志变量
    • stStackMem_t* stack_mem;,栈空间指针
  • 创建协程的接口co_create,其中
    • 参数部分:
      • ppco:输出变量,协程的地址
      • attr:协程的属性信息,目前只有一个属性就是是否是共享栈
      • pfn:协程的入口函数
      • arg:协程的入口函数的参数
    • 核心功能通过接口co_create_env完成
  • co_create_env本质上就是创建一个stCoRoutine_t并初始化
co_resume
代码语言:javascript
复制
#if defined(__i386__)
... ...
#elif defined(__x86_64__)
int coctx_make( coctx_t *ctx,coctx_pfn_t pfn,const void *s,const void *s1 )
{
	char *sp = ctx->ss_sp + ctx->ss_size;
	sp = (char*) ((unsigned long)sp & -16LL  );
	memset(ctx->regs, 0, sizeof(ctx->regs));
	ctx->regs[ kRSP ] = sp - 8;
	ctx->regs[ kRETAddr] = (char*)pfn;
	ctx->regs[ kRDI ] = (char*)s;
	ctx->regs[ kRSI ] = (char*)s1;
	return 0;
}

void co_resume( stCoRoutine_t *co )
{
	stCoRoutineEnv_t *env = co->env;
	// 找到当前运行的协程, 从数组最后一位拿出当前运行的协程,如果目前没有协程,那就是主线程
	stCoRoutine_t *lpCurrRoutine = env->pCallStack[ env->iCallStackSize - 1 ];
	if( !co->cStart ) {
		// 如果当前协程还没有开始运行,为其构建上下文
		coctx_make( &co->ctx,(coctx_pfn_t)CoRoutineFunc,co, 0 );
		co->cStart = 1;
	}
	// 将指定协程放入线程的协程队列末尾
	env->pCallStack[ env->iCallStackSize++ ] = co;
	// 将当前运行的上下文保存到lpCurrRoutine中,同时将协程co的上下文替换进去
	co_swap( lpCurrRoutine, co );
}

注意到:

  • 当协程被创建之后还没有运行的情况下,cStart标识为0,第一次调度之后,这个标志位为1,表示已经运行了
  • 在第一次调度协程时,会先通过接口coctx_make先为这个协程构建一个上下文
  • 而coctx_make做了什么?
    • 把协程入口地址写入到kRETAddr寄存器中
    • 把协程对象的地址写到kRDI寄存器中
  • 完成上下文创建之后,当前协程入栈(env->pCallStack[ env->iCallStackSize++ ] = co),并切换到子协程执行
co_swap
代码语言:javascript
复制
void co_swap(stCoRoutine_t* curr, stCoRoutine_t* pending_co)
{
 	stCoRoutineEnv_t* env = co_get_curr_thread_env();
	//get curr stack sp
	//这里非常重要!!!: 这个c变量的实现,作用是为了找到目前的栈底,因为c变量是最后一个放入栈中的内容。
	char c;
	curr->stack_sp= &c;
	if (!pending_co->cIsShareStack) {  
		env->pending_co = NULL;
		env->occupy_co = NULL;
	}
	else { // 如果采用了共享栈
	    ... ...
	}
	// swap context
	coctx_swap(&(curr->ctx),&(pending_co->ctx) );
	// 上一步coctx_swap会进入到pending_co的协程环境中运行
	// 但是pengdin_co也是这一步换出的,所以被换入的时候也是从这一步继续往后执行
	// 而yield回此协程之前,env->pending_co会被上一层协程设置为此协程
	// 因此可以顺利执行: 将之前保存起来的栈内容,恢复到运行栈上
	//stack buffer may be overwrite, so get again;
	stCoRoutineEnv_t* curr_env = co_get_curr_thread_env();
	stCoRoutine_t* update_occupy_co =  curr_env->occupy_co;
	stCoRoutine_t* update_pending_co = curr_env->pending_co;
	// 将栈的内容恢复,如果不是共享栈的话,每个协程都有自己独立的栈空间,则不用恢复。
	if (update_occupy_co && update_pending_co && update_occupy_co != update_pending_co) {
         ... ...
	}
}
代码语言:javascript
复制
.globl coctx_swap
// coctx_swap(coctx_t curr,coctx_t pending)

#if defined(__i386__)
	... ...
#elif defined(__x86_64__)
	leaq 8(%rsp),%rax   // 父函数栈帧中除返回地址外栈帧顶的位置
	
	leaq 112(%rdi),%rsp ;rdi寄存器存的是第一个参数的地址,其112个字节后,是regs数组后的位置。
	                    ;所以这一步是把栈指针设置为regs数组地址
	// 将寄存器保存到入栈,因为此时栈的地址指向数组,因此实际上就是讲各个寄存器填充到数组中
	pushq %rax  // rax -> regs[13],也就是当前的rsp -> regs[13]
	pushq %rbx  // rbx -> regs[12]
	pushq %rcx  // rcx -> regs[11]
	pushq %rdx  // rdx -> regs[10]
	pushq -8(%rax) // ret func addr  返回地址 -> regs[9]
	pushq %rsi  // rsi -> regs[8]
	pushq %rdi  // rdi -> regs[7]
	pushq %rbp  // rbp -> regs[6]
	pushq %r8   // r8 -> regs[5]
	pushq %r9   // r9 -> regs[4]
	pushq %r12   // r12 -> regs[3]
	pushq %r13    // r13 -> regs[2]
	pushq %r14   // r14 -> regs[1]
	pushq %r15   // r15 -> regs[0]  // r15是程序计数器
	// 截止到此,所有的协程上下文保存成功 
	// rsi中是第二个参数,我们需要将第二个参数的上下文载入到寄存器和栈里面
	// rsi的首地址就是整个regs[]参数的地址,从0开始,所以当pop的时候,将从0将参数pop出来。
	movq %rsi, %rsp
	popq %r15         ;以下为倒序还原
	popq %r14
	popq %r13
	popq %r12
	popq %r9
	popq %r8
	popq %rbp
	popq %rdi
	popq %rsi
	popq %rax //ret func addr
	popq %rdx
	popq %rcx
	popq %rbx
	popq %rsp
	pushq %rax // 将返回地址入栈
	// 将eax寄存器清零,eax是rax的低32位,也就是将rax的低32位清零。也就是return 0的意思。
	xorl %eax, %eax
	// 返回函数
	ret
#endif

co_swap主要完成了协程切换的功能,将执行流从curr指向的当前协程切换为pending_co指向的协程,在coctx_swap中:

  • 根据函数入栈规则,当调用到coctx_swap时,依次将pending、curr和ret-addr入栈了,所以第一句leaq 8(%rsp),%rax实际是让rax指向除返回地址以外的栈顶处
  • leaq 112(%rdi),%rsp是将栈的首地址指向了curr->regs起始位置处,后面的push本质上就是push到regs中,所以后面的一串push实际上就是存寄存器内容的
  • rsi是用来存第二个参数的,movq %rsi, %rsp实际就是把栈指针指向rsi的regs数组,然后一波pop用来填充寄存器的内容
  • pushq %rax将返回地址入栈(对应于leaq 8(%rsp),%rax),因为返回时会从栈顶取返回地址,作为下一条指令的地址,通过这步操作就从上次让出CPU的位置继续执行了
  • 而栈信息通过rbp和rsp指向,而栈顶指针在coctx_make里面已经设置为coctx_t::ss_sp的存储空间,所以在pop序列进行环境恢复的时候,实际就把对应协程空间的栈空间恢复了
  • 关于ebp的话,因为在coctx_make一开始,我们就把sp指向我们的栈空间,所以在协程内发生函数调用的时候,被调函数的函数头会有mov rsp rbp和push rbp,此时实际rbp也是指向我们分配的栈空间,所以真正栈空间里面是ebp和esp都是在我们管理的存储空间里面
  • 而在第一次调用协程时,co_resume执行完毕协程栈和寄存器状态如下图所示

  • 这其中:
    • 当执行完毕时popq %rax是把协程入口地址存入rax寄存器中,并pushq把这个入口地址压入栈顶
    • 执行到ret指令时,会把栈顶出栈,并填充到RIP寄存器中
    • 而一开始栈顶指针-8,就是预留空间给这个返回地址的,加上八个字节正好放满栈空间
co_yield
代码语言:javascript
复制
/*
*
* 主动将当前运行的协程挂起,并恢复到上一层的协程
*
* @param env 协程管理器 
*/
void co_yield_env( stCoRoutineEnv_t *env )
{
	// 这里直接取了iCallStackSize - 2,那么万一icallstacksize < 2呢?
	// 所以这里实际上有个约束,就是co_yield之前必须先co_resume, 这样就不会造成这个问题了
	// last就是 找到上次调用co_resume(curr)的协程
	stCoRoutine_t *last = env->pCallStack[ env->iCallStackSize - 2 ];
	// 当前栈
	stCoRoutine_t *curr = env->pCallStack[ env->iCallStackSize - 1 ];
	env->iCallStackSize--;
	// 把上下文当前的存储到curr中,并切换成last的上下文
	co_swap( curr, last);
}

void co_yield( stCoRoutine_t *co )
{
	co_yield_env( co->env );
}

co_yield主要功能由co_yield_env完成,主要就是让出执行权限,主要就是:

  • 从env->pCallStack取出当前协程和调度此协程的父协程,然后把执行权限通过接口co_swap交还给父协程即可,并将当前协程退栈

无栈协程

有栈协程是基于函数切换上下文恢复的思路实现被中断协程的继续执行,但是这个上下文里面有返回地址,即下一条指令的地址,所以当程序发生改动重新编译生成,指令地址有可能发生改变,这种对于需要重新编译生成发布的发布场景支持并不友好,会因为程序指令地址的变化导致协程执行流的错乱。这时另外一种不基于上下文恢复的协程机制提供了一种新的思路。

达夫设备

在比较早期的时候,有一种程序的优化机制叫做循环展开,所谓循环展开是通过将循环进行部分展开,既减少了指令数,又充分调用执行单元的并行处理的能力;这是一种牺牲程序尺寸换取程序执行速度的优化机制,现在我们很少用到这种机制一个是编译器帮我们做了这种优化,一个是我们现在执行单元处理速度快,很少遇到这样的性能问题。

1983年,一个叫Tom Duff的程序员利用这种方式实时播放动画的代码段进行了优化,优化前如下所示:

代码语言:javascript
复制
send(to, from, count)
	register short *to, *from;
	register count;
	{
		do
			*to = *from++;
		while(--count>0);
	}

这是一段常见的存储器之间的复制操作,当时性能瓶颈出现在这段代码位置处,而达夫基于汇编语言常用的,“在复制时最小化判断数和分支数”的基本思想,将上面代码改写成如下形式:

代码语言:javascript
复制
send(to, from, count)
register short *to, *from;
register count;
{
	register n = (count + 7) / 8;
	switch (count % 8) {
	case 0:	do { *to = *from++;
	case 7:		 *to = *from++;
	case 6:		 *to = *from++;
	case 5:		 *to = *from++;
	case 4:	     *to = *from++;
	case 3:      *to = *from++;
	case 2:      *to = *from++;
	case 1:      *to = *from++;
	        } while (--n > 0);
	}
}

看起来是不是很奇怪,没关系,变换一下你们就认识了:

代码语言:javascript
复制
send(to, from, count)
register short *to, *from;
register count;
{
    register n = (count + 7) / 8;
    switch (count % 8) {
        case 0: *to = *from++;
        case 7: *to = *from++;
        case 6: *to = *from++;
        case 5: *to = *from++;
        case 4: *to = *from++;
        case 3: *to = *from++;
        case 2: *to = *from++;
        case 1: *to = *from++;
    }
    while (--n > 0) {
        *to = *from++;
        *to = *from++;
        *to = *from++;
        *to = *from++;
        *to = *from++;
        *to = *from++;
        *to = *from++;
        *to = *from++;
    }
}

代码本身并不复杂,但对于我们来说更加关注的是,switch和do-while语句的嵌套写法,为什么程序可以这样写?

关于switch语句

在编程实践中,switch里面的condition-state命中一个条件之后,就会找到一个case向下运行,直到遇到一个break,这个过程可能会跨越多个case,这就是switch的“掉落”特性;事实上经常有很多bug都是因为这种掉落特性引发的,所以我们的编程实践都推荐每个case过后,都有一对大括号来包裹程序块并用break进行收尾。如下所示:

代码语言:javascript
复制
int function() {
    switch(condition-state){
        case state1: {
              statement-1;
              break;
        }
        case state2: {
              statement-2;
              break;
        }
        ... ...
        default: {
              statement-n;
        }
    }
}

但是实际我们查阅C17草案,我们可以看到case语句实际属于labeled statement(带标签的语句)如下:

在C语言中,switch实际上是一个转移表,而case则是一个标签——用于给一个或者一组指令进行命名,而标签本身并不会改变指令的控制流,而只是提供了一个程序的执行位置;基于此,形成无栈协程一个朴素的思想:是否我们可以通过给指令打标签的方式,告诉下一次指令需要从哪个标签开始执行,其中需要的变量我们存起来就好了?这样既解决了上下文切换很多不必要的操作,也解决了程序修改后指令地址改变导致的无法恢复的问题

无栈协程的Demo实现

一个协程库要解决以下几个问题:

1)如何在协程阻塞调用时归还执行权限?

2)如何选择合适的协程进行调度?

3)如何把执行权限交给被调度的协程

4)如何让被调度的协程从被中断的地方继续执行

在前面讨论中,可以认为协程是一个函数的调用,那么协程的恢复无非是从调用中断处继续执行,而对于无栈协程不需要进行上下文恢复,则核心是通过存储标签保证下次调度能从预期的地方继续执行,那么就有:

1)针对问题一,当协程阻塞等待时,直接保存下一步返回时,所想执行指令的位置的标签,然后直接return,则实现了执行权限交还给主调方

2)针对问题二,主调方拿到执行权限之后,可以根据自己策略去进行调度,这个协程库提供相应接口支持即可

3)针对问题三,因为协程被认为是一次函数调用,则执行权限交给对应被调度协程本质上调用协程的接口即可,即通过接口调用实现执行权限的传递

4)如何实现中断指令流的继续,执行流程的恢复包括两个部分,一个是局部变量的值的恢复,一个是从被中断的位置处继续执行,针对前者,我们可以将函数内局部变量全部迁出来用全局结构缓存,在调度到协程时通过参数形式传递进去,对于后者我们可以通过标签记录下函数中断位置的标签,并且通过switch-case找到中断的部分继续下去,于是有如下demo代码片段:

代码语言:javascript
复制
int function(void) {
    static int state = 0;
    static int i = 0;
    switch(state){
        case 0: goto LABEL0;
        case 1: goto LABEL1;
        case 2: goto LABEL2;
    }
    LABEL0:
    for(i=0;i<10;++i){
        state = 1;
        return i;
        LABEL1:
        state = 2;
        i+=1;
        return i;
        LABEL2:
        state = 3;
    }
}

int main() {
    int i =function();
    printf("i:%d\n",i);
    i = function();
    printf("i:%d\n",i);
    i = function();
    printf("i:%d\n",i);
    i = function();
    printf("i:%d\n",i);
    i = function();
    printf("i:%d\n",i);
    return 0;
}

分析代码,我们有:

1)function函数通过标签被划分为三个部分

2)function函数通过静态局部对象state记录程序执行到哪一步了

3)通过静态局部对象记录上次执行时相关参数的信息

代码语言:javascript
复制
[centos ~/test/label_test]$ ./main       
i:0
i:1
i:2
i:3
i:4

通过运行结果,我们能看到,每次接口的执行实际都是从上次函数调用的中断的地方开始执行的;

但是这样的代码并不能满足工业级的应用场景,因为:

1)使用静态变量去保存函数的执行状态,使得这个接口是不可重入的

2)我们也不能在每次写代码的时候去定义标签,指定跳转的位置

3)使用标号去进行跳转会导致程序结构修改会牵涉大规模的代码改动

......

虽然我们可以对上述进行优化和封装,但是在这我们并不准备过多赘述,后面我们则直接看一个开源的无栈协程库-protothread

无栈协程库——protothread

ProtoThread源码如图所示:

代码语言:javascript
复制
#define LC_INIT(s) s = 0;

#define LC_RESUME(s) switch(s) { case 0:

#define LC_SET(s) s = __LINE__; case __LINE__:

#define LC_END(s) }

typedef unsigned short lc_t;
//用于定义一个描述protothread实例的结构体,每一个无栈协程用这个结构体进行描述
struct pt {
  lc_t lc;
};

/**
 初始化一个protothread实例,无栈协程实例,核心就是将指令标签设置为0
 */
#define PT_INIT(pt)   LC_INIT((pt)->lc)

/**
 * 这里用于定义一个protothread实例的接口,name_args是一个包含函数名和形参列表的字符串
 * 且这个接口的返回值得是char型
 */
#define PT_THREAD(name_args) char name_args

/**
 * 用于定义一个protothread的起始执行位置,其实就是在prototype前面套了一个switch
 */
#define PT_BEGIN(pt) { char PT_YIELD_FLAG = 1; LC_RESUME((pt)->lc)

/**
 * 用于界定protothread的终止位置,就是在后面加了一个},并对结构体进行初始化
 */
#define PT_END(pt) LC_END((pt)->lc); PT_YIELD_FLAG = 0; \
                   PT_INIT(pt); return PT_ENDED; }

/**
 *阻塞直到条件为true,实际应用中返回PT_WAITING表示当前进程阻塞让出执行权,其他表示未被阻塞继续执行
 */
#define PT_WAIT_UNTIL(pt, condition)	        \
  do {						\
    LC_SET((pt)->lc);				\
    if(!(condition)) {				\
      return PT_WAITING;			\
    }						\
  } while(0)

/**
 * 调度一个prototype协程,当返回PT_WAITING,表示调度器阻塞了,让出执行权限给里面的进程
 */
#define PT_SCHEDULE(f) ((f) == PT_WAITING)

/**
 * 让出执行权限,本质上就是在让出位置打一个标签,并直接return,把执行权限交给主调接口
 */
#define PT_YIELD(pt)				\
  do {						\
    PT_YIELD_FLAG = 0;				\
    LC_SET((pt)->lc);				\
    if(PT_YIELD_FLAG == 0) {			\
      return PT_YIELDED;			\
    }						\
  } while(0)

/**
 */
#define PT_YIELD_UNTIL(pt, cond)		\
  do {						\
    PT_YIELD_FLAG = 0;				\
    LC_SET((pt)->lc);				\
    if((PT_YIELD_FLAG == 0) || !(cond)) {	\
      return PT_YIELDED;			\
    }						\
  } while(0)

如上述代码段所示:

  • protothread使用结构体struct pt描述一个协程,协程里面含有lc_t类型成员变量,本质上是一个unsigned short类型
  • 整个PT协程,在创建之前需要调用PT_INIT进行初始化,初始化之后调用PT_BEGIN拉起协程,协程运行完毕之后调用PT_END关闭协程
  • ProtoThread通过PT_THREAD封装协程执行接口
  • ProtoThread调用PT_WAIT_UNTIL阻塞,直到condition为true
    • 在这里若是condition为false,表示不满足条件,直接通过return交出执行权限
    • 在交出执行权限之前,调用LC_SET,查看LC_SET的代码,看到这里我们看PT是通过记录行号给源码打标签
  • ProtoThread通过宏PT_SCHEDULE来实现协程的调度,通常调用PT_SCHEDULE的是主控协程,主控协程决策调度哪个协程之后通过PT_SCHEDULE进行调度

我们尝试用ProtoThread写一个多玩家登陆的代码,如下:

代码语言:javascript
复制
#include "pt.h"
struct MessageBuffer {
    int change_flag;
    string content;
} g_message_buffer;

typedef struct RoleData {
    int id;
    int step;
    string name;   
    pt  thread_inst_pt;
    int recv_message;
} tagRoleData;

std::map<std::string,RoleData> g_role_set;

void Timer() {
    printf("timer work\n");
    return;
}

MessageBuffer recv_message() {
    MessageBuffer msg = g_message_buffer;
    reset_message();
    return msg;
}

int receive_message(tagRoleData& data) {
    if(data.recv_message > 0) {
        data.recv_message = 0;
        return 1;
    }
    return 0;
}

int process_online_data(tagRoleData& data){
    printf("process online  name[%s] current step[%d]\n",data.name.c_str(),data.step);
    data.step += 1;
    return 0;
}

int process_profile_data(tagRoleData& data){
    printf("process profile  name[%s] current step[%d]\n",data.name.c_str(),data.step);
    data.step += 1;
    return 0;
}

static PT_THREAD(login_thread(tagRoleData& data)) {
    PT_BEGIN(&data.thread_inst_pt);
    while(data.step < 4) {
        PT_WAIT_UNTIL(&data.thread_inst_pt, receive_message(data));
        process_online_data(data);
        PT_WAIT_UNTIL(&data.thread_inst_pt, receive_message(data));
        process_profile_data(data);
    }
    PT_EXIT(&data.thread_inst_pt);
    PT_END(&data.thread_inst_pt);
}

fd_set fds;
struct timeval tv;
static char c[100] = {0};
static PT_THREAD(network_thread(struct pt *pt))
{
    FD_ZERO(&fds);
    FD_SET(0,&fds);
    PT_BEGIN(pt);
    while(1) {
        PT_WAIT_UNTIL(pt,select(1,&fds,NULL,NULL,&tv) > 0);
        read(0,c,100);
        g_message_buffer.content = string(c);
        g_message_buffer.change_flag = 1;
        if (g_role_set.find(g_message_buffer.content) == g_role_set.end()){
            RoleData role_data;
            role_data.step = 0;
            role_data.name = g_message_buffer.content;
            PT_INIT(&role_data.thread_inst_pt);
            g_role_set.insert(std::pair<std::string,RoleData>(g_message_buffer.content,role_data));
        }
        std::map<std::string,RoleData>::iterator role_iter = g_role_set.find(g_message_buffer.content);
        role_iter->second.recv_message = 1;
        PT_SCHEDULE(login_thread(role_iter->second));
    }
    PT_EXIT(pt);
    PT_END(pt);
}

static struct timer codelock_timer, input_timer;
static PT_THREAD(timer_thread(struct pt *pt))
{
    PT_BEGIN(pt);
    timer_set(&input_timer, 1000);
    PT_WAIT_UNTIL(pt, timer_expired(&input_timer));
    PT_EXIT(pt);
    PT_END(pt);
}

static struct pt network_thread_pt;
static struct pt timer_thread_pt;
void Proc() {
    PT_INIT(&network_thread_pt);
    while(PT_SCHEDULE(network_thread(&network_thread_pt))) {
        PT_SCHEDULE(timer_thread(&timer_thread_pt));
        sleep(1);
    } 
}

int main() {
    Proc();
    return 0;
}

这其中:

  • 代码中定义了三个执行单元,一个是network_thread网络协程,一个是timer_thread定时协程,一个是login_thread登录协程
  • 其中timer_thread协程负责定时器任务,network_thread负责消息接收并根据消息头拉起对应的登录协程login_thread,而login_thread对应不同的登录实体的登录行为
  • network_thread协程,PT_WAIT_UNTIL会“阻塞”直到文件句柄直到可读(这里我们用标准输入进行替代以便于验证)
  • 当读到消息之后,对于未开启流程的玩家创建一个协程,其他的则调度对应的协程(PT_SCHEDULE(login_thread(role_iter->second)))继续往后走
  • 对于登录协程。需要多步通信过程,一个是需要等待取在线数据并处理(process_online_data),一个是需要取角色数据并处理(process_profile_data)
  • 在本例中,我们在RoleData中封装了pt类型的成员变量thread_inst_pt用于缓存协程的状态信息,而外层用name->RoleData的映射关系管理协程及其他协程中间态数据

需要注意的是——以protothread来说:

  • 对于无栈协程来说,因为不存在指针等信息,所以无栈协程的所有信息是可以缓存在共享内存的,因此进程可以通过共享内存在重启的环境下,也不会导致协程中断
  • 但是这种恢复也是有条件的,在protothread中是用行号进行协程恢复,若是用到协程的源文件的行号出现改变,则可能执行错乱,如下图所示,假设中断前宏扩展后执行序列如下:
代码语言:javascript
复制
switch(Line){
    case 0:{
        state1-1;
        s=Line1-2;
    }
    case Line1-2:{
        if(!cond){
            return;
        }
        state1-3;
        s=Line1-3
    }
    case Line1-3:{
        if(!cond){
            return;
        }
        state1-4;
    }
}

当源码修改之后,可能宏扩展之后代码就变为

代码语言:javascript
复制
switch(Line){
    case 0:{
        state2-1;
        s=Line2-2;
    }
    case Line2-2:{
        if(!cond){
            return;
        }
        state2-3;
        s=Line2-3
    }
    case Line2-3:{
        if(!cond){
            return;
        }
        state2-4;
    }
}
  • 对于无栈协程来说,执行流的恢复只是通过找到下一条指令的执行地址,但是不包括上下文,这意味着无栈协程里面不能有局部变量,需要我们手动把后面需要用到的局部变量缓存起来
  • 此外这里无栈协程是通过switch-case实现的,嵌套的switch-case会产生问题,限制比较多,所以也不适用于线上场景

Label As Value

标签变量(labels as values)是GCC对C语言的扩展,是指我们可以通过操作符&&得到当前函数中定义的标签地址,这个值的类型是void*,并且是常量,我们可以在任何可以使用这个类型的常量处使用;如下:

代码语言:javascript
复制
#include "stdio.h"
void* ptr = NULL;
int Test()
{
    printf("local:%d,global:%d:global2:%d\n",&&test_local,&&test_global,&&test_global2);
    if(NULL == ptr) {
         printf("here\n");
         ptr = &&test_local;
    }
    goto *ptr;
    test_local:
        ptr = &&test_global;
        printf("local test %d\n",ptr);
        return 0;
    test_global:
        ptr = &&test_global2;
        printf("global test\n");
        return 0;
    test_global2:
        ptr = &&test_local;
        printf("global2 test\n");
        return 0;
    return 0;
}
int main()
{
    Test();
    Test();
    Test();
    return 0;
}

执行完毕后有如下执行结果:

代码语言:javascript
复制
local:4196026,global:4196069:global2:4196097
here
local test 4196069
local:4196026,global:4196069:global2:4196097
global test
local:4196026,global:4196069:global2:4196097
global2 test

受此启发,我们对protothread进行修改,可以得到如下代码

代码语言:javascript
复制
typedef void * lc_t;

#define LC_CONCAT2(s1, s2) s1##s2
#define LC_CONCAT(s1, s2) LC_CONCAT2(s1, s2)

#define LC_RESUME(s)                            \
  do {                                          \
    if(s != NULL) {                             \
      goto *s;                                  \
    }                                           \
  } while(0)

#define LC_SET(s,label)                         \
  do {                                          \
    LC_CONCAT(label, __LINE__):                 \
    (s) = &&LC_CONCAT(label, __LINE__); \
  } while(0)
//block until 
#define PT_WAIT_UNTIL(pt, label, condition)             \
  do {                                          \
    LC_SET((pt)->lc, label);                            \
    if(!(condition)) {                          \
      return PT_WAITING;                        \
    }                                           \
  } while(0)

对于库文件的改造:

  • 阻塞命令PT_WAIT_UNTIL新增标签字段label,当阻塞时,我们不仅指明解除阻塞所需满足的条件,也指明解除阻塞后要执行的代码段
  • 调度的指令LC_RESUME,则是根据标签的地址直接跳转的对应代码去执行goto *s

则最终代码的使用样式如下:

代码语言:javascript
复制
static
PT_THREAD(login_thread(tagRoleData& data))
{
    PT_BEGIN(&data.thread_inst_pt);
    while(data.step < 4) {
        PT_WAIT_UNTIL(&data.thread_inst_pt, online_label, receive_message(data));
    online_label:    process_online_data(data);
        PT_WAIT_UNTIL(&data.thread_inst_pt,profile_label, receive_message(data));
    profile_label:    process_profile_data(data);
        PT_WAIT_UNTIL(&data.thread_inst_pt, online_label2, receive_message(data));
    online_label2:    process_online_data(data);
        PT_WAIT_UNTIL(&data.thread_inst_pt, online_label3, receive_message(data));
    online_label3:    process_online_data(data);
    }
    PT_EXIT(&data.thread_inst_pt);
    PT_END(&data.thread_inst_pt);
}

从这段代码可以看到

  • 每段接口执行完毕后,都阻塞等待对应条件满足,并指明了阻塞解除后要执行的代码通过标签的形式展示出来

但还是不行

上述采取标签的形式还是解决不了重启后协程恢复的问题,因为标签在内存中的位置会在重新编译的时候地址出现变化,我们遵循标签的修改方式对原先的基于行号的代码进行修改,如下:

代码语言:javascript
复制
#define LC_SET(s, evt_id) s = evt_id; case evt_id:
#define PT_WAIT_UNTIL(pt, evt_id, condition)            \
  do {                                          \
    LC_SET((pt)->lc, evt_id);                           \
    if(!(condition)) {                          \
      return PT_WAITING;                        \
    }                                           \
  } while(0)

业务方可以通过如下形式进行使用:

代码语言:javascript
复制
using namespace std;

struct MessageBuffer {
    int change_flag;
    string content;
} g_message_buffer;

typedef struct RoleData {
    int id;
    int step;
    string name;   
    pt  thread_inst_pt;
    int recv_message;
} tagRoleData;

std::map<std::string,RoleData> g_role_set;

int receive_message(tagRoleData& data) {
    if(data.recv_message > 0) {
        data.recv_message = 0;
        return 1;
    }
    return 0;
}

int process_online_data(tagRoleData& data){
    printf("process online  name[%s] current step[%d]\n",data.name.c_str(),data.step);
    data.step += 1;
    return 0;
}

int process_profile_data(tagRoleData& data){
    printf("process profile  name[%s] current step[%d]\n",data.name.c_str(),data.step);
    data.step += 1;
    return 0;
}

#define MSG_ONLINE_RSP 1
#define MSG_PROFILE_RSP 2
#define MSG_ONLINE_RSP_2 3
#define MSG_ONLINE_RSP_3 4
static PT_THREAD(login_thread(tagRoleData& data))
{
    PT_BEGIN(&data.thread_inst_pt);
    while(data.step < 4) {
        PT_WAIT_UNTIL(&data.thread_inst_pt, MSG_ONLINE_RSP, receive_message(data));
        process_online_data(data);
        PT_WAIT_UNTIL(&data.thread_inst_pt, MSG_PROFILE_RSP, receive_message(data));
        process_profile_data(data);
        PT_WAIT_UNTIL(&data.thread_inst_pt, MSG_ONLINE_RSP_2, receive_message(data));
        process_online_data(data);
        PT_WAIT_UNTIL(&data.thread_inst_pt, MSG_ONLINE_RSP_3, receive_message(data));
        process_online_data(data);
    }
    PT_EXIT(&data.thread_inst_pt);
    PT_END(&data.thread_inst_pt);
}


#define NETWORK_EVTID 200
fd_set fds;
struct timeval tv;
static PT_THREAD(network_thread(struct pt *pt))
{
    FD_ZERO(&fds);
    FD_SET(0,&fds);

    PT_BEGIN(pt);
    tv.tv_sec=0;
    tv.tv_usec=0;
    while(1) {
        PT_WAIT_UNTIL(pt, NETWORK_EVTID, select(1,&fds,NULL,NULL,&tv) > 0);
        read(0,d,100);
        memcpy(c,d,strlen(d)-1);
        g_message_buffer.content = string(c);
        g_message_buffer.change_flag = 1;
        if (g_role_set.find(g_message_buffer.content) == g_role_set.end()){
            RoleData role_data;
            role_data.step = 0;
            role_data.name = g_message_buffer.content;
            PT_INIT(&role_data.thread_inst_pt);
            g_role_set.insert(std::pair<std::string,RoleData>(g_message_buffer.content,role_data));
        }
        std::map<std::string,RoleData>::iterator role_iter = g_role_set.find(g_message_buffer.content);
        role_iter->second.recv_message = 1;
        PT_SCHEDULE(login_thread(role_iter->second));
    }
    PT_EXIT(pt);
    PT_END(pt);
}

#define TIMER_EVTID  100 
static PT_THREAD(timer_thread(struct pt *pt))
{
    PT_BEGIN(pt);
    timer_set(&input_timer, 1000);
    PT_WAIT_UNTIL(pt, TIMER_EVTID, timer_expired(&input_timer));
    PT_EXIT(pt);
    PT_END(pt);
}

static struct pt network_thread_pt;
static struct pt timer_thread_pt;
void Proc() {
    PT_INIT(&network_thread_pt);
    while(PT_SCHEDULE(network_thread(&network_thread_pt))) {
        PT_SCHEDULE(timer_thread(&timer_thread_pt));
        sleep(1);
    } 
}

int main() {
    Proc();
    return 0;
}

由此可见:

  • 我们可以在协程让出执行权限的时候,指明要等待的事件,如PT_WAIT_UNTIL(pt, evt_id, condition)所示
  • 其他的如之前所示,在阻塞分支之前会按照等待的事件ID,新增一个case分支
  • 因为标签是我们自定义的,不会因为程序的重新编译发生变化,所以重启不会影响协程的恢复和执行

参考资料

函数调用过程

ucontext manual pages

swapcontext() — Save and restore user context

云风协程库源码

编程沉思录——云风协程库源码分析

编程沉思录——libco源码分析

libco源码地址

libco性能对比

达夫设备

Label As Values标签变量

ucontext族函数的使用及原理分析

FSTENV

Intel x86 MXCSR Register

SSE-维基百科

libco源码注释版

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
作者已关闭评论
0 条评论
热度
最新
推荐阅读
目录
  • 概述
  • 有栈协程
    • 函数调用过程
      • 基于ucontext协程库
        • 关于ucontext函数簇
        • 云风的coroutine协程库
      • libco
        • co_create
        • co_resume
        • co_swap
        • co_yield
    • 无栈协程
      • 达夫设备
        • 关于switch语句
          • 无栈协程的Demo实现
            • 无栈协程库——protothread
              • Label As Value
                • 但还是不行
                • 参考资料
                领券
                问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档