前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >libuv小册之线程池篇

libuv小册之线程池篇

作者头像
theanarkh
发布2020-05-07 16:09:28
1.1K0
发布2020-05-07 16:09:28
举报
文章被收录于专栏:原创分享原创分享原创分享

前言:最近开始写小册子,一篇篇来,写完了再整理总结到一起。循序渐进,重点是优先分析libuv的原理。其他的有时间再写,也希望大家一起。

Libuv是基于事件驱动的异步io库,他本身是一个单进程单线程的。但是难免会有耗时的操作。如果在Libuv的主循环里执行的话,就会阻塞后面的任务执行。所以Libuv里维护了一个线程池。他负责处理Libuv中耗时的操作,比如文件io、dns、用户自定义的耗时任务。文件io因为存在跨平台兼容的问题。无法很好地在事件驱动模块实现异步io。下面分析一下线程池的实现。 我们先看线程池的初始化然后再看他的使用。

static void init_threads(void) {
  unsigned int i;
  const char* val;
  uv_sem_t sem;
  // 默认线程数4个,static uv_thread_t default_threads[4];
  nthreads = ARRAY_SIZE(default_threads);
  // 判断用户是否在环境变量中设置了线程数,是的话取用户定义的
  val = getenv("UV_THREADPOOL_SIZE");
  if (val != NULL)
    nthreads = atoi(val);
  if (nthreads == 0)
    nthreads = 1;
  // #define MAX_THREADPOOL_SIZE 128最多128个线程
  if (nthreads > MAX_THREADPOOL_SIZE)
    nthreads = MAX_THREADPOOL_SIZE;

  threads = default_threads;
  // 超过默认大小,重新分配内存
  if (nthreads > ARRAY_SIZE(default_threads)) {
    threads = uv__malloc(nthreads * sizeof(threads[0]));
    // 分配内存失败,回退到默认
    if (threads == NULL) {
      nthreads = ARRAY_SIZE(default_threads);
      threads = default_threads;
    }
  }
  // 初始化条件变量
  if (uv_cond_init(&cond))
    abort();
  // 初始化互斥变量
  if (uv_mutex_init(&mutex))
    abort();

  // 初始化三个队列
  QUEUE_INIT(&wq);
  QUEUE_INIT(&slow_io_pending_wq);
  QUEUE_INIT(&run_slow_work_message);

  // 初始化信号量变量,值为0
  if (uv_sem_init(&sem, 0))
    abort();
  // 创建多个线程,工作函数为worker,sem为worker入参
  for (i = 0; i < nthreads; i++)
    if (uv_thread_create(threads + i, worker, &sem))
      abort();
  //为0则阻塞,非0则减一,这里等待所有线程启动成功再往下执行
  for (i = 0; i < nthreads; i++)
    uv_sem_wait(&sem);

  uv_sem_destroy(&sem);
}

线程池的初始化主要是初始化一些数据结构,然后创建多个线程。接着在每个线程里执行worker函数。worker是消费者,在分析消费者之前,我们先看一下生产者的逻辑。

static void init_once(void) {
  init_threads();
}

// 给线程池提交一个任务
void uv__work_submit(uv_loop_t* loop,
                     struct uv__work* w,
                     enum uv__work_kind kind,
                     void (*work)(struct uv__work* w),
                     void (*done)(struct uv__work* w, int status)) {
  // 保证已经初始化线程,并只执行一次,所以线程池是在提交第一个任务的时候才被初始化
  uv_once(&once, init_once);
  w->loop = loop;
  w->work = work;
  w->done = done;
  post(&w->wq, kind);
}

这里把业务相关的函数和任务完成后的回调函数封装到uv__work结构体中。uv__work结构定义如下。

struct uv__work {
  void (*work)(struct uv__work *w);
  void (*done)(struct uv__work *w, int status);
  struct uv_loop_s* loop;
  void* wq[2];
};

然后调post往线程池的队列中加入一个新的任务。Libuv把任务分为三种类型,慢io(dns解析)、快io(文件操作)、cpu密集型等,kind就是说明任务的类型的。我们接着看post函数。

// 把任务插入队列等待线程处理
static void post(QUEUE* q, enum uv__work_kind kind) {
  // 加锁访问任务队列,因为这个队列是线程池共享的
  uv_mutex_lock(&mutex);
  // 类型是慢IO
  if (kind == UV__WORK_SLOW_IO) {
    /* 
        插入慢IO对应的队列,llibuv这个版本把任务分为几种类型,
        对于慢io类型的任务,libuv是往任务队列里面插入一个特殊的节点
        run_slow_work_message,然后用slow_io_pending_wq维护了一个慢io任务的队列,
        当处理到run_slow_work_message这个节点的时候,libuv会从slow_io_pending_wq
        队列里逐个取出任务节点来执行。 
    */
    QUEUE_INSERT_TAIL(&slow_io_pending_wq, q);
    /*
      有慢IO任务的时候,需要给主队列wq插入一个消息节点run_slow_work_message,
      说明有慢IO任务,所以如果run_slow_work_message是空,说明还没有插入主队列。
      需要进行q = &run_slow_work_message;赋值,然后把run_slow_work_message插入
      主队列。如果run_slow_work_message非空,说明已经插入线程池的任务队列了。
    解锁然后直接返回。
    */
    if (!QUEUE_EMPTY(&run_slow_work_message)) {
      uv_mutex_unlock(&mutex);
      return;
    }
    // 说明run_slow_work_message还没有插入队列,准备插入队列
    q = &run_slow_work_message;
  }
  // 把节点插入主队列,可能是慢IO消息节点或者一般任务
  QUEUE_INSERT_TAIL(&wq, q);
  // 有空闲线程则唤醒他,如果大家都在忙,则等到他忙完后就会重新判断是否还有新任务
  if (idle_threads > 0)
    uv_cond_signal(&cond);
  uv_mutex_unlock(&mutex);
}

这就是libuv中线程池的生产者逻辑。架构如下。

除了上面提到的,libuv还提供了另外一种生产者。即uv_queue_work函数。他只针对cpu密集型的。从实现来看,他和第一种生产模式的区别是,通过uv_queue_work提交的任务,是对应一个request的。如果该request对应的任务没有执行完,则事件循环不会退出。而通过uv__work_submit方式提交的任务就算没有执行完,也不会影响事件循环的退出。下面我们看uv_queue_work的实现。

int uv_queue_work(uv_loop_t* loop,
                  uv_work_t* req,
                  uv_work_cb work_cb,
                  uv_after_work_cb after_work_cb) {
  if (work_cb == NULL)
    return UV_EINVAL;

  uv__req_init(loop, req, UV_WORK);
  req->loop = loop;
  req->work_cb = work_cb;
  req->after_work_cb = after_work_cb;
  uv__work_submit(loop,
                  &req->work_req,
                  UV__WORK_CPU,
                  uv__queue_work,
                  uv__queue_done);
  return 0;
}

uv_queue_work函数其实也没有太多的逻辑,他保存用户的工作函数和回调到request中。然后提交任务,然后把uv__queue_work和uv__queue_done封装到uv__work中,接着提交任务。所以当这个任务被执行的时候。他会执行工作函数uv__queue_work。

static void uv__queue_work(struct uv__work* w) {
  // 通过结构体某字段拿到结构体地址
  uv_work_t* req = container_of(w, uv_work_t, work_req);
  req->work_cb(req);
}

我们看到uv__queue_work其实就是对用户定义的任务函数进行了封装。这时候我们可以猜到,uv__queue_done也只是对用户回调的简单封装,即他会执行用户的回调。至此,我们分析完了libuv中,线程池的两种生产任务的方式。下面我们开始分析消费者。消费者由worker函数实现。

static void worker(void* arg) {
  struct uv__work* w;
  QUEUE* q;
  int is_slow_work;
  // 线程启动成功
  uv_sem_post((uv_sem_t*) arg);
  arg = NULL;
  // 加锁互斥访问任务队列
  uv_mutex_lock(&mutex);
  for (;;) {
    /*
      1 队列为空,
      2 队列不为空,但是队列里只有慢IO任务且正在执行的慢IO任务个数达到阈值
      则空闲线程加一,防止慢IO占用过多线程,导致其他快的任务无法得到执行
    */
    while (QUEUE_EMPTY(&wq) ||
           (QUEUE_HEAD(&wq) == &run_slow_work_message &&
            QUEUE_NEXT(&run_slow_work_message) == &wq &&
            slow_io_work_running >= slow_work_thread_threshold())) {
      idle_threads += 1;
      // 阻塞,等待唤醒
      uv_cond_wait(&cond, &mutex);
      // 被唤醒,开始干活,空闲线程数减一 
      idle_threads -= 1;
    }
    // 取出头结点,头指点可能是退出消息、慢IO,一般请求
    q = QUEUE_HEAD(&wq);
    // 如果头结点是退出消息,则结束线程
    if (q == &exit_message) {
      // 唤醒其他因为没有任务正阻塞等待任务的线程,告诉他们准备退出
      uv_cond_signal(&cond);
      uv_mutex_unlock(&mutex);
      break;
    }
    // 移除节点 
    QUEUE_REMOVE(q);
    // 重置前后指针
    QUEUE_INIT(q);  
    is_slow_work = 0;
    /* 
        如果当前节点等于慢IO节点,上面的while只判断了是不是只有慢io任务且达到
        阈值,这里是任务队列里肯定有非慢io任务,可能有慢io,如果有慢io并且正在  执行的个数达到阈值,则先不处理该慢io任务,继续判断是否还有非慢io任务可
        执行。
    */
    if (q == &run_slow_work_message) { 
      // 遇到阈值,重新入队 
      if (slow_io_work_running >= slow_work_thread_threshold()) {
        QUEUE_INSERT_TAIL(&wq, q);
        continue;
      }
      // 没有慢IO任务则继续
      if (QUEUE_EMPTY(&slow_io_pending_wq))
        continue;
      // 有慢io,开始处理慢IO任务
      is_slow_work = 1;
      // 正在处理慢IO任务的个数累加,用于其他线程判断慢IO任务个数是否达到阈值
      slow_io_work_running++;
      // 摘下一个慢io任务
      q = QUEUE_HEAD(&slow_io_pending_wq);
      QUEUE_REMOVE(q);
      QUEUE_INIT(q);
       /*
         取出一个任务后,如果还有慢IO任务则把慢IO标记节点重新入队,
        表示还有慢IO任务,因为上面把该标记节点出队了 
       */
      if (!QUEUE_EMPTY(&slow_io_pending_wq)) {
        QUEUE_INSERT_TAIL(&wq, &run_slow_work_message);
        // 有空闲线程则唤醒他,因为还有任务处理
        if (idle_threads > 0)
          uv_cond_signal(&cond);
      }
    }
    // 不需要操作队列了,尽快释放锁
    uv_mutex_unlock(&mutex);
    // q是慢IO或者一般任务
    w = QUEUE_DATA(q, struct uv__work, wq);
    // 执行业务的任务函数,该函数一般会阻塞
    w->work(w);
    // 准备操作loop的任务完成队列,加锁
    uv_mutex_lock(&w->loop->wq_mutex);
    // 置空说明指向完了,不能被取消了,见cancel逻辑
    w->work = NULL;  
    // 执行完任务,插入到loop的wq队列,在uv__work_done的时候会执行该队列的节点
    QUEUE_INSERT_TAIL(&w->loop->wq, &w->wq);
    // 通知loop的wq_async节点
    uv_async_send(&w->loop->wq_async);
    uv_mutex_unlock(&w->loop->wq_mutex);
    // 为下一轮操作任务队列加锁
    uv_mutex_lock(&mutex);
    // 执行完慢IO任务,记录正在执行的慢IO个数变量减1,上面加锁保证了互斥访问这个变量
    if (is_slow_work) {
      slow_io_work_running--;
    }
  }
}

我们看到消费者的逻辑似乎比较复杂,主要是把任务分为三种。并且对于慢io类型的任务,还限制了线程数。其余的逻辑和一般的线程池类型,就是互斥访问任务队列,然后取出节点执行,最后执行回调。不过libuv这里不是直接回调用户的函数。而是通过uv_async_send(&w->loop->wq_async)通知主进程有任务完成了。然后线程继续执行任务。我们看一下这个函数的实现。

// 通知主线程有任务完成
int uv_async_send(uv_async_t* handle) {
  /*
    1 pending是0,则设置为1,返回0,
    2 pending是1则返回1,
    所以同一个async如果多次调用该函数是会被合并的。只有pending等于
    0的时候才会执行uv__async_send
*/
  if (cmpxchgi(&handle->pending, 0, 1) == 0)
    uv__async_send(handle->loop);

  return 0;
}

static void uv__async_send(uv_loop_t* loop) {
  const void* buf;
  ssize_t len;
  int fd;
  int r;

  buf = "";
  len = 1;
  // 用于异步通信的管道的写端
  fd = loop->async_wfd;

#if defined(__linux__)
  // fd等于1说明用的是eventfd而不是管道,管道才有两端
  if (fd == -1) {
    static const uint64_t val = 1;
    buf = &val;
    len = sizeof(val);
    // 见uv__async_start
    fd = loop->async_io_watcher.fd;  /* eventfd */
  }
#endif
  // 通知读端
  do
    r = write(fd, buf, len);
  while (r == -1 && errno == EINTR);
    // 省略部分代码
}

uv__async_send通过网eventfd中写入一些数据,触发了对应io观察者的事件。之前在分析async机制的时候讲过。该io观察者的回调是uv__work_done函数。那么我们就看看这个函数的逻辑。

void uv__work_done(uv_async_t* handle) {
  struct uv__work* w;
  uv_loop_t* loop;
  QUEUE* q;
  QUEUE wq;
  int err;
  // 通过结构体字段获得结构体首地址
  loop = container_of(handle, uv_loop_t, wq_async);
  // 准备处理队列,加锁
  uv_mutex_lock(&loop->wq_mutex);
  // 把loop->wq队列的节点全部移到wp变量中,这样一来可以尽快释放锁
  QUEUE_MOVE(&loop->wq, &wq);
  // 不需要使用了,解锁
  uv_mutex_unlock(&loop->wq_mutex);
  // wq队列的节点来源是在线程的worker里插入
  while (!QUEUE_EMPTY(&wq)) {
    q = QUEUE_HEAD(&wq);
    QUEUE_REMOVE(q);

    w = container_of(q, struct uv__work, wq);
    err = (w->work == uv__cancelled) ? UV_ECANCELED : 0;
    // 执行回调
    w->done(w, err);
  }
}

逐个处理已完成的任务节点,执行回调。这就是整个消费者的逻辑。最后顺带提一下w->work == uv__cancelled。这个处理的用处是为了支持取消一个任务。Libuv提供了uv__work_cancel函数支持用户取消提交的任务。我们看一下他的逻辑。

static int uv__work_cancel(uv_loop_t* loop, uv_req_t* req, struct uv__work* w) {
  int cancelled;
  // 加锁,为了把节点移出队列
  uv_mutex_lock(&mutex);
  // 加锁,为了判断w->wq是否为空
  uv_mutex_lock(&w->loop->wq_mutex);
  /*
    w在在任务队列中并且任务函数work不为空,则可取消,
    在work函数中,如果执行完了任务,会把work置NULL,
    所以一个任务可以取消的前提是他还没执行完。或者说还没执行过
  */
  cancelled = !QUEUE_EMPTY(&w->wq) && w->work != NULL;
  // 从任务队列中删除该节点
  if (cancelled)
    QUEUE_REMOVE(&w->wq);

  uv_mutex_unlock(&w->loop->wq_mutex);
  uv_mutex_unlock(&mutex);
  // 不能取消
  if (!cancelled)
    return UV_EBUSY;
  // 重置回调函数
  w->work = uv__cancelled;

  uv_mutex_lock(&loop->wq_mutex);
   /*
     插入loop的wq队列,对于取消的动作,libuv认为是任务执行完了。
     所以插入已完成的队列,不过他的回调是uv__cancelled函数,
     而不是用户设置的回调
   */
  QUEUE_INSERT_TAIL(&loop->wq, &w->wq);
  // 通知主线程有任务完成
  uv_async_send(&loop->wq_async);
  uv_mutex_unlock(&loop->wq_mutex);

  return 0;
}

最后我们举一个使用线程池的例子。这里以文件操作为例子,因为nodejs中文件读写是以线程池实现的。这里直接从uv_fs_open开始(因为js层到c++层主要是一些封装。最后会调到uv_fs_open)。直接看一下uv_fs_open的代码。

// 下面代码是宏展开后的效果
int uv_fs_open(
    uv_loop_t* loop,
         uv_fs_t* req,
         const char* path,
         int flags,
         int mode,
         uv_fs_cb cb
) {                                                                 
    // 初始化一些字段
    UV_REQ_INIT(req, UV_FS);                                                  
    req->fs_type = UV_FS_ ## subtype;                                         
    req->result = 0;                                                          
    req->ptr = NULL;                                                          
    req->loop = loop;                                                         
    req->path = NULL;                                                         
    req->new_path = NULL;                                                     
    req->bufs = NULL;                                                         
    req->cb = cb;    
    // 同步                                                
    if (cb == NULL) {                                                         
      req->path = path;                                                       
    } else {                                                                  
      req->path = uv__strdup(path);                               
    } 
    req->flags = flags;
    req->mode = mode;                                                                     
    if (cb != NULL) {                                                         
      uv__req_register(loop, req);  
      /* 异步*/                                           
      uv__work_submit(
         loop,                                                   
         &req->work_req,                                         
         UV__WORK_FAST_IO,                                       
         uv__fs_work,                                            
         uv__fs_done    
      );                                           
      return 0;                                                               
    }                                                                         
    else {                 
       /* 同步 */                                                    
      uv__fs_work(&req->work_req);                                            
      return req->result;                                                     
    }   

我们从上往下看,没有太多的逻辑,函数的最后一个参数cb是nodejs的c++层设置的,c++层会再回调js层。然后open(大部分的文件操作)分为同步和异步两种模式(即fs.open和openSync)。同步直接导致nodejs阻塞,不涉及到线程池,这里只看异步模式。我们看到异步模式下是调用uv__work_submit函数给线程池提交一个任务。设置的工作函数和回调函数分别是uv__fs_work,uv__fs_done。所以我们看一下这两函数。uv__fs_work函数主要是调用操作系统提供的函数。比如open。他会引起线程的阻塞,等到执行完后,他会把返回结果保存到request结构体中。接着执行就是遵从线程池的处理流程。执行回调uv__fs_done。

static void uv__fs_done(struct uv__work* w, int status) {
  uv_fs_t* req;

  req = container_of(w, uv_fs_t, work_req);
  uv__req_unregister(req->loop, req);
  // 取消了
  if (status == UV_ECANCELED) {
    req->result = UV_ECANCELED;
  }
  // 执行用户设置的回调,比如nodejs
  req->cb(req);
}

没有太多逻辑,直接执行回调,顺便提一下,nodejs里则是执行c++层函数AfterInteger(代码在node_file.cc的Open函数)。

void AfterInteger(uv_fs_t* req) {
  FSReqWrap* req_wrap = static_cast<FSReqWrap*>(req->data);
  FSReqAfterScope after(req_wrap, req);

  if (after.Proceed())
    req_wrap->Resolve(Integer::New(req_wrap->env()->isolate(), req->result));
}

void FSReqWrap::Resolve(Local<Value> value) {
  Local<Value> argv[2] {
    Null(env()->isolate()),
    value
  };
  MakeCallback(env()->oncomplete_string(), arraysize(argv), argv);
}

执行resolve,然后执行js层的oncomplete回调,即用户执行open函数时传入的函数。至此,线程池分析完成。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-05-04,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 编程杂技 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档