SRS是一个开源流媒体服务器,在目前大火的直播行业中较多的被使用。笔者作为直播行业的后台开发,对SRS的学习必不可少,本文主要讲解SRS底层使用的微线程开源框架StateThreads。
微线程也叫协程,是在用户态实现的,一般是非剥夺式的。微线程通常通过主动调用yield来放弃执行,进而其他微线程可以被调度执行。通过yield方式转移执行权的微线程之间不是调用者与被调用者的关系,而是彼此对称、平等的。
微线程为了保证互不影响的独立运行,需要自己的私有栈空间,类比多线程中每个线程都有自己的栈空间。为了在单进程中实现多个微线程的并发运行,需要保存每个微线程的寄存器上下文信息,并调用上下文切换函数进行切换,常见的切换函数有setjmp/longjmp、ucontex中的makecontext/swapcontext,有些框架为了提高切换上下文的性能,也会自己使用汇编代码实现相应功能的函数。
常见的寄存器如下图表格所示,其中在微线程框架中通常使用到的是 堆栈顶指针 寄存器,堆栈顶指针指向了代码运行使用到的栈的可用位置。通过修改微线程上下文信息(比如jmp_buf)中的RSP可以达到不同微线程运行在不同栈空间的目的。
寄存器 | 16位 | 32位 | 64位 |
---|---|---|---|
累加寄存器 | AX | EAX | RAX |
基址寄存器 | BX | EBX | RBX |
计数寄存器 | CX | ECX | RCX |
数据寄存器 | DX | EDX | RDX |
堆栈基指针 | BP | EBP | RBP |
变址寄存器 | SI | ESI | RSI |
堆栈顶指针 | SP | ESP | RSP |
指令寄存器 | IP | EIP | RIP |
框架的全局信息保存在结构体_st_vp_t中。_st_vp_t中保存了:idle微线程指针,runable微线程队列,IO等待微线程队列,zombie微线程队列,sleep定时器最小堆等。_st_vp_t数据结构的定义如下代码所示:
typedef struct _st_vp {
_st_thread_t *idle_thread; /* Idle thread for this vp */
st_utime_t last_clock; /* The last time we went into vp_check_clock() */
_st_clist_t run_q; /* run queue for this vp */
_st_clist_t io_q; /* io queue for this vp */
_st_clist_t zombie_q; /* zombie queue for this vp */
#ifdef DEBUG
_st_clist_t thread_q; /* all threads of this vp */
#endif
int pagesize;
_st_thread_t *sleep_q; /* sleep queue for this vp */
int sleepq_size; /* number of threads on sleep queue */
#ifdef ST_SWITCH_CB
st_switch_cb_t switch_out_cb; /* called when a thread is switched out */
st_switch_cb_t switch_in_cb; /* called when a thread is switched in */
#endif
} _st_vp_t;
其中重要的信息说明如下:
idle微线程(idle_thread):idle微线程是框架创建的微线程,只有在没有其他微线程可以调度时,才会被调度执行。主要任务是调用IO多路复用函数等待IO事件和处理定时器。
runable微线程队列(run_q):可运行的微线程,等待框架调度即可运行。
IO等待微线程队列(io_q):当微线程需要等待IO事件时,会被放到IO等待队列中。当等待的IO事件发生 或者 超时 或者 被中断时,会从IO等待队列中移除并加入到runable队列中。
zombie微线程队列(zombie_q):当微线程结束时,如果设置了joinable,即需要其他微线程‘收尸’,就会添加到zombie队列。
sleep最小堆(sleep_q):数据结构为完全二叉树组织的最小堆结构,当微线程设置了定时器时,就会根据超时时间添加到树中。
框架的整体结构如下图所示:
st_thread_t结构体中保存了微线程运行的信息:微线程的状态,微线程start函数,指向微线程私有栈空间的指针,微线程的运行上线文信息(保存在jmp_buf中)等。st_thread_t结构体定义如下所示:
typedef struct _st_thread {
int state; /* Thread's state */
int flags; /* Thread's flags */
void *(*start)(void *arg); /* The start function of the thread */
void *arg; /* Argument of the start function */
void *retval; /* Return value of the start function */
_st_stack_t *stack; /* Info about thread's stack */
_st_clist_t links; /* For putting on run/sleep/zombie queue */
_st_clist_t wait_links; /* For putting on mutex/condvar wait queue */
st_utime_t due; /* Wakeup time when thread is sleeping */
_st_thread_t *left; /* For putting in timeout heap */
_st_thread_t *right; /* -- see docs/timeout_heap.txt for details */
int heap_index;
void **private_data; /* Per thread private data */
_st_cond_t *term; /* Termination condition variable for join */
jmp_buf context; /* Thread's context */
} _st_thread_t;
微线程私有栈空间和st_thread_t结构体的关系如下图所示:
其中st_stack_t指向为微线程分配的私有栈空间,私有栈空间中包括以下信息:
RedZone:在私有栈空间的前后分别有一个页大小的保护空间。如果私有栈是使用mmap()函数分配时,会调用mprotect()函数设置两块RedZone空间为读写保护,便于发现微线程使用栈空间的溢出问题。如果栈空间是使用malloc()函数分配的堆空间,则无法设置读写保护。
栈空间:微线程运行中使用到的栈空间。
Padding Space:为了保证栈空间起始位置64字节对齐,空闲出来的空间。
st_thread_t结构体:保存了微线程运行中所需的信息。
私有key映射区:类似于多线程中通过调用pthread_setspecific()和pthread_getspecific()获取和设置线程私有数据那样,StateThreads也支持为每个微线程设置私有数据。私有key映射区最大允许保存16个void*指针。
微线程运行中会有不同的状态,框架中定义了不同的数值来表示不同的状态。框架中定义的微线程状态如下代码所示:
// 微线程状态定义
#define _ST_ST_RUNNING 0 // 执行中
#define _ST_ST_RUNNABLE 1 // 可执行状态,等待调度
#define _ST_ST_IO_WAIT 2 // 等待IO事件
#define _ST_ST_LOCK_WAIT 3 // 等待互斥锁
#define _ST_ST_COND_WAIT 4 // 等待条件变量
#define _ST_ST_SLEEPING 5 // sleep
#define _ST_ST_ZOMBIE 6 // 微线程已结束,待其他微线程调用st_thread_join收尸
#define _ST_ST_SUSPENDED 7 // 暂停,只能调用st_thread_interrupt唤醒
// 微线程flag定义
#define _ST_FL_PRIMORDIAL 0x01 // 原生微线程,即不是创建的微线程,没有分配私有栈资源
#define _ST_FL_IDLE_THREAD 0x02 // 空闲处理微线程,用于调用epoll,处理定时器
#define _ST_FL_ON_SLEEPQ 0x04 // 微线程在sleep队列中,需要定时器的情况:调用st_usleep、st_cond_timedwait、st_poll等待IO事件等
#define _ST_FL_INTERRUPT 0x08 // 微线程被调用st_thread_interrupt()中断
#define _ST_FL_TIMEDOUT 0x10 // 微线程定时器超时
微线程状态说明如下:
running状态:微线程正在运行时的状态,同一时间只会有一个微线程处于该状态。
runable状态:可运行状态,此时微线程位于_st_vp_t.run_q队列中。当微线程被调度时,状态会变为running状态。
io_wait状态:微线程等待网络变为可用的状态,如调用st_read,st_write等网络操作函数,都有可能会导致微线程进入该状态。
lock_wait状态:等待st_mutex_t互斥锁变为可用的状态。
cond_wait状态:等待st_cond_t条件变量变为可用的状态。
sleeping状态:调用st_sleep()函数 并且 指定了超时时间时,会进入该状态。
zombie状态:微线程终止 并且 需要等待其他微线程“收尸”,即设置了joinable的情况,会进入该状态。
suspended状态:微线程设置永不超时,会进入该状态,如调用st_sleep()函数设置超时时间为-1。
微线程不同状态间的转换关系如下图所示:
框架中的微线程可以分为三类,分别是Primordial微线程、Idle微线程和用户微线程。
Primordial微线程用来标记运行statethreads的系统进程(更准确的说是系统线程),当Primordial微线程终止时,整个框架也会终止。Primordial微线程并不是框架创建出来的微线程,所以不需要为Primordial微线程分配私有的栈空间,但是为了保证Primordial微线程和其他微线程的行为一致性,仍需为Primordial微线程分配私有key映射区。Primordial微线程的栈空间和st_thread_t结构体的组织关系如下图所示:
Idle微线程是框架初始化时调用st_thread_create()创建出来的,是一个特殊的微线程。Idle微线程只有在框架的可执行队列_st_vp_t.run_q为空时才会调度执行。Idle微线程主要负责两件事情:1. 调用epoll等IO多路复用函数等待IO事件(参照3.8小节中IO事件处理逻辑);2.处理定时器,定时器超时时负责唤醒等待的微线程。Idle微线程执行的main函数如下:
// idle微线程执行的start函数
void *_st_idle_thread_start(void *arg)
{
_st_thread_t *me = _ST_CURRENT_THREAD();
while (_st_active_count > 0) {
/* Idle vp till I/O is ready or the smallest timeout expired */
// 调用IO多路复用函数(epoll,select等)等待IO事件发生,并处理所有发生的IO事件,
// 如果微线程等待的IO事件发生,会将微线程从io等待队列st_vp_t.io_q中移除
// 并加入到runable队列st_vp_t.run_q
_ST_VP_IDLE();
/* Check sleep queue for expired threads */
// 检查微线程设置的定时器是否超时,针对超时的微线程,将其从sleep队列移除
// 并加入到runable队列
_st_vp_check_clock();
// 交出运行权,并从runable队列st_vp_t.run_q中调度下一个微线程运行
me->state = _ST_ST_RUNNABLE;
_ST_SWITCH_CONTEXT(me);
}
/* No more threads */
exit(0);
/* NOTREACHED */
return NULL;
}
用户微线程指用户调用st_thread_create()创建的微线程。
框架初始化时主要做以下事情:初始化框架数据结构,初始化IO多路复用相关逻辑,创建idle微线程,初始化Primordial微线程。框架初始化代码如下所示:
// 框架初始化函数
int st_init(void)
{
_st_thread_t *thread;
if (_st_active_count) {
/* Already initialized */
return 0;
}
// 选择使用哪个IO多路复用函数,如epoll、select等
st_set_eventsys(ST_EVENTSYS_DEFAULT);
// 屏蔽SIGPIPE信号 并 初始化IO多路复用函数相关数据结构
if (_st_io_init() < 0)
return -1;
memset(&_st_this_vp, 0, sizeof(_st_vp_t));
// 清空runable队列、io等待队列、zombie队列
ST_INIT_CLIST(&_ST_RUNQ);
ST_INIT_CLIST(&_ST_IOQ);
ST_INIT_CLIST(&_ST_ZOMBIEQ);
// 调用IO多路复用函数对应的初始化函数
if ((*_st_eventsys->init)() < 0)
return -1;
_st_this_vp.pagesize = getpagesize();
_st_this_vp.last_clock = st_utime();
// 创建idle微线程,idle微线程主要用于调用epoll等待IO事件和处理定时器
_st_this_vp.idle_thread = st_thread_create(_st_idle_thread_start,
NULL, 0, 0);
if (!_st_this_vp.idle_thread)
return -1;
_st_this_vp.idle_thread->flags = _ST_FL_IDLE_THREAD;
_st_active_count--;
// 将idle微线程从runable队列移除,由于idle微线程只在没有微线程可以运行时,才会主动调度,
// 所以不需要加入到run队列
_ST_DEL_RUNQ(_st_this_vp.idle_thread);
// 初始化primordial微线程,primordial微线程用来标记系统进程,由于可以直接使用
// 系统进程的栈空间,故只需要为primordial微线程分配st_thread_t和私有key数据区
thread = (_st_thread_t *)calloc(1, sizeof(_st_thread_t) +
(ST_KEYS_MAX * sizeof(void *)));
if (!thread)
return -1;
thread->private_data = (void **)(thread + 1);
thread->state = _ST_ST_RUNNING;
thread->flags = _ST_FL_PRIMORDIAL;
_ST_SET_CURRENT_THREAD(thread);
_st_active_count++;
// 当前运行的微线程是primordial微线程,当primordial微线程退出时,整个进程也会终止
return 0;
}
微线程创建流程为:1)创建微线程使用的私有栈空间;2)初始化私有栈空间;3)调用setjmp初始化jmp_buf,并修改jmp_buf中的RSP指向为微线程分配的私有栈空间的可用位置。微线程创建代码如下所示:
// 创建微线程函数
_st_thread_t *st_thread_create(void *(*start)(void *arg), void *arg,
int joinable, int stk_size)
{
_st_thread_t *thread;
_st_stack_t *stack;
void **ptds;
char *sp;
if (stk_size == 0)
stk_size = ST_DEFAULT_STACK_SIZE; // 默认栈空间为64K
// 将栈空间调整为PAGE_SIZE的整数倍
stk_size = ((stk_size + _ST_PAGE_SIZE - 1) / _ST_PAGE_SIZE) * _ST_PAGE_SIZE;
// 分配微线程使用的私有数据空间:包括栈空间,私有key数据区,st_thread_t结构体等
// 为了避免频繁调用系统分配函数分配微线程栈空间,框架中保存了stack空闲链表,
// 会优先从空闲链表获取,当空闲链表为空时,会从系统分配
stack = _st_stack_new(stk_size);
if (!stack)
return NULL;
// 初始化分配好的空间
sp = stack->stk_top;
sp = sp - (ST_KEYS_MAX * sizeof(void *));
ptds = (void **)sp;
sp = sp - sizeof(_st_thread_t);
thread = (_st_thread_t *)sp;
// 保证栈可用位置一定是64字节对齐的
if ((unsigned long)sp & 0x3f)
sp = sp - ((unsigned long)sp & 0x3f);
stack->sp = sp - _ST_STACK_PAD_SIZE;
// 分配的空间重置为0,因为stack可能是从空闲链表中获取的
memset(thread, 0, sizeof(_st_thread_t));
memset(ptds, 0, ST_KEYS_MAX * sizeof(void *));
thread->private_data = ptds;
thread->stack = stack;
thread->start = start;
thread->arg = arg;
// 调用setjmp保存当前的上下文信息到jmp_buf中,主要功能是初始化jmp_buf
// 因为现在保存到jmp_buf中的信息不会在微线程的后续运行中使用到
if (setjmp(thread->context))
// 当微线程首次被调度运行时,会进入该分支,并调用_st_thread_main函数执行
_st_thread_main();
// 非常重要的一点:设置jmp_buf中的rsp指向为微线程分配的私有栈空间的可用位置
// 这样可以保证框架中不同微线程运行在自己的私有栈空间上
thread->context[JB_RSP] = (long)(stack->sp);
// 如果微线程是可joinable的,就创建一个微线程终止时使用的条件变量
// 当该微线程终止时会等待在该条件变量上,当其他微线程调用st_thread_join函数
// 替该微线程收尸后,该微线程才会真正终止
if (joinable) {
thread->term = st_cond_new();
if (thread->term == NULL) {
_st_stack_free(thread->stack);
return NULL;
}
}
// 设置为runable状态,并 加入到runable队列中
thread->state = _ST_ST_RUNNABLE;
_st_active_count++;
_ST_ADD_RUNQ(thread);
return thread;
}
// 微线程启动后调用的main函数
void _st_thread_main(void)
{
_st_thread_t *thread = _ST_CURRENT_THREAD();
/*
* Cap the stack by zeroing out the saved return address register
* value. This allows some debugging/profiling tools to know when
* to stop unwinding the stack. It's a no-op on most platforms.
*/
MD_CAP_STACK(&thread);
// 调用微线程设置的start函数
thread->retval = (*thread->start)(thread->arg);
// 清理微线程使用的资源
st_thread_exit(thread->retval);
}
微线程创建好后,最重要的逻辑就是对微线程进行切换和调度。当处于running状态的微线程调用会阻塞执行的流程时,如需要等待IO、需要sleep或者需要主动让出运行权时(参考本文中的3.3微线程状态转换小节中的状态转换图),会调用_ST_SWITCH_CONTEXT宏定义将自己调度出去,并触发微线程调度下一个runable状态的微线程运行。微线程调度逻辑相比操作系统的线程调度逻辑简单很多。
微线程切换和调度函数逻辑如下所示:
// 切换_thread微线程出去,并调度下一个runable状态的微线程运行
#define _ST_SWITCH_CONTEXT(_thread) \
{ \
if (!setjmp(_thread->context)) { \
_st_vp_schedule(); \
} \
}
// 微线程调度逻辑,即调度下一个runable状态微线程运行
void _st_vp_schedule(void)
{
_st_thread_t *thread;
// 如果runable队列_st_this_vp.run_q非空,就选队列首的微线程
// 否则调度idle微线程运行
if (_ST_RUNQ.next != &_ST_RUNQ) {
thread = _ST_THREAD_PTR(_ST_RUNQ.next);
_ST_DEL_RUNQ(thread);
}
else {
thread = _st_this_vp.idle_thread;
}
// 设置选中的微线程的状态为running状态,并调用longjmp函数跳转到选中的微线程,
// 选中的微线程从最后一次的setjmp位置继续运行
thread->state = _ST_ST_RUNNING;
_ST_SET_CURRENT_THREAD(thread);
longjmp(thread->context, 1)
}
微线程终止时主要做清理逻辑,清理运行期间设置的私有key数据和分配的私有栈空间。如果微线程创建时设置了joinable,还需要通知等待该微线程终止的微线程。微线程终止逻辑如下:
// 微线程终止时调用的函数
void st_thread_exit(void *retval)
{
_st_thread_t *thread = _ST_CURRENT_THREAD();
thread->retval = retval;
// 释放微线程运行期间调用st_thread_setspecific设置的私有key数据
_st_thread_cleanup(thread);
_st_active_count--;
// 如果创建了term条件变量,需要通知调用st_thread_join()等待该微线程的微线程为该
// 微线程“收尸”
if (thread->term) {
// 添加到zombie队列
thread->state = _ST_ST_ZOMBIE;
_ST_ADD_ZOMBIEQ(thread);
// 通知等待在term条件变量上的微线程
st_cond_signal(thread->term);
// 交出控制权,等到为本线程收尸的微线程调用st_thread_join()返回
// 后,本微线程才会switch回来,并恢复运行
_ST_SWITCH_CONTEXT(thread);
// 清理条件变量
st_cond_destroy(thread->term);
thread->term = NULL;
}
// 如果终止的不是Primordial微线程,就释放为微线程分配的私有栈空间,
// 释放的栈空间会放到空闲链表中
if (!(thread->flags & _ST_FL_PRIMORDIAL))
_st_stack_free(thread->stack);
// 交出控制权,并调度下一个runable状态的微线程,微线程生命周期终止
_ST_SWITCH_CONTEXT(thread);
/* Not going to land here */
}
IO事件的处理主要是调用IO多路复用函数等待IO事件,将触发IO事件的微线程唤醒后加入到runable队列。本文主要介绍下框架中对epoll的实现逻辑。
调用epoll_wait等待IO事件的逻辑和其他网络框架的实现逻辑基本相同,唯一不同的是需要将触发IO事件的微线程的状态从IO等待状态变为runable状态,并加入到runable队列中。epoll等待的大致流程是:1)获取最早超时定时器的时间;2)调用epoll_wait等待IO事件;3)处理发生的IO事件和超时的定时器。epoll等待逻辑代码如下:
// 调用epoll等待IO事件的函数
void _st_epoll_dispatch(void)
{
st_utime_t min_timeout;
_st_clist_t *q;
_st_pollq_t *pq;
struct pollfd *pds, *epds;
struct epoll_event ev;
int timeout, nfd, i, osfd, notify;
int events, op;
short revents;
// sleep队列为空时,设置永不超时,否则设置为sleep队列中最早超时微线程的超时时间
if (_ST_SLEEPQ == NULL) {
timeout = -1;
} else {
min_timeout = (_ST_SLEEPQ->due <= _ST_LAST_CLOCK) ? 0 :
(_ST_SLEEPQ->due - _ST_LAST_CLOCK);
timeout = (int)(min_timeout / 1000);
}
// 调用epoll_wait函数等待IO事件发生 或者 超时 发生
nfd = epoll_wait(_st_epoll_data->epfd, _st_epoll_data->evtlist,
_st_epoll_data->evtlist_size, timeout);
if (nfd > 0) {
// 遍历发生的事件,把发生的IO事件设置到fd对应的_epoll_fd_data_t.revents中
for (i = 0; i < nfd; i++) {
osfd = _st_epoll_data->evtlist[i].data.fd;
_ST_EPOLL_REVENTS(osfd) = _st_epoll_data->evtlist[i].events;
if (_ST_EPOLL_REVENTS(osfd) & (EPOLLERR | EPOLLHUP)) {
/* Also set I/O bits on error */
_ST_EPOLL_REVENTS(osfd) |= _ST_EPOLL_EVENTS(osfd);
}
}
// 遍历IO等待队列_st_vp_t.io_q中的每个元素
for (q = _ST_IOQ.next; q != &_ST_IOQ; q = q->next) {
pq = _ST_POLLQUEUE_PTR(q);
notify = 0;
// 关注fd数组的结束位置
epds = pq->pds + pq->npds;
// 遍历关注的fd数组,查看对应fd上是否有IO事件发生
for (pds = pq->pds; pds < epds; pds++) {
if (_ST_EPOLL_REVENTS(pds->fd) == 0) {
pds->revents = 0;
continue;
}
osfd = pds->fd;
events = pds->events;
revents = 0;
// 关注读事件,并检查是否发生了读事件
if ((events & POLLIN) && (_ST_EPOLL_REVENTS(osfd) & EPOLLIN))
revents |= POLLIN;
// 关注写事件,并检查是否发生了写事件
if ((events & POLLOUT) && (_ST_EPOLL_REVENTS(osfd) & EPOLLOUT))
revents |= POLLOUT;
// 关注pri事件,并检查是否发生了pri事件
if ((events & POLLPRI) && (_ST_EPOLL_REVENTS(osfd) & EPOLLPRI))
revents |= POLLPRI;
// 关注错误事件,并检查是否发生了错误事件
if (_ST_EPOLL_REVENTS(osfd) & EPOLLERR)
revents |= POLLERR;
// 始终检查是否发生了hup事件
if (_ST_EPOLL_REVENTS(osfd) & EPOLLHUP)
revents |= POLLHUP;
pds->revents = revents;
if (revents) {
notify = 1;
}
}
if (notify) {
// 如果有IO事件发生,就从IO等待队列中移除
ST_REMOVE_LINK(&pq->links);
pq->on_ioq = 0;
/*
* Here we will only delete/modify descriptors that
* didn't fire (see comments in _st_epoll_pollset_del()).
*/
_st_epoll_pollset_del(pq->pds, pq->npds);
// 如果同时在sleep队列,就从sleep队列移除
if (pq->thread->flags & _ST_FL_ON_SLEEPQ)
_ST_DEL_SLEEPQ(pq->thread);
// 修改微线程状态 并 加入到runable队列,等待调度运行
pq->thread->state = _ST_ST_RUNNABLE;
_ST_ADD_RUNQ(pq->thread);
}
}
for (i = 0; i < nfd; i++) {
/* Delete/modify descriptors that fired */
osfd = _st_epoll_data->evtlist[i].data.fd;
// 重置revents为空
_ST_EPOLL_REVENTS(osfd) = 0;
events = _ST_EPOLL_EVENTS(osfd);
op = events ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
ev.events = events;
ev.data.fd = osfd;
// 修改fd在epoll中关注的IO事件集合
if (epoll_ctl(_st_epoll_data->epfd, op, osfd, &ev) == 0 &&
op == EPOLL_CTL_DEL) {
_st_epoll_data->evtlist_cnt--;
}
}
}
}
用户调用IO操作接口时,需要调用框架封装好的接口。本文只列出了st_read从tcp链接套接字读数据的实现,st_write等接口的实现类似,不再介绍。st_read函数的代码实现如下:
// 从tcp连接套接字中读数据
ssize_t st_read(_st_netfd_t *fd, void *buf, size_t nbyte, st_utime_t timeout)
{
ssize_t n;
// 先尝试读,如果没有读到数据,才调用st_netfd_poll等待套接字变为可读
while ((n = read(fd->osfd, buf, nbyte)) < 0) {
if (SocketErrno == PORT_IO_EINTR)
continue;
if (!_IO_NOT_READY_ERROR)
return -1;
if (st_netfd_poll(fd, POLLIN, timeout) < 0)
return -1;
}
return n;
}
// 等待单个套接字fd上的IO事件
int st_netfd_poll(_st_netfd_t *fd, int how, st_utime_t timeout)
{
struct pollfd pd;
int n;
pd.fd = fd->osfd;
pd.events = (short)how;
pd.revents = 0;
if ((n = st_poll(&pd, 1, timeout)) < 0)
return -1;
if (n == 0) {
/* Timed out */
errno = ETIME;
return -1;
}
if (pd.revents & POLLNVAL) {
errno = EBADF;
return -1;
}
return 0;
}
// 允许同时等待多个套接字的IO事件
int st_poll(struct pollfd *pds, int npds, st_utime_t timeout)
{
struct pollfd *pd;
struct pollfd *epd = pds + npds;
_st_pollq_t pq;
_st_thread_t *me = _ST_CURRENT_THREAD();
int n;
if (me->flags & _ST_FL_INTERRUPT) {
me->flags &= ~_ST_FL_INTERRUPT;
errno = EINTR;
return -1;
}
// 将数组中fd关注的IO事件添加到epoll中
if ((*_st_eventsys->pollset_add)(pds, npds) < 0)
return -1;
pq.pds = pds;
pq.npds = npds;
pq.thread = me;
pq.on_ioq = 1;
// 将调用微线程添加到IO等待队列中
_ST_ADD_IOQ(pq);
// 如果设置了超时时间,就加入到sleep队列中
if (timeout != ST_UTIME_NO_TIMEOUT)
_ST_ADD_SLEEPQ(me, timeout);
me->state = _ST_ST_IO_WAIT;
// 交出执行权,并调度其他微线程运行,等到关注的任何一个fd上发生了关注的IO事件时,
// 会从这里返回
_ST_SWITCH_CONTEXT(me);
n = 0;
if (pq.on_ioq) {
// 超时情况,从IO等待队列中就移除
_ST_DEL_IOQ(pq);
(*_st_eventsys->pollset_del)(pds, npds);
} else {
// 计算发生IO事件的fd数,这里没有把发生IO事件的fd组织成一个数组
for (pd = pds; pd < epd; pd++) {
if (pd->revents)
n++;
}
}
if (me->flags & _ST_FL_INTERRUPT) {
me->flags &= ~_ST_FL_INTERRUPT;
errno = EINTR;
return -1;
}
return n;
}
框架中很多地方需要使用定时器,如调用st_read等函数执行网络请求并指定了超时时间、调用st_sleep函数让微线程等待指定时间、调用st_cond_timedwait函数指定等待条件变量的超时时间等等。框架中组织定时器的数据结构也是通常网络架构中会使用到的最小堆,但并不是基于数组实现的,而是基于指针(st_thread_t结构中保存了指向叶子节点的left和right指针)实现的完全二叉树,和最小堆的排序性质相同。
添加定时器节点的函数是heap_insert(),添加节点的比较顺序和基于数组实现的最小堆的比较顺序是相反的。由于框架中定时器的组织结构是使用的基于指向叶子节点指针实现的完全二叉树,所以不能像基于数组的最小堆那样,将新加入节点放到数组尾,并执行和该叶子节点的所有父母节点进行比较的HeapUp操作。以下图为例,假设新插入节点的值为0,那么基于数组实现的最小堆的比较顺序为5->2->1,为框架中heap_insert()函数的比较路径为1->2->5。
删除定时器节点的函数是heap_delete(),删除定时器节点的逻辑和最小堆的处理逻辑类似。处理逻辑为:(1)从根节点开始遍历,找到最后一个叶子节点;(2)使用最后一个叶子节点替换将被删除节点;(3)对最后一个叶子节点的子树执行HeapDown过程,将值小的节点上移。以下图为例,删除的节点值为7,首先使用最后一个值为11的叶子节点替换7节点,然后从11节点开始,递归向下调整,将值较小子节点和父节点交换位置,直到没有交换为止,即HeapDown的过程。
由于所有的微线程在同一个进程中运行,所以微线程的运行是并发而不是并行的。所以互斥锁的使用场景并不是很多,除非一个临界区在一个微线程的单次调度中可能不完整,并且要求临界区完整前其他微线程不能操作,这种情况才需要使用到互斥锁。_st_mutex_t中owner表示哪个微线程持有互斥锁,wait_q保存了等待在互斥锁上的微线程链表。结构如下图所示:
加锁流程如下:
1.如果没有任何微线程持有互斥锁,就直接设置owner为加锁微线程并返回
2.否则,设置微线程状态为_ST_ST_LOCK_WAIT,并加到mutex的wait_q中
3.调用switch_context切换出去,调度其他微线程
4.switch回来后,从mutex的wait_q队列移除
解锁流程如下:
1.遍历mutex的wait_q链表,设置链表第一个等待的微线程的状态为_ST_ST_RUNNABLE,并加入到run队列。
2.如果没有等待的微线程,就设置ower为NULL。
框架中的条件变量函数定义如下代码所示,框架中条件变量相关函数不需要传互斥锁,这是由于微线程是在单进程内运行,微线程的运行时并发而非并行,故不用使用mutex互斥锁加锁。
// StateThreads条件变量相关函数
int st_cond_wait(_st_cond_t *cvar);
int st_cond_timedwait(_st_cond_t *cvar, st_utime_t timeout);
int st_cond_signal(st_cond_t cvar);
// linux条件变量相关系统调用
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
int pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex, struct timespec *timeout);
条件变量的数据结构比较简单,只保存了等待在条件变量的链表wait_q。
等待条件变量流程如下:
1.设置状态为_ST_ST_COND_WAIT,加到condition的wait_q队列中。
2.如果设置了超时时间,就加入到框架的sleep_q队列中
3.调用switch_context切换出去,调度其他微线程
4.switch回来后,从condition的wait_q队列移除
通知条件变量流程如下:
1.如果是broadcast模式,针对每一个等待的微线程,否则只处理wait_q队列的第一个微线程。
2.如果微线程在框架的sleep_q队列中,就从sleep_q队列中移除。
3.将微线程的状态设置为_ST_ST_RUNNABLE,并加入到框架的run_q队列中
本文从源码的角度分析了StateThreads微线程框架的实现,希望对读者认识微线程的运行机制有所帮助。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。