前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >用设计模式包装协程池

用设计模式包装协程池

作者头像
运维开发王义杰
发布2023-08-10 18:47:46
1340
发布2023-08-10 18:47:46
举报

协程池是一种常见的并发编程模式,它可以在多个协程之间共享一组固定数量的协程,以避免创建过多的协程导致系统资源耗尽。在 Go 语言中,协程池通常使用 sync.WaitGroup 和 chan 类型来实现。

在本文中,我们将介绍一种用户设计模式,即封装协程池。该模式可以将协程池的实现细节隐藏在一个简单的接口后面,使用户可以轻松地使用协程池而不必了解其内部实现。

实现协程池

首先,我们定义一个 workerFunc 类型,它表示一个可以在协程池中运行的函数。然后,我们定义一个 Pool 类型,它包含一个 workers 通道和一个 limit 通道。workers 通道用于存储要运行的函数,limit 通道用于限制协程池中的协程数量。

代码语言:javascript
复制
type workerFunc func() error
type Pool struct {
    workers chan workerFunc
    wg      sync.WaitGroup
    limit   chan struct{}
}

接下来,我们定义一个 NewPool 函数,它接受一个整数参数 size,表示协程池中的协程数量。NewPool 函数返回一个指向 Pool 类型的指针。

代码语言:javascript
复制
func NewPool(size int) *Pool {
    return &Pool{
        workers: make(chan workerFunc),
        limit:   make(chan struct{}, size),
    }
}

然后,我们定义一个 Add 方法,它接受一个 context.Context 类型的参数 ctx 和一个 workerFunc 类型的参数 fn。Add 方法将 fn 函数添加到 workers 通道中,如果 ctx 被取消,则返回 ctx.Err()。

代码语言:javascript
复制
func (p *Pool) Add(ctx context.Context, fn workerFunc) error {
    select {
    case <-ctx.Done():
        return ctx.Err()
    case p.workers <- fn:
        return nil
    }
}

接下来,我们定义一个 Run 方法,它接受一个 context.Context 类型的参数 ctx。Run 方法从 workers 通道中读取函数并运行它们。在运行函数之前,Run 方法会从 limit 通道中获取一个空结构体,以限制协程池中的协程数量。在函数运行完成后,Run 方法会将空结构体放回 limit 通道中,并使用 sync.WaitGroup 等待所有函数完成。

代码语言:javascript
复制
func (p *Pool) Run(ctx context.Context) {
    for fn := range p.workers {
        p.limit <- struct{}{}
        p.wg.Add(1)
        go func(fn workerFunc) {
            defer func() {
                <-p.limit
                p.wg.Done()
            }()
            if err := fn(); err != nil {
                // handle error
                fmt.Printf("err: %v\n", err)
            }
        }(fn)
    }
    p.wg.Wait()
}

最后,我们定义一个 Stop 方法,它关闭 workers 通道并等待所有函数完成。

代码语言:javascript
复制
func (p *Pool) Stop() {
    close(p.workers)
    p.wg.Wait()
}

完整协程池代码:

代码语言:javascript
复制
package gopool

import (
  "context"
  "fmt"
  "sync"
)

type workerFunc func() error

type Pool struct {
  workers chan workerFunc
  wg      sync.WaitGroup
  limit   chan struct{}
}

func NewPool(size int) *Pool {
  return &Pool{
    workers: make(chan workerFunc),
    limit:   make(chan struct{}, size),
  }
}

func (p *Pool) Add(ctx context.Context, fn workerFunc) error {
  select {
  case <-ctx.Done():
    return ctx.Err()
  case p.workers <- fn:
    return nil
  }
}

func (p *Pool) Run(ctx context.Context) {
  for fn := range p.workers {
    p.limit <- struct{}{}
    p.wg.Add(1)
    go func(fn workerFunc) {
      defer func() {
        <-p.limit
        p.wg.Done()
      }()
      if err := fn(); err != nil {
        // handle error
        fmt.Printf("err: %v\n", err)
      }
    }(fn)
  }
  p.wg.Wait()
}

func (p *Pool) Stop() {
  close(p.workers)
  p.wg.Wait()
}

现在,我们已经完成了协程池的封装。用户可以使用以下代码来创建一个协程池并运行函数:

代码语言:javascript
复制
pool := NewPool(10)
defer pool.Stop()

for i := 0; i < 100; i++ {
    err := pool.Add(context.Background(), func() error {
        // do some work
        return nil
    })
    if err != nil {
        // handle error
    }
}

pool.Run(context.Background())

在上面的代码中,我们创建了一个大小为 10 的协程池,并向其中添加了 100 个函数。然后,我们调用 Run 方法来运行这些函数。在函数运行完成后,我们调用 Stop 方法来关闭协程池。

通过封装协程池,我们可以将协程池的实现细节隐藏在一个简单的接口后面,使用户可以轻松地使用协程池而不必了解其内部实现。这种用户设计模式可以提高代码的可读性和可维护性,并使代码更易于重用。

使用普通工厂模式封装

代码语言:javascript
复制
package gopool

import "context"

type PoolFactory struct {
  size int
  pool *Pool // 存储 *Pool
}

func NewPoolFactory(size int) *PoolFactory {
  return &PoolFactory{
    size: size,
    pool: &Pool{ // 初始化 *Pool
      workers: make(chan workerFunc),
      limit:   make(chan struct{}, size),
    },
  }
}

func (f *PoolFactory) getPool() *Pool {
  return f.pool // 直接返回存储的 *Pool
}

func (f *PoolFactory) Add(ctx context.Context, fn workerFunc) error {
  pool := f.getPool()
  select {
  case <-ctx.Done():
    return ctx.Err()
  case pool.workers <- fn:
    return nil
  }
}

func (f *PoolFactory) Stop(ctx context.Context) error {
  pool := f.getPool()
  close(pool.workers)
  for i := 0; i < f.size; i++ {
    select {
    case <-ctx.Done():
      return ctx.Err()
    case pool.limit <- struct{}{}:
    }
  }
  return nil
}

func (f *PoolFactory) Run(ctx context.Context) {
  pool := f.getPool()
  pool.Run(ctx)
}

使用工厂方法模式封装

代码语言:javascript
复制
package gopool

import "context"

type PoolFactory interface {
  Add(ctx context.Context, fn workerFunc) error
  Stop(ctx context.Context) error
  Run(ctx context.Context)
}

type poolFactory struct {
  size int
  pool *Pool // 存储 *Pool
}

func NewPoolFactory(size int) PoolFactory {
  return &poolFactory{
    size: size,
    pool: &Pool{ // 初始化 *Pool
      workers: make(chan workerFunc),
      limit:   make(chan struct{}, size),
    },
  }
}

func (f *poolFactory) Add(ctx context.Context, fn workerFunc) error {
  pool := f.pool
  select {
  case <-ctx.Done():
    return ctx.Err()
  case pool.workers <- fn:
    return nil
  }
}

func (f *poolFactory) Stop(ctx context.Context) error {
  pool := f.pool
  close(pool.workers)
  for i := 0; i < f.size; i++ {
    select {
    case <-ctx.Done():
      return ctx.Err()
    case pool.limit <- struct{}{}:
    }
  }
  return nil
}

func (f *poolFactory) Run(ctx context.Context) {
  pool := f.pool
  pool.Run(ctx)
}

抽象工厂模式封装

代码语言:javascript
复制
package gopool

import (
  "fmt"
  "sync"
)
type PoolFactory interface {
  CreatePool(size int) Pool
}

type poolFactory struct{}

func NewPoolFactory() PoolFactory {
  return &poolFactory{}
}

func (pf *poolFactory) CreatePool(size int) Pool {
  return NewPool(size)
}
type workerFunc func() error

type Pool interface {
  Add(fn workerFunc)
  Run()
  Stop()
}

type pool struct {
  workers chan workerFunc
  wg      sync.WaitGroup
  limit   chan struct{}
}

func NewPool(size int) Pool {
  return &pool{
    workers: make(chan workerFunc, size),
    limit:   make(chan struct{}, size),
  }
}

func (p *pool) Add(fn workerFunc) {
  p.workers <- fn
}

func (p *pool) Run() {
  for fn := range p.workers {
    p.limit <- struct{}{}
    p.wg.Add(1)
    go func(fn workerFunc) {
      defer func() {
        <-p.limit
        p.wg.Done()
      }()
      if err := fn(); err != nil {
        // handle error
        fmt.Printf("err: %v\n", err)
      }
    }(fn)
  }
  p.wg.Wait()
}

func (p *pool) Stop() {
  close(p.workers)
  p.wg.Wait()
}

更多请看仓库:https://github.com/xilu0/gopool.git

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2023-07-24,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 运维开发王义杰 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
对象存储
对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档