state thread协程小谈

为了打造高性能的服务器,通常思路有两种:1.多cpu利用,并发执行,如多进程和多线程 2、提高每个cpu的使用率,让每个cpu高效起来。

一、多cpu利用

1.多进程模式

 传统的多进程方案,一个新的客户端,单独交给子进程处理,如:apache,伪代码
 
 while(new_requst)
 {
    if(fork() != 0)
    {
       ...
       exit(0);
    }
 }

i/o挂起和时间片轮询促使进程释放cpu,切换上下文进程,提高cpu的效率,随着客户端连接的不断增加,进程数相应增加,进程切换的代价导致系统的可用性降低。

2. 多线程

多线程是多进程的改进版本,线程切换的代价比进程切换低很多,线程的共享全局数据区,共享数据的临界区访问,需要同步机制,同步机制降低性能,线程之间的共享数据区,一个线程会影响整个进程的运行。

注:进程和线程的调度切换可以参考 https://www.jianshu.com/p/91c8600cb2ae

二、提高单CPU的利用率

多进程和多线程都是依赖操作系统的调用,当io出现阻塞的时候,将进程(线程)切换,另一个思路,出现i/o阻塞,进程内部自己去管理所有的任务,将阻塞任务挂起,调用可运行的任务。异步i/o和协程是两个比较通用的方案。

1.异步i/o

 异步i/o通过状态回调事件,当i/o事件触发,通过注册的处理函数处理对应的事件,以epoll代码为例:
 n = epoll_wait();
 for (i = 0; i < nfd; i++) 
 {
        if(writable(epoll_data[i]))
        {
            write_handle(epoll_data[i]);
        }
        
        if(readable(epoll_data[i]))
        {
            read_handle(epoll_data[i]);
        }
        
        if(error(epoll_data[i]))
        {
            error_handle(epoll_data[i]);
        }
 }

调度的核心在epoll事件,epoll事件注册回调,epoll事件的触发,调用回调信息,每个阶段处理的epoll注册的epoll回调不一样,类似状态处理机,需要记录当前处理状态(处理的函数),处理完成后切换到的下一个状态(另一个处理函数)。状态的管理比较复杂,代码的可读性不高。

2.协程

协程有两种通用的实现方式:

1. glibc 实现 ucontext

2. C 语言的 setjmp 和 longjmp, state thread(st)采用jmp实现

两种实现的本质是一样的,协程通过保存当前运行的寄存器和栈,切换后恢复寄存器和栈,st和ucontext的不同之处st忽略了信号量的处理,st内部不会直接处理信号量信息,st建议通过pipe方式实现信号量的处理。(参考:http://state-threads.sourceforge.net/docs/notes.html#signals)

三. st的实现

st是保障cpu的能一直跑任务,任务挂起后,调度下一个任务。如果没有可运行的任务,阻塞直到有新的i/o事件产生,触发新的任务。如cpu管理进程的思路,协程主要将任务分为4种状态:

1. running —— 正在执行的协程

2. runnable —— 可以执行的协程,等待被调度

3. sleep —— 需要等待i/o事件,在等待队列中

4. zombie —— 已经执行完毕的协程

st的状态调度如图:

st任务状态转换

3.1 st的状态转换实现

dispatch的st的主调度模块,主要负责将io事件和超时事件转换为runnalbe task,源码:

void *_st_idle_thread_start(void *arg)
{
  _st_thread_t *me = _ST_CURRENT_THREAD();

  while (_st_active_count > 0) {
    /* Idle vp till I/O is ready or the smallest timeout expired */
    _ST_VP_IDLE();

    /* Check sleep queue for expired threads */
    _st_vp_check_clock();

    me->state = _ST_ST_RUNNABLE;
    _ST_SWITCH_CONTEXT(me);
  }

  /* No more threads */
  exit(0);

  /* NOTREACHED */
  return NULL;
}

     

st_vp_idle是i/o事件,st可以选择select、poll、kqueue模型,本文主要介绍下epoll的实现,epoll的实现有几个重要环节:

1.timeout的获取,通过等待队列最小堆方式排序,获取堆顶元素,如果堆为空,则阻塞等待

2. wait i/o事件

3. 找到文件句柄对应的处理协程,将协程放入可执行队列内

 
ST_HIDDEN void _st_epoll_dispatch(void)
{
    ...
    if (_ST_SLEEPQ == NULL) {
        timeout = -1;
    } else {
        min_timeout = (_ST_SLEEPQ->due <= _ST_LAST_CLOCK) ? 0 :
            (_ST_SLEEPQ->due - _ST_LAST_CLOCK);
        timeout = (int) (min_timeout / 1000);
    }
    /* Check for I/O operations */
    nfd = epoll_wait(_st_epoll_data->epfd, _st_epoll_data->evtlist,
                     _st_epoll_data->evtlist_size, timeout);
    if (nfd > 0) {
        for (i = 0; i < nfd; i++) {
            osfd = _st_epoll_data->evtlist[i].data.fd;
            _ST_EPOLL_REVENTS(osfd) = _st_epoll_data->evtlist[i].events;
            if (_ST_EPOLL_REVENTS(osfd) & (EPOLLERR | EPOLLHUP)) {
                /* Also set I/O bits on error */
                _ST_EPOLL_REVENTS(osfd) |= _ST_EPOLL_EVENTS(osfd);
            }
        }

        for (q = _ST_IOQ.next; q != &_ST_IOQ; q = q->next) {
            pq = _ST_POLLQUEUE_PTR(q);
            ...
            if (notify) {
                ST_REMOVE_LINK(&pq->links);
                pq->on_ioq = 0;
                /*
                 * Here we will only delete/modify descriptors that
                 * didn't fire (see comments in _st_epoll_pollset_del()).
                 */
                _st_epoll_pollset_del(pq->pds, pq->npds);

                if (pq->thread->flags & _ST_FL_ON_SLEEPQ)
                    _ST_DEL_SLEEPQ(pq->thread);
                pq->thread->state = _ST_ST_RUNNABLE;
                _ST_ADD_RUNQ(pq->thread);
            }
        }

        for (i = 0; i < nfd; i++) {
            /* Delete/modify descriptors that fired */
            osfd = _st_epoll_data->evtlist[i].data.fd;
            _ST_EPOLL_REVENTS(osfd) = 0;
            events = _ST_EPOLL_EVENTS(osfd);
            op = events ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
            ev.events = events;
            ev.data.fd = osfd;
            if (epoll_ctl(_st_epoll_data->epfd, op, osfd, &ev) == 0 &&
                op == EPOLL_CTL_DEL) {
                _st_epoll_data->evtlist_cnt--;
            }
        }
    }
}

注:st的epoll实现效率不高,需要遍历所有的_ST_IOQ这个实现,这个逻辑违背了epoll的实现,退化为select模型 _st_vp_check_clock的实现比较巧妙,通过最小堆形式保存超时等待的任务,最早超时的在顶部,所以每次加入和删除超时队列,只需要做对应的heap调整,避免了全部排序,当出现超时将协程状态变为超时,加入可执行队列状态。源码:

void _st_vp_check_clock(void)
{
  _st_thread_t *thread;
  st_utime_t elapsed, now;
 
  now = st_utime();
  elapsed = now - _ST_LAST_CLOCK;
  _ST_LAST_CLOCK = now;

  if (_st_curr_time && now - _st_last_tset > 999000) {
    _st_curr_time = time(NULL);
    _st_last_tset = now;
  }

  while (_ST_SLEEPQ != NULL) {
    thread = _ST_SLEEPQ;
    ST_ASSERT(thread->flags & _ST_FL_ON_SLEEPQ);
    if (thread->due > now)
      break;
    _ST_DEL_SLEEPQ(thread);

    /* If thread is waiting on condition variable, set the time out flag */
    if (thread->state == _ST_ST_COND_WAIT)
      thread->flags |= _ST_FL_TIMEDOUT;

    /* Make thread runnable */
    ST_ASSERT(!(thread->flags & _ST_FL_IDLE_THREAD));
    thread->state = _ST_ST_RUNNABLE;
    _ST_ADD_RUNQ(thread);
  }
}

3.2 协程的初始化

初始化最重要的两个点

1. 协程栈的初始化;协程栈通过mmap分配

2. 设置setjimp,当dispatch调度任务后进入协程入库函数

st协程的初始化源码:

_st_thread_t *st_thread_create(void *(*start)(void *arg), void *arg, int joinable, int stk_size)
{
    _st_thread_t *trd;
    _st_stack_t *stack;
    void **ptds;
    char *sp;
    
    /* Adjust stack size */
    if (stk_size == 0) {
        stk_size = ST_DEFAULT_STACK_SIZE;
    }
    stk_size = ((stk_size + _ST_PAGE_SIZE - 1) / _ST_PAGE_SIZE) * _ST_PAGE_SIZE;
    stack = _st_stack_new(stk_size);
    if (!stack) {
        return NULL;
    }
    
    /* Allocate thread object and per-thread data off the stack */
    sp = stack->stk_top;
    /*
    * The stack segment is split in the middle. The upper half is used
    * as backing store for the register stack which grows upward.
    * The lower half is used for the traditional memory stack which
    * grows downward. Both stacks start in the middle and grow outward
    * from each other.
    */
    /**
    The below comments is by winlin:
    The Stack public structure:
        +--------------------------------------------------------------+
        |                         stack                                |
        +--------------------------------------------------------------+
       bottom                                                         top
    The code bellow use the stack as:
        +-----------------+-----------------+-------------+------------+
        | stack of thread |pad+align(128B+) |thread(336B) | keys(128B) |
        +-----------------+-----------------+-------------+------------+
       bottom            sp                trd           ptds         top
               (context[0].__jmpbuf.sp)             (private_data)
    */
    sp = sp - (ST_KEYS_MAX * sizeof(void *));
    ptds = (void **) sp;
    sp = sp - sizeof(_st_thread_t);
    trd = (_st_thread_t *) sp;
    
    /* Make stack 64-byte aligned */
    if ((unsigned long)sp & 0x3f) {
        sp = sp - ((unsigned long)sp & 0x3f);
    }
    stack->sp = sp - _ST_STACK_PAD_SIZE;
    
    memset(trd, 0, sizeof(_st_thread_t));
    memset(ptds, 0, ST_KEYS_MAX * sizeof(void *));
    
    /* Initialize thread */
    trd->private_data = ptds;
    trd->stack = stack;
    trd->start = start;
    trd->arg = arg;


    if (MD_SETJMP((trd)->context)) {
        _st_thread_main();
    }
    MD_GET_SP(trd) = (long) (stack->sp);
    
    /* If thread is joinable, allocate a termination condition variable */
    if (joinable) {
        trd->term = st_cond_new();
        if (trd->term == NULL) {
            _st_stack_free(trd->stack);
            return NULL;
        }
    }
    
    /* Make thread runnable */
    trd->state = _ST_ST_RUNNABLE;
    _st_active_count[THREAD_INDEX]++;
    _ST_ADD_RUNQ(trd);
    
    return trd;
}

3.3 协程的上下文切换

协程的上下文切换通过调度切换,当出现i/o或者协程退出后发生上下文切换,切换逻辑,遍历runnable列表

1.有可执行任务,直接恢复可执行任务的上下文

2. 没有有可执行任务,调用dispatch,知道出现了新的可执行的事件

切换源码如下:

// 切换阻塞协程
#define _ST_SWITCH_CONTEXT(_thread)       \
    ST_BEGIN_MACRO                        \
    ST_SWITCH_OUT_CB(_thread);            \
    if (!MD_SETJMP((_thread)->context)) { \
      _st_vp_schedule();                  \
    }                                     \
    ST_DEBUG_ITERATE_THREADS();           \
    ST_SWITCH_IN_CB(_thread);             \
    ST_END_MACRO

// 恢复协程上下文    
#define _ST_RESTORE_CONTEXT(_thread)   \
    ST_BEGIN_MACRO                     \
    _ST_SET_CURRENT_THREAD(_thread);   \
    MD_LONGJMP((_thread)->context, 1); \
    ST_END_MACRO
    
void _st_vp_schedule(void)
{
  _st_thread_t *thread;
  unsigned int cost_time;
  static __thread char buf[80];
  if (_ST_RUNQ.next != &_ST_RUNQ) {
    /* Pull thread off of the run queue */
    thread = _ST_THREAD_PTR(_ST_RUNQ.next);
    _ST_DEL_RUNQ(thread);

  } else {
    /* If there are no threads to run, switch to the idle thread */
    thread = _st_this_vp.idle_thread;
  }
  ST_ASSERT(thread->state == _ST_ST_RUNNABLE);

  /* Resume the thread */
  thread->state = _ST_ST_RUNNING;
  _ST_RESTORE_CONTEXT(thread);
}

四、st的总结

st巧妙的利用了jump由用户程序自发的调用cpu,相比线程有私有栈的跳转切换小了很多,整个调度过程是FIFO,没有抢占模式。st源码vp等全局变量使得,st只支持单线程(多进程数据独立,可以分别使用)。由于团队业务的一些特殊性,需要st支持多线程,将st的全局变量进行响应的封装,可以将st改造成多线程版本。

五、st相关的知识

1. 主页源码:https://sourceforge.net/projects/state-threads/files/

2. 进程、线程、协程参考:http://kexianda.info/2017/05/19/进程-线程-协程的实现原理/

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

编辑于

协程

1 篇文章1 人订阅

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏时序数据库专栏

Elasticsearch 底层系列之分片恢复解析

    我们是基础架构部,腾讯云 CES/CTSDB 产品后台服务的支持团队,我们拥有专业的ES开发运维能力,为大家提供稳定、高性能的服务,欢迎有需求的童鞋接入...

19850
来自专栏noteless

springmvc 项目完整示例04 整合mybatis mybatis所需要的jar包 mybatis配置文件 sql语句 mybatis应用

MyBatis 本是apache的一个开源项目iBatis, 2010年这个项目由apache software foundation 迁移到了google c...

10320
来自专栏PhpZendo

深入浅出 Laravel 路由执行原理

可以说几乎所有的框架都会涉及到「路由」的处理,简单一点讲就将用户请求的 url 分配到对应的处理程序。

28030
来自专栏祝威廉

ElasticSearch Rest/RPC 接口解析

早先ES的HTTP协议支持还是依赖Jetty的,现在不管是Rest还是RPC都是直接基于Netty了。

17840
来自专栏散尽浮华

linux运维中的命令梳理(四)

----------管理命令---------- ps命令:查看进程 要对系统中进程进行监测控制,查看状态,内存,CPU的使用情况,使用命令:/bin/ps (...

94580
来自专栏程序猿DD

分布式消息队列 RocketMQ 源码分析 —— Message 顺序发送与消费

? 本文主要基于 RocketMQ 4.0.x 正式版 1. 概述 2. Producer 顺序发送 3. Consumer 严格顺序消费 3.1 获得(锁定...

76980
来自专栏程序员八阿哥

王老板Python面试(9):整理的最全 python常见面试题(基本必考)

1)迭代器是一个更抽象的概念,任何对象,如果它的类有next方法和iter方法返回自己本身。对于string、list、dict、tuple等这类容器对象,使用...

22010
来自专栏向治洪

Android热插拔事件处理详解

一、Android热插拔事件处理流程图 Android热插拔事件处理流程如下图所示: ? 二、组成 1. NetlinkManager:       ...

85670
来自专栏魏琼东

基于DotNet构件技术的企业级敏捷软件开发平台 - AgileEAS.NET平台开发指南 - 实现插件

插件契约介绍          我们知道,要基于平台(容器)加插件的这种模式进行开发,我们必须定义一组契约,用于约束模块插件开发,也就是说,模块插件需要遵守一定...

19980
来自专栏前端杂谈

vue使用Axios做ajax请求

510120

扫码关注云+社区

领取腾讯云代金券