目前,高性能的网络服务都是采用异步IO和事件驱动。此方式采用epoll接收所有网络数据包,然后根据fd进行上下文切换,然后异步回调事件处理函数。GO引入了goroutine,将异步回调编程了多协程同步的编程方式。
GO原生支持协程,并且服务器上可以支持上万的协程goroutine。所以在网络编程方面,一般都采用一个连接开启一个协程的模式。
下面我们看下GO网络库netpoll的实现
type netFD struct {
pfd poll.FD
// immutable until Close
family int
sotype int
isConnected bool // handshake completed or use of association with peer
net string
laddr Addr
raddr Addr
}
type FD struct {
fdmu fdMutex
Sysfd int
pd pollDesc
......
}
type pollDesc struct {
link *pollDesc // in pollcache, protected by pollcache.lock
lock mutex // protects the following fields
fd uintptr
closing bool
user uint32 // user settable cookie
rseq uintptr // protects from stale read timers
rg uintptr // pdReady, pdWait, G waiting for read or nil
rt timer // read deadline timer (set if rt.f != nil)
rd int64 // read deadline
wseq uintptr // protects from stale write timers
wg uintptr // pdReady, pdWait, G waiting for write or nil
wt timer // write deadline timer
wd int64 // write deadline
}
GO封装的网络描述符。主要维护网络读写操作。其中成员最重要的成员pfd,它是一个底层netpoll的文件描述符。
GO底层netpoll的文件描述符。主要有两个成员。Sysfd:操作系统文件描述符。pd: I/O 轮询器.
GO底层netpoll的轮询器。维护了此fd和goroutine相关信息。主要有以下重要的成员:fd:底层描述符,rg:读就绪信号。wg:写就绪信号
rg和wg有三种情况:pdReady:IO就绪;pdWait:IO等待,准备挂起当前协程;G waiting:当前协程挂起状态,此时该变量为指向goroutine的指针
以下函数所在文件netpoll_epoll.go,它是GO语言在Linux操作系统中使用epoll作为网络IO复用的代码实现。其中包括epoll的创建,epoll中添加、移除fd,epoll wait等操作
netpollinit函数,调用epollcreate1创建一个多路复用的实例,创建失败则调用epollcreate函数。epfd为一个全局变量,并且只运行一次,也就是在一个进程中只维护一个多路复用的实例。
var (
epfd int32 = -1 // epoll descriptor
)
func netpollinit() {
epfd = epollcreate1(_EPOLL_CLOEXEC)
if epfd >= 0 {
return
}
epfd = epollcreate(1024)
if epfd >= 0 {
closeonexec(epfd)
return
}
println("runtime: epollcreate failed with", -epfd)
throw("runtime: netpollinit failed")
}
......
var serverInit sync.Once
func (pd *pollDesc) init(fd *FD) error {
serverInit.Do(runtime_pollServerInit)
ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
if errno != 0 {
if ctx != 0 {
runtime_pollUnblock(ctx)
runtime_pollClose(ctx)
}
return syscall.Errno(errno)
}
pd.runtimeCtx = ctx
return nil
}
netpollopen函数,将fd添加到轮询器中。并且每个fd都监听了读写事件,并且使用ET模式(边缘触发)。除此外还有个_EPOLLRDHUP,此事件为对端关闭连接。Linux在2.6.7版本内核中增加EPOLLRDHUP事件,此特性增加之前,对端关闭连接,会触发EPOLLIN事件,此时上层读取数据,会读取到EOF。
func netpollopen(fd uintptr, pd *pollDesc) int32 {
var ev epollevent
ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
*(**pollDesc)(unsafe.Pointer(&ev.data)) = pd
return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)
}
netpoll函数,等待epoll事件,根据网络就绪fd拿到pollDesc,调用netpollready函数,将读写信号置为ready。并且将goroutine放到可执行队列中。等待调度器调度。
func netpoll(block bool) gList {
......
retry:
n := epollwait(epfd, &events[0], int32(len(events)), waitms)
if n < 0 {
if n != -_EINTR {
println("runtime: epollwait on fd", epfd, "failed with", -n)
throw("runtime: netpoll failed")
}
goto retry
}
var toRun gList
for i := int32(0); i < n; i++ {
ev := &events[i]
if ev.events == 0 {
continue
}
var mode int32
if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'r'
}
if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'w'
}
if mode != 0 {
pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
netpollready(&toRun, pd, mode)
}
}
if block && toRun.empty() {
goto retry
}
return toRun
}
结合上述netpoll底层函数,我们看下GO网路库net的实现。
func (fd *netFD) Read(p []byte) (n int, err error) {
n, err = fd.pfd.Read(p)
runtime.KeepAlive(fd)
return n, wrapSyscallError("read", err)
}
// Read implements io.Reader.
func (fd *FD) Read(p []byte) (int, error) {
......
for {
n, err := syscall.Read(fd.Sysfd, p)
if err != nil {
n = 0
if err == syscall.EAGAIN && fd.pd.pollable() {
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
if runtime.GOOS == "darwin" && err == syscall.EINTR {
continue
}
}
err = fd.eofError(n, err)
return n, err
}
}
net网络库的fd都是以netFD结构出现,因此我们查看netFd的Read函数。它底层调用syscall.Read()函数,从上述代码可以看出,此函数是非阻塞模式,当IO缓冲区没有数据时候,返回EAGAIN。然后调用fd.pd.waitRead函数等待读取。waitRead底层调用poll_runtime_pollWait轮询等待。最终通过netpollblock函数确定IO是否就绪。
func poll_runtime_pollWait(pd *pollDesc, mode int) int {
err := netpollcheckerr(pd, int32(mode))
if err != 0 {
return err
}
// As for now only Solaris and AIX use level-triggered IO.
if GOOS == "solaris" || GOOS == "aix" {
netpollarm(pd, mode)
}
for !netpollblock(pd, int32(mode), false) {
err = netpollcheckerr(pd, int32(mode))
if err != 0 {
return err
}
// Can happen if timeout has fired and unblocked us,
// but before we had a chance to run, timeout has been reset.
// Pretend it has not happened and retry.
}
return 0
}
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}
// set the gpp semaphore to WAIT
for {
old := *gpp
if old == pdReady {
*gpp = 0
return true
}
if old != 0 {
throw("runtime: double wait")
}
if atomic.Casuintptr(gpp, 0, pdWait) {
break
}
}
if waitio || netpollcheckerr(pd, mode) == 0 {
gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
}
// be careful to not lose concurrent READY notification
old := atomic.Xchguintptr(gpp, 0)
if old > pdWait {
throw("runtime: corrupted polldesc")
}
return old == pdReady
}
poll_runtime_pollWait函数循环调用了netpollblock,返回true表示IO已准备好,返回false表示超时或者发生错误。在netpollblock函数中调用了gopark函数,此函数汇编实现,具体功能是将当前的goroutine修改为等待状态,然后执行其他的goroutine。等待IO就绪后,epoll将此goroutine修改为就绪状态。等待调度器调度。
总结:GO的网络操作底层全部为非阻塞IO,具体情况为:go协程从网络读取数据,读取失败并且返回syscall.EAGAIN时,依次调用waitRead->runtime_pollWait->poll_runtime_pollWait->netpollblock->gopark将当前协程挂起,模拟出阻塞IO。等待该fd有数据后,重新唤醒该协程。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。