不管是哪种编程语言,重新造轮子都是不好的做法。像启动多个goroutine并行处理任务并将它们的错误聚合这样的代码片段非常常见,对于这种情况,Go系统库中提供了解决该问题的方法。本文让我们深入研究它,并理解为什么它应该成为Go开发人员手中一把有力的工具。
golang.org/x是Go语言标准库的一个扩展库, 它里面有一个模块叫sync,该模块中有一个处理方便我们处理error的包叫errgroup, 详情见(https://pkg.go.dev/golang.org/x/sync@v0.0.0-20220601150217-0de741cfad7f/errgroup)。
假设我们要处理的函数(Handler)流程如下图,它从客户端(clinet)接收要处理的数据,并调用外部的服务完成业务逻辑处理。然而,由于某些限制,我们不能将接收的数据一次性全部传给外部服务完成处理,而是每次只能处理一部分数据,所以需要进行多次外部服务调用,此外,这些多次外部服务调用可以并行处理。
如果在调用外部服务的过程中出现错误,希望将这个错误返回。如果有多个服务调用出现错误,只返回其中一个即可。下面是使用标准库提供的并发原语实现的一个程序框架,代码如下:
func handler(ctx context.Context, circles []Circle) ([]Result, error) {
results := make([]Result, len(circles))
wg := sync.WaitGroup{}
wg.Add(len(results))
for i, circle := range circles {
i := i
circle := circle
go func() {
defer wg.Done()
result, err := foo(ctx, circle)
if err != nil {
// ?
}
results[i] = result
}()
}
wg.Wait()
// ...
}
上述程序使用sync.WaitGroup来等待所有的goroutine完成处理任务,并将处理得到的结果存储到切片results中,这样就把所有处理结果聚合了起来,这是一种处理方法。另一种处理方法是每个处理任务的goroutine将处理结果发送到一个通道上,该通道的接收方goroutine对结果进行聚合处理. 如果对消息的顺序有要求,处理的难点是如何重新排序传入的消息。因此,我们决定采用最简单的方法通过共享切片来实现。
「NOTE:虽然有多个goroutine操作同一个切片,如果它们写入的是切片的不同位置,这种实现没有数据竞争问题。」
但是上述程序框架还有一个关键点没有解决,如果调用foo返回错误了该怎么处理?主要有以下处理方法:
无论采用上面的哪种方法,都会使得程序逻辑变得更复杂。出于这个原因,Go扩展库中提供了errgroup用于解决这种问题。
errgroup包对外只提供了两个方法:Go和Wait。Go方法会在一个新的goroutine执行传入的处理任务。Wait会阻塞等待,直到所有的goroutine都执行完成,Wait有一个返回值类型为error.如果所有的子goroutine在处理任务时都没有产生错误,Wait返回的错误为空,如果在处理任务的时候有产生错误,则Wait返回值存储的是第一个错误值。
下面使用errgroup完善上面程序,首先通过执行下面的命令导入errgroup包。
go get golang.org/x/sync/errgroup
实现代码如下,首先通过函数传入的context创建一个 *errgroup.Group对象g。在每次循环中,调用g的Go方法执行传入的处理任务。g.Go接收一个func() error签名的函数。这里采用了闭包调用的方式使用外部的变量circle和i. 父goroutine中调用g.Wait阻塞等待所有的子goroutine执行完成。
func handler(ctx context.Context, circles []Circle) ([]Result, error) {
results := make([]Result, len(circles))
g, ctx := errgroup.WithContext(ctx)
for i, circle := range circles {
i := i
circle := circle
g.Go(func() error {
result, err := foo(ctx, circle)
if err != nil {
return err
}
results[i] = result
return nil
})
}
if err := g.Wait(); err != nil {
return nil, err
}
return results, nil
}
对比上面两种实现,可以看到采用errgroup实现方案比第一种实现更简单(注意,第一种实现方法还只是实现了部分功能,还没处理将错误值返回的逻辑)。并且我们也不需要考虑并发数据竞争冲突问题,因为errgroup.Group内部帮我们解决了。此外,使用errgroup还有一个优点是可以共享上下文。假如我们进行三个并发调用:
在我们的例子中,如果有错误产生,返回一个错误即可,不需要返回所有的错误。所以,这里进行的第二和第三个调用是没有意义的,因为第一个调用在1毫秒时已经产生了错误。使用errgroup.WithContext创建Group对象会在并行调用时共享传入的上下文。由于第一个调用在1毫秒内返回错误,它将取消上下文,从而取消其他goroutine,因此,我们不用等待后面其他goroutine在5秒后返回的错误,这也是使用errgroup的另一个优势。
「NOTE: g.Go调用的函数内部处理流程必须是上下文可以感知的,否则,取消上下文不会有效果。」
总结:当我们需要启动多个goroutine并发处理任务,同时需要记录任务处理过程是否存在错误,以及在任务处理时如果感知其他goroutine中存在错误,想取消终止处理时,可以考虑采用errgroup解决我们的问题,因为,正如前面的分析,errgroup提供了一组goroutine同步功能,也提供了错误处理和共享上下文的功能,能够减小我们处理问题的复杂度。