上一篇文章我们介绍了write是如何实现tcp写的,现在我们来看下read是如何实现tcp读的。
// fs/read_write.c
SYSCALL_DEFINE3(read, unsigned int, fd, char __user *, buf, size_t, count)
{
struct fd f = fdget_pos(fd);
...
if (f.file) {
...
ret = vfs_read(f.file, buf, count, &pos);
...
}
return ret;
}
该方法先根据fd找到对应的file,再调用vfs_read方法继续对file执行read逻辑。
// fs/read_write.c
ssize_t vfs_read(struct file *file, char __user *buf, size_t count, loff_t *pos)
{
...
if (!ret) {
...
ret = __vfs_read(file, buf, count, pos);
...
}
return ret;
}
EXPORT_SYMBOL_GPL(vfs_read);
该方法又调用了__vfs_read方法。
// fs/read_write.c
ssize_t __vfs_read(struct file *file, char __user *buf, size_t count,
loff_t *pos)
{
if (file->f_op->read)
return file->f_op->read(file, buf, count, pos);
else if (file->f_op->read_iter)
return new_sync_read(file, buf, count, pos);
else
return -EINVAL;
}
由上一篇文章可知,file->f_op指向的实例中只有read_iter字段,没有read字段,所以该方法最终会调用new_sync_read方法。
// fs/read_write.c
static ssize_t new_sync_read(struct file *filp, char __user *buf, size_t len, loff_t *ppos)
{
struct iovec iov = { .iov_base = buf, .iov_len = len };
struct kiocb kiocb;
struct iov_iter iter;
ssize_t ret;
init_sync_kiocb(&kiocb, filp);
...
iov_iter_init(&iter, READ, &iov, 1, len);
ret = call_read_iter(filp, &kiocb, &iter);
...
return ret;
}
该方法先将参数转化成其他类型,最终使得kiocb引用filp,即要读的文件,iter引用iov,iov又引用buf和len,即读取的数据被被拷贝到的区域。
之后又调用call_read_iter方法,传入这些新的参数,继续执行read逻辑
// include/linux/fs.h
static inline ssize_t call_read_iter(struct file *file, struct kiocb *kio,
struct iov_iter *iter)
{
return file->f_op->read_iter(kio, iter);
}
该方法又调用了file->f_op->read_iter指向的方法,由上一篇文章我们可以知道,该方法是sock_read_iter。
// net/socket.c
static ssize_t sock_read_iter(struct kiocb *iocb, struct iov_iter *to)
{
struct file *file = iocb->ki_filp;
struct socket *sock = file->private_data;
struct msghdr msg = {.msg_iter = *to,
.msg_iocb = iocb};
ssize_t res;
if (file->f_flags & O_NONBLOCK)
msg.msg_flags = MSG_DONTWAIT;
...
res = sock_recvmsg(sock, &msg, msg.msg_flags);
...
return res;
}
该方法先将参数包装成struct msghdr类型的变量,再调用sock_recvmsg方法,传入这个新参数,继续执行read逻辑。
// net/socket.c
int sock_recvmsg(struct socket *sock, struct msghdr *msg, int flags)
{
...
return err ?: sock_recvmsg_nosec(sock, msg, flags);
}
EXPORT_SYMBOL(sock_recvmsg);
该方法又调用了sock_recvmsg_nosec方法。
// net/socket.c
static inline int sock_recvmsg_nosec(struct socket *sock, struct msghdr *msg,
int flags)
{
return sock->ops->recvmsg(sock, msg, msg_data_left(msg), flags);
}
该方法又调用了sock->ops->recvmsg指向的方法,由第一篇文章我们可以知道,这个方法是inet_recvmsg。
// net/ipv4/af_inet.c
int inet_recvmsg(struct socket *sock, struct msghdr *msg, size_t size,
int flags)
{
struct sock *sk = sock->sk;
...
err = sk->sk_prot->recvmsg(sk, msg, size, flags & MSG_DONTWAIT,
flags & ~MSG_DONTWAIT, &addr_len);
...
return err;
}
EXPORT_SYMBOL(inet_recvmsg);
该方法又调用了sk->sk_prot->recvmsg指向的方法,由第一篇文章可以知道,这个方法是tcp_recvmsg。
// net/ipv4/tcp.c
int tcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len, int nonblock,
int flags, int *addr_len)
{
struct tcp_sock *tp = tcp_sk(sk);
int copied = 0;
...
u32 *seq;
unsigned long used;
int err;
int target; /* Read at least this many bytes */
long timeo;
struct sk_buff *skb, *last;
...
timeo = sock_rcvtimeo(sk, nonblock);
...
seq = &tp->copied_seq;
...
target = sock_rcvlowat(sk, flags & MSG_WAITALL, len);
do {
u32 offset;
...
skb_queue_walk(&sk->sk_receive_queue, skb) {
...
offset = *seq - TCP_SKB_CB(skb)->seq;
...
if (offset < skb->len)
goto found_ok_skb;
...
}
/* Well, if we have backlog, try to process it now yet. */
if (copied >= target && !sk->sk_backlog.tail)
break;
if (copied) {
if (sk->sk_err ||
sk->sk_state == TCP_CLOSE ||
(sk->sk_shutdown & RCV_SHUTDOWN) ||
!timeo ||
signal_pending(current))
break;
} else {
if (sock_flag(sk, SOCK_DONE))
break;
if (sk->sk_err) {
copied = sock_error(sk);
break;
}
if (sk->sk_shutdown & RCV_SHUTDOWN)
break;
if (sk->sk_state == TCP_CLOSE) {
...
break;
}
if (!timeo) {
copied = -EAGAIN;
break;
}
if (signal_pending(current)) {
copied = sock_intr_errno(timeo);
break;
}
}
...
if (copied >= target) {
/* Do not sleep, just process backlog. */
release_sock(sk);
lock_sock(sk);
} else {
sk_wait_data(sk, &timeo, last);
}
...
continue;
found_ok_skb:
/* Ok so how much can we use? */
used = skb->len - offset;
if (len < used)
used = len;
...
if (!(flags & MSG_TRUNC)) {
err = skb_copy_datagram_msg(skb, offset, msg, used);
...
}
*seq += used;
copied += used;
len -= used;
...
if (used + offset < skb->len)
continue;
...
if (!(flags & MSG_PEEK))
sk_eat_skb(sk, skb);
continue;
...
} while (len > 0);
...
return copied;
...
}
EXPORT_SYMBOL(tcp_recvmsg);
这个方法就是真正实现read逻辑的地方了,我们来好好看下。
方法描述
1. 设置变量copied的值为0,该变量用于记录拷贝tcp数据到用户提供的buf的字节数,最终返回给用户。
2. 根据socket是否是nonblocking设置timeo的值,如果是,timeo为0,如果不是,timeo大于0。
3. 设置变量seq的值,使其持有tp->copied_seq字段的地址,tp->copied_seq字段用于表示下一个要拷贝给用户的数据的seq是多少。
4. 设置target的值,用于表示一次read至少要读多少字节,该值默认为1,可通过setsockopt方法修改。
5. 进入while循环,该循环继续的条件为len>0,即剩余要拷贝给用户的数据的字节数大于0。
6. 遍历sk->sk_receive_queue队列,根据seq的值,即tp->copied_seq的值,找到我们应该从哪个struct sk_buff的哪个位置开始继续拷贝,将该位置赋值给offset,当offset小于skb包含数据的总长度时,说明我们已经找到。
当tcp层收到数据后,会将数据放到sk->sk_receive_queue队列中,等待用户读取,该部分逻辑的详细分析,我们以后会另开文章详细讲解。
7. 如果有这样的skb,则跳转到found_ok_skb标签指向的逻辑。该逻辑会首先根据skb剩余可读字节数及当前len的值,修正used的值,即会拷贝skb中的多少字节给用户,之后调用skb_copy_datagram_msg方法将数据拷贝到用户提供的内存区域,再之后会修改seq的值,即下一个要拷贝字节的位置,copied的值,即已经拷贝的字节数,len的值,即剩余要拷贝的字节数,最后,根据情况决定是否要将skb从sk->sk_receive_queue队列中移除。
8. 如果sk->sk_receive_queue队列中已经没有可读数据了,则看copied变量,即现在已经拷贝给用户的字节数,是否大于等于target,即一次read最少要读的字节数,同时还要看sk->sk_backlog.tail字段是否为null,即ip层传给tcp层的数据是否都已处理了,如果两种情况都满足,则跳出循环,返回copied变量的值给用户。
9. 当条件不满足时,则看当前sock是否有异常发生,比如error或close等,如果有异常,则看copied值是否大于0,如果大于0,则先不管当前异常,跳出while循环,返回当前copied的值。
10. 如果有异常,且copied的值为0,则根据异常情况,设置相应的错误码,然后跳出循环,返回错误码给用户。
11. 如果没有发生异常,则表明当前只是没有可读数据了,先看下sk->sk_backlog中是否有未处理的tcp segment,如果有,先把这些处理了,该处理逻辑是由release_sock方法触发,如果处理了这些数据,还没有达到跳出while循环的要求,则调用sk_wait_data方法,阻塞等待新数据的到来。
完。
本文分享自 Linux内核及JVM底层相关技术研究 微信公众号,前往查看
如有侵权,请联系 cloudcommunity@tencent.com 删除。
本文参与 腾讯云自媒体同步曝光计划 ,欢迎热爱写作的你一起参与!