前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >详解golang net之netpoll

详解golang net之netpoll

作者头像
charlieroro
发布2020-03-23 16:36:40
3.9K0
发布2020-03-23 16:36:40
举报
文章被收录于专栏:charlierorocharlieroro

golang版本1.12.9;操作系统:readhat 7.4

golang的底层使用epoll来实现IO复用。netPoll通过pollDesc结构体将文件描述符与底层进行了绑定。netpoll实现了用户层面的与底层网络IO相关的goroutine的阻塞/非阻塞管理。

对netpoll的介绍按照这篇文章的思路按照tcp建链中的listen/accept/read/write/close动作详解过程。

下面以TCP为例完整解析TCP的建链/断链以及读写过程

listen流程:

ListenTCP --> listenTCP --> internetSocket --> socket --> listenStream

unix的listen函数用于将一个socket转换为监听socket。golang中同时结合了创建socket的步骤。

代码语言:javascript
复制
// src/net/tcpsock.go
func ListenTCP(network string, laddr *TCPAddr) (*TCPListener, error) {
    switch network {
    //支持tcp协议为”tcp4“和“tcp6”,当使用"tcp"时可以通过地址格式进行判断
    case "tcp", "tcp4", "tcp6":
    default:
        return nil, &OpError{Op: "listen", Net: network, Source: nil, Addr: laddr.opAddr(), Err: UnknownNetworkError(network)}
    }
    //对laddr进行初始化(非nil),用于在socket函数中进入监听处理流程(见下文)
    if laddr == nil {
        laddr = &TCPAddr{}
    }
    sl := &sysListener{network: network, address: laddr.String()}
    ln, err := sl.listenTCP(context.Background(), laddr)
    if err != nil {
        return nil, &OpError{Op: "listen", Net: network, Source: nil, Addr: laddr.opAddr(), Err: err}
    }
    return ln, nil
}
代码语言:javascript
复制
func (sl *sysListener) listenTCP(ctx context.Context, laddr *TCPAddr) (*TCPListener, error) {
    //此处mode为"listen",可以表示一个tcp服务端;此外还有一个mode为"dial",表示连接的发起端,可以表示一个tcp客户端
    fd, err := internetSocket(ctx, sl.network, laddr, nil, syscall.SOCK_STREAM, 0, "listen", sl.ListenConfig.Control)
    if err != nil {
        return nil, err
    }
    return &TCPListener{fd}, nil
}
代码语言:javascript
复制
func internetSocket(ctx context.Context, net string, laddr, raddr sockaddr, sotype, proto int, mode string, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) {
    //此处判断mode为"dial"的场景。如果"dial" mode下的远端IP为通配符,则将远端IP转换为本地IP(127.0.0.1或::1),即默认连接本地server。
    if (runtime.GOOS == "aix" || runtime.GOOS == "windows" || runtime.GOOS == "openbsd" || runtime.GOOS == "nacl") && mode == "dial" && raddr.isWildcard() {
        raddr = raddr.toLocal(net)
    }
    //favoriteAddrFamily函数用于判断地址类型为IPv4还是IPv6,主要用于net为"tcp"时的场景。
    family, ipv6only := favoriteAddrFamily(net, laddr, raddr, mode)
    return socket(ctx, net, family, sotype, proto, ipv6only, laddr, raddr, ctrlFn)
}
代码语言:javascript
复制
//看下这个用于判断地址类型的函数
func favoriteAddrFamily(network string, laddr, raddr sockaddr, mode string) (family int, ipv6only bool) {
    //可以看到,如果直接写明"tcp4"或"tcp6"时会直接返回对应的地址类型
    switch network[len(network)-1] {
    case '4':
        return syscall.AF_INET, false
    case '6':
        return syscall.AF_INET6, true
    }

    //下面用于处理network为"tcp的场景",可以看出直接指定"tcp4"或"tcp6"时可以提高一些执行效率,但可能影响扩展性
    //如果使用监听模式,且本地没有指定监听地址,需要通过对系统本地地址进行测试来判定使用的IP类型
    if mode == "listen" && (laddr == nil || laddr.isWildcard()) {
        //supportsIPv4map函数用于测试系统是否支持ipv4MappedIPv6功能。如果系统支持该功能,或者不支持IPv4,则使用IPv6
        if supportsIPv4map() || !supportsIPv4() {
            return syscall.AF_INET6, false
        }
        //如果没有指定监听地址,同时不支持ipv4MappedIPv6功能,且支持IPv4,则使用IPv4
        if laddr == nil {
            return syscall.AF_INET, false
        }
        //通过判断IP地址(长度和格式)来判断所使用的IP类型。实现函数定义在src/net/tcpsock_posix.go中
        return laddr.family(), false
    }

    //如果未指定本端和远端地址或明确指定了本端和远端需要的地址类型为IPv4,则使用IPv4。可用于处理"listen"和"dial" mode。
    //"listen" mode下已经处理了laddr == nil的情况,如果laddr非nil,调用family()函数判断IP类型即可获得IP类型
    if (laddr == nil || laddr.family() == syscall.AF_INET) &&
        (raddr == nil || raddr.family() == syscall.AF_INET) {
        return syscall.AF_INET, false
    }
    //其他情况使用IPv6,如仅支持IPv6的场景
    return syscall.AF_INET6, false
}

从下面注释中可以看出socket用于返回一个网络描述符。listen场景下,需要使用socket->setsocketopt->bind->listen这几个系统调用来创建一个监听socket

代码语言:javascript
复制
 1 // socket returns a network file descriptor that is ready for
 2 // asynchronous I/O using the network poller. --此处应该是同步IO吧
 3 func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) {
 4     //创建一个socket
 5     s, err := sysSocket(family, sotype, proto)
 6     if err != nil {
 7         return nil, err
 8     }
 9     //设置socket选项,处理IPv6并设置允许广播
10     if err = setDefaultSockopts(s, family, sotype, ipv6only); err != nil {
11         poll.CloseFunc(s)
12         return nil, err
13     }
14     //初始化一个newFD,下面讲解
15     if fd, err = newFD(s, family, sotype, net); err != nil {
16         poll.CloseFunc(s)
17         return nil, err
18     }
19 
20     // 用于处理TCP或UDP服务端。服务端需要确保本地监听地址非nil(但可以为""),否则会被认为是一个客户端socket。ListenTCP中已经对laddr赋初值
21     if laddr != nil && raddr == nil {
22         switch sotype {
23         //处理流协议,TCP
24         case syscall.SOCK_STREAM, syscall.SOCK_SEQPACKET:
25             if err := fd.listenStream(laddr, listenerBacklog(), ctrlFn); err != nil {
26                 fd.Close()
27                 return nil, err
28             }
29             return fd, nil
30         //处理数据报协议,UDP
31         case syscall.SOCK_DGRAM:
32             if err := fd.listenDatagram(laddr, ctrlFn); err != nil {
                 
33                 fd.Close()
34                 return nil, err
35             }
36             return fd, nil
37         }
38     }
39     //处理非监听socket场景,即客户端发起连接
40     if err := fd.dial(ctx, laddr, raddr, ctrlFn); err != nil {
41         fd.Close()
42         return nil, err
43     }
44     return fd, nil
45 }

-----------------------------------------------------------------------------------------------------------------------------------------

golang的系统调用

以上述第5行代码中创建一个socket为例,看golang如何通过系统调用产生一个socket。sysSocket中过的调用链如下:

sysSocket->socketFunc->syscall.Socket->socket。socket函数内容如下,实际通过调用RawSyscall来运行系统调用,其系统调用ID为SYS_SOCKET,值为41

代码语言:javascript
复制
func socket(domain int, typ int, proto int) (fd int, err error) {
    r0, _, e1 := RawSyscall(SYS_SOCKET, uintptr(domain), uintptr(typ), uintptr(proto))
    fd = int(r0)
    if e1 != 0 {
        err = errnoErr(e1)
    }
    return
}

golang中的系统调用定义在src/syscall目录下,redhat系统对应的文件为zsysnum_linux_amd64.go。

操作系统中的系统调用ID定义在/usr/include/asm/unistd.h中(内容如下),其中32位系统使用<asm/unistd_32.h>,64位系统使用<asm/unistd_64.h>。

如golang中的SYS_SOCKET系统调用ID为41,对应redhat系统的定义为#define __NR_socket 41

代码语言:javascript
复制
//asm/unistd.h
#ifndef _ASM_X86_UNISTD_H
#define _ASM_X86_UNISTD_H

/* x32 syscall flag bit */
#define __X32_SYSCALL_BIT       0x40000000

# ifdef __i386__
#  include <asm/unistd_32.h>
# elif defined(__ILP32__)
#  include <asm/unistd_x32.h>
# else
#  include <asm/unistd_64.h>
# endif

#endif /* _ASM_X86_UNISTD_H */

golang可以通过如下4个函数来执行系统调用,后缀带"6"的表示有6个入参,不带"6"的表示有4个入参。(redhat系统的实现定义在src/syscall/asm_linux_amd64.s中)

golang的系统调用可以简单分为:阻塞系统调用,非阻塞系统调用和wrapped系统调用。wrapped系统调用就是自己封装的,如下文的epoll相关的系统调用就属于这一类。

代码语言:javascript
复制
func Syscall(trap, a1, a2, a3 uintptr) (r1, r2 uintptr, err Errno)
func Syscall6(trap, a1, a2, a3, a4, a5, a6 uintptr) (r1, r2 uintptr, err Errno)
func RawSyscall(trap, a1, a2, a3 uintptr) (r1, r2 uintptr, err Errno)
func RawSyscall6(trap, a1, a2, a3, a4, a5, a6 uintptr) (r1, r2 uintptr, err Errno)

Syscall与RawSyscall的有如下区别(以下内容来自 golang syscall原理),通常系统调用使用Syscall,防止阻塞同一个P中的其他goroutine的执行

代码语言:javascript
复制
Syscall在进入系统调用的时候,调用了runtime·entersyscall(SB)函数,在结束系统调用的时候调用了runtime·exitsyscall(SB)。做到进入和退出syscall的时候通知runtime。
这两个函数runtime·entersyscall和runtime·exitsyscall的实现在proc.go文件里面。其实在runtime·entersyscall函数里面,通知系统调用时候,是会将g的M的P解绑,P可以去继续获取M执行其余的g,这样提升效率。 所以如果用户代码使用了 RawSyscall 来做一些阻塞的系统调用,是有可能阻塞其它的 g 的。RawSyscall 只是为了在执行那些一定不会阻塞的系统调用时,能节省两次对 runtime 的函数调用消耗

RawSyscall的源码如下,第一个参数表示系统调用ID,其余为该系统调用所需的参数。整体上比较简单,3~8行将参数传到寄存器,10行执行系统调用,生成socket,后续就是用于处理错误码和返回值

代码语言:javascript
复制
 1 // func RawSyscall(trap, a1, a2, a3 uintptr) (r1, r2, err uintptr)
 2 TEXT ·RawSyscall(SB),NOSPLIT,$0-56
 3     MOVQ    a1+8(FP), DI
 4     MOVQ    a2+16(FP), SI
 5     MOVQ    a3+24(FP), DX
 6     MOVQ    $0, R10
 7     MOVQ    $0, R8
 8     MOVQ    $0, R9
 9     MOVQ    trap+0(FP), AX    // syscall entry
10     SYSCALL
11     CMPQ    AX, $0xfffffffffffff001
12     JLS    ok1
13     MOVQ    $-1, r1+32(FP)
14     MOVQ    $0, r2+40(FP)
15     NEGQ    AX
16     MOVQ    AX, err+48(FP)
17     RET
18 ok1:
19     MOVQ    AX, r1+32(FP)
20     MOVQ    DX, r2+40(FP)
21     MOVQ    $0, err+48(FP)
22     RET

socket代码就是调用了RawSyscall函数来执行SYS_SOCKET类型的系统调用,由于socket系统调用不会阻塞,因此可以使用RawSyscall。

代码语言:javascript
复制
-----------------------------------------------------------------------------------------------------------------------

继续listen流程,在创建完socket并设置socket选项后,进入流处理环节。需要注意的是,此处用到了newFD函数初始化的变量fd。

代码语言:javascript
复制
func (fd *netFD) listenStream(laddr sockaddr, backlog int, ctrlFn func(string, string, syscall.RawConn) error) error {
    var err error
    //此处设置了监听socket所使用的选项,允许地址socket重用,实际中并不推荐使用socket地址重用。
    if err = setDefaultListenerSockopts(fd.pfd.Sysfd); err != nil {
        return err
    }
    var lsa syscall.Sockaddr
    //构建一个实现了syscall.Sockaddr结构的结构体,如syscall.SockaddrInet4/syscall.SockaddrInet4/syscall.SockaddrUnix
    if lsa, err = laddr.sockaddr(fd.family); err != nil {
        return err
    }
    //ctrlFn在bind前调用,可以用于设置socket选项。此处为nil。用法可以参考net/listen_test.go中的TestListenConfigControl函数
    if ctrlFn != nil {
        c, err := newRawConn(fd)
        if err != nil {
            return err
        }
        if err := ctrlFn(fd.ctrlNetwork(), laddr.String(), c); err != nil {
            return err
        }
    }
    // 为socket绑定地址
    if err = syscall.Bind(fd.pfd.Sysfd, lsa); err != nil {
        return os.NewSyscallError("bind", err)
    }
    //使用系统调用SYS_LISTEN,第二个参数表示监听队列大小,来自"/proc/sys/net/core/somaxconn"或syscall.SOMAXCONN(参见src/net/sock_linux.go)
    //该函数等同于系统调用:
    //  #include<sys/socket.h>
    //  int listen(int sockfd, int backlog)
    if err = listenFunc(fd.pfd.Sysfd, backlog); err != nil {
        return os.NewSyscallError("listen", err)
    }
    //fd.init中会初始化epoll,并注册文件描述符。用于在accept时有连接建立时上报连接事件通知。用法参见epoll+socket实现 socket并发 linux服务器
    if err = fd.init(); err != nil {
        return err
    }
    //获取socket信息,此处会使用系统调用getsockname来获得配置的socket信息,并设置到fd.laddr中。监听socket的fd.raddr为nil
    lsa, _ = syscall.Getsockname(fd.pfd.Sysfd)
    fd.setAddr(fd.addrFunc()(lsa), nil)
    return nil
}
代码语言:javascript
复制
    listenStream失败后需要关闭fd.pd和fd.Sysfd
代码语言:javascript
复制
func (fd *netFD) Close() error {
    //与GC相关的设置
    runtime.SetFinalizer(fd, nil)
    return fd.pfd.Close()
}
代码语言:javascript
复制
func (fd *FD) Close() error {
    //将锁状态设置为mutexClosed,即mu.state的第一个bit位,后续将不能使用该锁进行lock操作,仅能unlock减少锁的引用计数。
    //读写对应底层全双工链路,读写操作不互斥。具体参见这篇文章
    if !fd.fdmu.increfAndClose() {
        return errClosing(fd.isFile)
    }

    // Unblock any I/O.  Once it all unblocks and returns,
    // so that it cannot be referring to fd.sysfd anymore,
    // the final decref will close fd.sysfd. This should happen
    // fairly quickly, since all the I/O is non-blocking, and any
    // attempts to block in the pollDesc will return errClosing(fd.isFile).
    //调用runtime_pollUnblock函数unblock pd中的读写goroutine。
    fd.pd.evict()

    //当文件锁的引用计数为0时才能做fd.destroy清理动作
    err := fd.decref()

    // Wait until the descriptor is closed. If this was the only
    // reference, it is already closed. Only wait if the file has
    // not been set to blocking mode, as otherwise any current I/O
    // may be blocking, and that would block the Close.
    // No need for an atomic read of isBlocking, increfAndClose means
    // we have exclusive access to fd.
    //此处用于在epoll场景下,阻塞等待释放文件锁,最终退出Close函数。destroy函数会释放锁。注意此处的锁和fd.fdum不是一个锁
    if fd.isBlocking == 0 {
        runtime_Semacquire(&fd.csema)
    }

    return err
}
代码语言:javascript
复制
func (fd *FD) decref() error {
    //此处减小文件锁的引用计数并判断引用计数是否为0,只有引用计数为0且文件锁关闭时才能进行destory工作
    if fd.fdmu.decref() {
        //除poll.decref外,在poll.readUnlock和poll.writeUnlock也会调用poll.desory()函数,主要用于退出读或写操作时。
        return fd.destroy()
    }
    return nil
}
代码语言:javascript
复制
func (fd *FD) destroy() error {
    //去注册epoll事件
    fd.pd.close()
    //关闭底层socket
    err := CloseFunc(fd.Sysfd)
    fd.Sysfd = -1
    //runtime_Semacquire和runtime_Semrelease对应P/V操作,。可以参见golang中的锁源码实现:Mutex
    //此处表示该文件锁的引用计数为0且已经关闭,释放文件锁,退出Close函数(其他地方可以进行如readUnlock/writeUnlock操作)
    runtime_Semrelease(&fd.csema)
    return err
}

accept会增加监听socket的引用计数

-----------------------------------------------------------------------------------------------------------------------

listen中netpoll的处理

FD的结构体如下,最主要的成员为fdmu和pd,前者为socket创建的文件描述符,后者为与epoll相关的底层结构体。golang中每一个连接都被抽象为一个FD。

代码语言:javascript
复制
//src/internal/poll/fd_unix.go
type FD struct {
    // Lock sysfd and serialize access to Read and Write methods.
    fdmu fdMutex

    // System file descriptor. Immutable until Close.
    Sysfd int

    // I/O poller.
    pd pollDesc

    // Writev cache. 
    iovecs *[]syscall.Iovec

    // Semaphore signaled when file is closed.
    csema uint32

    // Non-zero if this file has been set to blocking mode.
    isBlocking uint32

    // Whether this is a streaming descriptor, as opposed to a
    // packet-based descriptor like a UDP socket. Immutable.
    IsStream bool

    // Whether a zero byte read indicates EOF. This is false for a
    // message based socket connection.
    ZeroReadIsEOF bool

    // Whether this is a file rather than a network socket.
    isFile bool
}
代码语言:javascript
复制
//src/runtime/netpoll.go
type pollDesc struct {
    link *pollDesc // 可以看作是pollcache链表的链表指针

    // The lock protects pollOpen, pollSetDeadline, pollUnblock and deadlineimpl operations.
    // This fully covers seq, rt and wt variables. fd is constant throughout the PollDesc lifetime.
    // pollReset, pollWait, pollWaitCanceled and runtime·netpollready (IO readiness notification)
    // proceed w/o taking the lock. So closing, rg, rd, wg and wd are manipulated
    // in a lock-free way by all operations.
    // NOTE(dvyukov): the following code uses uintptr to store *g (rg/wg),
    // that will blow up when GC starts moving objects.
    lock    mutex // protects the following fields
    fd      uintptr
    closing bool
    user    uint32  // user settable cookie
    rseq    uintptr // protects from stale read timers
    rg      uintptr // pdReady, pdWait, G waiting for read or nil
    rt      timer   // read deadline timer (set if rt.f != nil)
    rd      int64   // read deadline
    wseq    uintptr // protects from stale write timers
    wg      uintptr // pdReady, pdWait, G waiting for write or nil
    wt      timer   // write deadline timer
    wd      int64   // write deadline
}

初始化FD

代码语言:javascript
复制
func (fd *netFD) init() error {
    //当文件描述符为网络时,需要将第二个参数置为true,表示使用poll机制
    return fd.pfd.Init(fd.net, true)
}
代码语言:javascript
复制
// Init initializes the FD. The Sysfd field should already be set.
// This can be called multiple times on a single FD.
// The net argument is a network name from the net package (e.g., "tcp"),
// or "file".
// Set pollable to true if fd should be managed by runtime netpoll.
func (fd *FD) Init(net string, pollable bool) error {
    // We don't actually care about the various network types.
    if net == "file" {
        fd.isFile = true
    }
    //如果不使用poll机制,则fd置为blocking mode并返回。当net=="file"时不会使用poll
    if !pollable {
        fd.isBlocking = 1
        return nil
    }
    //初始化pollDesc。入参校验后执行poll相关的操作,实际执行如下两步系统调用,即创建一个epoll句柄,并注册fd到epoll句柄epfd中
    //int epoll_create(int size);
    //int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
    err := fd.pd.init(fd)
    if err != nil {
        // If we could not initialize the runtime poller,
        // assume we are using blocking mode.
        fd.isBlocking = 1
    }
    return err
}
代码语言:javascript
复制
//src/internal/poll/fd_poll_runtime.go文件中定义了poll相关的操作,如下:
//  func runtime_pollServerInit()
//  func runtime_pollOpen(fd uintptr) (uintptr, int)
//  func runtime_pollClose(ctx uintptr)
//  func runtime_pollWait(ctx uintptr, mode int) int
//  func runtime_pollWaitCanceled(ctx uintptr, mode int) int
//  func runtime_pollReset(ctx uintptr, mode int) int
//  func runtime_pollSetDeadline(ctx uintptr, d int64, mode int)
//  func runtime_pollUnblock(ctx uintptr)
//  func runtime_isPollServerDescriptor(fd uintptr) bool
//以上函数的具体实现在src/runtime/netpoll.go中

runtime_pollServerInit的实现如下。从注释上可以看到该函数使用编译器链接到internal/poll.runtime_pollServerInit,即上述使用的函数。

它调用netpollinit创建epoll句柄,并调用atomic.Store将netpollInited设置为1,表示已经初始化epoll文件句柄。

代码语言:javascript
复制
//go:linkname poll_runtime_pollServerInit internal/poll.runtime_pollServerInit
func poll_runtime_pollServerInit() {
    netpollinit()
    atomic.Store(&netpollInited, 1)
}

netpollinit函数内容如下,首先调用epollcreate1创建epfd,如果失败则调用epollcreate创建。前者对应系统调用int epoll_create(int size);后者对应系统调用int epoll_create1(int flag);具体可以参见epoll函数深入讲解

代码语言:javascript
复制
func netpollinit() {
    //此处创建的epfd为全局变量
    epfd = epollcreate1(_EPOLL_CLOEXEC)
    if epfd >= 0 {
        return
    }
    epfd = epollcreate(1024)
    if epfd >= 0 {
        //调用fcntl给epfd设置FD_CLOEXEC,用于防止文件描述符泄露。参见使用FD_CLOEXEC实现close-on-exec,关闭子进程无用文件描述符
        closeonexec(epfd)
        return
    }
    println("runtime: epollcreate failed with", -epfd)
    throw("runtime: netpollinit failed")
}

epoll相关的实现位于src/runtime/sys_linux_amd64.s中(注意系统类型),可以看到其运行的系统调用为$SYS_epoll_create1,其值定义在同文件中。可以看到其实际值为291,在系统的/usr/include/asm/unistd_64.h中可以看到291的系统调用对应的内容为 #define __NR_epoll_create1 291

代码语言:javascript
复制
#define SYS_epoll_create    213
#define SYS_epoll_ctl        233
#define SYS_epoll_pwait        281
#define SYS_epoll_create1    291
代码语言:javascript
复制
TEXT runtime·epollcreate1(SB),NOSPLIT,$0
    MOVL    flags+0(FP), DI
    MOVL    $SYS_epoll_create1, AX
    SYSCALL
    MOVL    AX, ret+8(FP)
    RET

在创建完epoll之后,注册epoll事件

代码语言:javascript
复制
func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
    //获取一个pollDesc。pollcache为全局pd链表,可以为listen和accept过程提供pd。
    pd := pollcache.alloc()
    lock(&pd.lock)
    //pd全局链表中的节点都应该是可用或初始状态,0为初始化状态
    if pd.wg != 0 && pd.wg != pdReady {
        throw("runtime: blocked write on free polldesc")
    }
    if pd.rg != 0 && pd.rg != pdReady {
        throw("runtime: blocked read on free polldesc")
    }
    //pollDesc中保存了文件描述符
    pd.fd = fd
    //此处将pd的状态初始化为false,表示该pd可用
    pd.closing = false
    pd.rseq++
    pd.rg = 0
    pd.rd = 0
    pd.wseq++
    pd.wg = 0
    pd.wd = 0
    unlock(&pd.lock)

    var errno int32
    //注册文件描述符到epoll句柄中
    errno = netpollopen(fd, pd)
    return pd, int(errno)
}

poll节点的申请流程如下

代码语言:javascript
复制
func (c *pollCache) alloc() *pollDesc {
    lock(&c.lock)
    if c.first == nil {
        const pdSize = unsafe.Sizeof(pollDesc{})
        //首次会创建1024个节点
        n := pollBlockSize / pdSize
        if n == 0 {
            n = 1
        }
        // Must be in non-GC memory because can be referenced
        // only from epoll/kqueue internals.
        //调用malloc申请内存
        mem := persistentalloc(n*pdSize, 0, &memstats.other_sys)
        //通过指针移动来获取节点并将节点串成链表。需要注意的是,末节点的link指向nil
        for i := uintptr(0); i < n; i++ {
            pd := (*pollDesc)(add(mem, i*pdSize))
            pd.link = c.first
            c.first = pd
        }
    }
    //此处有2个作用:如果c.first非空,则返回c.first指向的节点,并将c.first指向下一个可用节点;初始化链表后,c.first指向的是首节点,将其指向首节点之后
    //的节点并返回首节点使用。如果链表节点全部被使用,会重新创建1024个节点
    //当删除注册的event事件时,会回收该节点.
    pd := c.first
    c.first = pd.link
    unlock(&c.lock)
    return pd
}

netpollopen将文件描述符注册到epoll中,此处可以看到pollDesc的作用是作为epollctl的参数成员,即用户数据部分。runtime/netpoll_epoll.go的netpoll函数中会处理此处注册的事件和数据

代码语言:javascript
复制
func netpollopen(fd uintptr, pd *pollDesc) int32 {
    var ev epollevent
    //设置触发事件 对应描述符上有可读数据|对应描述符上有可写数据|描述符被挂起|设置为边缘触发模式(仅在状态变更时上报一次事件)
    //epoll的事件可以参考epoll文档  

runtime_pollUnblock的实现函数为poll_runtime_pollUnblock。获取pd读/写阻塞的goroutine并将其状态切换为runnable,poll_runtime_pollUnblock函数一般在关闭连接时使用。

pollDesc结构体中的rg和wg比较难理解,它们与netpoll相关,将底层缓存区的读写情况反映为当前读写对应的goroutine的状态。当读缓存区没有数据时,会导致rg阻塞(非pdReady),此时调用netpollunblock返回的为读操作所在的goroutine;而当执行write操作时,如果缓存区没有空间,此时会导致wg阻塞,此时调用netpollunblock返回的为写操作所在的goroutine。在非阻塞时,rg/wg表示当前读/写goroutine状态,pdReady表示可以进行读/写操作,pdWait表示当前goroutine将会被park(调用gopark)住。

注意:用户读写操作可以使用同一个goroutine。

代码语言:javascript
复制
func poll_runtime_pollUnblock(pd *pollDesc) {
    lock(&pd.lock)
    if pd.closing {
        throw("runtime: unblock on closing polldesc")
    }
    //将pd状态置为closing,poll_runtime_pollClose会停止并回收pd
    pd.closing = true
    pd.rseq++
    pd.wseq++
    var rg, wg *g
    atomic.StorepNoWB(noescape(unsafe.Pointer(&rg)), nil) // full memory barrier between store to closing and read of rg/wg in netpollunblock
    //获取读写对应的goroutine。因此此处ioready设置为false,表示非底层IO的操作,底层epoll上报事件后,会通过runtime.netpollready调用
    //netpollunblock函数,此时netpollunblock的第三个参数会变为true,用于将对应事件的goroutine变为非阻塞,处理epoll的读写事件
    rg = netpollunblock(pd, 'r', false)
    wg = netpollunblock(pd, 'w', false)
    //pd.rt.f和pd.wt.f都是定时器超时后执行的函数,如果这些函数非空,则清除定时器并置为初始值nil(后续pd需要回收)。
    //定时器用于设置读写连接的deadline时间点,参见本文末的内容
    if pd.rt.f != nil {
        deltimer(&pd.rt)
        pd.rt.f = nil
    }
    if pd.wt.f != nil {
        deltimer(&pd.wt)
        pd.wt.f = nil
    }
    unlock(&pd.lock)
    //此处才是真正unblock阻塞的goroutine,netpollgoready会调用goready函数将阻塞的goroutine变为runnable状态,继续执行。
    //此处的unblock操作对应调用netpollblock函数gopark的goroutine,如等待Accept,等待Read等。当Accept有连接到达或Read有数据读取
    //时,这时需要unblock对应的goroutine继续处理(当然也包括处理错误场景)
    if rg != nil {
        netpollgoready(rg, 3)
    }
    if wg != nil {
        netpollgoready(wg, 3)
    }
}

netpollunblock等待返回读/写操作block的goroutine,如果读/写状态为pdReady(即非阻塞)或初始化状态则返回一个空指针,表示该goroutine没有被阻塞,可直接使用;反之返回一个阻塞的goroutine,golang调用netpollready函数可将其变为runnable。ioready表示是否由底层发起的调用,如果是则需要置为true。只有通过底层epoll事件通知的场景下才会置为true。

ps:这个函数的名字有点奇怪,它并不能主动unblock goroutine

代码语言:javascript
复制
func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g {
    gpp := &pd.rg
    if mode == 'w' {
        gpp = &pd.wg
    }

    //使用for循环用于执行atomic.Casuintptr原子操作。一个pd对于一条连接,而一条连接可能被多个goroutine操作。
    for {
        old := *gpp
        //如果gpp为pdReady,则对应的goroutine为unblock状态,返回即可
        if old == pdReady {
            return nil
        }
        //此处用于初始状态时的场景,此时并没有调用netpollblock阻塞goroutine,直接返回即可
        if old == 0 && !ioready {
            // Only set READY for ioready. runtime_pollWait
            // will check for timeout/cancel before waiting.
            return nil
        }
        var new uintptr
        if ioready {
            new = pdReady
        }
        //atomic.Casuintptr的实现在runtime/internal/atomic/asm_${plantform}.s中,如下处理逻辑为

runtime_pollClose实际调用的函数为poll_runtime_pollClose,用于删除注册的事件并回收pd节点

代码语言:javascript
复制
func poll_runtime_pollClose(pd *pollDesc) {
    if !pd.closing {
        throw("runtime: close polldesc w/o unblock")
    }
    //执行本函数前需要调用netpollunblock将goroutine变为非阻塞状态,以便回收pd节点
    if pd.wg != 0 && pd.wg != pdReady {
        throw("runtime: blocked write on closing polldesc")
    }
    if pd.rg != 0 && pd.rg != pdReady {
        throw("runtime: blocked read on closing polldesc")
    }
    //调用_EPOLL_CTL_DEL删除注册的epoll事件
    netpollclose(pd.fd)
    //回收fd节点
    pollcache.free(pd)
}

golang中使用epoll的方式比较巧妙,也比较奇怪。从上面流程可以看出,创建epoll和注册epoll事件时,通过对API层层调用可以看到其运行了系统调用runtime.netpollinit和runtime.netpollopen,但没有直接用到runtime.epollwait,对Accept,Read等的阻塞是通过poll_runtime_pollWait->netpollblock->gopark阻塞goroutine来实现的,即通过gopark阻塞对应的协程。

runtime.epollwait是在runtime.netpoll中调用的,而runtime.netpoll的是在单独的线程中运行的。

代码语言:javascript
复制
func netpoll(block bool) gList {
    if epfd == -1 {
        return gList{}
    }
    waitms := int32(-1)
    if !block {
        waitms = 0
    }
    var events [128]epollevent
retry:
    //这里运行系统调用阻塞等待epfd上发生的事件
    n := epollwait(epfd, &events[0], int32(len(events)), waitms)
    if n < 0 {
        if n != -_EINTR {
            println("runtime: epollwait on fd", epfd, "failed with", -n)
            throw("runtime: netpoll failed")
        }
        goto retry
    }
    var toRun gList
    //epoll可能一次性上报多个事件
    for i := int32(0); i < n; i++ {
        ev := &events[i]
        if ev.events == 0 {
            continue
        }
        var mode int32
        //底层可读事件,其他事件(如_EPOLLHUP)同时涉及到读写,因此读写的goroutine都需要通知
        if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
            mode += 'r'
        }
        //底层可读事件
        if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
            mode += 'w'
        }
        if mode != 0 {
            pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
            //这里才是底层通知读写事件来unblock(Accept/Read等)协程的地方。netpollready会返回pd对应的读写goroutine链表(runtime.gList),
            //最终在函数退出后返回给runtime.findrunnable函数调度。此处golang runtime会将podllDesc.rg/wg设置为pdready
            netpollready(&toRun, pd, mode)
        }
    }
    if block && toRun.empty() {
        goto retry
    }
    return toRun
}

这也是为什么使用用户API执行的协程需要使用pdReady和pdWait来表示协程状态的原因,因为其无法直接获得epoll_wait的事件信息。

-----------------------------------------------------------------------------------------------------------------------

accept流程

上面基本已经分析了epoll的所有底层流程,后续的就比较简单了。

可以看到Accept使用了listen阶段生成的netFD(TCPListener),接收监听socket的TCP连接。accept返回一个TCP连接,就可以在该TCP上进行读写操作,连接类型为TCPConn。每accept一个连接会创建一个新的goroutine,并调用internal/poll.runtime_pollWait来等待读事件

代码语言:javascript
复制
type TCPConn struct {
    conn
}
代码语言:javascript
复制
type TCPListener struct {
    fd *netFD
}
代码语言:javascript
复制
func (l *TCPListener) Accept() (Conn, error) {
    //如果监听socket已经释放,则无法继续执行accept,返回错误
    if !l.ok() {
        return nil, syscall.EINVAL
    }
    c, err := l.accept()
    if err != nil {
        return nil, &OpError{Op: "accept", Net: l.fd.net, Source: nil, Addr: l.fd.laddr, Err: err}
    }
    return c, nil
}

ln.fd.accept中会返回建立的TCP连接,后续可以在该连接上执行读写操作

代码语言:javascript
复制
func (ln *TCPListener) accept() (*TCPConn, error) {
    fd, err := ln.fd.accept()
    if err != nil {
        return nil, err
    }
    //封装并返回建立好的tcp连接。newTCPConn同时会设置TCP_NODELAY选项来禁止Nagle算法
    return newTCPConn(fd), nil
}
代码语言:javascript
复制
func (fd *netFD) accept() (netfd *netFD, err error) {
    //核心处理函数就是fd.pfd.Accept,返回tcp连接的文件描述符。具体见下文
    d, rsa, errcall, err := fd.pfd.Accept()
    if err != nil {
        if errcall != "" {
            err = wrapSyscallError(errcall, err)
        }
        return nil, err
    }
    //封装返回的tcp连接参数,netfd后续会被newTCPConn封装。ps:此处的返回值判断应该无用,newFD不会返回非nil的错误码
    if netfd, err = newFD(d, fd.family, fd.sotype, fd.net); err != nil {
        poll.CloseFunc(d)
        return nil, err
    }
    //初始化一个pollDesc并注册epoll事件通知,与listen不同,此处用于注册用户连接上的IO读写事件
    if err = netfd.init(); err != nil {
        //ps:这个地方应该close accept的连接,提了个issue,官方已经更新
        fd.Close()
        return nil, err
    }
    //设置netfd中表示本/对端变量的地址
    lsa, _ := syscall.Getsockname(netfd.pfd.Sysfd)
    netfd.setAddr(netfd.addrFunc()(lsa), netfd.addrFunc()(rsa))
    return netfd, nil
}
代码语言:javascript
复制
func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) {
    //注意此处的fd.readLock(),该函数会增加fd.fdmu的引用计数。此处的fd即监听socket,如果监听socket关闭将无法进行readLock操作,
    //会直接返回错误,即无法创建新的连接。
    //在accept的连接上进行读操作会增加该连接的引用计数
    if err := fd.readLock(); err != nil {
        return -1, nil, "", err
    }
    //此处减少fd.fdmu的引用计数。accept建立后的连接并不受监听socket控制,即使close监听socket,已有的连接会不会被关闭。
    //此处的readUnlock还有一个作用,在accept失败后,会在引用计数为0且fd.fdmu状态为mutexClosed时会调用poll.destory()函数close该连接。
    //从代码层面看,只能通过poll.Close(net/fd_unix.go)将网络描述符的fd.fdmu设置为mutexClose,该函数为对外API.
    //如果fd.fdmu未关闭,则此处仅仅减少引用计数。
    defer fd.readUnlock()
    //初始化监听socket的pd,因为监听socket只有读操作,仅初始化pd.rg。
    //初始化的原因是当前goroutine没有被阻塞,清除pd上的标记,进入poll_runtime_pollWait等待epoll事件唤醒goroutine,
    //防止因为残留标记导致虚假唤醒。
    //需要注意,prepare类函数,如prepareRead/prepareWrite会在其prepare->runtime_pollReset->poll_runtime_pollReset->netpollcheckerr中
    //对deadline(参见下文)时间进行检验,如果deadline的时间早于当前时间会导致read/write/accept等操作无法阻塞,直接返回timeout error
    //  func netpollcheckerr(pd *pollDesc, mode int32) int {
    //      if pd.closing {
    //          return 1 // errClosing
    //      }
    //      if (mode == 'r' && pd.rd < 0) || (mode == 'w' && pd.wd < 0) {
    //          return 2 // errTimeout
    //      }
    //      return 0
    //  }
    if err := fd.pd.prepareRead(fd.isFile); err != nil {
        return -1, nil, "", err
    }
    for {
        //调用系统函数返回accept的连接
        s, rsa, errcall, err := accept(fd.Sysfd)
        if err == nil {
            return s, rsa, "", err
        }
        switch err {
        //如果没有连接到来且使用了epoll,则调用runtime_pollWaitd等待新的连接。
        case syscall.EAGAIN:         
            if fd.pd.pollable() {
                if err = fd.pd.waitRead(fd.isFile); err == nil {
                    continue
                }
            }
        //此处用于处理连接中客户端断开情况,建链过程中客户端发送RST报文
        case syscall.ECONNABORTED:
            // This means that a socket on the listen
            // queue was closed before we Accept()ed it;
            // it's a silly error, so try again.
            continue
        }
        return -1, nil, errcall, err
    }
}

如上所述,runtime_pollWaitd并没有运行epollwait系统调用,它通过判断并循环等待goroutine变为pdReady。

代码语言:javascript
复制
func poll_runtime_pollWait(pd *pollDesc, mode int) int {
    //判断连接是否已经超时或关闭
    err := netpollcheckerr(pd, int32(mode))
    if err != 0 {
        return err
    }
    // As for now only Solaris and AIX use level-triggered IO.
    if GOOS == "solaris" || GOOS == "aix" {
        netpollarm(pd, mode)
    }
    for !netpollblock(pd, int32(mode), false) {
        //在超时和关闭情况下无需等待,返回错误
        err = netpollcheckerr(pd, int32(mode))
        if err != 0 {
            return err
        }
        // Can happen if timeout has fired and unblocked us,
        // but before we had a chance to run, timeout has been reset.
        // Pretend it has not happened and retry.
    }
    return 0
}

netpollblock与netpollunblock对应,前者调用gopark函数阻塞goroutine,后者结合goread函数unpark goroutine。netpollblock的返回值用于判断处理的goroutine是否为pdReady。从代码实现来看,netpollblock的目的是park一个非pdReady的goroutine,而非直接pack一个goroutine。park一个pdReady的goroutine是不合理的,有可能该goroutine正在进行读写操作。

netpollblock首先将pd中对应mode(读/写)的goroutine状态设置为pdWait,然后park该goroutine,用法与pdWait的定义一致

// pdWait - a goroutine prepares to park on the semaphore, but not yet parked;

代码语言:javascript
复制
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
    gpp := &pd.rg
    if mode == 'w' {
        gpp = &pd.wg
    }

    //设置gpp状态为pdWait
    for {
        old := *gpp
        if old == pdReady {
            *gpp = 0
            return true
        }
        if old != 0 {
            throw("runtime: double wait")
        }
        if atomic.Casuintptr(gpp, 0, pdWait) {
            break
        }
    }

    // need to recheck error states after setting gpp to WAIT
    // this is necessary because runtime_pollUnblock/runtime_pollSetDeadline/deadlineimpl
    // do the opposite: store to closing/rd/wd, membarrier, load of rg/wg
    //waitio为true可用于等待ioReady
    if waitio || netpollcheckerr(pd, mode) == 0 {
        //此处调用gopark阻塞goroutine。gopark返回可能是goroutine变为非阻塞,也可能由于其他原因(如close,timeout等)返回
        gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
    }
    // be careful to not lose concurrent READY notification
    old := atomic.Xchguintptr(gpp, 0)
    if old > pdWait {
        throw("runtime: corrupted polldesc")
    }
    return old == pdReady
}

read流程

有了上面的基础,read和write就非常简单了。

代码语言:javascript
复制
func (c *conn) Read(b []byte) (int, error) {
    if !c.ok() {
        return 0, syscall.EINVAL
    }
    n, err := c.fd.Read(b)
    if err != nil && err != io.EOF {
        err = &OpError{Op: "read", Net: c.fd.net, Source: c.fd.laddr, Addr: c.fd.raddr, Err: err}
    }
    //用户需要处理返回错误的情况,如关闭连接
    return n, err
}
代码语言:javascript
复制
func (fd *netFD) Read(p []byte) (n int, err error) {
    n, err = fd.pfd.Read(p)
    //与GC相关
    runtime.KeepAlive(fd)
    return n, wrapSyscallError("read", err)
}

可以看到read和accept的代码逻辑基本一致

代码语言:javascript
复制
func (fd *FD) Read(p []byte) (int, error) {
    //增加fd的引用计数
    if err := fd.readLock(); err != nil {
        return 0, err
    }
    //与accept类似,此处通常仅用于减少fd的引用计数,在read失败后需要手动close连接
    defer fd.readUnlock()
    if len(p) == 0 {
        // If the caller wanted a zero byte read, return immediately
        // without trying (but after acquiring the readLock).
        // Otherwise syscall.Read returns 0, nil which looks like
        // io.EOF.
        // TODO(bradfitz): make it wait for readability? (Issue 15735)
        return 0, nil
    }
    if err := fd.pd.prepareRead(fd.isFile); err != nil {
        return 0, err
    }
    if fd.IsStream && len(p) > maxRW {
        p = p[:maxRW]
    }
    for {
        //如果读取到数据,则直接返回
        n, err := syscall.Read(fd.Sysfd, p)
        if err != nil {
            n = 0
            //如果暂时没有数据,且使用epoll,则阻塞等待epoll的读事件通知
            if err == syscall.EAGAIN && fd.pd.pollable() {
                if err = fd.pd.waitRead(fd.isFile); err == nil {
                    continue
                }
            }

            // On MacOS we can see EINTR here if the user
            // pressed ^Z.  See issue #22838.
            if runtime.GOOS == "darwin" && err == syscall.EINTR {
                continue
            }
        }
        err = fd.eofError(n, err)
        return n, err
    }
}

write流程

write与read类似,在遇到IO阻塞时都需要调用runtime_pollWait等待epoll事件。不同点在于,read在有数据时会一次性读完,而write则需要判断底层是否有足够的空间来写入数据

代码语言:javascript
复制
func (c *conn) Write(b []byte) (int, error) {
    if !c.ok() {
        return 0, syscall.EINVAL
    }
    n, err := c.fd.Write(b)
    if err != nil {
        err = &OpError{Op: "write", Net: c.fd.net, Source: c.fd.laddr, Addr: c.fd.raddr, Err: err}
    }
    return n, err
}
代码语言:javascript
复制
func (fd *netFD) Write(p []byte) (nn int, err error) {
    nn, err = fd.pfd.Write(p)
    runtime.KeepAlive(fd)
    return nn, wrapSyscallError("write", err)
}
代码语言:javascript
复制
func (fd *FD) Write(p []byte) (int, error) {
    if err := fd.writeLock(); err != nil {
        return 0, err
    }
    defer fd.writeUnlock()
    if err := fd.pd.prepareWrite(fd.isFile); err != nil {
        return 0, err
    }
    var nn int
    //循环写入数据,发送数据的大小受发送缓存区限制,如果发送数据过大,则需要分多次发送
    for {
        max := len(p)
        //从maxRW的注释中可以看到Darwin和FreeBSD不允许一次性读写超过2G的数据。此处表示当写数据超过2G时,仅写入前2G数据
        //read函数中没有此限制,原因是在网络上读取数据时,socket读缓存区大小远远小于2G。
        if fd.IsStream && max-nn > maxRW {
            max = nn + maxRW
        }
        //返回发送成功的字节数
        n, err := syscall.Write(fd.Sysfd, p[nn:max])
        if n > 0 {
            nn += n
        }
        //只有发送失败或所有数据发送成功才算write结束
        if nn == len(p) {
            return nn, err
        }
        //遇到写缓存区满且使用epoll时,等待epoll上报缓存区有空间事件
        if err == syscall.EAGAIN && fd.pd.pollable() {
            if err = fd.pd.waitWrite(fd.isFile); err == nil {
                continue
            }
        }
        if err != nil {
            return nn, err
        }
        if n == 0 {
            return nn, io.ErrUnexpectedEOF
        }
    }
}

设置连接的deadline

可以通过如下参数设置不同的连接终止时间,底层都调用了poll.setDeadlineImpl函数。设置t为0表示永不超时。具体用法可以参见net.Conn(net/net.go)接口注释。

代码语言:javascript
复制
func (fd *FD) SetDeadline(t time.Time) error 
func (fd *FD) SetReadDeadline(t time.Time) error
func (fd *FD) SetWriteDeadline(t time.Time) error

以上函数调用setDeadlineImpl实现

代码语言:javascript
复制
func setDeadlineImpl(fd *FD, t time.Time, mode int) error {
    var d int64
    //如果设置了连接deadline时间,计算到deadline的时间差值。此处主要做一个预处理
    if !t.IsZero() {
        d = int64(time.Until(t))
        //这里表示deadline时间点为当前时间,则设置为-1
        if d == 0 {
            d = -1 // don't confuse deadline right now with no deadline
        }
    }
    if err := fd.incref(); err != nil {
        return err
    }
    defer fd.decref()
    if fd.pd.runtimeCtx == 0 {
        return ErrNoDeadline
    }
    //此处调用函数对定时器进行处理
    runtime_pollSetDeadline(fd.pd.runtimeCtx, d, mode)
    return nil
}

poll_runtime_pollSetDeadline中当到达deadline后执行如下函数,netpolldeadlineimpl实际执行的就是在deadline到期后运行netpollunblock+netpollgoready将阻塞的goroutine变为非阻塞,这会导致返回timeout io错误

func netpollDeadline(arg interface{}, seq uintptr) { netpolldeadlineimpl(arg.(*pollDesc), seq, true, true) } func netpollReadDeadline(arg interface{}, seq uintptr) { netpolldeadlineimpl(arg.(*pollDesc), seq, true, false) } func netpollWriteDeadline(arg interface{}, seq uintptr) { netpolldeadlineimpl(arg.(*pollDesc), seq, false, true) }

代码语言:javascript
复制
func poll_runtime_pollSetDeadline(pd *pollDesc, d int64, mode int) {
    lock(&pd.lock)
    if pd.closing {
        unlock(&pd.lock)
        return
    }
    rd0, wd0 := pd.rd, pd.wd
    combo0 := rd0 > 0 && rd0 == wd0
    if d > 0 {
        //获取deadline时间
        d += nanotime()
        //从注释看,这种情况表示deadline时间小于等于当前时间。将deadline时间设置为64bit的最大值
        if d <= 0 {
            // If the user has a deadline in the future, but the delay calculation
            // overflows, then set the deadline to the maximum possible value.
            d = 1<<63 - 1
        }
    }
    //按照不同mode设置读写对应的deadline时间
    if mode == 'r' || mode == 'r'+'w' {
        pd.rd = d
    }
    if mode == 'w' || mode == 'r'+'w' {
        pd.wd = d
    }
    //combo用于表示仅设置了读deadline还是同时设置了读写deadline 
    combo := pd.rd > 0 && pd.rd == pd.wd
    //读场景下deadline时间点执行的函数
    rtf := netpollReadDeadline
    //读场景下deadline时间点执行的函数
    if combo {
        rtf = netpollDeadline
    }
    //如果没有设置读deadline时间点运行的函数,则使用默认的函数
    if pd.rt.f == nil {
        if pd.rd > 0 {
            //设置deadline时间点,以及到时后运行的函数,函数参数等
            pd.rt.f = rtf
            pd.rt.when = pd.rd
            // Copy current seq into the timer arg.
            // Timer func will check the seq against current descriptor seq,
            // if they differ the descriptor was reused or timers were reset.
            pd.rt.arg = pd
            pd.rt.seq = pd.rseq
            //添加时间并启动定时任务
            addtimer(&pd.rt)
        }
    //此处用于处理仅读deadline的情况
    } else if pd.rd != rd0 || combo != combo0 {
        pd.rseq++ // invalidate current timers
        if pd.rd > 0 {
            //由于使用了自定义的deadline处理函数,此处调用modtimer重新赋值并启动定时任务
            modtimer(&pd.rt, pd.rd, 0, rtf, pd, pd.rseq)
        } else {
            //此处表示入参t为0的情况,即永不超时,删除定时器和定时器函数
            deltimer(&pd.rt)
            pd.rt.f = nil
        }
    }
    //此处处理写deadline相关的情况,与读类似
    if pd.wt.f == nil {
        if pd.wd > 0 && !combo {
            pd.wt.f = netpollWriteDeadline
            pd.wt.when = pd.wd
            pd.wt.arg = pd
            pd.wt.seq = pd.wseq
            addtimer(&pd.wt)
        }
    } else if pd.wd != wd0 || combo != combo0 {
        pd.wseq++ // invalidate current timers
        if pd.wd > 0 && !combo {
            modtimer(&pd.wt, pd.wd, 0, netpollWriteDeadline, pd, pd.wseq)
        } else {
            deltimer(&pd.wt)
            pd.wt.f = nil
        }
    }
    // If we set the new deadline in the past, unblock currently pending IO if any.
    var rg, wg *g
    //如果deadline时间点早于当前时间,则unblock pd的所有IO,返回timeout IO错误
    if pd.rd < 0 || pd.wd < 0 {
        atomic.StorepNoWB(noescape(unsafe.Pointer(&wg)), nil) // full memory barrier between stores to rd/wd and load of rg/wg in netpollunblock
        if pd.rd < 0 {
            rg = netpollunblock(pd, 'r', false)
        }
        if pd.wd < 0 {
            wg = netpollunblock(pd, 'w', false)
        }
    }
    unlock(&pd.lock)
    if rg != nil {
        netpollgoready(rg, 3)
    }
    if wg != nil {
        netpollgoready(wg, 3)
    }
}

setDeadLine主要是防止服务端阻塞等待导致的大量冗余连接(长连接),参见Go net/http 超时机制完全手册

TIPS:

  • epoll的用法可以参见这里

参考:

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019-09-20 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档