前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >libuv源码分析之unix域

libuv源码分析之unix域

作者头像
theanarkh
发布2020-05-24 10:47:20
7790
发布2020-05-24 10:47:20
举报
文章被收录于专栏:原创分享原创分享

unix域是一种基于单主机的进程间通信方式。实现模式类似tcp通信。今天先分析他的实现,后续会分析他的使用。在libuv中,unix域用uv_pipe_t表示。

代码语言:javascript
复制
struct uv_pipe_s {
  // uv_handle_s的字段
  void* data;        
  // 所属事件循环   
  uv_loop_t* loop;  
  // handle类型    
  uv_handle_type type;  
  // 关闭handle时的回调
  uv_close_cb close_cb; 
  // 用于插入事件循环的handle队列
  void* handle_queue[2];
  union {               
    int fd;             
    void* reserved[4];  
  } u;      
  // 用于插入事件循环的closing阶段对应的队列
  uv_handle_t* next_closing; 
  // 各种标记 
  unsigned int flags;
  // 流拓展的字段
  // 用户写入流的字节大小,流缓存用户的输入,然后等到可写的时候才做真正的写
  size_t write_queue_size; 
  // 分配内存的函数,内存由用户定义,主要用来保存读取的数据               
  uv_alloc_cb alloc_cb;  
  // 读取数据的回调                 
  uv_read_cb read_cb; 
  // 连接成功后,执行connect_req的回调(connect_req在uv__xxx_connect中赋值)
  uv_connect_t *connect_req; 
  // 关闭写端的时候,发送完缓存的数据,执行shutdown_req的回调(shutdown_req在uv_shutdown的时候赋值)    
  uv_shutdown_t *shutdown_req;
  // 流对应的io观察者,即文件描述符+一个文件描述符事件触发时执行的回调   
  uv__io_t io_watcher;  
  // 流缓存下来的,待写的数据         
  void* write_queue[2];       
  // 已经完成了数据写入的队列   
  void* write_completed_queue[2];
  // 完成三次握手后,执行的回调
  uv_connection_cb connection_cb;
  // 操作流时出错码
  int delayed_error;  
  // accept返回的通信socket对应的文件描述符           
  int accepted_fd;    
  // 同上,用于缓存更多的通信socket对应的文件描述符           
  void* queued_fds;
  // 标记管道是否能在进程间传递
  int ipc; 
  // 用于unix域通信的文件路径
  const char* pipe_fname; 
}

unix域继承域handle和stream。下面看一下他的具体实现逻辑。

代码语言:javascript
复制
int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle, int ipc) {
  uv__stream_init(loop, (uv_stream_t*)handle, UV_NAMED_PIPE);
  handle->shutdown_req = NULL;
  handle->connect_req = NULL;
  handle->pipe_fname = NULL;
  handle->ipc = ipc;
  return 0;
}

uv_pipe_init逻辑很简单,就是初始化uv_pipe_t结构体。刚才已经见过uv_pipe_t继承于stream,uv__stream_init就是初始化stream(父类)的字段。文章开头说过,unix域的实现类似tcp的实现。遵循网络socket编程那一套。服务端使用bind,listen等函数启动服务。

代码语言:javascript
复制
// name是unix域的文件路径
int uv_pipe_bind(uv_pipe_t* handle, const char* name) {
  struct sockaddr_un saddr;
  const char* pipe_fname;
  int sockfd;
  int err;

  pipe_fname = NULL;

  pipe_fname = uv__strdup(name);
  name = NULL;
  // unix域套接字
  sockfd = uv__socket(AF_UNIX, SOCK_STREAM, 0);
  memset(&saddr, 0, sizeof saddr);
  strncpy(saddr.sun_path, pipe_fname, sizeof(saddr.sun_path) - 1);
  saddr.sun_path[sizeof(saddr.sun_path) - 1] = '\0';
  saddr.sun_family = AF_UNIX;
  // 绑定到路径,tcp是绑定到ip和端口
  if (bind(sockfd, (struct sockaddr*)&saddr, sizeof saddr)) {
   // ...
  }

  // 已经绑定
  handle->flags |= UV_HANDLE_BOUND;
  handle->pipe_fname = pipe_fname; 
  // 保存socket fd,用于后面监听
  handle->io_watcher.fd = sockfd;
  return 0;
}

uv_pipe_bind函数的逻辑也比较简单,就是类似tcp的bind行为。 1 申请一个socket套接字。 2 绑定unix域路径到socket中。 绑定了路径后,就可以调用listen函数开始监听。

代码语言:javascript
复制
int uv_pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb) {
  if (uv__stream_fd(handle) == -1)
    return UV_EINVAL;
  // uv__stream_fd(handle)得到bind函数中获取的socket
  if (listen(uv__stream_fd(handle), backlog))
    return UV__ERR(errno);
  // 保存回调,有进程调用connect的时候时触发,由uv__server_io函数触发
  handle->connection_cb = cb;
  // io观察者的回调,有进程调用connect的时候时触发(io观察者的fd在init函数里设置了)
  handle->io_watcher.cb = uv__server_io;
  // 注册io观察者到libuv,等待连接,即读事件到来
  uv__io_start(handle->loop, &handle->io_watcher, POLLIN);
  return 0;
}

uv_pipe_listen执行listen函数使得socket成为监听型的套接字。然后把socket对应的文件描述符和回调封装成io观察者。注册到libuv。等到有读事件到来(有连接到来)。就会执行uv__server_io函数,摘下对应的客户端节点。最后执行connection_cb回调。

这时候,使用unix域成功启动了一个服务。接下来就是看客户端的逻辑。

代码语言:javascript
复制
void uv_pipe_connect(uv_connect_t* req,
                    uv_pipe_t* handle,
                    const char* name,
                    uv_connect_cb cb) {
  struct sockaddr_un saddr;
  int new_sock;
  int err;
  int r;
  // 判断是否已经有socket了,没有的话需要申请一个,见下面
  new_sock = (uv__stream_fd(handle) == -1);
  // 客户端还没有对应的socket fd
  if (new_sock) {
    err = uv__socket(AF_UNIX, SOCK_STREAM, 0);
    if (err < 0)
      goto out;
    // 保存socket对应的文件描述符到io观察者
    handle->io_watcher.fd = err;
  }
  // 需要连接的服务器信息。主要是unix域路径信息
  memset(&saddr, 0, sizeof saddr);
  strncpy(saddr.sun_path, name, sizeof(saddr.sun_path) - 1);
  saddr.sun_path[sizeof(saddr.sun_path) - 1] = '\0';
  saddr.sun_family = AF_UNIX;
  // 连接服务器,unix域路径是name
  do {
    r = connect(uv__stream_fd(handle),(struct sockaddr*)&saddr, sizeof saddr);
  }
  while (r == -1 && errno == EINTR);
  // 忽略错误处理逻辑
  err = 0;
  // 设置socket的可读写属性
  if (new_sock) {
    err = uv__stream_open((uv_stream_t*)handle,
                          uv__stream_fd(handle),
                          UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
  }
  // 把io观察者注册到libuv,等到连接成功或者可以发送请求
  if (err == 0)
    uv__io_start(handle->loop, &handle->io_watcher, POLLIN | POLLOUT);

out:
  // 记录错误码,如果有的话
  handle->delayed_error = err;
  // 连接成功时的回调
  handle->connect_req = req;

  uv__req_init(handle->loop, req, UV_CONNECT);
  req->handle = (uv_stream_t*)handle;
  req->cb = cb;
  QUEUE_INIT(&req->queue);

  // 如果连接出错,在pending节点会执行req对应的回调。错误码是delayed_error
  if (err)
    uv__io_feed(handle->loop, &handle->io_watcher);
}

本文大致分析了unix域在libuv中是如何封装的。大致的流程和网络编程一样。分为服务端和客户端两面。libuv在操作系统提供的api的基础上。和libuv的异步非阻塞结合。在libuv中为进程间提供了一种通信方式。后续会继续分析本文提到的内容。unix域在操作系统的实现可以参考unix域源码解析

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-05-12,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 编程杂技 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档