前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >协程-无栈协程(下)

协程-无栈协程(下)

原创
作者头像
王鹏程1990
发布2023-03-09 21:06:23
7140
发布2023-03-09 21:06:23
举报
文章被收录于专栏:源码阅读源码阅读

无栈协程库——protothread

ProtoThread源码如下所示:

代码语言:javascript
复制
#define LC_INIT(s) s = 0;

#define LC_RESUME(s) switch(s) { case 0:

#define LC_SET(s) s = __LINE__; case __LINE__:

#define LC_END(s) }

typedef unsigned short lc_t;
//用于定义一个描述protothread实例的结构体,每一个无栈协程用这个结构体进行描述
struct pt {
  lc_t lc;
};

/**
 初始化一个protothread实例,无栈协程实例,核心就是将指令标签设置为0
 */
#define PT_INIT(pt)   LC_INIT((pt)->lc)

/**
 * 这里用于定义一个protothread实例的接口,name_args是一个包含函数名和形参列表的字符串
 * 且这个接口的返回值得是char型
 */
#define PT_THREAD(name_args) char name_args

/**
 * 用于定义一个protothread的起始执行位置,其实就是在prototype前面套了一个switch
 */
#define PT_BEGIN(pt) { char PT_YIELD_FLAG = 1; LC_RESUME((pt)->lc)

/**
 * 用于界定protothread的终止位置,就是在后面加了一个},并对结构体进行初始化
 */
#define PT_END(pt) LC_END((pt)->lc); PT_YIELD_FLAG = 0; \
                   PT_INIT(pt); return PT_ENDED; }

/**
 *阻塞直到条件为true,实际应用中返回PT_WAITING表示当前进程阻塞让出执行权,其他表示未被阻塞继续执行
 */
#define PT_WAIT_UNTIL(pt, condition)	        \
  do {						\
    LC_SET((pt)->lc);				\
    if(!(condition)) {				\
      return PT_WAITING;			\
    }						\
  } while(0)

/**
 * 调度一个prototype协程,当返回PT_WAITING,表示调度器阻塞了,让出执行权限给里面的进程
 */
#define PT_SCHEDULE(f) ((f) == PT_WAITING)

/**
 * 让出执行权限,本质上就是在让出位置打一个标签,并直接return,把执行权限交给主调接口
 */
#define PT_YIELD(pt)				\
  do {						\
    PT_YIELD_FLAG = 0;				\
    LC_SET((pt)->lc);				\
    if(PT_YIELD_FLAG == 0) {			\
      return PT_YIELDED;			\
    }						\
  } while(0)

/**
 */
#define PT_YIELD_UNTIL(pt, cond)		\
  do {						\
    PT_YIELD_FLAG = 0;				\
    LC_SET((pt)->lc);				\
    if((PT_YIELD_FLAG == 0) || !(cond)) {	\
      return PT_YIELDED;			\
    }						\
  } while(0)

如上述代码段所示:

    ·protothread使用结构体struct pt描述一个协程,协程里面含有lc_t类型成员变量,本质上是一个unsigned short类型

    ·整个PT协程,在创建之前需要调用PT_INIT进行初始化,初始化之后调用PT_BEGIN拉起协程,协程运行完毕之后调用PT_END关闭协程

·ProtoThread通过PT_THREAD封装协程执行接口

·ProtoThread调用PT_WAIT_UNTIL阻塞,直到condition为true。在这里若是condition为false,表示不满足条件,直接通过return交出执行权限;在交出执行权限之前,调用LC_SET,查看LC_SET的代码,看到这里我们看PT是通过记录行号给源码打标签

·ProtoThread通过宏PT_SCHEDULE来实现协程的调度,通常调用PT_SCHEDULE的是主控协程,主控协程决策调度哪个协程之后通过PT_SCHEDULE进行调度

    我们尝试用ProtoThread写一个多玩家登陆的代码,如下:

代码语言:javascript
复制
#include "pt.h"
struct MessageBuffer {
    int change_flag;
    string content;
} g_message_buffer;

typedef struct RoleData {
    int id;
    int step;
    string name;   
    pt  thread_inst_pt;
    int recv_message;
} tagRoleData;

std::map<std::string,RoleData> g_role_set;

void Timer() {
    printf("timer work\n");
    return;
}

MessageBuffer recv_message() {
    MessageBuffer msg = g_message_buffer;
    reset_message();
    return msg;
}

int receive_message(tagRoleData& data) {
    if(data.recv_message > 0) {
        data.recv_message = 0;
        return 1;
    }
    return 0;
}

int process_online_data(tagRoleData& data){
    printf("process online  name[%s] current step[%d]\n",data.name.c_str(),data.step);
    data.step += 1;
    return 0;
}

int process_profile_data(tagRoleData& data){
    printf("process profile  name[%s] current step[%d]\n",data.name.c_str(),data.step);
    data.step += 1;
    return 0;
}

static PT_THREAD(login_thread(tagRoleData& data)) {
    PT_BEGIN(&data.thread_inst_pt);
    while(data.step < 4) {
        PT_WAIT_UNTIL(&data.thread_inst_pt, receive_message(data));
        process_online_data(data);
        PT_WAIT_UNTIL(&data.thread_inst_pt, receive_message(data));
        process_profile_data(data);
    }
    PT_EXIT(&data.thread_inst_pt);
    PT_END(&data.thread_inst_pt);
}

fd_set fds;
struct timeval tv;
static char c[100] = {0};
static PT_THREAD(network_thread(struct pt *pt))
{
    FD_ZERO(&fds);
    FD_SET(0,&fds);
    PT_BEGIN(pt);
    while(1) {
        PT_WAIT_UNTIL(pt,select(1,&fds,NULL,NULL,&tv) > 0);
        read(0,c,100);
        g_message_buffer.content = string(c);
        g_message_buffer.change_flag = 1;
        if (g_role_set.find(g_message_buffer.content) == g_role_set.end()){
            RoleData role_data;
            role_data.step = 0;
            role_data.name = g_message_buffer.content;
            PT_INIT(&role_data.thread_inst_pt);
            g_role_set.insert(std::pair<std::string,RoleData>(g_message_buffer.content,role_data));
        }
        std::map<std::string,RoleData>::iterator role_iter = g_role_set.find(g_message_buffer.content);
        role_iter->second.recv_message = 1;
        PT_SCHEDULE(login_thread(role_iter->second));
    }
    PT_EXIT(pt);
    PT_END(pt);
}

static struct timer codelock_timer, input_timer;
static PT_THREAD(timer_thread(struct pt *pt))
{
    PT_BEGIN(pt);
    timer_set(&input_timer, 1000);
    PT_WAIT_UNTIL(pt, timer_expired(&input_timer));
    PT_EXIT(pt);
    PT_END(pt);
}

static struct pt network_thread_pt;
static struct pt timer_thread_pt;
void Proc() {
    PT_INIT(&network_thread_pt);
    while(PT_SCHEDULE(network_thread(&network_thread_pt))) {
        PT_SCHEDULE(timer_thread(&timer_thread_pt));
        sleep(1);
    } 
}

int main() {
    Proc();
    return 0;
}

这其中:

·代码中定义了三个执行单元,一个是network_thread网络协程,一个是timer_thread定时协程,一个是login_thread登录协程;

·其中timer_thread协程负责定时器任务,network_thread负责消息接收并根据消息头拉起对应的登录协程login_thread,而login_thread对应不同的登录实体的登录行为;

·network_thread协程,PT_WAIT_UNTIL会“阻塞”直到文件句柄直到可读(这里我们用标准输入进行替代以便于验证);

·当读到消息之后,对于未开启流程的玩家创建一个协程,其他的则调度对应的协程(PT_SCHEDULE(login_thread(role_iter->second)))继续往后走;

·对于登录协程。需要多步通信过程,一个是需要等待取在线数据并处理(process_online_data),一个是需要取角色数据并处理(process_profile_data);

·在本例中,我们在RoleData中封装了pt类型的成员变量thread_inst_pt用于缓存协程的状态信息,而外层用name->RoleData的映射关系管理协程及其他协程中间态数据;

    需要注意的是——以protothread来说:

·对于无栈协程来说,因为不存在指针等信息,所以无栈协程的所有信息是可以缓存在共享内存的,因此进程可以通过共享内存在重启的环境下,也不会导致协程中断;

·但是这种恢复也是有条件的,在protothread中是用行号进行协程恢复,若是用到协程的源文件的行号出现改变,则可能执行错乱,如下所示,假设中断前宏扩展后执行序列如下:

代码语言:javascript
复制
switch(Line){
    case 0:{
        state1-1;
        s=Line1-2;
    }
    case Line1-2:{
        if(!cond){
            return;
        }
        state1-3;
        s=Line1-3
    }
    case Line1-3:{
        if(!cond){
            return;
        }
        state1-4;
    }
}

    当源码修改之后,可能宏扩展之后代码就变为:

代码语言:javascript
复制
switch(Line){
    case 0:{
        state2-1;
        s=Line2-2;
    }
    case Line2-2:{
        if(!cond){
            return;
        }
        state2-3;
        s=Line2-3
    }
    case Line2-3:{
        if(!cond){
            return;
        }
        state2-4;
    }
}

    当Line1-xx和Line2-xx不相等的时候,会重新调度进来就会找不到行号了,引发执行流程错乱(所以在使用这类库的时候,应该将函数的实现和协程主流程分开,以避免因为逻辑修改导致协程不可恢复的场景);

    对于无栈协程来说,执行流的恢复只是通过找到下一条指令的执行地址,但是不包括上下文,这意味着无栈协程里面不能有局部变量,需要我们手动把后面需要用到的局部变量缓存起来。

    此外这里无栈协程是通过switch-case实现的,嵌套的switch-case会产生问题,限制比较多,所以也不适用于线上场景。

Label As Value

标签变量(labels as values)是GCC对C语言的扩展,是指我们可以通过操作符&&得到当前函数中定义的标签地址,这个值的类型是void*,并且是常量,我们可以在任何可以使用这个类型的常量处使用;如下:

代码语言:javascript
复制
#include "stdio.h"
void* ptr = NULL;
int Test()
{
    printf("local:%d,global:%d:global2:%d\n",&&test_local,&&test_global,&&test_global2);
    if(NULL == ptr) {
         printf("here\n");
         ptr = &&test_local;
    }
    goto *ptr;
    test_local:
        ptr = &&test_global;
        printf("local test %d\n",ptr);
        return 0;
    test_global:
        ptr = &&test_global2;
        printf("global test\n");
        return 0;
    test_global2:
        ptr = &&test_local;
        printf("global2 test\n");
        return 0;
    return 0;
}
int main()
{
    Test();
    Test();
    Test();
    return 0;
}

   执行完毕后有如下执行结果:

受此启发,我们对protothread进行修改,可以得到如下代码:

代码语言:javascript
复制
typedef void * lc_t;

#define LC_CONCAT2(s1, s2) s1##s2
#define LC_CONCAT(s1, s2) LC_CONCAT2(s1, s2)

#define LC_RESUME(s)                            \
  do {                                          \
    if(s != NULL) {                             \
      goto *s;                                  \
    }                                           \
  } while(0)

#define LC_SET(s,label)                         \
  do {                                          \
    LC_CONCAT(label, __LINE__):                 \
    (s) = &&LC_CONCAT(label, __LINE__); \
  } while(0)
//block until 
#define PT_WAIT_UNTIL(pt, label, condition)             \
  do {                                          \
    LC_SET((pt)->lc, label);                            \
    if(!(condition)) {                          \
      return PT_WAITING;                        \
    }                                           \
  } while(0)

 对于库文件的改造:

·阻塞命令PT_WAIT_UNTIL新增标签字段label,当阻塞时,我们不仅指明解除阻塞所需满足的条件,也指明解除阻塞后要执行的代码段

·调度的指令LC_RESUME,则是根据标签的地址直接跳转的对应代码去执行goto *s

    则最终代码的使用样式如下:

代码语言:javascript
复制
static
PT_THREAD(login_thread(tagRoleData& data))
{
    PT_BEGIN(&data.thread_inst_pt);
    while(data.step < 4) {
        PT_WAIT_UNTIL(&data.thread_inst_pt, online_label, receive_message(data));
    online_label:    process_online_data(data);
        PT_WAIT_UNTIL(&data.thread_inst_pt,profile_label, receive_message(data));
    profile_label:    process_profile_data(data);
        PT_WAIT_UNTIL(&data.thread_inst_pt, online_label2, receive_message(data));
    online_label2:    process_online_data(data);
        PT_WAIT_UNTIL(&data.thread_inst_pt, online_label3, receive_message(data));
    online_label3:    process_online_data(data);
    }
    PT_EXIT(&data.thread_inst_pt);
    PT_END(&data.thread_inst_pt);
}

    从这段代码可以看到:

·每段接口执行完毕后,都阻塞等待对应条件满足,并指明了阻塞解除后要执行的代码通过标签的形式展示出来。

问题

    上述采取标签的形式还是解决不了重启后协程恢复的问题,因为标签在内存中的位置会在重新编译的时候地址出现变化,我们遵循标签的修改方式对原先的基于行号的代码进行修改,如下:

代码语言:javascript
复制
#define LC_SET(s, evt_id) s = evt_id; case evt_id:
#define PT_WAIT_UNTIL(pt, evt_id, condition)            \
  do {                                          \
    LC_SET((pt)->lc, evt_id);                           \
    if(!(condition)) {                          \
      return PT_WAITING;                        \
    }                                           \
  } while(0)

    业务方可以通过如下形式进行使用:

代码语言:javascript
复制
using namespace std;

struct MessageBuffer {
    int change_flag;
    string content;
} g_message_buffer;

typedef struct RoleData {
    int id;
    int step;
    string name;   
    pt  thread_inst_pt;
    int recv_message;
} tagRoleData;

std::map<std::string,RoleData> g_role_set;

int receive_message(tagRoleData& data) {
    if(data.recv_message > 0) {
        data.recv_message = 0;
        return 1;
    }
    return 0;
}

int process_online_data(tagRoleData& data){
    printf("process online  name[%s] current step[%d]\n",data.name.c_str(),data.step);
    data.step += 1;
    return 0;
}

int process_profile_data(tagRoleData& data){
    printf("process profile  name[%s] current step[%d]\n",data.name.c_str(),data.step);
    data.step += 1;
    return 0;
}

#define MSG_ONLINE_RSP 1
#define MSG_PROFILE_RSP 2
#define MSG_ONLINE_RSP_2 3
#define MSG_ONLINE_RSP_3 4
static PT_THREAD(login_thread(tagRoleData& data))
{
    PT_BEGIN(&data.thread_inst_pt);
    while(data.step < 4) {
        PT_WAIT_UNTIL(&data.thread_inst_pt, MSG_ONLINE_RSP, receive_message(data));
        process_online_data(data);
        PT_WAIT_UNTIL(&data.thread_inst_pt, MSG_PROFILE_RSP, receive_message(data));
        process_profile_data(data);
        PT_WAIT_UNTIL(&data.thread_inst_pt, MSG_ONLINE_RSP_2, receive_message(data));
        process_online_data(data);
        PT_WAIT_UNTIL(&data.thread_inst_pt, MSG_ONLINE_RSP_3, receive_message(data));
        process_online_data(data);
    }
    PT_EXIT(&data.thread_inst_pt);
    PT_END(&data.thread_inst_pt);
}


#define NETWORK_EVTID 200
fd_set fds;
struct timeval tv;
static PT_THREAD(network_thread(struct pt *pt))
{
    FD_ZERO(&fds);
    FD_SET(0,&fds);

    PT_BEGIN(pt);
    tv.tv_sec=0;
    tv.tv_usec=0;
    while(1) {
        PT_WAIT_UNTIL(pt, NETWORK_EVTID, select(1,&fds,NULL,NULL,&tv) > 0);
        read(0,d,100);
        memcpy(c,d,strlen(d)-1);
        g_message_buffer.content = string(c);
        g_message_buffer.change_flag = 1;
        if (g_role_set.find(g_message_buffer.content) == g_role_set.end()){
            RoleData role_data;
            role_data.step = 0;
            role_data.name = g_message_buffer.content;
            PT_INIT(&role_data.thread_inst_pt);
            g_role_set.insert(std::pair<std::string,RoleData>(g_message_buffer.content,role_data));
        }
        std::map<std::string,RoleData>::iterator role_iter = g_role_set.find(g_message_buffer.content);
        role_iter->second.recv_message = 1;
        PT_SCHEDULE(login_thread(role_iter->second));
    }
    PT_EXIT(pt);
    PT_END(pt);
}

#define TIMER_EVTID  100 
static PT_THREAD(timer_thread(struct pt *pt))
{
    PT_BEGIN(pt);
    timer_set(&input_timer, 1000);
    PT_WAIT_UNTIL(pt, TIMER_EVTID, timer_expired(&input_timer));
    PT_EXIT(pt);
    PT_END(pt);
}

static struct pt network_thread_pt;
static struct pt timer_thread_pt;
void Proc() {
    PT_INIT(&network_thread_pt);
    while(PT_SCHEDULE(network_thread(&network_thread_pt))) {
        PT_SCHEDULE(timer_thread(&timer_thread_pt));
        sleep(1);
    } 
}

int main() {
    Proc();
    return 0;
}

    由此可见:

·我们可以在协程让出执行权限的时候,指明要等待的事件,如PT_WAIT_UNTIL(pt, evt_id, condition)所示

·其他的如之前所示,在阻塞分支之前会按照等待的事件ID,新增一个case分支

·因为标签是我们自定义的,不会因为程序的重新编译发生变化,所以重启不会影响协程的恢复和执行

参考资料

函数调用过程

ucontext manual pages

swapcontext() — Save and restore user context

云风协程库源码

编程沉思录——云风协程库源码分析

编程沉思录——libco源码分析

libco源码地址

libco性能对比

达夫设备

Label As Values标签变量

ucontext族函数的使用及原理分析

FSTENV

Intel x86 MXCSR Register

SSE-维基百科

libco源码注释版

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 无栈协程库——protothread
  • Label As Value
  • 问题
  • 参考资料
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档