ProtoThread源码如下所示:
#define LC_INIT(s) s = 0;
#define LC_RESUME(s) switch(s) { case 0:
#define LC_SET(s) s = __LINE__; case __LINE__:
#define LC_END(s) }
typedef unsigned short lc_t;
//用于定义一个描述protothread实例的结构体,每一个无栈协程用这个结构体进行描述
struct pt {
lc_t lc;
};
/**
初始化一个protothread实例,无栈协程实例,核心就是将指令标签设置为0
*/
#define PT_INIT(pt) LC_INIT((pt)->lc)
/**
* 这里用于定义一个protothread实例的接口,name_args是一个包含函数名和形参列表的字符串
* 且这个接口的返回值得是char型
*/
#define PT_THREAD(name_args) char name_args
/**
* 用于定义一个protothread的起始执行位置,其实就是在prototype前面套了一个switch
*/
#define PT_BEGIN(pt) { char PT_YIELD_FLAG = 1; LC_RESUME((pt)->lc)
/**
* 用于界定protothread的终止位置,就是在后面加了一个},并对结构体进行初始化
*/
#define PT_END(pt) LC_END((pt)->lc); PT_YIELD_FLAG = 0; \
PT_INIT(pt); return PT_ENDED; }
/**
*阻塞直到条件为true,实际应用中返回PT_WAITING表示当前进程阻塞让出执行权,其他表示未被阻塞继续执行
*/
#define PT_WAIT_UNTIL(pt, condition) \
do { \
LC_SET((pt)->lc); \
if(!(condition)) { \
return PT_WAITING; \
} \
} while(0)
/**
* 调度一个prototype协程,当返回PT_WAITING,表示调度器阻塞了,让出执行权限给里面的进程
*/
#define PT_SCHEDULE(f) ((f) == PT_WAITING)
/**
* 让出执行权限,本质上就是在让出位置打一个标签,并直接return,把执行权限交给主调接口
*/
#define PT_YIELD(pt) \
do { \
PT_YIELD_FLAG = 0; \
LC_SET((pt)->lc); \
if(PT_YIELD_FLAG == 0) { \
return PT_YIELDED; \
} \
} while(0)
/**
*/
#define PT_YIELD_UNTIL(pt, cond) \
do { \
PT_YIELD_FLAG = 0; \
LC_SET((pt)->lc); \
if((PT_YIELD_FLAG == 0) || !(cond)) { \
return PT_YIELDED; \
} \
} while(0)
如上述代码段所示:
·protothread使用结构体struct pt描述一个协程,协程里面含有lc_t类型成员变量,本质上是一个unsigned short类型
·整个PT协程,在创建之前需要调用PT_INIT进行初始化,初始化之后调用PT_BEGIN拉起协程,协程运行完毕之后调用PT_END关闭协程
·ProtoThread通过PT_THREAD封装协程执行接口
·ProtoThread调用PT_WAIT_UNTIL阻塞,直到condition为true。在这里若是condition为false,表示不满足条件,直接通过return交出执行权限;在交出执行权限之前,调用LC_SET,查看LC_SET的代码,看到这里我们看PT是通过记录行号给源码打标签
·ProtoThread通过宏PT_SCHEDULE来实现协程的调度,通常调用PT_SCHEDULE的是主控协程,主控协程决策调度哪个协程之后通过PT_SCHEDULE进行调度
我们尝试用ProtoThread写一个多玩家登陆的代码,如下:
#include "pt.h"
struct MessageBuffer {
int change_flag;
string content;
} g_message_buffer;
typedef struct RoleData {
int id;
int step;
string name;
pt thread_inst_pt;
int recv_message;
} tagRoleData;
std::map<std::string,RoleData> g_role_set;
void Timer() {
printf("timer work\n");
return;
}
MessageBuffer recv_message() {
MessageBuffer msg = g_message_buffer;
reset_message();
return msg;
}
int receive_message(tagRoleData& data) {
if(data.recv_message > 0) {
data.recv_message = 0;
return 1;
}
return 0;
}
int process_online_data(tagRoleData& data){
printf("process online name[%s] current step[%d]\n",data.name.c_str(),data.step);
data.step += 1;
return 0;
}
int process_profile_data(tagRoleData& data){
printf("process profile name[%s] current step[%d]\n",data.name.c_str(),data.step);
data.step += 1;
return 0;
}
static PT_THREAD(login_thread(tagRoleData& data)) {
PT_BEGIN(&data.thread_inst_pt);
while(data.step < 4) {
PT_WAIT_UNTIL(&data.thread_inst_pt, receive_message(data));
process_online_data(data);
PT_WAIT_UNTIL(&data.thread_inst_pt, receive_message(data));
process_profile_data(data);
}
PT_EXIT(&data.thread_inst_pt);
PT_END(&data.thread_inst_pt);
}
fd_set fds;
struct timeval tv;
static char c[100] = {0};
static PT_THREAD(network_thread(struct pt *pt))
{
FD_ZERO(&fds);
FD_SET(0,&fds);
PT_BEGIN(pt);
while(1) {
PT_WAIT_UNTIL(pt,select(1,&fds,NULL,NULL,&tv) > 0);
read(0,c,100);
g_message_buffer.content = string(c);
g_message_buffer.change_flag = 1;
if (g_role_set.find(g_message_buffer.content) == g_role_set.end()){
RoleData role_data;
role_data.step = 0;
role_data.name = g_message_buffer.content;
PT_INIT(&role_data.thread_inst_pt);
g_role_set.insert(std::pair<std::string,RoleData>(g_message_buffer.content,role_data));
}
std::map<std::string,RoleData>::iterator role_iter = g_role_set.find(g_message_buffer.content);
role_iter->second.recv_message = 1;
PT_SCHEDULE(login_thread(role_iter->second));
}
PT_EXIT(pt);
PT_END(pt);
}
static struct timer codelock_timer, input_timer;
static PT_THREAD(timer_thread(struct pt *pt))
{
PT_BEGIN(pt);
timer_set(&input_timer, 1000);
PT_WAIT_UNTIL(pt, timer_expired(&input_timer));
PT_EXIT(pt);
PT_END(pt);
}
static struct pt network_thread_pt;
static struct pt timer_thread_pt;
void Proc() {
PT_INIT(&network_thread_pt);
while(PT_SCHEDULE(network_thread(&network_thread_pt))) {
PT_SCHEDULE(timer_thread(&timer_thread_pt));
sleep(1);
}
}
int main() {
Proc();
return 0;
}
这其中:
·代码中定义了三个执行单元,一个是network_thread网络协程,一个是timer_thread定时协程,一个是login_thread登录协程;
·其中timer_thread协程负责定时器任务,network_thread负责消息接收并根据消息头拉起对应的登录协程login_thread,而login_thread对应不同的登录实体的登录行为;
·network_thread协程,PT_WAIT_UNTIL会“阻塞”直到文件句柄直到可读(这里我们用标准输入进行替代以便于验证);
·当读到消息之后,对于未开启流程的玩家创建一个协程,其他的则调度对应的协程(PT_SCHEDULE(login_thread(role_iter->second)))继续往后走;
·对于登录协程。需要多步通信过程,一个是需要等待取在线数据并处理(process_online_data),一个是需要取角色数据并处理(process_profile_data);
·在本例中,我们在RoleData中封装了pt类型的成员变量thread_inst_pt用于缓存协程的状态信息,而外层用name->RoleData的映射关系管理协程及其他协程中间态数据;
需要注意的是——以protothread来说:
·对于无栈协程来说,因为不存在指针等信息,所以无栈协程的所有信息是可以缓存在共享内存的,因此进程可以通过共享内存在重启的环境下,也不会导致协程中断;
·但是这种恢复也是有条件的,在protothread中是用行号进行协程恢复,若是用到协程的源文件的行号出现改变,则可能执行错乱,如下所示,假设中断前宏扩展后执行序列如下:
switch(Line){
case 0:{
state1-1;
s=Line1-2;
}
case Line1-2:{
if(!cond){
return;
}
state1-3;
s=Line1-3
}
case Line1-3:{
if(!cond){
return;
}
state1-4;
}
}
当源码修改之后,可能宏扩展之后代码就变为:
switch(Line){
case 0:{
state2-1;
s=Line2-2;
}
case Line2-2:{
if(!cond){
return;
}
state2-3;
s=Line2-3
}
case Line2-3:{
if(!cond){
return;
}
state2-4;
}
}
当Line1-xx和Line2-xx不相等的时候,会重新调度进来就会找不到行号了,引发执行流程错乱(所以在使用这类库的时候,应该将函数的实现和协程主流程分开,以避免因为逻辑修改导致协程不可恢复的场景);
对于无栈协程来说,执行流的恢复只是通过找到下一条指令的执行地址,但是不包括上下文,这意味着无栈协程里面不能有局部变量,需要我们手动把后面需要用到的局部变量缓存起来。
此外这里无栈协程是通过switch-case实现的,嵌套的switch-case会产生问题,限制比较多,所以也不适用于线上场景。
标签变量(labels as values)是GCC对C语言的扩展,是指我们可以通过操作符&&得到当前函数中定义的标签地址,这个值的类型是void*,并且是常量,我们可以在任何可以使用这个类型的常量处使用;如下:
#include "stdio.h"
void* ptr = NULL;
int Test()
{
printf("local:%d,global:%d:global2:%d\n",&&test_local,&&test_global,&&test_global2);
if(NULL == ptr) {
printf("here\n");
ptr = &&test_local;
}
goto *ptr;
test_local:
ptr = &&test_global;
printf("local test %d\n",ptr);
return 0;
test_global:
ptr = &&test_global2;
printf("global test\n");
return 0;
test_global2:
ptr = &&test_local;
printf("global2 test\n");
return 0;
return 0;
}
int main()
{
Test();
Test();
Test();
return 0;
}
执行完毕后有如下执行结果:
受此启发,我们对protothread进行修改,可以得到如下代码:
typedef void * lc_t;
#define LC_CONCAT2(s1, s2) s1##s2
#define LC_CONCAT(s1, s2) LC_CONCAT2(s1, s2)
#define LC_RESUME(s) \
do { \
if(s != NULL) { \
goto *s; \
} \
} while(0)
#define LC_SET(s,label) \
do { \
LC_CONCAT(label, __LINE__): \
(s) = &&LC_CONCAT(label, __LINE__); \
} while(0)
//block until
#define PT_WAIT_UNTIL(pt, label, condition) \
do { \
LC_SET((pt)->lc, label); \
if(!(condition)) { \
return PT_WAITING; \
} \
} while(0)
对于库文件的改造:
·阻塞命令PT_WAIT_UNTIL新增标签字段label,当阻塞时,我们不仅指明解除阻塞所需满足的条件,也指明解除阻塞后要执行的代码段
·调度的指令LC_RESUME,则是根据标签的地址直接跳转的对应代码去执行goto *s
则最终代码的使用样式如下:
static
PT_THREAD(login_thread(tagRoleData& data))
{
PT_BEGIN(&data.thread_inst_pt);
while(data.step < 4) {
PT_WAIT_UNTIL(&data.thread_inst_pt, online_label, receive_message(data));
online_label: process_online_data(data);
PT_WAIT_UNTIL(&data.thread_inst_pt,profile_label, receive_message(data));
profile_label: process_profile_data(data);
PT_WAIT_UNTIL(&data.thread_inst_pt, online_label2, receive_message(data));
online_label2: process_online_data(data);
PT_WAIT_UNTIL(&data.thread_inst_pt, online_label3, receive_message(data));
online_label3: process_online_data(data);
}
PT_EXIT(&data.thread_inst_pt);
PT_END(&data.thread_inst_pt);
}
从这段代码可以看到:
·每段接口执行完毕后,都阻塞等待对应条件满足,并指明了阻塞解除后要执行的代码通过标签的形式展示出来。
上述采取标签的形式还是解决不了重启后协程恢复的问题,因为标签在内存中的位置会在重新编译的时候地址出现变化,我们遵循标签的修改方式对原先的基于行号的代码进行修改,如下:
#define LC_SET(s, evt_id) s = evt_id; case evt_id:
#define PT_WAIT_UNTIL(pt, evt_id, condition) \
do { \
LC_SET((pt)->lc, evt_id); \
if(!(condition)) { \
return PT_WAITING; \
} \
} while(0)
业务方可以通过如下形式进行使用:
using namespace std;
struct MessageBuffer {
int change_flag;
string content;
} g_message_buffer;
typedef struct RoleData {
int id;
int step;
string name;
pt thread_inst_pt;
int recv_message;
} tagRoleData;
std::map<std::string,RoleData> g_role_set;
int receive_message(tagRoleData& data) {
if(data.recv_message > 0) {
data.recv_message = 0;
return 1;
}
return 0;
}
int process_online_data(tagRoleData& data){
printf("process online name[%s] current step[%d]\n",data.name.c_str(),data.step);
data.step += 1;
return 0;
}
int process_profile_data(tagRoleData& data){
printf("process profile name[%s] current step[%d]\n",data.name.c_str(),data.step);
data.step += 1;
return 0;
}
#define MSG_ONLINE_RSP 1
#define MSG_PROFILE_RSP 2
#define MSG_ONLINE_RSP_2 3
#define MSG_ONLINE_RSP_3 4
static PT_THREAD(login_thread(tagRoleData& data))
{
PT_BEGIN(&data.thread_inst_pt);
while(data.step < 4) {
PT_WAIT_UNTIL(&data.thread_inst_pt, MSG_ONLINE_RSP, receive_message(data));
process_online_data(data);
PT_WAIT_UNTIL(&data.thread_inst_pt, MSG_PROFILE_RSP, receive_message(data));
process_profile_data(data);
PT_WAIT_UNTIL(&data.thread_inst_pt, MSG_ONLINE_RSP_2, receive_message(data));
process_online_data(data);
PT_WAIT_UNTIL(&data.thread_inst_pt, MSG_ONLINE_RSP_3, receive_message(data));
process_online_data(data);
}
PT_EXIT(&data.thread_inst_pt);
PT_END(&data.thread_inst_pt);
}
#define NETWORK_EVTID 200
fd_set fds;
struct timeval tv;
static PT_THREAD(network_thread(struct pt *pt))
{
FD_ZERO(&fds);
FD_SET(0,&fds);
PT_BEGIN(pt);
tv.tv_sec=0;
tv.tv_usec=0;
while(1) {
PT_WAIT_UNTIL(pt, NETWORK_EVTID, select(1,&fds,NULL,NULL,&tv) > 0);
read(0,d,100);
memcpy(c,d,strlen(d)-1);
g_message_buffer.content = string(c);
g_message_buffer.change_flag = 1;
if (g_role_set.find(g_message_buffer.content) == g_role_set.end()){
RoleData role_data;
role_data.step = 0;
role_data.name = g_message_buffer.content;
PT_INIT(&role_data.thread_inst_pt);
g_role_set.insert(std::pair<std::string,RoleData>(g_message_buffer.content,role_data));
}
std::map<std::string,RoleData>::iterator role_iter = g_role_set.find(g_message_buffer.content);
role_iter->second.recv_message = 1;
PT_SCHEDULE(login_thread(role_iter->second));
}
PT_EXIT(pt);
PT_END(pt);
}
#define TIMER_EVTID 100
static PT_THREAD(timer_thread(struct pt *pt))
{
PT_BEGIN(pt);
timer_set(&input_timer, 1000);
PT_WAIT_UNTIL(pt, TIMER_EVTID, timer_expired(&input_timer));
PT_EXIT(pt);
PT_END(pt);
}
static struct pt network_thread_pt;
static struct pt timer_thread_pt;
void Proc() {
PT_INIT(&network_thread_pt);
while(PT_SCHEDULE(network_thread(&network_thread_pt))) {
PT_SCHEDULE(timer_thread(&timer_thread_pt));
sleep(1);
}
}
int main() {
Proc();
return 0;
}
由此可见:
·我们可以在协程让出执行权限的时候,指明要等待的事件,如PT_WAIT_UNTIL(pt, evt_id, condition)所示
·其他的如之前所示,在阻塞分支之前会按照等待的事件ID,新增一个case分支
·因为标签是我们自定义的,不会因为程序的重新编译发生变化,所以重启不会影响协程的恢复和执行
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。