专栏首页吴亲强的深夜食堂另一种思维实现一个 work-pool
原创

另一种思维实现一个 work-pool

开篇

之前写过一篇文章,它有个响亮的名字: Handling 1 Million Requests per Minute with Go。 这是国外的一个作者写的,我做了一篇说明。起的也是这个标题, 阅读量是我最好的一篇,果然文章都是靠标题出彩的.....

今天偶然看到另一篇文章(原文在文末)。两篇文章原理相似:有一批工作任务(job),通过工作池(worker-pool)的方式,达到多 worker 并发处理 job 的效果。

他们还是有很多不同的点,实现上差别也是蛮大的。

首先上一篇文章我放了一张图片,大概就是上篇整体的工作流。

  • 每个 worker 处理完任务就好,不关心结果,不对结果做进一步处理。
  • 只要请求不停止,程序就不会停止,没有控制机制,除非宕机。

这篇文章不同点在于:

首先数据会从 generate (生产数据)->并发处理数据->处理结果聚合。 图大概是这样的,

然后它可以通过 context.context 达到控制工作池停止工作的效果。

最后通过代码,你会发现它不是传统意义上的 worker-pool,后面会说明。

下图能清晰表达整体流程了。

顺便说一句,这篇文章实现的代码比 Handling 1 Million Requests per Minute with Go 的代码简单多了。

首先看 job

package wpool

import (
	"context"
)

type JobID string
type jobType string
type jobMetadata map[string]interface{}

type ExecutionFn func(ctx context.Context, args interface{}) (interface{}, error)

type JobDescriptor struct {
	ID       JobID 
	JType    jobType
	Metadata map[string]interface{}
}

type Result struct {
	Value      interface{}
	Err        error
	Descriptor JobDescriptor
}

type Job struct {
	Descriptor JobDescriptor
	ExecFn     ExecutionFn
	Args       interface{}
}

// 处理 job 逻辑,处理结果包装成 Result 结果
func (j Job) execute(ctx context.Context) Result {
	value, err := j.ExecFn(ctx, j.Args)
	if err != nil {
		return Result{
			Err:        err,
			Descriptor: j.Descriptor,
		}
	}

	return Result{
		Value:      value,
		Descriptor: j.Descriptor,
	}
}
复制代码

这个可以简单过一下。最终每个 job 处理完都会包装成 Result 返回。

下面这段就是核心代码了。

package wpool

import (
	"context"
	"fmt"
	"sync"
)

// 运行中的每个worker
func worker(ctx context.Context, wg *sync.WaitGroup, jobs <-chan Job, results chan<- Result) {
	defer wg.Done()
	for {
		select {
		case job, ok := <-jobs:
			if !ok {
				return
			}
			results <- job.execute(ctx)
		case <-ctx.Done():
			fmt.Printf("cancelled worker. Error detail: %v\n", ctx.Err())
			results <- Result{
				Err: ctx.Err(),
			}
			return
		}
	}
}

type WorkerPool struct {
	workersCount int //worker 数量
	jobs         chan Job // 存储 job 的 channel 
	results      chan Result // 处理完每个 job 对应的 结果集
	Done         chan struct{} //是否结束
}

func New(wcount int) WorkerPool {
	return WorkerPool{
		workersCount: wcount,
		jobs:         make(chan Job, wcount),
		results:      make(chan Result, wcount),
		Done:         make(chan struct{}),
	}
}

func (wp WorkerPool) Run(ctx context.Context) {
	var wg sync.WaitGroup
	for i := 0; i < wp.workersCount; i++ {
		wg.Add(1)
		go worker(ctx, &wg, wp.jobs, wp.results)
	}

	wg.Wait()
	close(wp.Done)
	close(wp.results)
}

func (wp WorkerPool) Results() <-chan Result {
	return wp.results
}

func (wp WorkerPool) GenerateFrom(jobsBulk []Job) {
	for i, _ := range jobsBulk {
		wp.jobs <- jobsBulk[i]
	}
	close(wp.jobs)
}

复制代码

整个 WorkerPool 结构很简单。 jobs 是一个缓冲 channel。每一个任务都会放入 jobs 中等待处理 woker 处理。

results 也是一个通道类型,它的作用是保存每个 job 处理后产生的结果 Result

首先通过 New 初始化一个 worker-pool 工作池,然后执行 Run 开始运行。

func New(wcount int) WorkerPool {
	return WorkerPool{
		workersCount: wcount,
		jobs:         make(chan Job, wcount),
		results:      make(chan Result, wcount),
		Done:         make(chan struct{}),
	}
}
func (wp WorkerPool) Run(ctx context.Context) {
	var wg sync.WaitGroup

	for i := 0; i < wp.workersCount; i++ {
		wg.Add(1)
		go worker(ctx, &wg, wp.jobs, wp.results)
	}

	wg.Wait()
	close(wp.Done)
	close(wp.results)
}
复制代码

初始化的时候传入 worker 数,对应每个 g 运行 work(ctx,&wg,wp.jobs,wp.results),组成了 worker-pool。 同时通过 sync.WaitGroup,我们可以等待所有 worker 工作结束,也就意味着 work-pool 结束工作,当然可能是因为任务处理结束,也可能是被停止了。

每个 job 数据源是如何来的?

// job数据源,把每个 job 放入到 jobs channel 中
func (wp WorkerPool) GenerateFrom(jobsBulk []Job) {
	for i, _ := range jobsBulk {
		wp.jobs <- jobsBulk[i]
	}
	close(wp.jobs)
}
复制代码

对应每个 worker 的工作,

func worker(ctx context.Context, wg *sync.WaitGroup, jobs <-chan Job, results chan<- Result) {
	defer wg.Done()
	for {
		select {
		case job, ok := <-jobs:
			if !ok {
				return
			}
			results <- job.execute(ctx)
		case <-ctx.Done():
			fmt.Printf("cancelled worker. Error detail: %v\n", ctx.Err())
			results <- Result{
				Err: ctx.Err(),
			}
			return
		}
	}
}

每个 worker 都尝试从同一个 jobs 获取数据,这是一个典型的 fan-out 模式。 当对应的 g 获取到 job 进行处理后,会把处理结果发送到同一个 results channel 中,这又是一个 fan-in 模式。 当然我们通过 context.Context 可以对每个 worker 做停止运行控制。

最后是处理结果集合,

// 处理结果集
func (wp WorkerPool) Results() <-chan Result {
	return wp.results
}
复制代码

那么整体的测试代码就是:

func TestWorkerPool(t *testing.T) {
	wp := New(workerCount)

	ctx, cancel := context.WithCancel(context.TODO())
	defer cancel()

	go wp.GenerateFrom(testJobs())

	go wp.Run(ctx)

	for {
		select {
		case r, ok := <-wp.Results():
			if !ok {
				continue
			}

			i, err := strconv.ParseInt(string(r.Descriptor.ID), 10, 64)
			if err != nil {
				t.Fatalf("unexpected error: %v", err)
			}

			val := r.Value.(int)
			if val != int(i)*2 {
				t.Fatalf("wrong value %v; expected %v", val, int(i)*2)
			}
		case <-wp.Done:
			return
		default:
		}
	}
}
复制代码

看了代码之后,我们知道,这并不是一个传统意义的 worker-pool。它并不像 Handling 1 Million Requests per Minute with Go 这篇文章一样, 初始化一个真正的 worker-pool,一旦接收到 job,就尝试从池中获取一个 worker, 把对应的 job 交给这个 work 进行处理,等 work 处理完毕,重新进行到工作池中,等待下一次被利用。

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • if-else的另一种实现

    在日常开发中,经常会需要监听某种数据的新增、删除、修改。根据不同类型做处理,通常处理:

    OPice
  • FutureTask——另一种闭锁的实现

    关于多线程,我们接触对多的,最基础,入门的可能就是实现Runnable接口继承Thead类,因为Java单继承的原因,通常建议是实现Runnable接口。但这种...

    用户1148394
  • 郭健: currency Managed Workqueue(CMWQ)概述

    一种新的机制出现的原因往往是为了解决实际的问题,虽然linux kernel中已经提供了workqueue的机制,那么为何还要引入cmwq呢?也就是说:旧的wo...

    Linux阅码场
  • Go 常见并发模式实现(三):通过无缓冲通道创建协程池

    上篇教程学院君给大家演示了如何通过缓冲通道实现共享资源池,今天,我们来看另一个并发模式的 Go 语言实现 —— 通过无缓冲通道实现协程(goroutine)池。

    学院君
  • 用一个栈实现另一个栈的排序

    有一个待排序的栈,现在想将该栈从顶到底按照从大到小的顺序排序,只允许申请一个栈。除此之外,可以申请新的变量,但不能申请新的数据结构。

    HelloVass
  • 一起用golang之Go程序的套路

    系统性地介绍golang基础的资料实在太多了,这里不再一一赘述。本文的思路是从另一个角度来由浅入深地探究下Go程序的套路。毕竟纸上得来终觉浅,所以,能动手就不要...

    李海彬
  • 物理刚体挖洞!另一种实现!

    在 物理挖洞-优化篇 和 物理挖洞-实现篇 中介绍了一种用多边形链条组件(cc.PhysicsChainCollider)实现物理挖洞的方法。这次打算用多边形碰...

    白玉无冰
  • strlen的另一种实现,可以作为ShellCode

    在实际工作中会遇到很多strlen. 这里针对strlen函数做一下代码还原. 并且讲解其原理

    IBinary
  • 并发模式

    并发模式并不是一种函数的运用、亦或者实际存在的东西。他是前人对于并发场景的运用总结与经验。他与23中设计模式一样。好啦,话不多说。开干

    PayneWu
  • Android侧滑删除另一种实现,SwipeListView补充

    前不久在在做聊天删除功能的时候使用SwipeListView进行侧滑删除有一点小问题,因为SwipeListView嵌套在Fragment内的时候,会报一个转换...

    xiangzhihong
  • Android侧滑删除另一种实现,SwipeListView补充

    前不久在在做聊天删除功能的时候使用SwipeListView进行侧滑删除有一点小问题,因为SwipeListView嵌套在Fragment内的时候,会报一个转换...

    xiangzhihong
  • 用node.js实现ORM的一种思路

      ORM是O和R的映射。O代表面向对象,R代表关系型数据库。二者有相似之处同时也各有特色。就是因为这种即是又非的情况,才需要做映射的。   理想情况是,根据关...

    用户1174620
  • java中的fork join框架

    fork join框架是java 7中引入框架,这个框架的引入主要是为了提升并行计算的能力。

    程序那些事
  • common-pool2 学习:thrift连接池的另一种实现

    对象池是一种很实用的技术,经典的例子就是数据库连接池。去年曾经从零开始写过一个thrift客户端连接池。如果不想重造轮子,可以直接在apache开源项目comm...

    菩提树下的杨过
  • ​高性能分布式锁的另一种实现:Redisson

    以往在项目中涉及到分布式锁时,都是结合redisTemplate采用类原生的方式编写,代码量不少,还容易出现锁死的情况,近来无意间在看到某篇文章中发现了redi...

    MavenTalker
  • Mysql的qps高DB随时可能挂掉时的处理方法

    使用Mysql中如果CPU在95%及以上,Qps突然增到2万以上,这时Mysql随时有死去风险。

    杨漆
  • 并行执行任务的ForkJoin框架简介

    从JDK1.7开始,Java提供Fork/Join框架用于并行执行任务,它的思想就是讲一个大任务分割成若干小任务,最终汇总每个小任务的结果得到这个大任务的结果。

    一个会写诗的程序员
  • 实现智能的一个思考。

    首先通过视觉对外部世界达到语义认识,然后在语义认识的基础之上进行思维,进行其他认识活动的构建。

    用户1908973
  • Python多进程编程

    阅读目录 1. Process 2. Lock 3. Semaphore 4. Event 5. Queue 6. Pipe 7. Pool 序. multi...

    小小科

扫码关注云+社区

领取腾讯云代金券