🐾 大家好,我是猫头虎博主!今天要和大家探讨Go的并发模式,尤其是管道和取消技术。在这篇博客中,我们将深入挖掘Go的并发原语如何简化数据流管道的构建,并有效利用I/O与多核CPU。我们还将探索在操作失败时应对的细节,并引入干净处理失败的技术。让我们一起深入Go的世界,探索其并发之美!🚀
Go的并发原语让构建数据流管道变得简单,能有效地利用I/O和多CPU。本文通过管道示例,强调操作失败时出现的微妙问题,并介绍如何干净地处理这些失败。
管道是连接通过通道(channel)的多个阶段(stages)的系列,每个阶段是一组运行相同函数的goroutines。它们通过inbound通道接收上游数据,处理这些数据,然后通过outbound通道发送到下游。
Go中管道的一个简单例子是数字平方。我们先定义gen
函数,它将整数列表转换为发出列表中整数的通道。然后是sq
函数,它接收整数并返回其平方的通道。
func gen(nums ...int) <-chan int { /* ... */ }
func sq(in <-chan int) <-chan int { /* ... */ }
扇出(fan-out)指多个函数可以从同一通道读取直到该通道关闭。扇入(fan-in)是通过将多个输入通道复用到一个单一通道上,然后在所有输入关闭时关闭该通道。
func merge(cs ...<-chan int) <-chan int { /* ... */ }
在现实中,管道的阶段可能不会接收所有入站值。我们需要某种方式来让早期阶段停止产生后续阶段不需要的值。
在Go中,当主函数(main
)决定在未接收所有值的情况下退出时,它必须通过一个名为done
的通道告诉上游阶段的goroutines放弃他们正在尝试发送的值。
func main() { /* ... */ }
我们考虑一个更现实的管道,用于计算给定目录下所有文件的MD5校验和。我们通过分离MD5All
为两个阶段的管道来实现。
我们通过为读取文件创建固定数量的goroutines来限制内存分配,从而实现有界的并行。
func digester(done <-chan struct{}, paths <-chan string, c chan<- result) { /* ... */ }
我们介绍了在Go中构建数据流管道的技术。处理此类管道中的失败是棘手的,因为每个阶段可能会在尝试向下游发送值时阻塞,而下游阶段可能不再关心传入的数据。我们展示了如何通过关闭通道来广播给所有由管道启动的goroutines一个“完成”信号,并定义了正确构建管道的指南。
currency-patterns)