专栏首页纸上得来终觉浅libuv源码阅读(18)--progress
原创

libuv源码阅读(18)--progress

先看下用例源码:

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>

#include <uv.h>

uv_loop_t *loop;
uv_async_t async;

double percentage;

void fake_download(uv_work_t *req) {
    int size = *((int*) req->data);
    int downloaded = 0;
    while (downloaded < size) {
        percentage = downloaded*100.0/size;
        async.data = (void*) &percentage;
        // work 随机完成一些模拟的下载任务 然后告诉loop可以打印此时的进度了
        uv_async_send(&async);

        sleep(1);
        downloaded += (200+random())%1000; // can only download max 1000bytes/sec,
                                           // but at least a 200;
    }
}

// 任务结束了
void after(uv_work_t *req, int status) {
    fprintf(stderr, "Download complete\n");
    uv_close((uv_handle_t*) &async, NULL);
}

// 异步handler的回调
void print_progress(uv_async_t *handle) {
    double percentage = *((double*) handle->data);
    fprintf(stderr, "Downloaded %.2f%%\n", percentage);
}

int main() {
    loop = uv_default_loop();

    uv_work_t req;
    int size = 10240;
    req.data = (void*) &size;
    
    // 初始化一个异步handler
    uv_async_init(loop, &async, print_progress);
    // 提交一个work
    uv_queue_work(loop, &req, fake_download, after);

    return uv_run(loop, UV_RUN_DEFAULT);
}

看下一些函数的细节:

// 常规初始化操作
int uv_async_init(uv_loop_t* loop, uv_async_t* handle, uv_async_cb async_cb) {
  int err;

  err = uv__async_start(loop);
  if (err)
    return err;

  uv__handle_init(loop, (uv_handle_t*)handle, UV_ASYNC);
  handle->async_cb = async_cb;
  handle->pending = 0;

  // 插入指定队列中
  QUEUE_INSERT_TAIL(&loop->async_handles, &handle->queue);
  uv__handle_start(handle);

  return 0;
}

// loop初始化的时候已经创建好了一个异步handler
static int uv__async_start(uv_loop_t* loop) {
  int pipefd[2];
  int err;
  
  // 所以这里会直接返回的
  if (loop->async_io_watcher.fd != -1)
    return 0;

#ifdef __linux__
  err = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
  if (err < 0)
    return UV__ERR(errno);

  pipefd[0] = err;
  pipefd[1] = -1;
#else
  err = uv__make_pipe(pipefd, UV_NONBLOCK_PIPE);
  if (err < 0)
    return err;
#endif
  // 每个异步任务结束后都会写监听fd 导致下面的回调触发
  uv__io_init(&loop->async_io_watcher, uv__async_io, pipefd[0]);
  uv__io_start(loop, &loop->async_io_watcher, POLLIN);
  loop->async_wfd = pipefd[1];

  return 0;
}

static void uv__async_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
  char buf[1024];
  ssize_t r;
  QUEUE queue;
  QUEUE* q;
  uv_async_t* h;

  assert(w == &loop->async_io_watcher);

  for (;;) {
    // 有事件触发了
    r = read(w->fd, buf, sizeof(buf));

    if (r == sizeof(buf))
      continue;

    if (r != -1)
      break;

    if (errno == EAGAIN || errno == EWOULDBLOCK)
      break;

    if (errno == EINTR)
      continue;

    abort();
  }

  // 遍历指定队列 如果是处于pending状态的handler就可以处理了
  QUEUE_MOVE(&loop->async_handles, &queue);
  while (!QUEUE_EMPTY(&queue)) {
    q = QUEUE_HEAD(&queue);
    h = QUEUE_DATA(q, uv_async_t, queue);

    QUEUE_REMOVE(q);
    QUEUE_INSERT_TAIL(&loop->async_handles, q);
    
    // pending为 1 2 时可以处理
    if (0 == uv__async_spin(h))
      continue;  /* Not pending. */

    if (h->async_cb == NULL)
      continue;

    h->async_cb(h);
  }
}

// 多个工作线程可能会操作一个handler 导致pending的值引入竞争态 需要增加同步操作
/* Only call this from the event loop thread. */
static int uv__async_spin(uv_async_t* handle) {
  int i;
  int rc;

  for (;;) {
    /* 997 is not completely chosen at random. It's a prime number, acyclical
     * by nature, and should therefore hopefully dampen sympathetic resonance.
     */
    for (i = 0; i < 997; i++) {
      /* rc=0 -- handle is not pending.
       * rc=1 -- handle is pending, other thread is still working with it.
       * rc=2 -- handle is pending, other thread is done.
       */
      rc = cmpxchgi(&handle->pending, 2, 0);

      if (rc != 1)
        return rc;

      /* Other thread is busy with this handle, spin until it's done. */
      cpu_relax();
    }

    /* Yield the CPU. We may have preempted the other thread while it's
     * inside the critical section and if it's running on the same CPU
     * as us, we'll just burn CPU cycles until the end of our time slice.
     */
    sched_yield();
  }
}

//  如果 ptr 值和 old相等, 则将new赋值给ptr且返回old, 否则返回new。
UV_UNUSED(static int cmpxchgi(int* ptr, int oldval, int newval)) {
#if defined(__i386__) || defined(__x86_64__)
  int out;
  __asm__ __volatile__ ("lock; cmpxchg %2, %1;"
                        : "=a" (out), "+m" (*(volatile int*) ptr)
                        : "r" (newval), "0" (oldval)
                        : "memory");
  return out;
#elif defined(__MVS__)
  unsigned int op4;
  if (__plo_CSST(ptr, (unsigned int*) &oldval, newval,
                (unsigned int*) ptr, *ptr, &op4))
    return oldval;
  else
    return op4;
#elif defined(__SUNPRO_C) || defined(__SUNPRO_CC)
  return atomic_cas_uint((uint_t *)ptr, (uint_t)oldval, (uint_t)newval);
#else
  return __sync_val_compare_and_swap(ptr, oldval, newval);
#endif
}

// 提升 cpu 在等待 自旋锁释放时候的性能
UV_UNUSED(static void cpu_relax(void)) {
#if defined(__i386__) || defined(__x86_64__)
  __asm__ __volatile__ ("rep; nop");  /* a.k.a. PAUSE */
#elif (defined(__arm__) && __ARM_ARCH >= 7) || defined(__aarch64__)
  __asm__ volatile("yield");
#endif
}

// 发送信号 pending置位
int uv_async_send(uv_async_t* handle) {
  /* Do a cheap read first. */
  if (ACCESS_ONCE(int, handle->pending) != 0)
    return 0;

  /* Tell the other thread we're busy with the handle. */
  if (cmpxchgi(&handle->pending, 0, 1) != 0)
    return 0;

  /* Wake up the other thread's event loop. */
  uv__async_send(handle->loop);

  /* Tell the other thread we're done. */
  if (cmpxchgi(&handle->pending, 1, 2) != 1)
    abort();

  return 0;
}

// 触发异步任务io监听fd可读事件
static void uv__async_send(uv_loop_t* loop) {
  const void* buf;
  ssize_t len;
  int fd;
  int r;

  buf = "";
  len = 1;
  fd = loop->async_wfd;

#if defined(__linux__)
  if (fd == -1) {
    static const uint64_t val = 1;
    buf = &val;
    len = sizeof(val);
    fd = loop->async_io_watcher.fd;  /* eventfd */
  }
#endif

  do
    r = write(fd, buf, len);
  while (r == -1 && errno == EINTR);

  if (r == len)
    return;

  if (r == -1)
    if (errno == EAGAIN || errno == EWOULDBLOCK)
      return;

  abort();
}

总结:用户自己初始化的async handler 也可以被插入到异步handler队列中,当管道[0]可读的时候,代表某个异步handler可以处理了,这时候遍历队列,处理pengding状态的handler。

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • libuv源码阅读(2)--queue.h

    这是一种双向队列的实现,假设现在有2个strcut BASE 和 A 要通过双向队列组织起来,BASE作为队列头结点的持有者,A作为队列元素插入:

    wanyicheng
  • libuv源码阅读(6)--helloworld

    每一种都是一种hanlder类型或者request类型,代表某种资源类型或者请求操作的包装结构体,里面的属性字段是为了支持它可以正常工作的而设置的:

    wanyicheng
  • libuv源码阅读(9)--interfaces

    简单来说就是调用linux系统的:getifaddrs 和 freeifaddrs,读取系统的网卡接口信息,然后拷贝到用户的缓冲区中,然后再释放函数内部的内存。...

    wanyicheng
  • libuv源码阅读(11)--lock

    2. 创建2个读者 1个写者 根据读写锁 被系统调度分配执行时机 输出对应自己的读到或者写后的值

    wanyicheng
  • libuv源码阅读(12)--change

    可以看到 fs_event_s 也是由基础的handler和一个path 以及 它独有的字段组成

    wanyicheng
  • libuv源码阅读(13)--plugin

    功能很简单:载入启动参数中对应的插件动态库,调用它们的 initialize 方法

    wanyicheng
  • libuv源码阅读(16)--signal

    总结:信号处理handler是被插入到红黑树中,按照一定规则排序插入的,信号越小,不带oneshot等规则。信号处理函数统一触发信号管道可读,然后loop从信号...

    wanyicheng
  • libuv源码阅读(19)--vustop

    wanyicheng
  • libuv源码阅读(20)--uvcat

    wanyicheng

扫码关注云+社区

领取腾讯云代金券