专栏首页纸上得来终觉浅libuv源码阅读(17)--queue-cancel
原创

libuv源码阅读(17)--queue-cancel

先看下用例源码:

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>

#include <uv.h>

#define FIB_UNTIL 25
uv_loop_t *loop;
uv_work_t fib_reqs[FIB_UNTIL];

long fib_(long t) {
    if (t == 0 || t == 1)
        return 1;
    else
        return fib_(t-1) + fib_(t-2);
}

void fib(uv_work_t *req) {
    int n = *(int *) req->data;
    if (random() % 2)
        sleep(1);
    else
        sleep(3);
    long fib = fib_(n);
    fprintf(stderr, "%dth fibonacci is %lu\n", n, fib);
}

void after_fib(uv_work_t *req, int status) {
    if (status == UV_ECANCELED)
        fprintf(stderr, "Calculation of %d cancelled.\n", *(int *) req->data);
}

void signal_handler(uv_signal_t *req, int signum)
{
    printf("Signal received!\n");
    int i;
    for (i = 0; i < FIB_UNTIL; i++) {
        // 取消在这里调用
        uv_cancel((uv_req_t*) &fib_reqs[i]);
    }
    uv_signal_stop(req);
}

int main() {
    loop = uv_default_loop();

    int data[FIB_UNTIL];
    int i;
    // 依次提交任务到线程池的工作队列中
    for (i = 0; i < FIB_UNTIL; i++) {
        data[i] = i;
        fib_reqs[i].data = (void *) &data[i];
        uv_queue_work(loop, &fib_reqs[i], fib, after_fib);
    }

    uv_signal_t sig;
    uv_signal_init(loop, &sig);
    // 接收到终止信号后把没有结束的任务取消掉
    uv_signal_start(&sig, signal_handler, SIGINT);

    return uv_run(loop, UV_RUN_DEFAULT);
}

看下呗插入到线程池的任务如何取消:

int uv_cancel(uv_req_t* req) {
  struct uv__work* wreq;
  uv_loop_t* loop;

  switch (req->type) {
  case UV_FS:
    loop =  ((uv_fs_t*) req)->loop;
    wreq = &((uv_fs_t*) req)->work_req;
    break;
  case UV_GETADDRINFO:
    loop =  ((uv_getaddrinfo_t*) req)->loop;
    wreq = &((uv_getaddrinfo_t*) req)->work_req;
    break;
  case UV_GETNAMEINFO:
    loop = ((uv_getnameinfo_t*) req)->loop;
    wreq = &((uv_getnameinfo_t*) req)->work_req;
    break;
  case UV_RANDOM:
    loop = ((uv_random_t*) req)->loop;
    wreq = &((uv_random_t*) req)->work_req;
    break;
  case UV_WORK:
    loop =  ((uv_work_t*) req)->loop;
    wreq = &((uv_work_t*) req)->work_req;
    break;
  default:
    return UV_EINVAL;
  }
  // 根据类型转换指针类型
  return uv__work_cancel(loop, req, wreq);
}

static int uv__work_cancel(uv_loop_t* loop, uv_req_t* req, struct uv__work* w) {
  int cancelled;

  uv_mutex_lock(&mutex);
  uv_mutex_lock(&w->loop->wq_mutex);
  
  // 先看是否支持取消 被插入队列中且work函数尚未执行
  cancelled = !QUEUE_EMPTY(&w->wq) && w->work != NULL;
  if (cancelled)
    QUEUE_REMOVE(&w->wq);

  uv_mutex_unlock(&w->loop->wq_mutex);
  uv_mutex_unlock(&mutex);

  if (!cancelled)
    return UV_EBUSY;

  // 替换work函数 插入loop的就绪wq中 通知loop执行它 
  w->work = uv__cancelled;
  uv_mutex_lock(&loop->wq_mutex);
  QUEUE_INSERT_TAIL(&loop->wq, &w->wq);
  uv_async_send(&loop->wq_async);
  uv_mutex_unlock(&loop->wq_mutex);

  return 0;
}

// 终止线程
static void uv__cancelled(struct uv__work* w) {
  abort();
}

void uv__work_done(uv_async_t* handle) {
  struct uv__work* w;
  uv_loop_t* loop;
  QUEUE* q;
  QUEUE wq;
  int err;

  loop = container_of(handle, uv_loop_t, wq_async);
  uv_mutex_lock(&loop->wq_mutex);
  QUEUE_MOVE(&loop->wq, &wq);
  uv_mutex_unlock(&loop->wq_mutex);

  while (!QUEUE_EMPTY(&wq)) {
    q = QUEUE_HEAD(&wq);
    QUEUE_REMOVE(q);

    w = container_of(q, struct uv__work, wq);
    // 这里就是上文决定是否是取消的任务
    err = (w->work == uv__cancelled) ? UV_ECANCELED : 0;
    w->done(w, err);
  }
}

总结:cancel把会work从待处理的队列中移动到loop的wq就绪队列中,然后让loop的异步任务handler来处理它

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • libuv源码阅读(14)--queue-work

    其实就是调用 uv__work_submit 提交一个任务给任务队列,跟loop的io事件多线程异步化一样的处理逻辑,这里2个cb分别对应work和done函数...

    wanyicheng
  • libuv源码阅读(2)--queue.h

    这是一种双向队列的实现,假设现在有2个strcut BASE 和 A 要通过双向队列组织起来,BASE作为队列头结点的持有者,A作为队列元素插入:

    wanyicheng
  • libuv源码阅读(6)--helloworld

    每一种都是一种hanlder类型或者request类型,代表某种资源类型或者请求操作的包装结构体,里面的属性字段是为了支持它可以正常工作的而设置的:

    wanyicheng
  • libuv源码阅读(9)--interfaces

    简单来说就是调用linux系统的:getifaddrs 和 freeifaddrs,读取系统的网卡接口信息,然后拷贝到用户的缓冲区中,然后再释放函数内部的内存。...

    wanyicheng
  • libuv源码阅读(11)--lock

    2. 创建2个读者 1个写者 根据读写锁 被系统调度分配执行时机 输出对应自己的读到或者写后的值

    wanyicheng
  • libuv源码阅读(12)--change

    可以看到 fs_event_s 也是由基础的handler和一个path 以及 它独有的字段组成

    wanyicheng
  • libuv源码阅读(13)--plugin

    功能很简单:载入启动参数中对应的插件动态库,调用它们的 initialize 方法

    wanyicheng
  • libuv源码阅读(16)--signal

    总结:信号处理handler是被插入到红黑树中,按照一定规则排序插入的,信号越小,不带oneshot等规则。信号处理函数统一触发信号管道可读,然后loop从信号...

    wanyicheng
  • libuv源码阅读(18)--progress

    总结:用户自己初始化的async handler 也可以被插入到异步handler队列中,当管道[0]可读的时候,代表某个异步handler可以处理了,这时候遍...

    wanyicheng

扫码关注云+社区

领取腾讯云代金券