前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Go 协程池

Go 协程池

作者头像
王小明_HIT
发布2023-03-01 15:58:16
6840
发布2023-03-01 15:58:16
举报
文章被收录于专栏:程序员奇点程序员奇点

协程

Goroutine 是 Golang 提供的一种轻量级线程,我们通常称之为「协程」,相比较线程,创建一个协程的成本是很低的。所以你会经常看到 Golang 开发的应用出现上千个协程并发的场景。

协程池

高并发场景下,会启动大量协程进行业务处理,此时如果使用协程池可以复用对象,减少协程池内存分配的效率与创建协程池点创建开销,提高协程的执行效率。字节官方开源了gopkg库提供的 gopool 协程池实现。

协程池实现原理

线程池设计

代码语言:javascript
复制
type pool struct {
   // pool 的名字,打 metrics 和打 log 时用到
   name string
   // pool 的容量,也就是最大的真正在工作的 goroutine 的数量
   // 为了性能考虑,可能会有小误差
   cap int32
   // 配置信息
   config *Config
   // 任务链表
   taskHead  *task
   taskTail  *task
   taskLock  sync.Mutex
   taskCount int32
 
   // 记录正在运行的 worker 数量
   workerCount int32
 
   // 用来标记是否关闭
   closed int32
 
   // worker panic 的时候会调用这个方法
   panicHandler func(context.Context, interface{})
 }

gopool最核心的方法

代码语言:javascript
复制
func (p *pool) CtxGo(ctx context.Context, f func()) {
   // 首先是一个对象池,避免task对象反复分配
   t := taskPool.Get().(*task)
   // 拿到的对象做一些变量的设置
   t.ctx = ctx
   t.f = f
   
   // 这里的加上task锁,把生成的task绑定到task链表上
   p.taskLock.Lock()
   if p.taskHead == nil {
     p.taskHead = t
     p.taskTail = t
   } else {
     p.taskTail.next = t
     p.taskTail = t
   }
   p.taskLock.Unlock()
   // 解锁之后做一些统计信息变更
   atomic.AddInt32(&p.taskCount, 1)
   
   // 这个地方的逻辑是不是放到一开头判断一下也可以,
   // 或者上下都判断一下;
   // 这里写在下面可能考虑到上面的lock和taskpool的get可能会有一点时间的等待,
   // 所以如果我来写可能方法的入口和这里都会判断一下吧。不过这里也不是什么重点问题。

   // 如果 pool 已经被关闭了,就 panic
   if atomic.LoadInt32(&p.closed) == 1 {
     panic("use closed pool")
   }
   
   // 下面的注释作者的意图都已经说明了,
   // 解决的问题就是协程池goroutine需不需要加的问题;
   // 以及协程池是不是啥都没有的问题
 
   // 满足以下两个条件:
   // 1. task 数量大于阈值
   // 2. 目前的 worker 数量小于上限 p.cap
   // 或者目前没有 worker
   if (atomic.LoadInt32(&p.taskCount) >= p.config.ScaleThreshold && p.WorkerCount() < atomic.LoadInt32(&p.cap)) || p.WorkerCount() == 0 {
     p.incWorkerCount() // 加一个工作协程
     w := workerPool.Get().(*worker) // 对象池的优化
     w.pool = p
     w.run() // 让worker跑起来。
   }
 }

worker跑起来是如何执行的

代码语言:javascript
复制
 func (w *worker) run() {
   // 通过goroutine异步执行
   go func() {
     for {
       //select {
       //case <-w.stopChan:
       // w.close()
       // return
       //default:
       
       // 这里是从task池中获取一个task
       var t *task
       w.pool.taskLock.Lock()
       if w.pool.taskHead != nil {
         t = w.pool.taskHead
         w.pool.taskHead = w.pool.taskHead.next
         atomic.AddInt32(&w.pool.taskCount, -1)
       }
       if t == nil {
         // 如果没有任务要做了,就释放资源,退出
         w.close()
         w.pool.taskLock.Unlock()
         w.Recycle()
         return
       }
       w.pool.taskLock.Unlock()
       // 这里就是成功的获取一个任务,然后准备开始执行这个任务
       func() {
         // 防止任务panic做的一些打点逻辑
         defer func() {
           if r := recover(); r != nil {
             logs.CtxFatal(t.ctx, "GOPOOL: panic in pool: %s: %v: %s", w.pool.name, r, debug.Stack())
             if w.pool.config.EnablePanicMetrics {
               panicMetricsClient.EmitCounter(panicKey, 1, metrics.T{Name: "pool", Value: w.pool.name})
             }
             // 这里如果没有设置panicHandler可能会有空指针
             w.pool.panicHandler(t.ctx, r)
           }
         }()
         // 执行函数
         t.f()
       }()
       // 这个task已经做完了,回收对象
       t.Recycle()
       //}
     }
   }()
 }
  1. 来一个任务,放到任务链表中,使用锁控制保证并发安全
  2. 如果 task 数量大于阈值且当前的 worker 数量小于上限 p.cap 或者目前没有 worker,那么新建一个工作协程来执行任务。
  3. 任务执行过程中,使用 for 循环不断遍历 task 链表,如果链表不为空,则从链表中拿任务执行。链表为空,协程关闭,工作协程数减一。
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-09-10,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 程序员奇点 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 协程
  • 协程池
  • 协程池实现原理
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档