上一篇分析了流的基础知识和读写操作的实现。今天继续分析。
// 关闭流的写端
int uv_shutdown(uv_shutdown_t* req, uv_stream_t* stream, uv_shutdown_cb cb) {
// 流是可写的,并且还没关闭写端,也不是处于正在关闭状态
if (!(stream->flags & UV_HANDLE_WRITABLE) ||
stream->flags & UV_HANDLE_SHUT ||
stream->flags & UV_HANDLE_SHUTTING ||
uv__is_closing(stream)) {
return UV_ENOTCONN;
}
// 初始化一个关闭请求,关联的handle是stream
uv__req_init(stream->loop, req, UV_SHUTDOWN);
req->handle = stream;
// 关闭后执行的回调
req->cb = cb;
stream->shutdown_req = req;
// 设置正在关闭的标记
stream->flags |= UV_HANDLE_SHUTTING;
// 注册等待可写事件
uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);
return 0;
}
关闭流的写端就是相当于给流发送一个关闭请求,把请求挂载到流中,然后注册等待可写事件,在可写事件触发的时候就会执行关闭操作。这个我们后面分析。
void uv__stream_close(uv_stream_t* handle) {
unsigned int i;
uv__stream_queued_fds_t* queued_fds;
// 从事件循环中删除io观察者,移出pending队列
uv__io_close(handle->loop, &handle->io_watcher);
// 停止读
uv_read_stop(handle);
// 停掉handle
uv__handle_stop(handle);
// 不可读、写
handle->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
// 关闭非标准流的文件描述符
if (handle->io_watcher.fd != -1) {
/* Don't close stdio file descriptors. Nothing good comes from it. */
if (handle->io_watcher.fd > STDERR_FILENO)
uv__close(handle->io_watcher.fd);
handle->io_watcher.fd = -1;
}
// 关闭通信socket对应的文件描述符
if (handle->accepted_fd != -1) {
uv__close(handle->accepted_fd);
handle->accepted_fd = -1;
}
/* Close all queued fds */
// 同上,这是在排队等待处理的通信socket
if (handle->queued_fds != NULL) {
queued_fds = handle->queued_fds;
for (i = 0; i < queued_fds->offset; i++)
uv__close(queued_fds->fds[i]);
uv__free(handle->queued_fds);
handle->queued_fds = NULL;
}
assert(!uv__io_active(&handle->io_watcher, POLLIN | POLLOUT));
}
连接流是针对tcp的,连接即建立三次握手。所以我们首先介绍一下一些网络编程相关的内容。想要发起三次握手,首先我们先要有一个socket。我们看libuv中如何新建一个socket。
/*
1 获取一个新的socket fd
2 把fd保存到handle里,并根据flag进行相关设置
3 绑定到本机随意的地址(如果设置了该标记的话)
*/
static int new_socket(uv_tcp_t* handle, int domain, unsigned long flags) {
struct sockaddr_storage saddr;
socklen_t slen;
int sockfd;
int err;
// 获取一个socket
err = uv__socket(domain, SOCK_STREAM, 0);
if (err < 0)
return err;
// 申请的fd
sockfd = err;
// 设置选项和保存socket的文件描述符到io观察者中
err = uv__stream_open((uv_stream_t*) handle, sockfd, flags);
if (err) {
uv__close(sockfd);
return err;
}
// 设置了需要绑定标记UV_HANDLE_BOUND
if (flags & UV_HANDLE_BOUND) {
slen = sizeof(saddr);
memset(&saddr, 0, sizeof(saddr));
// 获取fd对应的socket信息,比如ip,端口,可能没有
if (getsockname(uv__stream_fd(handle), (struct sockaddr*) &saddr, &slen)) {
uv__close(sockfd);
return UV__ERR(errno);
}
// 绑定到socket中,如果没有则绑定到系统随机选择的地址
if (bind(uv__stream_fd(handle), (struct sockaddr*) &saddr, slen)) {
uv__close(sockfd);
return UV__ERR(errno);
}
}
return 0;
}
上面的代码就是在libuv申请一个socket的逻辑,他还支持新建的socket,可以绑定到一个用户设置的,或者操作系统随机选择的地址。不过libuv并不直接使用这个函数。而是又封装了一层。
// 如果流还没有对应的fd,则申请一个新的,如果有则修改流的配置
static int maybe_new_socket(uv_tcp_t* handle, int domain, unsigned long flags) {
struct sockaddr_storage saddr;
socklen_t slen;
if (domain == AF_UNSPEC) {
handle->flags |= flags;
return 0;
}
// 已经有socket fd了
if (uv__stream_fd(handle) != -1) {
// 该流需要绑定到一个地址
if (flags & UV_HANDLE_BOUND) {
/*
流是否已经绑定到一个地址了。handle的flag是在new_socket里设置的,
如果有这个标记说明已经执行过绑定了,直接更新flags就行。
*/
if (handle->flags & UV_HANDLE_BOUND) {
handle->flags |= flags;
return 0;
}
// 有socket fd,但是可能还没绑定到一个地址
slen = sizeof(saddr);
memset(&saddr, 0, sizeof(saddr));
// 获取socket绑定到的地址
if (getsockname(uv__stream_fd(handle), (struct sockaddr*) &saddr, &slen))
return UV__ERR(errno);
// 绑定过了socket地址,则更新flags就行
if ((saddr.ss_family == AF_INET6 &&
((struct sockaddr_in6*) &saddr)->sin6_port != 0) ||
(saddr.ss_family == AF_INET &&
((struct sockaddr_in*) &saddr)->sin_port != 0)) {
/* Handle is already bound to a port. */
handle->flags |= flags;
return 0;
}
// 没绑定则绑定到随机地址,bind中实现
if (bind(uv__stream_fd(handle), (struct sockaddr*) &saddr, slen))
return UV__ERR(errno);
}
handle->flags |= flags;
return 0;
}
// 申请一个新的fd关联到流
return new_socket(handle, domain, flags);
}
maybe_new_socket函数的逻辑分支很多 1 如果流还没有关联到fd,则申请一个新的fd关联到流上。如果设置了绑定标记,fd还会和一个地址进行绑定。 2 如果流已经关联了一个fd
int uv__tcp_connect(uv_connect_t* req,
uv_tcp_t* handle,
const struct sockaddr* addr,
unsigned int addrlen,
uv_connect_cb cb) {
int err;
int r;
// 已经发起了connect了
if (handle->connect_req != NULL)
return UV_EALREADY;
// 申请一个socket和绑定一个地址,如果还没有的话
err = maybe_new_socket(handle,
addr->sa_family,
UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
if (err)
return err;
handle->delayed_error = 0;
do {
// 清除全局错误变量的值
errno = 0;
// 发起三次握手
r = connect(uv__stream_fd(handle), addr, addrlen);
} while (r == -1 && errno == EINTR);
if (r == -1 && errno != 0) {
// 三次握手还没有完成
if (errno == EINPROGRESS)
; /* not an error */
else if (errno == ECONNREFUSED)
// 对方拒绝建立连接,延迟报错
handle->delayed_error = UV__ERR(errno);
else
// 直接报错
return UV__ERR(errno);
}
// 初始化一个连接型request,并设置某些字段
uv__req_init(handle->loop, req, UV_CONNECT);
req->cb = cb;
req->handle = (uv_stream_t*) handle;
QUEUE_INIT(&req->queue);
handle->connect_req = req;
// 注册到libuv观察者队列
uv__io_start(handle->loop, &handle->io_watcher, POLLOUT);
// 连接出错,插入pending队尾
if (handle->delayed_error)
uv__io_feed(handle->loop, &handle->io_watcher);
return 0;
}
连接流的逻辑,大致如下 1 申请一个socket,绑定一个地址。 2 根据给定的服务器地址,发起三次握手,非阻塞的,会直接返回继续执行,不会等到三次握手完成。 3 往流上挂载一个connect型的请求。 4 设置io观察者感兴趣的事件为可写。然后把io观察者插入事件循环的io观察者队列。等待可写的时候时候(完成三次握手),就会执行cb回调。
int uv_tcp_listen(uv_tcp_t* tcp, int backlog, uv_connection_cb cb) {
static int single_accept = -1;
unsigned long flags;
int err;
if (tcp->delayed_error)
return tcp->delayed_error;
// 是否设置了不连续accept。默认是连续accept。
if (single_accept == -1) {
const char* val = getenv("UV_TCP_SINGLE_ACCEPT");
single_accept = (val != NULL && atoi(val) != 0); /* Off by default. */
}
// 设置不连续accept
if (single_accept)
tcp->flags |= UV_HANDLE_TCP_SINGLE_ACCEPT;
flags = 0;
/*
可能还没有用于listen的fd,socket地址等。
这里申请一个socket和绑定到一个地址(如果调listen之前没有调bind则绑定到随机地址)
*/
err = maybe_new_socket(tcp, AF_INET, flags);
if (err)
return err;
// 设置fd为listen状态
if (listen(tcp->io_watcher.fd, backlog))
return UV__ERR(errno);
// 建立连接后的业务回调
tcp->connection_cb = cb;
tcp->flags |= UV_HANDLE_BOUND;
// 有连接到来时的libuv层回调
tcp->io_watcher.cb = uv__server_io;
// 注册读事件,等待连接到来
uv__io_start(tcp->loop, &tcp->io_watcher, POLLIN);
return 0;
}
监听流的逻辑看起来逻辑很多,但是主要的逻辑是把流对的fd改成listen状态,这样流就可以接收请求了。然后设置连接到来时执行的回调。最后注册io观察者到事件循环。等待连接到来。就会执行uv__server_io。uv__server_io再执行connection_cb。监听流和其他流的一个区别是,当io观察者的事件触发时,监听流执行的回调是uv__server_io函数。而其他流是在uv__stream_io里统一处理。
流的类型分析得差不多了,最后分析一下监听流的处理函数uv__server_io,统一处理其他流的函数是uv__stream_io,这个下次分析。
刚才已经说到有连接到来的时候,libuv会执行uv__server_io,下面看一下他做了什么事情。
// 有tcp连接到来时执行该函数
void uv__server_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
uv_stream_t* stream;
int err;
// 拿到io观察者所在的流
stream = container_of(w, uv_stream_t, io_watcher);
// 继续注册事件,等待连接
uv__io_start(stream->loop, &stream->io_watcher, POLLIN);
/* connection_cb can close the server socket while we're
* in the loop so check it on each iteration.
*/
while (uv__stream_fd(stream) != -1) {
// 有连接到来,进行accept
err = uv__accept(uv__stream_fd(stream));
if (err < 0) {
// 忽略出错处理
// accept出错,触发回调
stream->connection_cb(stream, err);
continue;
}
// 保存通信socket对应的文件描述符
stream->accepted_fd = err;
/*
有连接,执行上层回调,connection_cb一般会调用uv_accept消费accepted_fd。
然后重新注册等待可读事件
*/
stream->connection_cb(stream, 0);
/*
用户还没有消费accept_fd。先解除io的事件,
等到用户调用uv_accept消费了accepted_fd再重新注册事件
*/
if (stream->accepted_fd != -1) {
uv__io_stop(loop, &stream->io_watcher, POLLIN);
return;
}
// 定时睡眠一会(可被信号唤醒),分点给别的进程accept
if (stream->type == UV_TCP &&
(stream->flags & UV_HANDLE_TCP_SINGLE_ACCEPT)) {
struct timespec timeout = { 0, 1 };
nanosleep(&timeout, NULL);
}
}
}
整个函数的逻辑如下 1 调用accept摘下一个完成了三次握手的节点。 2 然后执行上层回调。上层回调会调用uv_accept消费accept返回的fd。然后再次注册等待可读事件(当然也可以不消费)。 3 如果2没有消费调fd。则撤销等待可读事件,即处理完一个fd后,再accept下一个。如果2中消费了fd。再判断有没有设置UV_HANDLE_TCP_SINGLE_ACCEPT标记,如果有则休眠一会,分点给别的进程accept。否则继续accept。
总结:目前分析的是一些基础的概念和实现。后续会串起来再分析一下具体的过程。