前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >libuv源码分析之stream第二篇

libuv源码分析之stream第二篇

作者头像
theanarkh
发布2020-05-07 16:07:25
6510
发布2020-05-07 16:07:25
举报
文章被收录于专栏:原创分享原创分享

上一篇分析了流的基础知识和读写操作的实现。今天继续分析。

1 关闭流的写端

代码语言:c++
复制
// 关闭流的写端
int uv_shutdown(uv_shutdown_t* req, uv_stream_t* stream, uv_shutdown_cb cb) {

  // 流是可写的,并且还没关闭写端,也不是处于正在关闭状态
  if (!(stream->flags & UV_HANDLE_WRITABLE) ||
      stream->flags & UV_HANDLE_SHUT ||
      stream->flags & UV_HANDLE_SHUTTING ||
      uv__is_closing(stream)) {
    return UV_ENOTCONN;
  }

  // 初始化一个关闭请求,关联的handle是stream
  uv__req_init(stream->loop, req, UV_SHUTDOWN);
  req->handle = stream;
  // 关闭后执行的回调
  req->cb = cb;
  stream->shutdown_req = req;
  // 设置正在关闭的标记
  stream->flags |= UV_HANDLE_SHUTTING;
  // 注册等待可写事件
  uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);

  return 0;
}

关闭流的写端就是相当于给流发送一个关闭请求,把请求挂载到流中,然后注册等待可写事件,在可写事件触发的时候就会执行关闭操作。这个我们后面分析。

2 关闭流

代码语言:c++
复制
void uv__stream_close(uv_stream_t* handle) {
  unsigned int i;
  uv__stream_queued_fds_t* queued_fds;
  // 从事件循环中删除io观察者,移出pending队列
  uv__io_close(handle->loop, &handle->io_watcher);
  // 停止读
  uv_read_stop(handle);
  // 停掉handle
  uv__handle_stop(handle);
  // 不可读、写
  handle->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
  // 关闭非标准流的文件描述符
  if (handle->io_watcher.fd != -1) {
    /* Don't close stdio file descriptors.  Nothing good comes from it. */
    if (handle->io_watcher.fd > STDERR_FILENO)
      uv__close(handle->io_watcher.fd);
    handle->io_watcher.fd = -1;
  }
  // 关闭通信socket对应的文件描述符
  if (handle->accepted_fd != -1) {
    uv__close(handle->accepted_fd);
    handle->accepted_fd = -1;
  }

  /* Close all queued fds */
  // 同上,这是在排队等待处理的通信socket
  if (handle->queued_fds != NULL) {
    queued_fds = handle->queued_fds;
    for (i = 0; i < queued_fds->offset; i++)
      uv__close(queued_fds->fds[i]);
    uv__free(handle->queued_fds);
    handle->queued_fds = NULL;
  }

  assert(!uv__io_active(&handle->io_watcher, POLLIN | POLLOUT));
}

3 连接流

连接流是针对tcp的,连接即建立三次握手。所以我们首先介绍一下一些网络编程相关的内容。想要发起三次握手,首先我们先要有一个socket。我们看libuv中如何新建一个socket。

代码语言:c++
复制
/*
1 获取一个新的socket fd
2 把fd保存到handle里,并根据flag进行相关设置
3 绑定到本机随意的地址(如果设置了该标记的话)
*/
static int new_socket(uv_tcp_t* handle, int domain, unsigned long flags) {
  struct sockaddr_storage saddr;
  socklen_t slen;
  int sockfd;
  int err;
  // 获取一个socket
  err = uv__socket(domain, SOCK_STREAM, 0);
  if (err < 0)
    return err;
  // 申请的fd
  sockfd = err;
  // 设置选项和保存socket的文件描述符到io观察者中
  err = uv__stream_open((uv_stream_t*) handle, sockfd, flags);
  if (err) {
    uv__close(sockfd);
    return err;
  }
  // 设置了需要绑定标记UV_HANDLE_BOUND    
  if (flags & UV_HANDLE_BOUND) {
    slen = sizeof(saddr);
    memset(&saddr, 0, sizeof(saddr));
    // 获取fd对应的socket信息,比如ip,端口,可能没有
    if (getsockname(uv__stream_fd(handle), (struct sockaddr*) &saddr, &slen)) {
      uv__close(sockfd);
      return UV__ERR(errno);
    }
    // 绑定到socket中,如果没有则绑定到系统随机选择的地址
    if (bind(uv__stream_fd(handle), (struct sockaddr*) &saddr, slen)) {
      uv__close(sockfd);
      return UV__ERR(errno);
    }
  }

  return 0;
}

上面的代码就是在libuv申请一个socket的逻辑,他还支持新建的socket,可以绑定到一个用户设置的,或者操作系统随机选择的地址。不过libuv并不直接使用这个函数。而是又封装了一层。

代码语言:c++
复制
// 如果流还没有对应的fd,则申请一个新的,如果有则修改流的配置
static int maybe_new_socket(uv_tcp_t* handle, int domain, unsigned long flags) {
  struct sockaddr_storage saddr;
  socklen_t slen;

  if (domain == AF_UNSPEC) {
    handle->flags |= flags;
    return 0;
  }
  // 已经有socket fd了
  if (uv__stream_fd(handle) != -1) {
    // 该流需要绑定到一个地址
    if (flags & UV_HANDLE_BOUND) {
      /*
          流是否已经绑定到一个地址了。handle的flag是在new_socket里设置的,
          如果有这个标记说明已经执行过绑定了,直接更新flags就行。
      */
      if (handle->flags & UV_HANDLE_BOUND) {
        handle->flags |= flags;
        return 0;
      }
      // 有socket fd,但是可能还没绑定到一个地址
      slen = sizeof(saddr);
      memset(&saddr, 0, sizeof(saddr));
      // 获取socket绑定到的地址
      if (getsockname(uv__stream_fd(handle), (struct sockaddr*) &saddr, &slen))
        return UV__ERR(errno);
      // 绑定过了socket地址,则更新flags就行
      if ((saddr.ss_family == AF_INET6 &&
          ((struct sockaddr_in6*) &saddr)->sin6_port != 0) ||
          (saddr.ss_family == AF_INET &&
          ((struct sockaddr_in*) &saddr)->sin_port != 0)) {
        /* Handle is already bound to a port. */
        handle->flags |= flags;
        return 0;
      }
      // 没绑定则绑定到随机地址,bind中实现
      if (bind(uv__stream_fd(handle), (struct sockaddr*) &saddr, slen))
        return UV__ERR(errno);
    }

    handle->flags |= flags;
    return 0;
  }
  // 申请一个新的fd关联到流
  return new_socket(handle, domain, flags);
}

maybe_new_socket函数的逻辑分支很多 1 如果流还没有关联到fd,则申请一个新的fd关联到流上。如果设置了绑定标记,fd还会和一个地址进行绑定。 2 如果流已经关联了一个fd

  1. 如果流设置了绑定地址的标记,但是已经通过libuv绑定了一个地址(Libuv会设置UV_HANDLE_BOUND标记,用户也可能是直接调bind函数绑定了)。则不需要再次绑定,更新flags就行。
  2. 如果流设置了绑定地址的标记,但是还没有通过libuv绑定一个地址,这时候通过getsocketname判断用户是否自己通过bind函数绑定了一个地址,是的话则不需要再次执行绑定操作。否则随机绑定到一个地址。 以上两个函数的逻辑主要是申请一个socket和给socket绑定一个地址。下面我们开看一下连接流的实现。
代码语言:c++
复制
int uv__tcp_connect(uv_connect_t* req,
                    uv_tcp_t* handle,
                    const struct sockaddr* addr,
                    unsigned int addrlen,
                    uv_connect_cb cb) {
  int err;
  int r;

  // 已经发起了connect了
  if (handle->connect_req != NULL)
    return UV_EALREADY;  
  // 申请一个socket和绑定一个地址,如果还没有的话
  err = maybe_new_socket(handle,
                         addr->sa_family,
                         UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
  if (err)
    return err;

  handle->delayed_error = 0;

  do {
    // 清除全局错误变量的值
    errno = 0;
    // 发起三次握手
    r = connect(uv__stream_fd(handle), addr, addrlen);
  } while (r == -1 && errno == EINTR);

  if (r == -1 && errno != 0) {
    // 三次握手还没有完成
    if (errno == EINPROGRESS)
      ; /* not an error */
    else if (errno == ECONNREFUSED)
      // 对方拒绝建立连接,延迟报错
      handle->delayed_error = UV__ERR(errno);
    else
      // 直接报错
      return UV__ERR(errno);
  }
  // 初始化一个连接型request,并设置某些字段
  uv__req_init(handle->loop, req, UV_CONNECT);
  req->cb = cb;
  req->handle = (uv_stream_t*) handle;
  QUEUE_INIT(&req->queue);
  handle->connect_req = req;
  // 注册到libuv观察者队列
  uv__io_start(handle->loop, &handle->io_watcher, POLLOUT);
  // 连接出错,插入pending队尾
  if (handle->delayed_error)
    uv__io_feed(handle->loop, &handle->io_watcher);

  return 0;
}

连接流的逻辑,大致如下 1 申请一个socket,绑定一个地址。 2 根据给定的服务器地址,发起三次握手,非阻塞的,会直接返回继续执行,不会等到三次握手完成。 3 往流上挂载一个connect型的请求。 4 设置io观察者感兴趣的事件为可写。然后把io观察者插入事件循环的io观察者队列。等待可写的时候时候(完成三次握手),就会执行cb回调。

4 监听流

代码语言:c++
复制
int uv_tcp_listen(uv_tcp_t* tcp, int backlog, uv_connection_cb cb) {
  static int single_accept = -1;
  unsigned long flags;
  int err;

  if (tcp->delayed_error)
    return tcp->delayed_error;
  // 是否设置了不连续accept。默认是连续accept。
  if (single_accept == -1) {
    const char* val = getenv("UV_TCP_SINGLE_ACCEPT");
    single_accept = (val != NULL && atoi(val) != 0);  /* Off by default. */
  }
  // 设置不连续accept
  if (single_accept)
    tcp->flags |= UV_HANDLE_TCP_SINGLE_ACCEPT;

  flags = 0;
  /*
    可能还没有用于listen的fd,socket地址等。
    这里申请一个socket和绑定到一个地址(如果调listen之前没有调bind则绑定到随机地址)
  */
  err = maybe_new_socket(tcp, AF_INET, flags);
  if (err)
    return err;
  // 设置fd为listen状态
  if (listen(tcp->io_watcher.fd, backlog))
    return UV__ERR(errno);
  // 建立连接后的业务回调
  tcp->connection_cb = cb;
  tcp->flags |= UV_HANDLE_BOUND;
  // 有连接到来时的libuv层回调
  tcp->io_watcher.cb = uv__server_io;
  // 注册读事件,等待连接到来
  uv__io_start(tcp->loop, &tcp->io_watcher, POLLIN);

  return 0;
}

监听流的逻辑看起来逻辑很多,但是主要的逻辑是把流对的fd改成listen状态,这样流就可以接收请求了。然后设置连接到来时执行的回调。最后注册io观察者到事件循环。等待连接到来。就会执行uv__server_io。uv__server_io再执行connection_cb。监听流和其他流的一个区别是,当io观察者的事件触发时,监听流执行的回调是uv__server_io函数。而其他流是在uv__stream_io里统一处理。

流的类型分析得差不多了,最后分析一下监听流的处理函数uv__server_io,统一处理其他流的函数是uv__stream_io,这个下次分析。

刚才已经说到有连接到来的时候,libuv会执行uv__server_io,下面看一下他做了什么事情。

代码语言:c++
复制
// 有tcp连接到来时执行该函数
void uv__server_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
  uv_stream_t* stream;
  int err;
  // 拿到io观察者所在的流
  stream = container_of(w, uv_stream_t, io_watcher);
  // 继续注册事件,等待连接
  uv__io_start(stream->loop, &stream->io_watcher, POLLIN);

  /* connection_cb can close the server socket while we're
   * in the loop so check it on each iteration.
   */
  while (uv__stream_fd(stream) != -1) {
    // 有连接到来,进行accept
    err = uv__accept(uv__stream_fd(stream));
    if (err < 0) {
      // 忽略出错处理
      // accept出错,触发回调
      stream->connection_cb(stream, err);
      continue;
    }
    // 保存通信socket对应的文件描述符
    stream->accepted_fd = err;
    /*
        有连接,执行上层回调,connection_cb一般会调用uv_accept消费accepted_fd。
        然后重新注册等待可读事件
    */
    stream->connection_cb(stream, 0);
    /*
        用户还没有消费accept_fd。先解除io的事件,
        等到用户调用uv_accept消费了accepted_fd再重新注册事件
    */
    if (stream->accepted_fd != -1) {
      uv__io_stop(loop, &stream->io_watcher, POLLIN);
      return;
    }
    // 定时睡眠一会(可被信号唤醒),分点给别的进程accept
    if (stream->type == UV_TCP &&
        (stream->flags & UV_HANDLE_TCP_SINGLE_ACCEPT)) {
      struct timespec timeout = { 0, 1 };
      nanosleep(&timeout, NULL);
    }
  }
}

整个函数的逻辑如下 1 调用accept摘下一个完成了三次握手的节点。 2 然后执行上层回调。上层回调会调用uv_accept消费accept返回的fd。然后再次注册等待可读事件(当然也可以不消费)。 3 如果2没有消费调fd。则撤销等待可读事件,即处理完一个fd后,再accept下一个。如果2中消费了fd。再判断有没有设置UV_HANDLE_TCP_SINGLE_ACCEPT标记,如果有则休眠一会,分点给别的进程accept。否则继续accept。

总结:目前分析的是一些基础的概念和实现。后续会串起来再分析一下具体的过程。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-05-03,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 编程杂技 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1 关闭流的写端
  • 2 关闭流
  • 3 连接流
  • 4 监听流
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档