首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

Go 系列教程——23.缓冲信道和工作池

首发于:https://studygolang.com/articles/12512

欢迎来到 Golang 系列教程的第 23 篇。

什么是缓冲信道?

在上一教程里,我们讨论的主要是无缓冲信道。我们在信道的教程里详细讨论了,无缓冲信道的发送和接收过程是阻塞的。

我们还可以创建一个有缓冲(Buffer)的信道。只在缓冲已满的情况,才会阻塞向缓冲信道(Buffered Channel)发送数据。同样,只有在缓冲为空的时候,才会阻塞从缓冲信道接收数据。

通过向 函数再传递一个表示容量的参数(指定缓冲的大小),可以创建缓冲信道。

要让一个信道有缓冲,上面语法中的 应该大于 0。无缓冲信道的容量默认为 0,因此我们在上一教程创建信道时,省略了容量参数。

我们开始编写代码,创建一个缓冲信道。

示例一

在线运行程序

在上面程序里的第 9 行,我们创建了一个缓冲信道,其容量为 2。由于该信道的容量为 2,因此可向它写入两个字符串,而且不会发生阻塞。在第 10 行和第 11 行,我们向信道写入两个字符串,该信道并没有发生阻塞。我们又在第 12 行和第 13 行分别读取了这两个字符串。该程序输出:

示例二

我们再看一个缓冲信道的示例,其中有一个并发的 Go 协程来向信道写入数据,而 Go 主协程负责读取数据。该示例帮助我们进一步理解,在向缓冲信道写入数据时,什么时候会发生阻塞。

在线运行程序

在上面的程序中,第 16 行在 Go 主协程中创建了容量为 2 的缓冲信道 ,而第 17 行把 传递给了 协程。接下来 Go 主协程休眠了两秒。在这期间, 协程在并发地运行。 协程有一个 for 循环,依次向信道 写入 0~4。而缓冲信道的容量为 2,因此 协程里立即会向 写入 0 和 1,接下来发生阻塞,直到 内的值被读取。因此,该程序立即打印出下面两行:

打印上面两行之后, 协程中向 的写入发生了阻塞,直到 有值被读取到。而 Go 主协程休眠了两秒后,才开始读取该信道,因此在休眠期间程序不会打印任何结果。主协程结束休眠后,在第 19 行使用 for range 循环,开始读取信道 ,打印出了读取到的值后又休眠两秒,这个循环一直到 关闭才结束。所以该程序在两秒后会打印下面两行:

该过程会一直进行,直到信道读取完所有的值,并在 协程中关闭信道。最终输出如下:

死锁

在线运行程序

在上面程序里,我们向容量为 2 的缓冲信道写入 3 个字符串。当在程序控制到达第 3 次写入时(第 11 行),由于它超出了信道的容量,因此这次写入发生了阻塞。现在想要这次写操作能够进行下去,必须要有其它协程来读取这个信道的数据。但在本例中,并没有并发协程来读取这个信道,因此这里会发生死锁(deadlock)。程序会在运行时触发 panic,信息如下:

长度 vs 容量

缓冲信道的容量是指信道可以存储的值的数量。我们在使用 函数创建缓冲信道的时候会指定容量大小。

缓冲信道的长度是指信道中当前排队的元素个数。

代码可以把一切解释得很清楚。:)

在线运行程序

在上面的程序里,我们创建了一个容量为 3 的信道,于是它可以保存 3 个字符串。接下来,我们分别在第 9 行和第 10 行向信道写入了两个字符串。于是信道有两个字符串排队,因此其长度为 2。在第 13 行,我们又从信道读取了一个字符串。现在该信道内只有一个字符串,因此其长度变为 1。该程序会输出:

WaitGroup

在本教程的下一节里,我们会讲到工作池(Worker Pools)。而 用于实现工作池,因此要理解工作池,我们首先需要学习 。

用于等待一批 Go 协程执行结束。程序控制会一直阻塞,直到这些协程全部执行完毕。假设我们有 3 个并发执行的 Go 协程(由 Go 主协程生成)。Go 主协程需要等待这 3 个协程执行结束后,才会终止。这就可以用 来实现。

理论说完了,我们编写点儿代码吧。:)

在线运行程序

WaitGroup 是一个结构体类型,我们在第 18 行创建了 类型的变量,其初始值为零值。 使用计数器来工作。当我们调用 的 并传递一个 时, 的计数器会加上 的传参。要减少计数器,可以调用 的 方法。 方法会阻塞调用它的 Go 协程,直到计数器变为 0 后才会停止阻塞。

上述程序里,for 循环迭代了 3 次,我们在循环内调用了 (第 20 行)。因此计数器变为 3。for 循环同样创建了 3 个 协程,然后在第 23 行调用了 ,确保 Go 主协程等待计数器变为 0。在第 13 行, 协程内调用了 ,可以让计数器递减。一旦 3 个子协程都执行完毕(即 调用了 3 次),那么计数器就变为 0,于是主协程会解除阻塞。

在第 21 行里,传递 的地址是很重要的。如果没有传递 的地址,那么每个 Go 协程将会得到一个 值的拷贝,因而当它们执行结束时, 函数并不会知道

该程序输出:

由于 Go 协程的执行顺序不一定,因此你的输出可能和我不一样。:)

工作池的实现

缓冲信道的重要应用之一就是实现工作池。

一般而言,工作池就是一组等待任务分配的线程。一旦完成了所分配的任务,这些线程可继续等待任务的分配。

我们会使用缓冲信道来实现工作池。我们工作池的任务是计算所输入数字的每一位的和。例如,如果输入 234,结果会是 9(即 2 + 3 + 4)。向工作池输入的是一列伪随机数。

我们工作池的核心功能如下:

创建一个 Go 协程池,监听一个等待作业分配的输入型缓冲信道。

将作业添加到该输入型缓冲信道中。

作业完成后,再将结果写入一个输出型缓冲信道。

从输出型缓冲信道读取并打印结果。

我们会逐步编写这个程序,让代码易于理解。

第一步就是创建一个结构体,表示作业和结果。

所有 结构体变量都会有 和 两个字段, 用于计算其每位数之和。

而 结构体有一个 字段,表示所对应的作业,还有一个 字段,表示计算的结果(每位数字之和)。

第二步是分别创建用于接收作业和写入结果的缓冲信道。

工作协程(Worker Goroutine)会监听缓冲信道 里更新的作业。一旦工作协程完成了作业,其结果会写入缓冲信道 。

如下所示, 函数的任务实际上就是计算整数的每一位之和,最后返回该结果。为了模拟出 在计算过程中花费了一段时间,我们在函数内添加了两秒的休眠时间。

然后,我们写一个创建工作协程的函数。

上面的函数创建了一个工作者(Worker),读取 信道的数据,根据当前的 和 函数的返回值,创建了一个 结构体变量,然后将结果写入 缓冲信道。 函数接收了一个 类型的 作为参数,当所有的 完成的时候,调用了 方法。

函数创建了一个 Go 协程的工作池。

上面函数的参数是需要创建的工作协程的数量。在创建 Go 协程之前,它调用了 方法,于是 计数器递增。接下来,我们创建工作协程,并向 函数传递 的地址。创建了需要的工作协程后,函数调用 ,等待所有的 Go 协程执行完毕。所有协程完成执行之后,函数会关闭 信道。因为所有协程都已经执行完毕,于是不再需要向 信道写入数据了。

现在我们已经有了工作池,我们继续编写一个函数,把作业分配给工作者。

上面的 函数接收所需创建的作业数量作为输入参数,生成了最大值为 998 的伪随机数,并使用该随机数创建了 结构体变量。这个函数把 for 循环的计数器 作为 id,最后把创建的结构体变量写入 信道。当写入所有的 时,它关闭了 信道。

下一步是创建一个读取 信道和打印输出的函数。

函数读取 信道,并打印出 的 、输入的随机数、该随机数的每位数之和。 函数也接受 信道作为参数,当打印所有结果时, 会被写入 true。

现在一切准备充分了。我们继续完成最后一步,在 函数中调用上面所有的函数。

我们首先在 函数的第 2 行,保存了程序的起始时间,并在最后一行(第 12 行)计算了 和 的差值,显示出程序运行的总时间。由于我们想要通过改变协程数量,来做一点基准指标(Benchmark),所以需要这么做。

我们把 设置为 100,接下来调用了 ,向 信道添加作业。

我们创建了 信道,并将其传递给 协程。于是该协程会开始打印结果,并在完成打印时发出通知。

通过调用 函数,我们最终创建了一个有 10 个协程的工作池。 函数会监听 信道的通知,等待所有结果打印结束。

为了便于参考,下面是整个程序。我还引用了必要的包。

在线运行程序

为了更精确地计算总时间,请在你的本地机器上运行该程序。

该程序输出:

程序总共会打印 100 行,对应着 100 项作业,然后最后会打印一行程序消耗的总时间。你的输出会和我的不同,因为 Go 协程的运行顺序不一定,同样总时间也会因为硬件而不同。在我的例子中,运行程序大约花费了 20 秒。

现在我们可以理解了,随着工作协程数量增加,完成作业的总时间会减少。你们可以练习一下:在 函数里修改 和 的值,并试着去分析一下结果。

本教程到此结束。祝你愉快。

  • 发表于:
  • 原文链接https://kuaibao.qq.com/s/20180611G0HCV800?refer=cp_1026
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券