本文基于字节开源的NetPoll版本进行讲解,对应官方文档链接为: Netpoll对应官方文档链接
netPoll底层有一个非常核心的数据结构叫LinkBuffer , 本文作为netPoll正式源码分析的前导篇 , 主要来看看netPoll底层使用到的LinkBuffer的源码实现。
我们先来看一段官方对NetPoll的定义:
NetPoll对标的其实就是java中的Netty框架 , 而对于这一类多路IO复用框架来说,他们底层实现都依赖于epoll,kqueue等底层操作系统向上提供的多路复用API ; 在多路复用模型设计中,底层epoll等API的事件触发方式会影响I/O和buffer的设计,这也是netpoll推出LinkBuffer的原因。
Linux提供的epoll有两种触发方式:
go原生网络库采用边沿触发(ET)模式,而netpoll采用水平触发(LT)模式,LT模式实效性更好,主动I/O可以集中内存使用和管理,并且还可以像netpoll这样提供nocpoy操作同时还能减少GC 。
目前一些热门开源网络库同样也是采用的LT模式,如easygo,evio和gnet等
这里主动IO是指由netpoll提供一个缓冲区,当监听到fd上的读事件时,就主动将数据读取到该缓冲区中,至于什么时候从netpoll提供的缓冲区读出数据,则是用户的事情了。
主动I/O需要网络库自身提供一个数据缓冲区,这会引入上层代码并发操作buffer的问题,同时网络库自身也需要对该缓冲区进行I/O读写,因此为了保证数据正确性,同时又避免加锁带来的低性能,目前开源的网络库通常都会采取同步处理buffer (easygo , evio) 或将 buffer copy (gent) 一份提供给上层代码的方式来实现。
已有的实现方式不适合大流量环境下的业务处理或存在copy开销,同时,常见的bytes , bufio , ringbuffer 等 buffer 库 ,均存在扩容需要拷贝原数组数据,以及只能扩容无法缩容导致占用大量内存等问题。因此,LinkBuffer的提出就是为了解决上面提出的两个问题!
相比于常见的Buffer库,LinkBuffer的优势有以下几点:
LinkBuffer 的设计思路如下图所示:
(如: arr[1:10])
, 同时对每个Node都有引用计数(切片多少次就标记多少次) , 当所有的切片均使用完释放后,用完的Node会自动回收到Node内存池。(把一个链表挂接到另一个链表的末尾)
LinkBuffer 中的 LinkBufferNode 节点结构如下:
type linkBufferNode struct {
buf []byte // 字节缓冲区
off int // 读偏移量
malloc int // 写偏移量
refer int32 // 引用计数
readonly bool // 只读节点,表示底层buf中的内存是不是自己控制的,为真表示不是,自己不能释放
origin *linkBufferNode // 当我们从某个slice中切分出其中一部分返回时,此时会用origin指针记录其原本的切片对象
next *linkBufferNode // next指针
}
LinkBufferNode的构造函数如下:
var linkedPool = sync.Pool{
New: func() interface{} {
return &linkBufferNode{
refer: 1, // 自带 1 引用
}
},
}
func newLinkBufferNode(size int) *linkBufferNode {
// 从缓冲池中拿到一个空闲的node节点
var node = linkedPool.Get().(*linkBufferNode)
// 重置节点的读写偏移量,引用计数和只读属性
node.off, node.malloc, node.refer, node.readonly = 0, 0, 1, false
// 节点大小小于等于0,表示为只读节点
if size <= 0 {
node.readonly = true
return node
}
// LinkBufferCap表示每个node节点的最小的大小
if size < LinkBufferCap {
size = LinkBufferCap
}
// 分配len(slice)=0 , len(cap)=size大小的切片
node.buf = malloc(0, size)
return node
}
// malloc 底层调用的是字节开源的mache库
func malloc(size, capacity int) []byte {
if capacity > mallocMax {
return make([]byte, size, capacity)
}
return mcache.Malloc(size, capacity)
}
LinkBufferNode 中最重要的属性便是buf了,buf 是整个网络读写最终的存储变量,这段内存是单独管理的,且大小不固定,与buf相关的操作有如下几种:
buf 内存分配有以下三种情况:
buf可读数据范围: readable = buf[off:len(buf)] (off 读指针)
buf可写数据范围: writeable = buf[len(buf):malloc]
如果node的readonly属性为true,表示底层buf中的内存不是自己控制的,不能去主动释放;Node对象readonly属性为true,有以下两种情况:
这里简单看看LinkBufferNode提供的一些常用的API实现:
// Len 剩余可读数据量
func (node *linkBufferNode) Len() (l int) {
return len(node.buf) - node.off
}
// IsEmpty 当前节点可读数据量是否为空
func (node *linkBufferNode) IsEmpty() (ok bool) {
return node.off == len(node.buf)
}
// Reset 重置节点状态
func (node *linkBufferNode) Reset() {
// 如果当前节点拥有的切片是个子切片或者当前切片的引用计数不等于1,说明当前节点不能重置
if node.origin != nil || atomic.LoadInt32(&node.refer) != 1 {
return
}
// 重置读写指针
node.off, node.malloc = 0, 0
// 重置缓冲区len大小,cap不变
node.buf = node.buf[:0]
return
}
// Next 往后读取n个字节数据,并移动读指针
// 调用方需要检查传入的长度n,确保其不超过malloc-off ,如果超过了,可能会读到buf重用产生的脏数据
func (node *linkBufferNode) Next(n int) (p []byte) {
off := node.off
node.off += n
return node.buf[off:node.off]
}
// Peek 不移动读指针,只是预览数据
func (node *linkBufferNode) Peek(n int) (p []byte) {
return node.buf[node.off : node.off+n]
}
// Malloc 申请一段内存来写入数据,在没有flush(buf:=buf[:malloc])前,不会读到这段内存
// 注意,Node上的Malloc不会真正去申请内存,Node的内存在buf创建时就已经申请好了
func (node *linkBufferNode) Malloc(n int) (buf []byte) {
malloc := node.malloc
node.malloc += n
return node.buf[malloc:node.malloc]
}
// Refer 返回一个新的Node对象,并设置origin父对象,此处指向的origin是根origin --> linkBufferNode为两级结构
// 将 [read,read+n]范围的切片切分出来,由一个新的node节点引用,同时增加当前节点的引用计数
func (node *linkBufferNode) Refer(n int) (p *linkBufferNode) {
// 创建一个只读节点
p = newLinkBufferNode(0)
// 当前节点p指向[read,read+n]范围的切片
p.buf = node.Next(n)
// 如果当前节点本身指向的也是一个子切片,这边不会形成一个树状结构,而是指向根节点
if node.origin != nil {
p.origin = node.origin
} else {
p.origin = node
}
// 增加根节点的引用计数
atomic.AddInt32(&p.origin.refer, 1)
return p
}
// Release 如果有原始节点,先释放原始节点
// 如果当前节点不存在其他引用了,重置node各属性,放回节点池等待重用
func (node *linkBufferNode) Release() (err error) {
// 如果当前节点指向的是子切片,先释放父切片
if node.origin != nil {
node.origin.Release()
}
// release self
// 递减根节点引用计数 (计数只会在根节点上递增,所以这里只关心根节点上的递减即可)
if atomic.AddInt32(&node.refer, -1) == 0 {
// readonly nodes cannot recycle node.buf, other node.buf are recycled to mcache.
// 释放根节点占用的buf空间
if !node.readonly {
free(node.buf)
}
// 将相关属性设置为null
node.buf, node.origin, node.next = nil, nil, nil
// 将node重新放回节点池中
linkedPool.Put(node)
}
return nil
}
LinkBuffer 抽象来看属于一个二维切片,如果使用传统的read/write系统调用,仅支持传入一维切片,需要反复调用才能处理完整个二维切片的数据,所以LinkBuffer这里对外提供readv/writev系统调用,用来一次性传输多个数组的数据:
// writev 包装 writev 系统调用
// writev以顺序iov[0]、iov[1]至iov[iovcnt-1]从各缓冲区中聚集输出数据到fd
func writev(fd int, bs [][]byte, ivs []syscall.Iovec) (n int, err error) {
// 将ivs[i].base 指向 bs[i] , 也就是将bs作为写缓冲区数据来源
iovLen := iovecs(bs, ivs)
if iovLen == 0 {
return 0, nil
}
// 执行writev系统调用,将ivs[i].base指针指向的缓冲区数据写入fd代表的文件中
r, _, e := syscall.RawSyscall(syscall.SYS_WRITEV, uintptr(fd), uintptr(unsafe.Pointer(&ivs[0])), uintptr(iovLen))
// 清空ivs和bs缓冲区数据
resetIovecs(bs, ivs[:iovLen])
if e != 0 {
return int(r), syscall.Errno(e)
}
// 返回成功写入的字节数量
return int(r), nil
}
// readv 包装readv系统调用 , 返回 0 或 nil 表示数据读完了
// readv则将从fd读入的数据按同样的顺序散布到各缓冲区中,readv总是先填满一个缓冲区,然后再填下一个
func readv(fd int, bs [][]byte, ivs []syscall.Iovec) (n int, err error) {
// 将ivs[i].base 指向 bs[i] , 也就是将bs作为最终接收数据的缓冲区
iovLen := iovecs(bs, ivs)
if iovLen == 0 {
return 0, nil
}
// 执行readv系统调用,将数据读取到ivs[i].base指针指向的缓冲区中
r, _, e := syscall.RawSyscall(syscall.SYS_READV, uintptr(fd), uintptr(unsafe.Pointer(&ivs[0])), uintptr(iovLen))
// 官方代码此时又执行一遍清空缓冲区操作,笔者认为这里有点小问题,也提出了相应的pr
// resetIovecs(bs, ivs[:iovLen])
if e != 0 {
return int(r), syscall.Errno(e)
}
// 返回成功读取到的字节数量
return int(r), nil
}
此处使用到了Linux相关的IO系统调用: Unix/Linux编程:分散输入和集中输出------readv() 、 writev()
关于readv函数实现bug的pr链接:
LinkBuffer 具体的数据结构如下所示:
// LinkBuffer implements ReadWriter.
type LinkBuffer struct {
length int64 // 可读数据量
mallocSize int // 已写数据量
head *linkBufferNode // release head 头结点
read *linkBufferNode // read head 读指针
flush *linkBufferNode // malloc head 写开始指针
write *linkBufferNode // malloc tail 写结束指针
caches [][]byte // buf allocated by Next when cross-package, which should be freed when release
}
这里只对Next和Slice方法展开进行讲解,其他读API,大家自行阅读源码学习即可,实现思路大同小异。
Next 函数存在两种实现场景:
// Next implements Reader.
func (b *LinkBuffer) Next(n int) (p []byte, err error) {
...
// 递减总的可读数据量
b.recalLen(-n)
// 是否需要跨节点读取
if b.isSingleNode(n) {
// 读取当前read指向节点的可读数据,同时推进当前节点上的read指针
return b.read.Next(n), nil
}
// 跨节点读取
var pIdx int
if block1k < n && n <= mallocMax {
// 要在release的时候释放
p = malloc(n, n)
b.caches = append(b.caches, p)
} else {
p = make([]byte, n)
}
var l int
for ack := n; ack > 0; ack = ack - l {
l = b.read.Len()
if l >= ack {
pIdx += copy(p[pIdx:], b.read.Next(ack))
break
} else if l > 0 {
pIdx += copy(p[pIdx:], b.read.Next(l))
}
b.read = b.read.next
}
_ = pIdx
return p, nil
}
const mallocMax = block8k * block1k
func malloc(size, capacity int) []byte {
if capacity > mallocMax {
return make([]byte, size, capacity)
}
return mcache.Malloc(size, capacity)
}
// 增加或减少b.length大小
func (b *LinkBuffer) recalLen(delta int) (length int) {
return int(atomic.AddInt64(&b.length, int64(delta)))
}
此处必须返回一维切片是因为协议层反序列化时需要组装出定义的结构体字段。
如果都是小读取,那只有小概率会触发到跨节点读取,对于大读取,还是优先考虑Slice;与Next的区别是,Slice会返回一个新的LinkBuffer,无论大小都是zero-copy,缺点是用户需要手动管理Buffer :
func (b *LinkBuffer) Slice(n int) (r Reader, err error) {
// 递减剩余可读取数据量
b.recalLen(-n)
// 创建一个新的LinkBuffer
p := &LinkBuffer{
length: int64(n),
}
defer func() {
p.flush = p.flush.next
p.write = p.flush
}()
// 如果是单节点读取,那正好zero-copy
if b.isSingleNode(n) {
// 从 Slice() 返回的 LinkBuffer 是只读的
node := b.read.Refer(n)
p.head, p.read, p.flush = node, node, node
return p, nil
}
// 如果是跨节点读取
// 先基于当前读节点给新 LinkBuffer 赋予第一个头节点
var l = b.read.Len()
node := b.read.Refer(l)
// 读指针前进一个节点
b.read = b.read.next
p.head, p.read, p.flush = node, node, node
for ack := n - l; ack > 0; ack = ack - l {
l = b.read.Len()
// 表示是新 LinkBuffer 的最后一个 Node
// 从当前读节点引用出一个需要长度的 Node
if l >= ack {
p.flush.next = b.read.Refer(ack)
p.flush = p.flush.next
break
} else if l > 0 {
// 表示需要创建一个完整大小的 Node,flush 指针前进
p.flush.next = b.read.Refer(l)
p.flush = p.flush.next
}
b.read = b.read.next
}
// b.Release() 只会 release 已读的内容,即返回的 slice 的内容
// 由于有引用计数的存在,所以底部内存并不会被回收
return p, b.Release()
}
// Malloc 预先分配一块内存,这块内存不可读,直到我们调用了Flush
func (b *LinkBuffer) Malloc(n int) (buf []byte, err error) {
if n <= 0 {
return
}
// 累加写入数据量计数
b.mallocSize += n
// 如果当前节点剩余空间不足,则进行扩容,也就是创建一个新节点挂载到链表尾部
b.growth(n)
// 分配n大小的切片空间返回
return b.write.Malloc(n), nil
}
// MallocAck 缩容操作,保留malloc api预分配的前n个字节数据,丢弃剩余的数据
func (b *LinkBuffer) MallocAck(n int) (err error) {
if n < 0 {
return fmt.Errorf("link buffer malloc ack[%d] invalid", n)
}
// 将已分配数量缩小到n
b.mallocSize = n
// 从flush节点开始定位n个byte,丢弃剩余byte
b.write = b.flush
var l int // l 代表当前节点剩余的已分配数据量
for ack := n; ack > 0; ack = ack - l {
// 计算当前节点已分配数据量
// len(b.write.buf) 表示当前node已经flush的数据量大小
l = b.write.malloc - len(b.write.buf)
// 如果当前节点已经分配出去的数据量比当前ack大,则丢弃分配的多余空间
if l >= ack {
b.write.malloc = ack + len(b.write.buf)
break
}
b.write = b.write.next
}
// 将多分配的空间全部回收
for node := b.write.next; node != nil; node = node.next {
node.off, node.malloc, node.refer, node.buf = 0, 0, 1, node.buf[:0]
}
return nil
}
// Flush 默认认为当前malloc的内容都为有效数据 , 调用该函数前,用户需要确保已经写入了Malloc的所有数据
func (b *LinkBuffer) Flush() (err error) {
b.mallocSize = 0
// FIXME: The tail node must not be larger than 8KB to prevent Out Of Memory.
if cap(b.write.buf) > pagesize {
b.write.next = newLinkBufferNode(0)
b.write = b.write.next
}
var n int
// 从flush指针指向的节点遍历到write指针指向的节点
for node := b.flush; node != b.write.next; node = node.next {
// 计算当前节点已分配数据量
delta := node.malloc - len(node.buf)
if delta > 0 {
// 累加已分配数据量计数
n += delta
// 更新buf的len大小,[0,len]区间代表当前node节点上已经flush的数据范围
node.buf = node.buf[:node.malloc]
}
}
// 移动flush指针到当前write指针指向的节点
b.flush = b.write
// n 代表总的已经malloc出去的数据量,此处让所有数据都对外可见
b.recalLen(n)
return nil
}
func (b *LinkBuffer) book(bookSize, maxSize int) (p []byte) {
// 计算当前写入节点剩余空间还有多少
l := cap(b.write.buf) - b.write.malloc
// 没有空间了,那么新创建一个LinkBufferNode , 挂载到链表尾部
if l == 0 {
l = maxSize
b.write.next = newLinkBufferNode(maxSize)
b.write = b.write.next
}
// 当前节点,剩余空间比当前需要的空间还大
if l > bookSize {
l = bookSize
}
// 分配l大小的空间
return b.write.Malloc(l)
}
与malloc区别: book 用来支持 readv/writev 这类二维切片参数的API , 此外与Malloc相比也不存在内存浪费的情况。
// bookAck 保留book预留的前n个字符,丢弃多余的book空间
func (b *LinkBuffer) bookAck(n int) (length int, err error) {
// 缩小malloc大小
b.write.malloc = n + len(b.write.buf)
// 更新len(buf) = malloc --> 更新后,数据将可以被读取到
// 和mallocAck不同的一点在于,bookAck会更新len(buf)大小,相当于调用了一次flush
b.write.buf = b.write.buf[:b.write.malloc]
b.flush = b.write
// 增加可读数量
length = b.recalLen(n)
return length, nil
}
本文带领大家详细研究了一下netpoll底层使用的LinkBuffer实现,其中还有诸多细节由于时间关系不能一一到来,这些内容大家可以自行阅读源码进行学习。
LinkBuffer 底层还使用到了字节开源的Mcache和GoPool实现,感兴趣的同学可以去了解一下;如果本篇文章有讲的错误之处,也欢迎在评论区指出或私信与我讨论。