前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >libuv源码分析之stream第一篇

libuv源码分析之stream第一篇

作者头像
theanarkh
发布2020-05-07 16:05:10
8450
发布2020-05-07 16:05:10
举报
文章被收录于专栏:原创分享原创分享

流的实现在libuv里占了很大篇幅,今天分析一下流的实现。首先看数据结构。流在libuv里用uv_stream_s表示,他属于handle族。继承于uv_handle_s。

代码语言:c++
复制
struct uv_stream_s {
  // uv_handle_s的字段
  void* data;        
  // 所属事件循环   
  uv_loop_t* loop;  
  // handle类型    
  uv_handle_type type;  
  // 关闭handle时的回调
  uv_close_cb close_cb; 
  // 用于插入事件循环的handle队列
  void* handle_queue[2];
  union {               
    int fd;             
    void* reserved[4];  
  } u;      
  // 用于插入事件循环的closing阶段对应的队列
  uv_handle_t* next_closing; 
  // 各种标记 
  unsigned int flags;
  // 流拓展的字段
  // 用户写入流的字节大小,流缓存用户的输入,然后等到可写的时候才做真正的写
  size_t write_queue_size; 
  // 分配内存的函数,内存由用户定义,主要用来保存读取的数据               
  uv_alloc_cb alloc_cb;  
  // 读取数据的回调                 
  uv_read_cb read_cb; 
  // 连接成功后,执行connect_req的回调(connect_req在uv__xxx_connect中赋值)
  uv_connect_t *connect_req; 
  // 关闭写端的时候,发送完缓存的数据,执行shutdown_req的回调(shutdown_req在uv_shutdown的时候赋值)    
  uv_shutdown_t *shutdown_req;
  // 流对应的io观察者,即文件描述符+一个文件描述符事件触发时执行的回调   
  uv__io_t io_watcher;  
  // 流缓存下来的,待写的数据         
  void* write_queue[2];       
  // 已经完成了数据写入的队列   
  void* write_completed_queue[2];
  // 完成三次握手后,执行的回调
  uv_connection_cb connection_cb;
  // 操作流时出错码
  int delayed_error;  
  // accept返回的通信socket对应的文件描述符           
  int accepted_fd;    
  // 同上,用于缓存更多的通信socket对应的文件描述符           
  void* queued_fds;
}

流的实现中,最核心的字段是io观察者,其余的字段是和流的性质相关的。io观察者封装了流对应的文件描述符和文件描述符事件触发时的回调。比如读一个流,写一个流,关闭一个流,连接一个流,监听一个流,在uv_stream_s中都有对应的字段去支持。但是本质上是靠io观察者去驱动的。 1 读一个流,就是io观察者中的文件描述符。可读事件触发时,执行用户的读回调。 2 写一个流,先把数据写到流中,然后io观察者中的文件描述符。可写事件触发时,执行最后的写入,并执行用户的写完成回调。 3 关闭一个流,就是io观察者中的文件描述符。可写事件触发时,如果待写的数据已经写完(比如发送完),然后执行关闭流的写端。接着执行用户的回调。 4 连接一个流,比如作为客户端去连接服务器。就是io观察者中的文件描述符。可读事件触发时(建立三次握手成功),执行用户的回调。 5 监听一个流,就是io观察者中的文件描述符。可读事件触发时(有完成三次握手的连接),执行用户的回调。 今天我们具体分析一下流读写操作的实现。首先我们看一下如何初始化一个流。

代码语言:c++
复制
// 初始化流
void uv__stream_init(uv_loop_t* loop,
                     uv_stream_t* stream,
                     uv_handle_type type) {
  int err;
  // 记录handle的类型
  uv__handle_init(loop, (uv_handle_t*)stream, type);
  stream->read_cb = NULL;
  stream->alloc_cb = NULL;
  stream->close_cb = NULL;
  stream->connection_cb = NULL;
  stream->connect_req = NULL;
  stream->shutdown_req = NULL;
  stream->accepted_fd = -1;
  stream->queued_fds = NULL;
  stream->delayed_error = 0;
  QUEUE_INIT(&stream->write_queue);
  QUEUE_INIT(&stream->write_completed_queue);
  stream->write_queue_size = 0;
  // 这个逻辑看起来是为了拿到一个备用的文件描述符,如果以后触发UV_EMFILE错误(打开的文件太多)时,使用这个备用的fd
  if (loop->emfile_fd == -1) {
    err = uv__open_cloexec("/dev/null", O_RDONLY);
    if (err < 0)
        err = uv__open_cloexec("/", O_RDONLY);
    if (err >= 0)
      loop->emfile_fd = err;
  }
  // 初始化io观察者,把文件描述符(这里还没有,所以是-1)和回调uv__stream_io记录在io_watcher上
  uv__io_init(&stream->io_watcher, uv__stream_io, -1);
}

我们看到流的初始化没有太多逻辑。主要是初始化一些字段。接着我们看一下如何打开(激活)一个流。

代码语言:c++
复制
// 关闭nagle,开启长连接,保存fd 
int uv__stream_open(uv_stream_t* stream, int fd, int flags) {

  // 还没有设置fd或者设置的同一个fd则继续,否则返回busy
  if (!(stream->io_watcher.fd == -1 || stream->io_watcher.fd == fd))
    return UV_EBUSY;

  // 设置流的标记
  stream->flags |= flags;

  if (stream->type == UV_TCP) {
    // 关闭nagle算法
    if ((stream->flags & UV_HANDLE_TCP_NODELAY) && uv__tcp_nodelay(fd, 1))
      return UV__ERR(errno);

    // 开启SO_KEEPALIVE,使用tcp长连接,一定时间后没有收到数据包会发送心跳包
    if ((stream->flags & UV_HANDLE_TCP_KEEPALIVE) &&
        uv__tcp_keepalive(fd, 1, 60)) {
      return UV__ERR(errno);
    }
  }
   // 保存socket对应的文件描述符到io观察者中,libuv会在io poll阶段监听该文件描述符
  stream->io_watcher.fd = fd;

  return 0;
}

打开一个流,本质上就是给这个流关联一个文件描述符。还有一些属性的设置。有了文件描述符,后续就可以操作这个流了。下面我们逐个操作分析。 1 读 我们在一个流上执行uv_read_start。流的数据(如果有的话)就会源源不断地流向调用方。

代码语言:c++
复制
int uv_read_start(uv_stream_t* stream,
                  uv_alloc_cb alloc_cb,
                  uv_read_cb read_cb) {
  assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE ||
      stream->type == UV_TTY);
  // 流已经关闭,不能读
  if (stream->flags & UV_HANDLE_CLOSING)
    return UV_EINVAL;
  // 流不可读,说明可能是只写流
  if (!(stream->flags & UV_HANDLE_READABLE))
    return -ENOTCONN;
  // 标记正在读
  stream->flags |= UV_HANDLE_READING;
  // 记录读回调,有数据的时候会执行这个回调
  stream->read_cb = read_cb;
  // 分配内存函数,用于存储读取的数据
  stream->alloc_cb = alloc_cb;
  // 注册读事件
  uv__io_start(stream->loop, &stream->io_watcher, POLLIN);
  // 激活handle,有激活的handle,事件循环不会退出
  uv__handle_start(stream);

  return 0;
}

执行uv_read_start本质上是给流对应的文件描述符在epoll中注册了一个可读事件。并且给一些字段赋值,比如读回调函数,分配内存的函数。打上正在做读取操作的标记。然后在可读事件触发的时候,读回调就会被执行,这个逻辑我们后面分析。除了开始读取数据,还有一个读操作就是停止读取。对应的函数是uv_read_stop。

代码语言:c++
复制
int uv_read_stop(uv_stream_t* stream) {
  // 是否正在执行读取操作,如果不是,则没有必要停止
  if (!(stream->flags & UV_HANDLE_READING))
    return 0;
  // 清除 正在读取 的标记
  stream->flags &= ~UV_HANDLE_READING;
  // 撤销 等待读事件
  uv__io_stop(stream->loop, &stream->io_watcher, POLLIN);
  // 对写事件也不感兴趣,停掉handle。允许事件循环退出
  if (!uv__io_active(&stream->io_watcher, POLLOUT))
    uv__handle_stop(stream);
  stream->read_cb = NULL;
  stream->alloc_cb = NULL;
  return 0;
}

和start相反,start是注册等待可读事件和打上正在读取这个标记,stop就是撤销等待可读事件和清除这个标记。另外还有一个辅助函数,判断流是否设置了可读属性。

代码语言:c++
复制
int uv_is_readable(const uv_stream_t* stream) {
  return !!(stream->flags & UV_HANDLE_READABLE);
}

2 写 我们在流上执行uv_write就可以往流中写入数据。

代码语言:c++
复制
int uv_write(
       // 一个写请求,记录了需要写入的数据和信息。数据来自下面的const uv_buf_t bufs[]
         uv_write_t* req,
         // 往哪个流写
       uv_stream_t* handle,
       // 需要写入的数据
       const uv_buf_t bufs[],
       // 个数
       unsigned int nbufs,
       // 写完后执行的回调
       uv_write_cb cb
) {
  return uv_write2(req, handle, bufs, nbufs, NULL, cb);
}

uv_write是直接调用uv_write2。第四个参数是NULL。代表是一般的写数据,不传递文件描述符。

代码语言:c++
复制
int uv_write2(
    uv_write_t* req,
    uv_stream_t* stream,
    const uv_buf_t bufs[],
    unsigned int nbufs,
    // 需要传递的文件描述符所在的流,这里不分析,在分析unix的时候再分析
    uv_stream_t* send_handle,
    uv_write_cb cb
) 
{
  int empty_queue;
  // 是不可写流
  if (!(stream->flags & UV_HANDLE_WRITABLE))
    return -EPIPE;
  // 流中缓存的数据大小是否为0
  empty_queue = (stream->write_queue_size == 0);

  // 初始化一个写请求
  uv__req_init(stream->loop, req, UV_WRITE);
  // 写完后执行的回调
  req->cb = cb;
  // 往哪个流写
  req->handle = stream;
  // 写出错的错误码,初始化为0
  req->error = 0;
  QUEUE_INIT(&req->queue);
  // 默认buf
  req->bufs = req->bufsml;
  // 不够则扩容
  if (nbufs > ARRAY_SIZE(req->bufsml))
    req->bufs = uv__malloc(nbufs * sizeof(bufs[0]));
  // 把需要写入的数据填充到req中
  memcpy(req->bufs, bufs, nbufs * sizeof(bufs[0]));
  // 需要写入的buf个数
  req->nbufs = nbufs;
  // 目前写入的buf个数,初始化是0
  req->write_index = 0;
  // 更新流中待写数据的总长度,就是每个buf的数据大小加起来
  stream->write_queue_size += uv__count_bufs(bufs, nbufs);

  // 插入待写队列
  QUEUE_INSERT_TAIL(&stream->write_queue, &req->queue);
  /*
   stream->connect_req非空说明是作为客户端,并且正在建立三次握手,建立成功会置connect_req为NULL。
   这里非空说明还没有建立成功或者不是作为客户端(不是连接流)。即没有用到connect_req这个字段。
  */
  if (stream->connect_req) {
    /* Still connecting, do nothing. */
  }
  else if (empty_queue) { 
    // 待写队列为空,则直接触发写动作,即操作文件描述符
    uv__write(stream);
  }
  else {
    /*
        队列非空,说明往底层写,uv__write中不一样会注册等待可写事件,所以这里注册一下
        给流注册等待可写事件,触发的时候,把数据消费掉
    */
    uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);
  }

  return 0;
}

uv_write2的主要逻辑就是封装一个写请求,插入到流的待写队列。然后根据当前流的情况。看是直接写入还是等待会再写入。架构大致如下。

我们继续看真正的写操作。

代码语言:c++
复制
static void uv__write(uv_stream_t* stream) {
  struct iovec* iov;
  QUEUE* q;
  uv_write_t* req;
  int iovmax;
  int iovcnt;
  ssize_t n;
  int err;

start:
  // 待写队列为空,没得写
  if (QUEUE_EMPTY(&stream->write_queue))
    return;
  // 遍历待写队列,把每个节点的数据写入底层
  q = QUEUE_HEAD(&stream->write_queue);
  req = QUEUE_DATA(q, uv_write_t, queue);
  /*
    struct iovec {
        ptr_t iov_base; // 数据首地址
        size_t iov_len; // 数据长度
    };  
    iovec和bufs结构体的定义一样
  */
  // 转成iovec格式发送
  iov = (struct iovec*) &(req->bufs[req->write_index]);
  // 待写的buf个数,nbufs是总数,write_index是当前已写的个数
  iovcnt = req->nbufs - req->write_index;
  // 最多能写几个
  iovmax = uv__getiovmax();

  // 取最小值
  if (iovcnt > iovmax)
    iovcnt = iovmax;

  // 有需要传递的描述符
  if (req->send_handle) {
    // 需要传递文件描述符的逻辑,分析unix域的时候再分析
  } else { // 单纯发送数据,则直接写
    do {
      if (iovcnt == 1) {
        n = write(uv__stream_fd(stream), iov[0].iov_base, iov[0].iov_len);
      } else {
        n = writev(uv__stream_fd(stream), iov, iovcnt);
      }
    } while (n == -1 && errno == EINTR);
  }
  // 发送出错
  if (n < 0) {
    // 发送失败的逻辑,我们不具体分析
  } else {
    // 写成功,n是写成功的字节数
    while (n >= 0) {
      // 本次待写数据的首地址
      uv_buf_t* buf = &(req->bufs[req->write_index]);
      // 某个buf的数据长度
      size_t len = buf->len;
      // len如果大于n说明本buf的数据部分被写入
      if ((size_t)n < len) {
        // 更新指针,指向下次待发送的数据首地址
        buf->base += n;
        // 更新待发送数据的长度
        buf->len -= n;
        // 更新待写数据的总长度
        stream->write_queue_size -= n;
        n = 0;
        // 设置了一直写标记,则继续写
        if (stream->flags & UV_HANDLE_BLOCKING_WRITES) {
          goto start;
        } else {
          // 否则等待可写事件触发的时候再写
          break;
        }
      } else {
        // 本buf的数据完成被写入,更新下一个待写入的buf位置
        req->write_index++;
        n -= len;
        // 更新待写数据总长度
        stream->write_queue_size -= len;
        // 如果写完了全部buf,触发回调
        if (req->write_index == req->nbufs) {
          // 写完了本请求的数据,做后续处理
          uv__write_req_finish(req);
          return;
        }
      }
    }
  }
  // 到这说明数据还没有完全被写入,注册等待可写事件,等待继续写
  uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);
  return;

error:
  // 写出错
  req->error = err;
  uv__write_req_finish(req);
  // 撤销等待可写事件
  uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
  // 没有注册了等待可读事件,则停掉流
  if (!uv__io_active(&stream->io_watcher, POLLIN))
    uv__handle_stop(stream);
}

我们看一下写完一个请求后,libuv如何处理他。逻辑在uv__write_req_finish函数。

代码语言:c++
复制
// 把buf的数据写入完成或写出错后触发的回调
static void uv__write_req_finish(uv_write_t* req) {
  uv_stream_t* stream = req->handle;
  // 移出队列
  QUEUE_REMOVE(&req->queue);
  // 写入成功了
  if (req->error == 0) {
    /*
      bufsml是默认的buf数,如果不够,则bufs指向新的内存,
      然后再储存数据。两者不等说明申请了额外的内存,需要free掉
    */ 
    if (req->bufs != req->bufsml)
      uv__free(req->bufs);
    req->bufs = NULL;
  }
  // 插入写完成队列
  QUEUE_INSERT_TAIL(&stream->write_completed_queue, &req->queue);
  // 插入pending队列,在pending阶段执行回调
  uv__io_feed(stream->loop, &stream->io_watcher);
}

uv__write_req_finish的逻辑比较简单,就是把节点从待写队列中移除。然后插入写完成队列。最后把io 观察者插入pending队列。在pending节点会知道io观察者的回调(uv__stream_io)。流模块的逻辑比较多,今天先分析到这里。后续继续分析其他操作。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-05-02,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 编程杂技 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档