前面介绍了协程的基本概念和协程切换的常见方式以后,本文将介绍如何通过c语言实现自己的协程库,分为独立栈和共享栈两种实现,代码见git仓库。
通过独立栈实现的协程库中的每一个协程都有自己独立的栈空间,协程栈大小固定且互不干扰。
协程结构体定义如下所示:
/* 默认协程栈大小 */
#define DEAFAULT_STACK_SIZE 1024*128
/* 协程池数目 */
#define COROUTINE_NUM 1024
/* 协程状态 */
enum STATE{FREE,RUNABLE,RUNING,SUSPEND};
/* 函数指针 */
typedef void (*Func)(void *);
/* 协程结构体 */
typedef struct _coroutine_t
{
/* 协程上下文 */
ucontext_t ctxt;
/* 协程函数 */
Func func;
/* 函数参数 */
void *arg;
/* 协程状态 */
STATE state;
/* 协程栈 */
char stack[DEAFAULT_STACK_SIZE];
}coroutine_t;
其中每一个协程栈stack大小固定为DEAFAULT_STACK_SIZE。刚开始协程库中的协程都处于FREE状态,指定了协程相关函数及参数以后协程变为RUNABLE状态,协程运行时处于RUNING状态,暂停时处于SUSPEND状态,运行完毕后处于FREE状态。
为了便于对协程进行管理调度,定义了专门的数据结构:
/* 协程调度结构体 */
typedef struct _schedule_t
{
/* 主协程上下文 */
ucontext_t main;
/* 运行协程id */
int runId;
/* 协程池 */
coroutine_t *coroutines;
/* 最大下标 */
int maxIndex;
_schedule_t():runId(-1),maxIndex(0)
{
coroutines=new coroutine_t[COROUTINE_NUM];
for(int i=0;i<COROUTINE_NUM;++i)
{
coroutines[i].state=FREE;
}
}
~_schedule_t()
{
delete coroutines;
}
}schedule_t;
其中coroutines是协程池,runId表示协程池中当前运行协程的ID,maxIndex是协程池中已经使用协程的最大下标。
协程创建函数定义如下,主要负责找到第一个可用协程并设置函数及参数。
int coroutine_create(schedule_t &schedule,Func func,void *arg)
{
/* 找到第一个可用的协程 */
int id=0;
for(;id<schedule.maxIndex;++id)
{
if(schedule.coroutines[id].state==FREE)
{
break;
}
}
if(id==schedule.maxIndex)
{
schedule.maxIndex++;
}
/* 设置函数及参数 */
coroutine_t &c=schedule.coroutines[id];
c.func=func;
c.arg=arg;
c.state=RUNABLE;
/* 返回协程id */
return id;
}
协程创建完毕以后可以通过coroutine_resume函数进行启动,挂起的协程也可以通过它进行恢复,定义如下:
void coroutine_start(schedule_t *ps)
{
int id=ps->runId;
if(id!=-1)
{
coroutine_t &c=ps->coroutines[id];
c.func(c.arg);
c.state=FREE;
ps->runId=-1;
}
}
int coroutine_resume(schedule_t &schedule,int id)
{
if(id<0||id>schedule.maxIndex) return 1;
coroutine_t &c=schedule.coroutines[id];
switch (c.state)
{
case RUNABLE:
{
getcontext(&(c.ctxt));
c.ctxt.uc_link=&(schedule.main);
c.ctxt.uc_stack.ss_flags=0;
c.ctxt.uc_stack.ss_sp=c.stack;
c.ctxt.uc_stack.ss_size=DEAFAULT_STACK_SIZE;
makecontext(&(c.ctxt),(void(*)())coroutine_start,1,&schedule);
/* 不用break跳出 */
}
case SUSPEND:
{
c.state=RUNING;
schedule.runId=id;
swapcontext(&(schedule.main),&(c.ctxt));
break;
}
default:
{
return 1;
}
}
return 0;
}
初次启动时,需要指定协程的栈c.ctxt.uc_stack及协程结束后返回时的上下文c.ctxt.uc_link。不管是初次启动还是暂停恢复,都需要修改协程状态和调度器的runId,最后通过swapcontext切换上下文开始执行协程。
协程在需要的时候可以通过coroutine_yield函数挂起,让出CPU,实现如下:
int coroutine_yield(schedule_t &schedule)
{
if(schedule.runId!=-1)
{
coroutine_t &c=schedule.coroutines[schedule.runId];
c.state=SUSPEND;
schedule.runId=-1;
swapcontext(&(c.ctxt),&(schedule.main));
}
return 0;
}
在协程运行时,需要函数来判断是否运行结束,定义如下:
int is_schedule_finished(schedule_t &schedule)
{
if(schedule.runId!=-1) return 0;
for(int id=0;id<schedule.maxIndex;++id)
{
if(schedule.coroutines[id].state!=FREE)
{
return 0;
}
}
return 1;
}
采用本协程库的简单示例如下所示:
#include"coroutine.h"
#include<stdio.h>
void func1(void *arg)
{
schedule_t *ps=static_cast<schedule_t *>(arg);
printf("########\n");
coroutine_yield(*ps);
printf("########\n");
}
void func2(void *arg)
{
schedule_t *ps=static_cast<schedule_t *>(arg);
printf("*********\n");
coroutine_yield(*ps);
printf("*********\n");
}
int main()
{
schedule_t schedule;
int id1=coroutine_create(schedule,func1,&schedule);
int id2=coroutine_create(schedule,func2,&schedule);
while(is_schedule_finished(schedule)!=1)
{
coroutine_resume(schedule,id1);
coroutine_resume(schedule,id2);
}
return 0;
}
运行结果如下:
通过共享栈实现的协程库中的每一个协程在运行时都使用一个公共的栈空间,当协程挂起时将自己的数据从共享栈拷贝到自己的独立栈,协程运行时又将数据从独立栈拷贝到共享栈运行,本文是参考cloudyun大神代码进行简要分析。
协程结构体定义如下:
/* 默认协程栈大小 */
#define DEAFAULT_STACK_SIZE 1024*1024
/* 协程池数目 */
#define COROUTINE_NUM 16
/* 协程状态 */
enum CoroutineState{COROUTINE_DEAD,COROUTINE_READY,COROUTINE_RUNING,COROUTINE_SUSPEND};
/* 调度器结构体 */
struct schedule_t;
/* 函数指针 */
typedef void (*CoroutineFunc)(schedule_t*,void *);
/* 协程结构体 */
typedef struct _coroutine_t
{
/* 协程上下文 */
ucontext_t ctxt;
/* 协程函数 */
CoroutineFunc func;
/* 函数参数 */
void *arg;
/* 协程所属调度器 */
schedule_t *sch;
/* 协程状态 */
CoroutineState state;
/* 协程栈 */
char *stack;
/* 已经分配的内存大小 */
ptrdiff_t cap;
/* 当前协程的运行时栈,保存起来时的大小 */
ptrdiff_t size;
}coroutine_t;
/* 协程调度结构体 */
typedef struct schedule_t
{
/* 协程共享栈 */
char stack[DEAFAULT_STACK_SIZE];
/* 主协程上下文 */
ucontext_t main;
/* 存活的协程个数 */
int nco;
/* 可管理的最大协程数目 */
int cap;
/* 运行协程id */
int runId;
/* 协程池 */
coroutine_t **coroutines;
};
和独立栈实现方式类似,每个协程内部都有一个栈指针stack,不同之处在于独立栈的空间大小固定,并且协程运行时数据存放在该栈空间。而共享栈实现时协程运行时,数据存放在schedule_t结构体中的stack成员中,这个栈被所有协程共享。当协程挂起时会将数据从schedule_t中的stack成员拷贝到coroutine_t中的stack成员,对数据进行临时存储。coroutine_t中的stack大小是根据要拷贝的数据量动态分配的,并通过coroutine_t中的cap记录了分配空间的大小,用coroutine_t中的size记录了拷贝数据的字节数(cap>=size),这样在协程恢复时就可以将临时存储的数据又拷贝回schedule_t结构体中的stack成员中。
在协程创建和删除时会调用对应的工具函数,定义如下:
/* 协程创建工具函数 */
coroutine_t *_co_new(schedule_t *sch,CoroutineFunc func,void *arg)
{
coroutine_t *co=new coroutine_t;
co->func=func;
co->arg=arg;
co->sch=sch;
co->cap=0;
co->size=0;
co->stack=NULL;
co->state=COROUTINE_READY;
return co;
}
/* 协程删除工具函数 */
void _co_delete(coroutine_t *co)
{
free(co->stack);
delete co;
}
调度器创建关闭函数如下所示,主要负责协程池的内存分配及释放。
schedule_t *schedule_open()
{
schedule_t *sch=new schedule_t;
sch->nco=0;
sch->cap=COROUTINE_NUM;
sch->runId=-1;
sch->coroutines=(coroutine_t **)malloc(sch->cap*sizeof(coroutine_t *));
memset(sch->coroutines,0,sch->cap*sizeof(coroutine_t *));
return sch;
}
void schedule_close(schedule_t*sch)
{
for(int i=0;i<sch->cap;++i)
{
coroutine_t *co=sch->coroutines[i];
if(co)
{
_co_delete(co);
}
}
free(sch->coroutines);
sch->coroutines=NULL;
delete sch;
}
协程创建函数定义如下,主要是新建一个协程,并将其放在协程池中的一个空闲位置。当协程池中所有协程都在运行时,会进行扩容。
int coroutine_create(schedule_t *sch,CoroutineFunc func,void *arg)
{
/* 创建一个协程 */
coroutine_t *co=_co_new(sch,func,arg);
if(sch->nco>=sch->cap)
{
/* 进行扩容 */
int id=sch->cap;
sch->coroutines=(coroutine_t **)realloc(sch->coroutines,sizeof(coroutine_t *)*sch->cap*2);
memset(sch->coroutines+sch->cap,0,sizeof(coroutine_t *)*sch->cap);
sch->coroutines[id]=co;
sch->cap*=2;
++sch->nco;
return id;
}
else
{
/* 找到一个NULL位置 */
for(int i=0;i<sch->cap;++i)
{
int id=(i+sch->nco)%sch->cap;
if(NULL==sch->coroutines[id])
{
sch->coroutines[id]=co;
sch->nco++;
return id;
}
}
}
assert(0);
return -1;
}
协程挂起函数如下所示:
static void _save_stack(coroutine_t *co,char *top)
{
char dummy=0;
assert(top-&dummy<DEAFAULT_STACK_SIZE);
if(co->cap<top-&dummy)
{
free(co->stack);
co->cap=top-&dummy;
co->stack=(char *)malloc(co->cap);
}
co->size=top-&dummy;
memcpy(co->stack,&dummy,co->size);
}
int coroutine_yield(schedule_t *sch)
{
assert(sch->runId>=0);
int id=sch->runId;
coroutine_t *co=sch->coroutines[id];
assert((char *)&co>sch->stack);
/* 保存栈内容 */
_save_stack(co,sch->stack+DEAFAULT_STACK_SIZE);
co->state=COROUTINE_SUSPEND;
sch->runId=-1;
swapcontext(&(co->ctxt),&(sch->main));
return 0;
}
协程挂起时需要将数据从共享栈拷贝到自己的栈中,这其中涉及到了程序的地址空间分布的问题,一个进程的地址空间分布如下所示:
可以看到栈空间位于地址空间的高地址部分,并且是栈时从高地址向低地址增长的。协程栈也是类似的,其中sch->stack是共享栈的首地址,DEAFAULT _STACK_SIZE是共享栈的大小。数据在栈中是按照高地址向低地址方向增加的,co->size是数据容量。
当协程调用coroutine_yield挂起时,会使用_save_stack函数将共享栈内容保存在自己的栈中。其中用到了一个小技巧:定义dummy变量,定义以后dummy便是栈中的最新数据,通过co->size=top-&dummy就可以得到栈中的数据容量了。
协程启动/恢复函数定义如下:
static void mainfunc(u_int32_t low32,u_int32_t hi32)
{
uintptr_t ptr=uintptr_t(low32)|(uintptr_t(hi32)<<32);
schedule_t *sch=(schedule_t *)ptr;
int id=sch->runId;
coroutine_t *co=sch->coroutines[id];
co->func(sch,co->arg);
_co_delete(co);
sch->coroutines[id]=NULL;
sch->nco--;
sch->runId=-1;
}
int coroutine_resume(schedule_t *sch,int id)
{
assert(sch->runId==-1);
assert(id>=0&&id<sch->cap);
coroutine_t *co=sch->coroutines[id];
if(NULL==co)
{
return -1;
}
switch (co->state)
{
case COROUTINE_READY:
{
getcontext(&(co->ctxt));
co->ctxt.uc_link=&(sch->main);
/* 设置共享栈 */
co->ctxt.uc_stack.ss_flags=0;
co->ctxt.uc_stack.ss_sp=sch->stack;
co->ctxt.uc_stack.ss_size=DEAFAULT_STACK_SIZE;
sch->runId=id;
co->state=COROUTINE_RUNING;
uintptr_t ptr=uintptr_t(sch);
makecontext(&(co->ctxt),(void(*)())mainfunc,2,u_int32_t(ptr),u_int32_t(ptr>>32));
swapcontext(&(sch->main),&(co->ctxt));
break;
}
case COROUTINE_SUSPEND:
{
/* 将协程栈保存的内容拷贝到共享栈中 */
memcpy(sch->stack+DEAFAULT_STACK_SIZE-co->size,co->stack,co->size);
sch->runId=id;
co->state=COROUTINE_RUNING;
swapcontext(&(sch->main),&(co->ctxt));
break;
}
default:
{
assert(0);
return -1;
}
}
return 0;
}
和独立栈实现方式不同之处便是如果是暂停的协程,首先要将用户数据拷贝到共享栈中再进行恢复。
获取指定id的协程的状态函数如下:
CoroutineState coroutine_state(schedule_t *sch,int id)
{
assert(id>=0&&id<sch->cap);
if(NULL==sch->coroutines[id])
{
return COROUTINE_DEAD;
}
return sch->coroutines[id]->state;
}
获取当前运行协程id的函数如下:
int coroutine_runId(schedule_t *sch)
{
return sch->runId;
}
采用本协程库应用的实例如下:
#include"coroutine1.h"
#include<stdio.h>
void func(schedule_t *sch,void *arg)
{
int *num=(int *)arg;
for(int i=0;i<5;++i)
{
printf("coroutine %d:%d\n",coroutine_runId(sch),*num+i);
coroutine_yield(sch);
}
}
void coroutine_test(schedule_t *sch)
{
int num1=1,num2=100;
int id1=coroutine_create(sch,func,&num1);
int id2=coroutine_create(sch,func,&num2);
printf("main start\n");
while(coroutine_state(sch,id1)!=COROUTINE_DEAD&&coroutine_state(sch,id2)!=COROUTINE_DEAD)
{
coroutine_resume(sch,id1);
coroutine_resume(sch,id2);
}
printf("main end\n");
}
int main()
{
schedule_t *sch=schedule_open();
coroutine_test(sch);
schedule_close(sch);
}
显示结果如下所示:
本文对有栈协程的两种实现方式进行了简单介绍,共享栈实现时所有协程运行时共享一个栈,并且每个协程有自己的栈来临时存储数据,临时栈空间根据数据量进行分配,可以有效利用内存,但是缺点是每次切换都要进行数据拷贝。独立栈实现的协程切换时不需要进程数据拷贝,但是每个协程的空间大小相同且固定,所以独立栈相比共享栈的空间一般更小,这样使用时需要考虑栈的容量,并且空间利用率不高。