如果裸写一个goroutine pool

引言

在上文中,我说到golang的原生http server处理client的connection的时候,每个connection起一个goroutine,这是一个相当粗暴的方法。为了感受更深一点,我们来看一下go的源码。先定义一个最简单的http server如下。

func myHandler(w http.ResponseWriter, r *http.Request) {
    fmt.Fprintf(w, "Hello there!\n")
}
func main(){
    http.HandleFunc("/", myHandler)     //  设置访问路由
    log.Fatal(http.ListenAndServe(":8080", nil))
}
func myHandler(w http.ResponseWriter, r *http.Request) {
    fmt.Fprintf(w, "Hello there!\n")
}
func main(){
    http.HandleFunc("/", myHandler)     //  设置访问路由
    log.Fatal(http.ListenAndServe(":8080", nil))
}
从入口http.ListenAndServe函数跟进去。
// file: net/http/server.go
func ListenAndServe(addr string, handler Handler) error {
    server := &Server{Addr: addr, Handler: handler}
    return server.ListenAndServe()
}
func (srv *Server) ListenAndServe() error {
    addr := srv.Addr
    if addr == "" {
        addr = ":http"
    }
    ln, err := net.Listen("tcp", addr)
    if err != nil {
        return err
    }
    return srv.Serve(tcpKeepAliveListener{ln.(*net.TCPListener)})        
}
func (srv *Server) Serve(l net.Listener) error {
    defer l.Close()
    ...
    for {
        rw, e := l.Accept()
        if e != nil {
            // error handle
            return e
        }
        tempDelay = 0
        c, err := srv.newConn(rw)
        if err != nil {
            continue
        }
        c.setState(c.rwc, StateNew) // before Serve can return
        go c.serve()
    }
}
// file: net/http/server.go
func ListenAndServe(addr string, handler Handler) error {
    server := &Server{Addr: addr, Handler: handler}
    return server.ListenAndServe()
}
func (srv *Server) ListenAndServe() error {
    addr := srv.Addr
    if addr == "" {
        addr = ":http"
    }
    ln, err := net.Listen("tcp", addr)
    if err != nil {
        return err
    }
    return srv.Serve(tcpKeepAliveListener{ln.(*net.TCPListener)})        
}
func (srv *Server) Serve(l net.Listener) error {
    defer l.Close()
    ...
    for {
        rw, e := l.Accept()
        if e != nil {
            // error handle
            return e
        }
        tempDelay = 0
        c, err := srv.newConn(rw)
        if err != nil {
            continue
        }
        c.setState(c.rwc, StateNew) // before Serve can return
        go c.serve()
    }
}

首先net.Listen负责监听网络端口,rw, e := l.Accept()则从网络端口中取出TCP连接,然后go c.server()则对每一个TCP连接起一个goroutine来处理。我还说到fasthttp这个网络框架性能要比原生的net/http性能要好,其中一个原因就是因为使用了goroutine pool。那么问题来了,如果要我们自己去实现一个goroutine pool,该怎么去实现呢?我们先来实现一个最简单的。

弱鸡版

golang中的goroutine通过go来启动,goroutine资源和临时对象池不一样,不能放回去再取出来。所以goroutine应该是一直运行着的。需要的时候就运行,不需要的时候就阻塞,这样对其他的goroutine的调度影响也不是很大。而goroutine的任务可以通过channel来传递就ok了。很简单的弱鸡版本就出来了,如下。

func Gopool() {
    start := time.Now()
    wg := new(sync.WaitGroup)
    data := make(chan int, 100)
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(n int) {
            defer wg.Done()
            for _ = range data {
                fmt.Println("goroutine:", n, i)
            }
        }(i)
    }
    for i := 0; i < 10000; i++ {
        data <- i
    }
    close(data)
    wg.Wait()
    end := time.Now()
    fmt.Println(end.Sub(start))
}
func Gopool() {
    start := time.Now()
    wg := new(sync.WaitGroup)
    data := make(chan int, 100)
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(n int) {
            defer wg.Done()
            for _ = range data {
                fmt.Println("goroutine:", n, i)
            }
        }(i)
    }
    for i := 0; i < 10000; i++ {
        data <- i
    }
    close(data)
    wg.Wait()
    end := time.Now()
    fmt.Println(end.Sub(start))
}

上面的代码中还做了程序运行时间统计。作为对比,下面是一个没有使用pool的版本。

func Nopool() {
    start := time.Now()
    wg := new(sync.WaitGroup)
    for i := 0; i < 10000; i++ {
        wg.Add(1)
        go func(n int) {
            defer wg.Done()
            //fmt.Println("goroutine", n)
        }(i)
    }
    wg.Wait()
    end := time.Now()
    fmt.Println(end.Sub(start))
}
func Nopool() {
    start := time.Now()
    wg := new(sync.WaitGroup)
    for i := 0; i < 10000; i++ {
        wg.Add(1)
        go func(n int) {
            defer wg.Done()
            //fmt.Println("goroutine", n)
        }(i)
    }
    wg.Wait()
    end := time.Now()
    fmt.Println(end.Sub(start))
}

最后运行时间对比,使用了goroutine pool的代码运行时间约为没有使用pool的代码的2/3。当然这么测试还是略显粗糙了。我们下面使用reflect那篇文章里面介绍的go benchmark testing的方式测试,测试代码如下(去掉了很多无关代码)。

package pool
import (
    "sync"
    "testing"
)
func Gopool() {
    wg := new(sync.WaitGroup)
    data := make(chan int, 100)
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(n int) {
            defer wg.Done()
            for _ = range data {
            }
        }(i)
    }
    for i := 0; i < 10000; i++ {
        data <- i
    }
    close(data)
    wg.Wait()
}
func Nopool() {
    wg := new(sync.WaitGroup)
    for i := 0; i < 10000; i++ {
        wg.Add(1)
        go func(n int) {
            defer wg.Done()
        }(i)
    }
    wg.Wait()
}
func BenchmarkGopool(b *testing.B) {
    for i := 0; i < b.N; i++ {
        Gopool()
    }
}
func BenchmarkNopool(b *testing.B) {
    for i := 0; i < b.N; i++ {
        Nopool()
    }
}
package pool
import (
    "sync"
    "testing"
)
func Gopool() {
    wg := new(sync.WaitGroup)
    data := make(chan int, 100)
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(n int) {
            defer wg.Done()
            for _ = range data {
            }
        }(i)
    }
    for i := 0; i < 10000; i++ {
        data <- i
    }
    close(data)
    wg.Wait()
}
func Nopool() {
    wg := new(sync.WaitGroup)
    for i := 0; i < 10000; i++ {
        wg.Add(1)
        go func(n int) {
            defer wg.Done()
        }(i)
    }
    wg.Wait()
}
func BenchmarkGopool(b *testing.B) {
    for i := 0; i < b.N; i++ {
        Gopool()
    }
}
func BenchmarkNopool(b *testing.B) {
    for i := 0; i < b.N; i++ {
        Nopool()
    }
}

最终的测试结果如下,使用了goroutine pool的代码执行时间确实更短。

$ go test -bench='.' gopool_test.go

BenchmarkGopool-8 500 2596750 ns/op

BenchmarkNopool-8 500 3604035 ns/op

PASS

$ go test -bench='.' gopool_test.go

BenchmarkGopool-8 500 2596750 ns/op

BenchmarkNopool-8 500 3604035 ns/op

PASS

升级版

对于一个好的线程池,我们往往有更多的需求,一个最迫切的需求是能自定义goroutine运行的函数。函数无非就是函数地址和函数参数。如果要传入的函数形式不一样(形参或者返回值不一样)怎么办?一个比较简单的方法是引入反射。

type worker struct {
    Func interface{}
    Args []reflect.Value
}
func main() {
    var wg sync.WaitGroup
    channels := make(chan worker, 10)
    for i := 0; i < 5; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for ch := range channels {
                reflect.ValueOf(ch.Func).Call(ch.Args)
            }
        }()
    }
    for i := 0; i < 100; i++ {
        wk := worker{
            Func: func(x, y int) {
                fmt.Println(x + y)
            },
            Args: []reflect.Value{reflect.ValueOf(i), reflect.ValueOf(i)},
        }
        channels <- wk
    }
    close(channels)
    wg.Wait()
}
type worker struct {
    Func interface{}
    Args []reflect.Value
}
func main() {
    var wg sync.WaitGroup
    channels := make(chan worker, 10)
    for i := 0; i < 5; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for ch := range channels {
                reflect.ValueOf(ch.Func).Call(ch.Args)
            }
        }()
    }
    for i := 0; i < 100; i++ {
        wk := worker{
            Func: func(x, y int) {
                fmt.Println(x + y)
            },
            Args: []reflect.Value{reflect.ValueOf(i), reflect.ValueOf(i)},
        }
        channels <- wk
    }
    close(channels)
    wg.Wait()
}

但是引入反射又会引入性能问题。本来goroutine pool就是为了解决性能问题,然而现在又引入了新的性能问题。那么怎么办呢?闭包。

type worker struct {
    Func func()
}
func main() {
    var wg sync.WaitGroup
    channels := make(chan worker, 10)
    for i := 0; i < 5; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for ch := range channels {
                //reflect.ValueOf(ch.Func).Call(ch.Args)
                ch.Func()
            }
        }()
    }
    for i := 0; i < 100; i++ {
        j := i
        wk := worker{
            Func: func() {
                fmt.Println(j + j)
            },
        }
        channels <- wk
    }
    close(channels)
    wg.Wait()
}
type worker struct {
    Func func()
}
func main() {
    var wg sync.WaitGroup
    channels := make(chan worker, 10)
    for i := 0; i < 5; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for ch := range channels {
                //reflect.ValueOf(ch.Func).Call(ch.Args)
                ch.Func()
            }
        }()
    }
    for i := 0; i < 100; i++ {
        j := i
        wk := worker{
            Func: func() {
                fmt.Println(j + j)
            },
        }
        channels <- wk
    }
    close(channels)
    wg.Wait()
}

这里值得注意的一点是golang的闭包用不好容易把自己代入坑,而理解闭包一个很关键的点就是对对象的引用而不是复制。这里只是goroutine pool 实现的一个精简版,真正实现的时候还有很多细节需要考虑,比如设置一个stop channel用来停止pool,但是goroutine pool的核心就在于这个地方。

goroutine池和CPU核的关系

那么goroutine池里面goroutine数目和核数有没有关系呢?这个其实要分情况讨论。

1.goroutine池跑不满

这也就意味着channel data里面一有数据就会被goroutine拿走,这样的话当然只能你CPU能调度的过来就行,也就是池子里的goroutine数目和CPU核数是最优的。经测试,确实是这样。

2.channel data有数据阻塞

这意思是说goroutine是不够用的,如果goroutine的运行任务不是CPU密集型的(大部分情况都不是),而只是IO阻塞,这个时候一般goroutine数目在一定范围内是越多越好,当然范围在什么地方就要具体情况具体分析了。

原文发布于微信公众号 - Golang语言社区(Golangweb)

原文发表时间:2016-12-18

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Java成神之路

Java企业微信开发_07_JSSDK多图上传

 所有的JS接口只能在企业微信应用的可信域名下调用(包括子域名),可在企业微信的管理后台“我的应用”里设置应用可信域名。这个域名必须要通过ICP备案,不然jss...

21320
来自专栏更流畅、简洁的软件开发方式

好大一棵树,新春的祝福(二):功能节点的数据结构和页面展示

1、数据结构      在原有的基础上,把noteID改成FunctionID,去掉code字段,增加三个字段。 NoteLevel :表示第几级的节点,可...

22750
来自专栏闻道于事

Java实现word文档在线预览,读取office(word,excel,ppt)文件

10.7K70
来自专栏C#

免费高效实用的.NET操作Excel组件NPOI(.NET组件介绍之六)

  很多的软件项目几乎都包含着对文档的操作,前面已经介绍过两款操作文档的组件,现在介绍一款文档操作的组件NPOI。   NPOI可以生成没有安装在您的服务器上的...

42550
来自专栏菩提树下的杨过

Silverlight:获取ContentTemplate中的命名控件

项目开发中遇到一个要求,需要将ComboBox右侧中的小三角箭头给去掉,通过Blend工具“编辑ComboBox的模板副本”得知,这是一个名为"BtnArrow...

22480
来自专栏xingoo, 一个梦想做发明家的程序员

【插件开发】—— 9 编辑器代码分块着色-高亮显示!

前文回顾: 1 插件学习篇 2 简单的建立插件工程以及模型文件分析 3 利用扩展点,开发透视图 4 SWT编程须知 5 SWT简单控件的使用与布局搭...

29560
来自专栏菩提树下的杨过

silverlight动态读取txt文件/解析json数据/调用wcf示例

终于开始正式学习silverlight,虽然有点晚,但总算开始了,今天看了一下sdk,主要是想看下silverlight中如何动态调用数据,对于数据库的访问,s...

247100
来自专栏青蛙要fly的专栏

项目需求讨论-Vlayout来快速构建及扩展复杂界面

大家好,今天又带来了项目中具体遇到的需求。做一个首界面,该首界面有很多功能块,同时这些功能块是动态的,因为登录的人的权限的不同,会显示不同的功能块,因为功能模块...

25520
来自专栏Urahara Blog

Apache Struts2 Remote Code Execution (S2-045)

45530
来自专栏hbbliyong

WPF备忘录(3)如何从 Datagrid 中获得单元格的内容与 使用值转换器进行绑定数据的转换IValueConverter

一、如何从 Datagrid 中获得单元格的内容    DataGrid 属于一种 ItemsControl, 因此,它有 Items 属性并且用ItemCon...

39370

扫码关注云+社区

领取腾讯云代金券