前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Go语言源码笔记 --- netpoller

Go语言源码笔记 --- netpoller

原创
作者头像
叫你不戴帽子
修改2018-09-02 12:22:16
3.9K0
修改2018-09-02 12:22:16
举报

源码参考: Go1.11

总览:Go中网络交互采用多路复用的技术,具体到各个平台,即Kqueue、Epoll、Select、Poll等,下面以Linux下的Epoll实现为例进行分析。

netpoller代码分析

所谓的netpoller,其实是Golang实利用了OS提供的非阻塞IO访问模式,并配合epll/kqueue等IO事件监控机制;为了弥合OS的异步机制与Golang接口的差异,而在runtime上做的一层封装。以此来实现网络IO优化。

实际的实现(epoll/kqueue)必须定义以下函数:

代码语言:txt
复制
func netpollinit() // 初始化轮询器
func netpollopen(fd uintptr, pd *pollDesc) int32 // 为fd和pd启动边缘触发通知

当一个goroutine进行io阻塞时,会去被放到等待队列。这里面就关键的就是建立起文件描述符和goroutine之间的关联。 pollDesc结构体就是完成这个任务的。代码参见src/runtime/netpoll.go

代码语言:txt
复制
type pollDesc struct { // Poller对象
    link *pollDesc // 链表
    lock mutex // 保护下面字段
    fd uintptr // fd是底层网络io文件描述符,整个生命期内,不能改变值
    closing bool
    seq uintptr // protect from stale(过时) timers and ready notifications
    rg uintptr // reader goroutine addr
    rt timer
    rd int64
    wg uintptr // writer goroutine addr
    wt timer
    wd int64
    user int32 // user-set cookie用户自定义数据
}

type pollCache struct { // 全局Poller链表
    lock mutex // 保护Poller链表
    first *pollDesc
}

func poll_runtime_pollServerInit() // 调用netpollinit()
func poll_runtime_pollOpen() // 调用netpollopen()
func poll_runtime_pollClose() // 调用netpollclose()
func poll_runtime_pollReset(pd, mode) // 先check(netpollcheckerr(pd, mode))是否有err发生,没有的话重置pd对应字段
func poll_runtime_pollWait(pd, mode) // 先chekerr,再调用netpollblock(pd, mode, false)
func poll_runtime_pollWaitCanceled(pd, mode) // windows下专用
func poll_runtime_pollSetDeadline(pd, deadline, mode)
//1. 重置定时器,并seq++
//2. 设置超时函数netpollDeadline(或者netpollReadDeadline、netpollWriteDeadline)
//3. 如果已经过期,调用netpollunblock和netpollgoready
func poll_runtime_pollUnblock(pd) // netpollUnblock、netpollgoready

/*------------------部分实现------------------*/
func netpollcheckerr(pd, mode) // 检查是否超时或正在关闭
func netpollblockcommit(gp *g, gpp unsafe.Pointer)
func netpollready(gpp *guintptr, pd, mode) // 调用netpollunblock,更新g的schedlink
func netpollgoready(gp *g, traceskip) // 更新统计数据,调用goready --- 通知调度器协程g从parked变为ready
func netpollblock(pd, mode, waitio) // Set rg/wg = pdWait,调用gopark挂起pd对应的g。
func netpollunblock(pd, mode, ioready)
func netpoll(Write/Read)Deadline(arg, seq)

pollCache是pollDesc链表入口,加锁保护链表安全。

pollDesc中,rg、wg有些特殊,它可能有如下3种状态:

  1. pdReady == 1: 网络io就绪通知,goroutine消费完后应置为nil
  2. pdWait == 2: goroutine等待被挂起,后续可能有3种情况:
    1. goroutine被调度器挂起,置为goroutine地址
    2. 收到io通知,置为pdReady
    3. 超时或者被关闭,置为nil
  3. Goroutine地址: 被挂起的goroutine的地址,当io就绪时、或者超时、被关闭时,此goroutine将被唤醒,同时将状态改为pdReady或者nil。

另外,由于wg、rg是goroutine的地址,因此当GC发生后,如果goroutine被回收(在heap区),代码就崩溃了(指针无效)。所以,进行网络IO的goroutine不能在heap区分配内存。

lock锁对象保护了pollOpen, pollSetDeadline, pollUnblock和deadlineimpl操作。而这些操作又完全包含了对seq, rt, tw变量。fd在PollDesc整个生命过程中都是一个常量。处理pollReset, pollWait, pollWaitCanceled和runtime.netpollready(IO就绪通知)不需要用到锁,所以closing, rg, rd, wg和wd的所有操作都是一个无锁的操作。

多路复用三部曲

初始化PollServer

初始化在下面注册fd监听时顺便处理了,调用runtime_pollServerInit(),并使用sync.Once()机制保证只会被初始化一次。全局使用同一个EpollServer(同一个Epfd)。

代码语言:txt
复制
func poll_runtime_ServerInit() {
    netpollinit() // 具现化到Linux下,调用epoll_create
    ...
}

注册监听fd

所有Unix文件在初始化时,如果支持Poll,都会加入到PollServer的监听中。源码下搜索runtime_pollOpen,即见分晓。

代码语言:txt
复制
/*****************internal/poll/fd_unix.go*******************/
type FD struct {
    // Lock sysfd and serialize access to Read and Write methods.
    fdmu fdMutex
    // System file descriptor. Immutable until Close.
    Sysfd int
    // I/O poller.
    pd pollDesc
    ...
}
func(fd *FD) Init(net string, pollable bool) error {
    ...
    err := fd.pd.init(fd) // 初始化pd
    ...
}
...
/*****************internal/poll/fd_poll_runtime.go*****************/
type pollDesc struct {
    runtimeCtx uintptr
}
func (pd *pollDesc) init(fd *FD) error {
    serverInit.Do(runtime_pollServerInit) // 初始化PollServer(sync.Once)
    ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
    ...
    runtimeCtx = ctx
    return nil
}
...
/*****************runtime/netpoll.go*****************/
func poll_runtime_pollOpen(fd uintptr) (*epDesc, int32) {
    ...
    errno := netpollopen(fd, pd) // 具现化到Linux下,调用epoll_ctl
    ...
}

取消fd的监听与此流程类似,最终调用epoll_ctl.

定期Poll

结合上述实现,必然有处逻辑定期执行epoll_wait来检测fd状态。在代码中搜索下netpoll(,即可发现是在sysmon、startTheWorldWithSema、pollWork、findrunnable中调用的,以sysmon为例:

代码语言:txt
复制
// runtime/proc.go
...
lastpoll := int64(atomic.Load64(&sched.lastpoll))
now := nanotime()
// 如果10ms内没有poll过,则poll。(1ms=1000000ns)
if lastpoll != 0 && lastpoll+10*1000*1000 < now {
    atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now))
    gp := netpoll(false) // netpoll在Linux具现为epoll_wait
    if gp != nil {
       injectglist(gp) //把g放到sched中去执行,底层仍然是调用的之前在goroutine里面提到的startm函数。
	}
}
...

以读等待挂起为例

加入监听

golang中客户端与服务端进行通讯时,常用如下方法:

代码语言:txt
复制
conn, err := net.Dial("tcp", "localhost:1208")
...

从net.Dial看进去,最终会调用net/net_posix.go中的socket函数,大致流程如下:

代码语言:txt
复制
func socket(...) ... {
	/*
	1. 调用sysSocket创建原生socket
	2. 调用同名包下netFd(),初始化网络文件描述符netFd
	3. 调用fd.dial(),其中最终有调用runtime_pollOpen()加入监听列表
	*/
}

至此,文件描述符已经加入pollServer监听列表。

读等待

主要是挂起goroutine,并建立gorotine和fd之间的关联。

当从netFd读取数据时,调用system call,循环从fd.sysfd读取数据:

代码语言:txt
复制
func (fd *FD) Read(p []byte) (int, error) {
    if err := fd.pd.prepareRead(fd.isFile); err != nil {
        return 0, err
    }
    if fd.IsStream && len(p) > maxRW {
        p = p[:maxRW]
    }
    for {
        n, err := syscall.Read(fd.Sysfd, p)
        if err != nil {
            n = 0
            if err == syscall.EAGAIN && fd.pd.pollable() {
                if err = fd.pd.waitRead(fd.isFile); err == nil {
                    continue
                }
            }
        }
        err = fd.eofError(n, err)
        return n, err
    }
}

读取的时候只处理EAGAIN类型的错误,其他错误一律返回给调用者,因为对于非阻塞的网络连接的文件描述符,如果错误是EAGAIN,说明Socket的缓冲区为空,未读取到任何数据,则调用fd.pd.WaitRead:

代码语言:txt
复制
func (pd *pollDesc) waitRead(isFile bool) error {
    return pd.wait('r', isFile)
}

func (pd *pollDesc) wait(mode int, isFile bool) error {
    if pd.runtimeCtx == 0 {
        return errors.New("waiting for unsupported file type")
    }
    res := runtime_pollWait(pd.runtimeCtx, mode)
    return convertErr(res, isFile)
}

res是runtime_pollWait函数返回的结果,由conevertErr函数包装后返回:

代码语言:txt
复制
func convertErr(res int, isFile bool) error {
    switch res {
    case 0:
        return nil
    case 1:
        return errClosing(isFile)
    case 2:
        return ErrTimeout
    }
    println("unreachable: ", res)
    panic("unreachable")
}

其中0表示io已经准备好了,1表示链接意见关闭,2表示io超时。再来看看pollWait的实现:

代码语言:txt
复制
func poll_runtime_pollWait(pd *pollDesc, mode int) int {
    err := netpollcheckerr(pd, int32(mode))
    if err != 0 {
        return err
    }
    for !netpollblock(pd, int32(mode), false) {
        err = netpollcheckerr(pd, int32(mode))
        if err != 0 {
            return err
        }
    }
    return 0
}

调用netpollblock来判断IO是否准备好了:

代码语言:txt
复制
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
	gpp := &pd.rg
	if mode == 'w' {
		gpp = &pd.wg
	}
    for {
        old := *gpp
        if old == pdReady {
            *gpp = 0
            return true
        }
        if old != 0 {
            throw("runtime: double wait")
        }
        if atomic.Casuintptr(gpp, 0, pdWait) {
            break
        }
    }
    if waitio || netpollcheckerr(pd, mode) == 0 {
	    gopark(netpollblockcommit, unsafe.Pointer(gpp), "IO wait", traceEvGoBlockNet, 5)
    }
    old := atomic.Xchguintptr(gpp, 0)
    if old > pdWait {
        throw("runtime: corrupted polldesc")
    }
    return old == pdReady
}

返回true说明IO已经准备好,返回false说明IO操作已经超时或者已经关闭。否则当waitio为false, 且io不出现错误或者超时才会挂起当前goroutine。最后的gopark函数,就是将当前的goroutine(调用者)设置为waiting状态。

就绪唤醒

如上所述,go在多种场景下都会调用netpoll检查文件描述符状态。寻找到IO就绪的socket文件描述符,并找到这些socket文件描述符对应的轮询器中附带的信息,根据这些信息将之前等待这些socket文件描述符就绪的goroutine状态修改为Grunnable。执行完netpoll之后,会找到一个就绪的goroutine列表,接下来将就绪的goroutine加入到调度队列中,等待调度运行。

总结

总的来说,netpoller的最终的效果就是用户层阻塞,底层非阻塞。当goroutine读或写阻塞时会被放到等待队列,这个goroutine失去了运行权,但并不是真正的整个系统“阻塞”于系统调用。而通过后台的poller不停地poll,所有的文件描述符都被添加到了这个poller中的,当某个时刻一个文件描述符准备好了,poller就会唤醒之前因它而阻塞的goroutine,于是goroutine重新运行起来。

和使用Unix系统中的select或是poll方法不同地是,Golang的netpoller查询的是能被调度的goroutine而不是那些函数指针、包含了各种状态变量的struct等,这样你就不用管理这些状态,也不用重新检查函数指针等,这些都是你在传统Unix网络I/O需要操心的问题。


参考资料:

  1. 深入Golang之netpoller
  2. go1.11源码
  3. 使用 Go 进行 Socket 编程 | 始于珞尘

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • netpoller代码分析
  • 多路复用三部曲
    • 初始化PollServer
      • 注册监听fd
        • 定期Poll
        • 以读等待挂起为例
          • 加入监听
            • 读等待
              • 就绪唤醒
              • 总结
              相关产品与服务
              腾讯云代码分析
              腾讯云代码分析(内部代号CodeDog)是集众多代码分析工具的云原生、分布式、高性能的代码综合分析跟踪管理平台,其主要功能是持续跟踪分析代码,观测项目代码质量,支撑团队传承代码文化。
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档