前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >从网络库浅析GO协程切换

从网络库浅析GO协程切换

原创
作者头像
1011689
修改2019-07-15 11:05:59
1.7K0
修改2019-07-15 11:05:59
举报
文章被收录于专栏:MM

目前,高性能的网络服务都是采用异步IO和事件驱动。此方式采用epoll接收所有网络数据包,然后根据fd进行上下文切换,然后异步回调事件处理函数。GO引入了goroutine,将异步回调编程了多协程同步的编程方式。

GO原生支持协程,并且服务器上可以支持上万的协程goroutine。所以在网络编程方面,一般都采用一个连接开启一个协程的模式。

下面我们看下GO网络库netpoll的实现

1、数据结构

代码语言:javascript
复制
type netFD struct {
	pfd poll.FD

	// immutable until Close
	family      int
	sotype      int
	isConnected bool // handshake completed or use of association with peer
	net         string
	laddr       Addr
	raddr       Addr
}

type FD struct {
    fdmu fdMutex
    Sysfd int
    pd pollDesc
    ......
}

type pollDesc struct {
    link *pollDesc // in pollcache, protected by pollcache.lock
    lock    mutex // protects the following fields
    fd      uintptr
    closing bool
    user    uint32  // user settable cookie
    rseq    uintptr // protects from stale read timers
    rg      uintptr // pdReady, pdWait, G waiting for read or nil
    rt      timer   // read deadline timer (set if rt.f != nil)
    rd      int64   // read deadline
    wseq    uintptr // protects from stale write timers
    wg      uintptr // pdReady, pdWait, G waiting for write or nil
    wt      timer   // write deadline timer
    wd      int64   // write deadline
}
  • netFD

GO封装的网络描述符。主要维护网络读写操作。其中成员最重要的成员pfd,它是一个底层netpoll的文件描述符。

  • poll.FD

GO底层netpoll的文件描述符。主要有两个成员。Sysfd:操作系统文件描述符。pd: I/O 轮询器.

  • pollDesc

GO底层netpoll的轮询器。维护了此fd和goroutine相关信息。主要有以下重要的成员:fd:底层描述符,rg:读就绪信号。wg:写就绪信号

rg和wg有三种情况:pdReady:IO就绪;pdWait:IO等待,准备挂起当前协程;G waiting:当前协程挂起状态,此时该变量为指向goroutine的指针

2、netpoll相关函数

以下函数所在文件netpoll_epoll.go,它是GO语言在Linux操作系统中使用epoll作为网络IO复用的代码实现。其中包括epoll的创建,epoll中添加、移除fd,epoll wait等操作

2.1 netpollinit

netpollinit函数,调用epollcreate1创建一个多路复用的实例,创建失败则调用epollcreate函数。epfd为一个全局变量,并且只运行一次,也就是在一个进程中只维护一个多路复用的实例。

代码语言:javascript
复制
var (
    epfd int32 = -1 // epoll descriptor
)
func netpollinit() {
    epfd = epollcreate1(_EPOLL_CLOEXEC)
    if epfd >= 0 {
        return
    }
    epfd = epollcreate(1024)
    if epfd >= 0 {
        closeonexec(epfd)
        return
    }
    println("runtime: epollcreate failed with", -epfd)
    throw("runtime: netpollinit failed")
}

......
var serverInit sync.Once

func (pd *pollDesc) init(fd *FD) error {
	serverInit.Do(runtime_pollServerInit)
	ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
	if errno != 0 {
		if ctx != 0 {
			runtime_pollUnblock(ctx)
			runtime_pollClose(ctx)
		}
		return syscall.Errno(errno)
	}
	pd.runtimeCtx = ctx
	return nil
}

2.2 netpollopen

netpollopen函数,将fd添加到轮询器中。并且每个fd都监听了读写事件,并且使用ET模式(边缘触发)。除此外还有个_EPOLLRDHUP,此事件为对端关闭连接。Linux在2.6.7版本内核中增加EPOLLRDHUP事件,此特性增加之前,对端关闭连接,会触发EPOLLIN事件,此时上层读取数据,会读取到EOF。

代码语言:javascript
复制
func netpollopen(fd uintptr, pd *pollDesc) int32 {
    var ev epollevent
    ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
    *(**pollDesc)(unsafe.Pointer(&ev.data)) = pd
    return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)
}

3.3 netpoll

netpoll函数,等待epoll事件,根据网络就绪fd拿到pollDesc,调用netpollready函数,将读写信号置为ready。并且将goroutine放到可执行队列中。等待调度器调度。

代码语言:go
复制
func netpoll(block bool) gList {
    ......
retry:
    n := epollwait(epfd, &events[0], int32(len(events)), waitms)
    if n < 0 {
        if n != -_EINTR {
            println("runtime: epollwait on fd", epfd, "failed with", -n)
            throw("runtime: netpoll failed")
        }
        goto retry
    }
    var toRun gList
    for i := int32(0); i < n; i++ {
        ev := &events[i]
        if ev.events == 0 {
            continue
        }
        var mode int32
        if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
            mode += 'r'
        }
        if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
            mode += 'w'
        }
        if mode != 0 {
            pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
            netpollready(&toRun, pd, mode)
        }
    }
    if block && toRun.empty() {
        goto retry
    }
    return toRun
}

3、go上层网络库net的实现

结合上述netpoll底层函数,我们看下GO网路库net的实现。

3.1 net.Read函数的实现:

代码语言:javascript
复制
func (fd *netFD) Read(p []byte) (n int, err error) {
    n, err = fd.pfd.Read(p)
    runtime.KeepAlive(fd)
    return n, wrapSyscallError("read", err)
}
// Read implements io.Reader.
func (fd *FD) Read(p []byte) (int, error) {
    ......
    for {
        n, err := syscall.Read(fd.Sysfd, p)
        if err != nil {
            n = 0
            if err == syscall.EAGAIN && fd.pd.pollable() {
                if err = fd.pd.waitRead(fd.isFile); err == nil {
                    continue
                }
            }
            if runtime.GOOS == "darwin" && err == syscall.EINTR {
                continue
            }
        }
        err = fd.eofError(n, err)
        return n, err
    }
}

net网络库的fd都是以netFD结构出现,因此我们查看netFd的Read函数。它底层调用syscall.Read()函数,从上述代码可以看出,此函数是非阻塞模式,当IO缓冲区没有数据时候,返回EAGAIN。然后调用fd.pd.waitRead函数等待读取。waitRead底层调用poll_runtime_pollWait轮询等待。最终通过netpollblock函数确定IO是否就绪。

代码语言:javascript
复制
func poll_runtime_pollWait(pd *pollDesc, mode int) int {
	err := netpollcheckerr(pd, int32(mode))
	if err != 0 {
		return err
	}
	// As for now only Solaris and AIX use level-triggered IO.
	if GOOS == "solaris" || GOOS == "aix" {
		netpollarm(pd, mode)
	}
	for !netpollblock(pd, int32(mode), false) {
		err = netpollcheckerr(pd, int32(mode))
		if err != 0 {
			return err
		}
		// Can happen if timeout has fired and unblocked us,
		// but before we had a chance to run, timeout has been reset.
		// Pretend it has not happened and retry.
	}
	return 0
}

func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
    gpp := &pd.rg
    if mode == 'w' {
        gpp = &pd.wg
    }
    // set the gpp semaphore to WAIT
    for {
        old := *gpp
        if old == pdReady {
            *gpp = 0
            return true
        }
        if old != 0 {
            throw("runtime: double wait")
        }
        if atomic.Casuintptr(gpp, 0, pdWait) {
            break
        }
    }
    if waitio || netpollcheckerr(pd, mode) == 0 {
        gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
    }
    // be careful to not lose concurrent READY notification
    old := atomic.Xchguintptr(gpp, 0)
    if old > pdWait {
        throw("runtime: corrupted polldesc")
    }
    return old == pdReady
}

poll_runtime_pollWait函数循环调用了netpollblock,返回true表示IO已准备好,返回false表示超时或者发生错误。在netpollblock函数中调用了gopark函数,此函数汇编实现,具体功能是将当前的goroutine修改为等待状态,然后执行其他的goroutine。等待IO就绪后,epoll将此goroutine修改为就绪状态。等待调度器调度。

总结:GO的网络操作底层全部为非阻塞IO,具体情况为:go协程从网络读取数据,读取失败并且返回syscall.EAGAIN时,依次调用waitRead->runtime_pollWait->poll_runtime_pollWait->netpollblock->gopark将当前协程挂起,模拟出阻塞IO。等待该fd有数据后,重新唤醒该协程。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1、数据结构
  • 2、netpoll相关函数
    • 2.1 netpollinit
      • 2.2 netpollopen
        • 3.3 netpoll
        • 3、go上层网络库net的实现
          • 3.1 net.Read函数的实现:
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档