前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >golang-ants协程池的使用和实现

golang-ants协程池的使用和实现

原创
作者头像
小许code
发布2023-02-10 09:37:55
3.4K0
发布2023-02-10 09:37:55
举报
文章被收录于专栏:小许code小许code

golang中goroutine由运行时管理,使用go关键字就可以方便快捷的创建一个goroutine,受限于服务器硬件内存大小,如果不对goroutine数量进行限制,会出现Out of Memory错误。但是goroutine泄漏引发的血案,想必各位gopher都经历过,通过协程池限制goroutine数一个有效避免泄漏的手段,但是自己手动实现一个协程池,总是会兼顾不到各种场景,比如释放,处理panic,动态扩容等。那么ants是公认的优秀实现协程池。

ants简介

ants是一个高性能的 goroutine 池,实现了对大规模 goroutine 的调度管理、goroutine 复用,允许使用者在开发并发程序的时候限制 goroutine 数量,复用资源,达到更高效执行任务的效果

功能

  1. 自动调度海量的 goroutines,复用 goroutines
  2. 定期清理过期的 goroutines,进一步节省资源
  3. 提供了大量有用的接口:任务提交、获取运行中的 goroutine 数量、动态调整 Pool 大小、释放 Pool、重启 Pool
  4. 优雅处理 panic,防止程序崩溃
  5. 资源复用,极大节省内存使用量;在大规模批量并发任务场景下比原生 goroutine 并发具有更高的性能
  6. 非阻塞机制

1.ants库结构

学习一个库先从结构看起吧,pool、pool_func、ants初始化一个pool等操作都在这里

编辑切换为居中

ants库代码结构

  1. pool.go提供了ants.NewPool(创建协程池)、Submit(task func())提交任务
  2. pool_func.go使用NewPoolWithFunc(创建pool对象需要带具体的函数),并且使用Invoke(args interface{})进行调用,arg就是传给池函数func(interface{})的参数
  3. options.go使用函数选项模式进行参数配置
  4. ants.go给初始化默认协程池对象defaultAntsPool(默认的pool容量是math.MaxInt32)提供了公共函数

介绍完了主要的库文件后,我们进行逐个的了解,具体的使用,我们可以结合官方的使用案例进行了解,这里就不进行展开了。

2.ants中Pool创建对象

创建Pool对象需调用ants.NewPool(size, options)函数,返回一个pool的指针

先看Pool的接口,对我们创建的Pool先做个初步印象

编辑切换为居中

Pool结构

代码语言:javascript
复制
// NewPool generates an instance of ants pool.
func NewPool(size int, options ...Option) (*Pool, error) {
    opts := loadOptions(options...)

    if size <= 0 {
        size = -1
    }

    if expiry := opts.ExpiryDuration; expiry < 0 {
        return nil, ErrInvalidPoolExpiry
    } else if expiry == 0 {
        opts.ExpiryDuration = DefaultCleanIntervalTime
    }

    if opts.Logger == nil {
        opts.Logger = defaultLogger
    }

    p := &Pool{
        capacity: int32(size),
        lock:     internal.NewSpinLock(),
        options:  opts,
    }
    p.workerCache.New = func() interface{} {
        return &goWorker{
            pool: p,
            task: make(chan func(), workerChanCap),
        }
    }
    if p.options.PreAlloc {
        if size == -1 {
            return nil, ErrInvalidPreAllocSize
        }
        p.workers = newWorkerArray(loopQueueType, size)
    } else {
        p.workers = newWorkerArray(stackType, 0)
    }

    p.cond = sync.NewCond(p.lock)

    // Start a goroutine to clean up expired workers periodically.
    go p.purgePeriodically()

    return p, nil
}

ants.NewPool创建Pool过程

  1. 接收size参数作为pool的容量,如果size<=0,那么不对池子容量进行限制
  2. loadOptions对Pool的配置,比如是否阻塞模式,
  3. workerCache这个sync.Pool对象的New方法,在调用sync.Pool的Get()方法时,如果为nil,则返回workerCache.New()的结果
  4. 是否初始化Pool是进行内存预分配(size > 0),来创建不同的worker(stack、loopQueue两种模式)
  5. 使用p.lock锁创建一个条件变量
  6. 开启一个协程定期清理过期的workers

3.ants中的PoolWithFunc

ants.PoolWithFunc创建PoolWithFunc和New.Pool整体的结构很像,多了个poolFunc func(interface{})字段,也就是提交到池子的函数,然后workers的类型不一样

4.理解worker

可以查看出pool中的worker在整个流程起着很重要的作用,也就是ants中为每个任务都是由 worker 对象来处理的,每个work都会创建一个goroutine来处理任务,ants中的worker结构如下

代码语言:javascript
复制
type goWorker struct {
    //work的所属者
    pool *Pool

    //任务通道,通过这个发送给goWorker
    task chan func()

    //将work放入到队列时更新
    recycleTime time.Time
}

从ants.Pool创建对象Pool的过程第四步可以看出,通过newWorkerArray创建workers,因为workerArray是个接口,有如下方法。

代码语言:javascript
复制
type workerArray interface {
    len() int
    isEmpty() bool
    insert(worker *goWorker) error
    detach() *goWorker
    retrieveExpiry(duration time.Duration) []*goWorker
    reset()
}

通过newWorkerArray,返回实现了workerArray接口的workerStack,这里newWorkerArray其实是用了个工厂方法来实现的,根据传入的类型,并不需要知道具体实现了接口的结构体,只要实现了workerArray接口就可以返回实现者的结构体,然后调用具体的实现

5.提交任务Submit

Submit(task func())接收一个func作为参数,将task通过通道task将类型为func的函数给到goWorker,然后调用retrieveWorker返回一个可用的worker给task

代码语言:javascript
复制
func (p *Pool) retrieveWorker() (w *goWorker) {
    spawnWorker := func() {
        w = p.workerCache.Get().(*goWorker)
        w.run()
    }

    p.lock.Lock()

    w = p.workers.detach()
    if w != nil { // first try to fetch the worker from the queue
        p.lock.Unlock()
    } else if capacity := p.Cap(); capacity == -1 || capacity > p.Running() {
        // if the worker queue is empty and we don't run out of the pool capacity,
        // then just spawn a new worker goroutine.
        p.lock.Unlock()
        spawnWorker()
    } else { // otherwise, we'll have to keep them blocked and wait for at least one worker to be put back into pool.
        if p.options.Nonblocking {
            p.lock.Unlock()
            return
        }
    retry:
        if p.options.MaxBlockingTasks != 0 && p.blockingNum >= p.options.MaxBlockingTasks {
            p.lock.Unlock()
            return
        }
        p.blockingNum++
        p.cond.Wait() // block and wait for an available worker
        p.blockingNum--
        var nw int
        if nw = p.Running(); nw == 0 { // awakened by the scavenger
            p.lock.Unlock()
            if !p.IsClosed() {
                spawnWorker()
            }
            return
        }
        if w = p.workers.detach(); w == nil {
            if nw < capacity {
                p.lock.Unlock()
                spawnWorker()
                return
            }
            goto retry
        }

        p.lock.Unlock()
    }
    return
}

执行过程分析:

  1. spawnWorker是一个func,从p.workerCache这个sync.Pool获取一个goWorker对象(在New.Pool中有讲到),用sync.Locker上锁
  2. 调用p.workers.detach方法(前面提到p.workers实现了workerArray接口)
  3. 如果获取到了goWorker对象就直接返回
  4. 如果worker队列为空,并且Pool还有容量,那么调用spawnWorker,调用worker的run方法启动一个新的协程处理任务
  5. run方法的实现如下,从goWorker的channel中遍历待执行的func(),执行,并且在执行完后调用revertWorker放回workers
代码语言:javascript
复制
func (w *goWorker) run() {
    w.pool.incRunning()
    go func() {

        for f := range w.task {
            if f == nil {
                return
            }
            f()
            if ok := w.pool.revertWorker(w); !ok {
                return
            }
        }
    }()
}

6.释放和重启Pool

释放和重启Pool分别调用了Release和Reboot,这两个函数都在ants.Pool这个文件中可以找到,具体实现这里做个简单说明

  1. Release调用p.workers.reset()结束loopQueue或wokerStack中的 goroutine。都是通过发送nil到goWorker的task通道中,然后重置各个字段的值
  2. Reboot调用purgePeriodically,检测到Pool关闭了就直接退出了

7.细节

task缓冲通道

下面这个是NewPool变量workerCachesyn类型sync.Pool创建goWorker对象的代码

代码语言:javascript
复制
p.workerCache.New = func() interface{} {
        return &goWorker{
            pool: p,
            task: make(chan func(), workerChanCap),
        }
    }

workerChanCap作为容量,这个变量定义在ants.go文件中的定义如下:

代码语言:javascript
复制
// workerChanCap determines whether the channel of a worker should be a buffered channel
    // to get the best performance. Inspired by fasthttp at
    // https://github.com/valyala/fasthttp/blob/master/workerpool.go#L139
    workerChanCap = func() int {
        // Use blocking channel if GOMAXPROCS=1.
        // This switches context from sender to receiver immediately,
        // which results in higher performance (under go1.5 at least).
        if runtime.GOMAXPROCS(0) == 1 {
            return 0
        }

        // Use non-blocking workerChan if GOMAXPROCS>1,
        // since otherwise the sender might be dragged down if the receiver is CPU-bound.
        return 1
    }()

ants参考了著名的 Web框架fasthttp的实现。当GOMAXPROCS为 1时(即操作系统线程数为1),向通道task发送会挂起发送 goroutine,将执行流程转向接收goroutine,这能提升接收处理性能。如果GOMAXPROCS大于1,ants使用带缓冲的通道,为了防止接收 goroutine 是 CPU密集的,导致发送 goroutine 被阻塞。

自旋锁 SpinLock

在NewPool中lock,其实给lock初始化了一个自旋锁,这里是利用atomic.CompareAndSwapUint32()这个原子操作实现的,在加锁失败后不会等待,而是继续尝试,提高了加锁减锁的性能

在开发中刚好遇到需要ants,这次也做个记录作为分享,其实慢慢的会发现三方库的xx_test用例是最好的学习例子,希望能和大家一起知其然知其所以然,加油!

参考文档

  1. https://blog.csdn.net/darjun/article/details/117719886
  2. https://segmentfault.com/a/1190000038319941

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • ants简介
  • 功能
  • 1.ants库结构
  • 2.ants中Pool创建对象
  • 3.ants中的PoolWithFunc
  • 4.理解worker
  • 5.提交任务Submit
  • 6.释放和重启Pool
  • 7.细节
  • task缓冲通道
  • 自旋锁 SpinLock
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档