专栏首页吴亲强的深夜食堂go并发-工作池模式
原创

go并发-工作池模式

开篇

之前写过一篇文章,它有个响亮的名字: Handling 1 Million Requests per Minute with Go。 这是国外的一个作者写的,我做了一篇说明。起的也是这个标题, 阅读量是我最好的一篇,果然文章都是靠标题出彩的.....

今天偶然看到另一篇文章(原文在文末)。两篇文章原理相似:有一批工作任务(job),通过工作池(worker-pool)的方式,达到多 worker 并发处理 job 的效果。

他们还是有很多不同的点,实现上差别也是蛮大的。

首先上一篇文章我放了一张图片,大概就是上篇整体的工作流。

  • 每个 worker 处理完任务就好,不关心结果,不对结果做进一步处理。
  • 只要请求不停止,程序就不会停止,没有控制机制,除非宕机。

这篇文章不同点在于:

首先数据会从 generate (生产数据)->并发处理数据->处理结果聚合。 图大概是这样的,

然后它可以通过 context.context 达到控制工作池停止工作的效果。

最后通过代码,你会发现它不是传统意义上的 worker-pool,后面会说明。

下图能清晰表达整体流程了。

顺便说一句,这篇文章实现的代码比 Handling 1 Million Requests per Minute with Go 的代码简单多了。

首先看 job

package wpool

import (
	"context"
)

type JobID string
type jobType string
type jobMetadata map[string]interface{}

type ExecutionFn func(ctx context.Context, args interface{}) (interface{}, error)

type JobDescriptor struct {
	ID       JobID 
	JType    jobType
	Metadata map[string]interface{}
}

type Result struct {
	Value      interface{}
	Err        error
	Descriptor JobDescriptor
}

type Job struct {
	Descriptor JobDescriptor
	ExecFn     ExecutionFn
	Args       interface{}
}

// 处理 job 逻辑,处理结果包装成 Result 结果
func (j Job) execute(ctx context.Context) Result {
	value, err := j.ExecFn(ctx, j.Args)
	if err != nil {
		return Result{
			Err:        err,
			Descriptor: j.Descriptor,
		}
	}

	return Result{
		Value:      value,
		Descriptor: j.Descriptor,
	}
}

这个可以简单过一下。最终每个 job 处理完都会包装成 Result 返回。

下面这段就是核心代码了。

package wpool

import (
	"context"
	"fmt"
	"sync"
)

// 运行中的每个worker
func worker(ctx context.Context, wg *sync.WaitGroup, jobs <-chan Job, results chan<- Result) {
	defer wg.Done()
	for {
		select {
		case job, ok := <-jobs:
			if !ok {
				return
			}
			results <- job.execute(ctx)
		case <-ctx.Done():
			fmt.Printf("cancelled worker. Error detail: %v\n", ctx.Err())
			results <- Result{
				Err: ctx.Err(),
			}
			return
		}
	}
}

type WorkerPool struct {
	workersCount int //worker 数量
	jobs         chan Job // 存储 job 的 channel 
	results      chan Result // 处理完每个 job 对应的 结果集
	Done         chan struct{} //是否结束
}

func New(wcount int) WorkerPool {
	return WorkerPool{
		workersCount: wcount,
		jobs:         make(chan Job, wcount),
		results:      make(chan Result, wcount),
		Done:         make(chan struct{}),
	}
}

func (wp WorkerPool) Run(ctx context.Context) {
	var wg sync.WaitGroup
	for i := 0; i < wp.workersCount; i++ {
		wg.Add(1)
		go worker(ctx, &wg, wp.jobs, wp.results)
	}

	wg.Wait()
	close(wp.Done)
	close(wp.results)
}

func (wp WorkerPool) Results() <-chan Result {
	return wp.results
}

func (wp WorkerPool) GenerateFrom(jobsBulk []Job) {
	for i, _ := range jobsBulk {
		wp.jobs <- jobsBulk[i]
	}
	close(wp.jobs)
}

整个 WorkerPool 结构很简单。 jobs 是一个缓冲 channel。每一个任务都会放入 jobs 中等待处理 woker 处理。

results 也是一个通道类型,它的作用是保存每个 job 处理后产生的结果 Result

首先通过 New 初始化一个 worker-pool 工作池,然后执行 Run 开始运行。

func New(wcount int) WorkerPool {
	return WorkerPool{
		workersCount: wcount,
		jobs:         make(chan Job, wcount),
		results:      make(chan Result, wcount),
		Done:         make(chan struct{}),
	}
}
func (wp WorkerPool) Run(ctx context.Context) {
	var wg sync.WaitGroup

	for i := 0; i < wp.workersCount; i++ {
		wg.Add(1)
		go worker(ctx, &wg, wp.jobs, wp.results)
	}

	wg.Wait()
	close(wp.Done)
	close(wp.results)
}

初始化的时候传入 worker 数,对应每个 g 运行 work(ctx,&wg,wp.jobs,wp.results),组成了 worker-pool。 同时通过 sync.WaitGroup,我们可以等待所有 worker 工作结束,也就意味着 work-pool 结束工作,当然可能是因为任务处理结束,也可能是被停止了。

每个 job 数据源是如何来的?

// job数据源,把每个 job 放入到 jobs channel 中
func (wp WorkerPool) GenerateFrom(jobsBulk []Job) {
	for i, _ := range jobsBulk {
		wp.jobs <- jobsBulk[i]
	}
	close(wp.jobs)
}

对应每个 worker 的工作,

func worker(ctx context.Context, wg *sync.WaitGroup, jobs <-chan Job, results chan<- Result) {
	defer wg.Done()
	for {
		select {
		case job, ok := <-jobs:
			if !ok {
				return
			}
			results <- job.execute(ctx)
		case <-ctx.Done():
			fmt.Printf("cancelled worker. Error detail: %v\n", ctx.Err())
			results <- Result{
				Err: ctx.Err(),
			}
			return
		}
	}
}

每个 worker 都尝试从同一个 jobs 获取数据,这是一个典型的 fan-out 模式。 当对应的 g 获取到 job 进行处理后,会把处理结果发送到同一个 results channel 中,这又是一个 fan-in 模式。 当然我们通过 context.Context 可以对每个 worker 做停止运行控制。

最后是处理结果集合,

// 处理结果集
func (wp WorkerPool) Results() <-chan Result {
	return wp.results
}

那么整体的测试代码就是:

func TestWorkerPool(t *testing.T) {
	wp := New(workerCount)

	ctx, cancel := context.WithCancel(context.TODO())
	defer cancel()

	go wp.GenerateFrom(testJobs())

	go wp.Run(ctx)

	for {
		select {
		case r, ok := <-wp.Results():
			if !ok {
				continue
			}

			i, err := strconv.ParseInt(string(r.Descriptor.ID), 10, 64)
			if err != nil {
				t.Fatalf("unexpected error: %v", err)
			}

			val := r.Value.(int)
			if val != int(i)*2 {
				t.Fatalf("wrong value %v; expected %v", val, int(i)*2)
			}
		case <-wp.Done:
			return
		default:
		}
	}
}

看了代码之后,我们知道,这并不是一个传统意义的 worker-pool。它并不像 Handling 1 Million Requests per Minute with Go 这篇文章一样, 初始化一个真正的 worker-pool,一旦接收到 job,就尝试从池中获取一个 worker, 把对应的 job 交给这个 work 进行处理,等 work 处理完毕,重新进行到工作池中,等待下一次被利用。

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 【go语言】Goroutines 并发模式(一)

    摘要 这一篇主要是对GO语言中的并发编程模式做一个粗略的归纳总结,文中示例参考自golang conference中的一些演讲和博客,go涉及到的Go语言的语法...

    李海彬
  • 【go语言】Goroutines 并发模式(二)

    摘要 接上一篇博客,主要是对GO语言中的并发编程模式做一个粗略的归纳总结,文中示例参考自golang conference中的一些演讲和博客,go涉及到的Go语...

    李海彬
  • Go by Example 中文:工作池

    在这个例子中,我们将看到如何使用 Go 协程和通道实现一个工作池 。 对应的源代码如下:

    ccf19881030
  • Go并发模式:管道与取消

    关键字:Go语言,管道,取消机制,并发,sync.WaitGroup,包引用,通道,defer,select GO并发模式:管道与取消 简介 Go的并发能...

    文彬
  • 并发组件 | Go设计模式实战

    之前文章《代码组件 | Go设计模式实战》已经介绍了「组合模式」的概念,以及在业务中的使用。今天我们结合Go语言天生的并发特性,升级「组合模式」为「并发组合模式...

    用户1093396
  • Go并发之CSP并发模型、协程并发

    CSP 即通信顺序进程、交谈循序程序,又被译为交换消息的循序程序(communicating sequential processes),它是一种用来描述并发性...

    Regan Yue
  • Go 常见并发模式实现(三):通过无缓冲通道创建协程池

    上篇教程学院君给大家演示了如何通过缓冲通道实现共享资源池,今天,我们来看另一个并发模式的 Go 语言实现 —— 通过无缓冲通道实现协程(goroutine)池。

    学院君
  • Go 常见并发模式实现(二):通过缓冲通道实现共享资源池

    今天这篇教程我们继续演示常见并发模式的 Go 语言实现 —— 通过缓冲通道(channel)实现共享资源池。

    学院君
  • 用好工作池 WaitGroup

    WaitGroup 用于实现工作池,因此要理解工作池,我们首先需要学习 WaitGroup。

    酷走天涯
  • [Go] golang无缓冲通道实现工作池控制并发

    展示如何使用无缓冲的通道创建一个goroutine池,控制并发频率 1.无缓冲通道保证了两个goroutine之间的数据交换 2.当所有的goroutine都忙...

    陶士涵
  • 并发模式

    并发模式并不是一种函数的运用、亦或者实际存在的东西。他是前人对于并发场景的运用总结与经验。他与23中设计模式一样。好啦,话不多说。开干

    PayneWu
  • golang 多协程的同步方法总结

    之前用 go 写一个小工具的时候, 用到了多个协程之间的通信, 当时随手查了查, 结果查出来一大坨, 简单记录一下. golang中多个协程之间是如何进行通信及...

    烟草的香味
  • 干货 | Go开发中,如何有效控制Goroutine的并发数量

    洋洋,携程高级安全研发工程师,擅长Python、Golang开发,负责安全工具研发。

    携程技术
  • golang插件化方案

    业务线的活动,每一次新活动都做独立项目开发,有大量重复代码,并且浪费数据服务的连接资源;排序服务也许要经常添加业务代码,目前是停服务发布……这些场景为了开发维护...

    李海彬
  • golang插件化方案

    业务线的活动,每一次新活动都做独立项目开发,有大量重复代码,并且浪费数据服务的连接资源;排序服务也许要经常添加业务代码,目前是停服务发布……这些场景为了开发维护...

    李海彬
  • 盘点Golang并发那些事儿之一

    Golang、Golang、Golang 真的够浪,今天我们一起盘点一下Golang并发那些事儿,准确来说是goroutine,关于多线程并发,咱们暂时先放一放...

    PayneWu
  • Go语言 | 从并发模式看channel使用技巧

    最近重看MinIO的源代码,发现纠删码模式下读取数据盘的时候,使用了更简单的并发读取方式,以前看的时候没发现,查了下Git历史记录,发现是19年新改的,新的使用...

    飞雪无情
  • 性能测试工具的并发模式

    大家所熟悉的性能测试工具有Loadrunner、JMeter,以及其他小众一些的工具,如Locust、Ngrinder、Gatling等等,那么你们知道这些工具...

    smooth00
  • 由Go语言并发模型想到游戏服务器并发

    这段时间看了一些Go语言相关的东西,发现Go语言的最大特性并发模型类似于C++里面的线程池,正好我们项目服务器也是用的线程池,记录下。   Go语言的并发单位是...

    李海彬

扫码关注云+社区

领取腾讯云代金券