前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Go 语言调度(三): 并发

Go 语言调度(三): 并发

作者头像
李海彬
发布2019-05-08 11:14:38
5940
发布2019-05-08 11:14:38
举报
文章被收录于专栏:Golang语言社区Golang语言社区

作者:达菲格 来源:简书

介绍

当我解决问题时,尤其是新问题,我不会一上来就想着能不能使用并发来处理。我会首先想出一个顺序执行的解决方案。然后在可读性和技术评审后,我会开始考虑并发处理能不能让问题得到更快的解决。有时很显然并发是很合适的,但有时候并不那么明显。

第一篇文章,我讲解了系统调度器的机制和原理,我相信这对写一个多线程代码是重要的。第二篇文章我讲解了 Go 语言调度器的机制,对如何 Go 语言写出并发代码是重要的。这篇文章,我会开始把操作系统和 Go 调度器统一起来,提供深入理解下什么并发,什么不是。 这篇文章有两个目的:

1、提供几个在考虑你的服务是否适合用并发来解决时,需要思考的关键点。 2、像你展示不同类型的工作,需要的工程决策也是不同的。

什么是并发

并发意味着不按顺序执行。给定一组指令,可以按顺序执行,也可以找一种方式不按顺序执行,但仍能得到同样的结果。对于你眼前的问题,显然乱序执行能够增加价值。我所说的价值,意思是以复杂性为成本,而得到足够的性能增益。关键还是要看你的问题,它可能无法或甚至无法进行无序执行。

理解并发并不等于并行也是重要的。并行意味着 2 个或者 2 个以上的指令可以在同一时间一起执行。这和并发的概念不同。只有你的机器有至少 2 个 hardware threads 才能使你的程序达到并行效果。

并发和并行

上图中,你看到有两个逻辑处理器(P),每个都分配了一个独立的系统线程(M),线程被关联到了独立的 Core 上。你可以看到 2 个 Goroutine 正在并行执行,他们同时在不同的 Core 上执行各自的指令。对于每一个逻辑处理器,3 个 Groutine 正在轮流共享他们的系统线程。所有的 Goroutine 正在并发执行,没有按照特定的顺序执行他们的指令。

这里有个难点,就是有时候在没有并行能力机器上使用并发,实际上会降低你的性能。更有趣的是,有时候利用并行来达并发效果,并不会如你期望的那样得到更好的性能。

工作负荷

首先,你最好是先搞懂你的问题属于哪种负荷类型。是 CPU 密集型的还是 IO 密集型的。之前的文章有描述,这里不再重复。

CPU 密集型的工作你需要通过并行来达到并发。单个 hardware thread 处理多个 Goroutine 并不高效,因为这些 Goroutine 不会进入阻塞状态。使用比机器 hardware thread 数量更多的 Goroutine 会减慢工作,因为把 Goroutine 不断从系统线程上调来调去也是有成本。上下文切换会触发 Stop The World(简称STW)事件,因为在切换过程中你的工作不会被执行。

对于 IO 密集型的工作,你不需要并行来保证并发。单个 hardware thread 可以处理高效的处理多个 Goroutine,因为这些 Goroutine 会自动进出于阻塞态。拥有比 hardware thread 更多的 Goroutine 可以加速工作的完成,因为让 Goroutine 从系统线程上调来调去不会触发 STW 事件。你的工作会自动暂停,这允许其他不同的 Goroutine 使用同一个 hardware thread 来更高效的完成工作,而不是让它处于空闲状态。

你如何能知道,一个 hardware thread 上跑多少个 Goroutine 才能得到最大程度的吞吐量呢?太少的 Goroutine 导致你有太多的空闲资源,太多的 Goroutine 导致你有太多的延迟时间。这是您需要考虑的,但是超出了这篇文章的范围。

是时候该看一些代码了,可以巩固一下你判断工作适不适合并发,是否需要并行的能力。

加法

我们不需要复杂的代码来搞懂这个机制。看下面的 add 函数,功能就是对一组数字求和。

代码语言:javascript
复制
1func add(numbers []int) int {
2    var v int
3    for _, n := range numbers {
4        v += n
5    }
6    return v
7}

问题:这个 add 函数可不可以乱序执行?答案可以的。一个数字集合可以被拆成更小的一些集合,这些小集合是可以并发处理的。一旦小集合求和完了,所有的结果再相加求和,能得到同样的答案。

但是,还有另外一个问题。应该把集合拆成多小,才能让速度最快呢? 为了回答这个问题你就必须知道 add 属于哪种工作。 add 方法属于 CPU 密集型,因为算法就是不断执行数学运算,不会导致 Goroutine 进入阻塞态。这意味着每一个 hardware thread 跑一个 Goroutine 会让速度最快。

下面有一个并发版本的 add 。

代码语言:javascript
复制
 144 func addConcurrent(goroutines int, numbers []int) int {
 245     var v int64
 346     totalNumbers := len(numbers)
 447     lastGoroutine := goroutines - 1
 548     stride := totalNumbers / goroutines
 649
 750     var wg sync.WaitGroup
 851     wg.Add(goroutines)
 952
1053     for g := 0; g < goroutines; g++ {
1154         go func(g int) {
1255             start := g * stride
1356             end := start + stride
1457             if g == lastGoroutine {
1558                 end = totalNumbers
1659             }
1760
1861             var lv int
1962             for _, n := range numbers[start:end] {
2063                 lv += n
2164             }
2265
2366             atomic.AddInt64(&v, int64(lv))
2467             wg.Done()
2568         }(g)
2669     }
2770
2871     wg.Wait()
2972
3073     return int(v)
3174 }

上面代码中, addConcurrent 函数就是并发版本的 add 方法。解释下这里面比较重要的几行代码。

48行:每个 Goroutine 获得一个独立的但是更小的数字集合进行相加。每个集合的大小是总集合的大小除以 Goroutine 的数量。

53行: 一堆 Goroutine 开始执行加法运算。

57-59行:最后一个 Goroutine 要把剩下的所有数字相加,这有可能使得它的集合要比其他集合大。

66行: 将小集合的求和结果,加在一起得到最终求和结果。

并发版本的实现,要比顺序版本的更复杂,但这到底值不值呢?最好的办法就是做压测。压测时我使用一千万数字的集合,并关闭掉 GC。

代码语言:javascript
复制
 1func BenchmarkSequential(b *testing.B) {
 2    for i := 0; i < b.N; i++ {
 3        add(numbers)
 4    }
 5}
 6
 7func BenchmarkConcurrent(b *testing.B) {
 8    for i := 0; i < b.N; i++ {
 9        addConcurrent(runtime.NumCPU(), numbers)
10    }
11}

上面代码就是压测函数。这里我只分配一个 CPU Core 给程序。顺序版本的使用 1 个 Goroutine,并发版本的使用 runtie.NumCPU() 个(也就是 8 个)Goroutine。这种情况下并发版本因为无法使用并行来进行并发。

代码语言:javascript
复制
 110 Million Numbers using 8 goroutines with 1 core
 22.9 GHz Intel 4 Core i7
 3Concurrency WITHOUT Parallelism
 4-----------------------------------------------------------------------------
 5$ GOGC=off go test -cpu 1 -run none -bench . -benchtime 3s
 6goos: darwin
 7goarch: amd64
 8pkg: github.com/ardanlabs/gotraining/topics/go/testing/benchmarks/cpu-bound
 9BenchmarkSequential             1000       5720764 ns/op : ~10% Faster
10BenchmarkConcurrent             1000       6387344 ns/op
11BenchmarkSequentialAgain        1000       5614666 ns/op : ~13% Faster
12BenchmarkConcurrentAgain        1000       6482612 ns/op

上面的压测结果显示了,在只有一个 CPU Core 的情况下,顺序版本的要比并发版本的快 10%~13%。这与我估计的一样,因为并发版本的那几个 Goroutine 在同一个 CPU Core 上不断的做切换。

下面我们用多核进行在此测试。顺序版本还是只使用 1 个核,而并发版本使用 8 个核。这种情况下,并发版本可以同步并行来并发。

代码语言:javascript
复制
 110 Million Numbers using 8 goroutines with 8 cores
 22.9 GHz Intel 4 Core i7
 3Concurrency WITH Parallelism
 4-----------------------------------------------------------------------------
 5$ GOGC=off go test -cpu 8 -run none -bench . -benchtime 3s
 6goos: darwin
 7goarch: amd64
 8pkg: github.com/ardanlabs/gotraining/topics/go/testing/benchmarks/cpu-bound
 9BenchmarkSequential-8               1000       5910799 ns/op
10BenchmarkConcurrent-8               2000       3362643 ns/op : ~43% Faster
11BenchmarkSequentialAgain-8          1000       5933444 ns/op
12BenchmarkConcurrentAgain-8          2000       3477253 ns/op : ~41% Faster

上面的压测结果显示了,在多核的情况下并发版本要比顺序版本快 41%~43%。这也是符合预期的,因为所有的 Goroutine 都是并行执行的,8 个 Goroutine 是同时一起工作的。

排序

我们一定要知道,不是所有的 CPU密集型工作都可以使用并发完成的。尤其是在把工作拆分再把结果合并所带来的开销很大时。比如冒泡排序。看看下面冒泡排序的实现。

代码语言:javascript
复制
 101 package main
 202
 303 import "fmt"
 404
 505 func bubbleSort(numbers []int) {
 606     n := len(numbers)
 707     for i := 0; i < n; i++ {
 808         if !sweep(numbers, i) {
 909             return
1010         }
1111     }
1212 }
1313
1414 func sweep(numbers []int, currentPass int) bool {
1515     var idx int
1616     idxNext := idx + 1
1717     n := len(numbers)
1818     var swap bool
1919
2020     for idxNext < (n - currentPass) {
2121         a := numbers[idx]
2222         b := numbers[idxNext]
2323         if a > b {
2424             numbers[idx] = b
2525             numbers[idxNext] = a
2626             swap = true
2727         }
2828         idx++
2929         idxNext = idx + 1
3030     }
3131     return swap
3232 }
3333
3434 func main() {
3535     org := []int{1, 3, 2, 4, 8, 6, 7, 2, 3, 0}
3636     fmt.Println(org)
3737
3838     bubbleSort(org)
3939     fmt.Println(org)
4040 }

冒泡排序通过将数组中数字交换位置来达到排序的效果。在某些情况中,有些数字可能要交换好几次。

问题: bubboSort 函数是否可以乱序执行?答案是不可以。这组数字是可以拆成一堆小数组并并发的执行排序。但是,当所有并发工作完成后,没有一个高效的方法将这些小结果合并成最终的结果。我们来看一下并发版本的冒泡排序。

代码语言:javascript
复制
 101 func bubbleSortConcurrent(goroutines int, numbers []int) {
 202     totalNumbers := len(numbers)
 303     lastGoroutine := goroutines - 1
 404     stride := totalNumbers / goroutines
 505
 606     var wg sync.WaitGroup
 707     wg.Add(goroutines)
 808
 909     for g := 0; g < goroutines; g++ {
1010         go func(g int) {
1111             start := g * stride
1212             end := start + stride
1313             if g == lastGoroutine {
1414                 end = totalNumbers
1515             }
1616
1717             bubbleSort(numbers[start:end])
1818             wg.Done()
1919         }(g)
2020     }
2121
2222     wg.Wait()
2323
2424     // Ugh, we have to sort the entire list again.
2525     bubbleSort(numbers)
2626 }

bubbleSortConcurrent 函数是 bubbleSort 的并发实现。它使用多个 Goroutine 并发的对数组自己进行排序。但是,你最后在结果合并时,相当于把整个数组又重新排了一遍。

读者可以思考下归并排序适不适合并发,最好亲自动手测试一下

读文件

上面举了 2 个 CPU 密集型例子,但是 IO 密集型的呢?在 Goroutine 自然的不断频繁的进出阻塞态时,情况有什么不同?我们看一个 IO 密集的例子,就是读取一些文件然后执行查找操作。

第一个版本是顺序方式实现,函数名是 find

代码语言:javascript
复制
 142 func find(topic string, docs []string) int {
 243     var found int
 344     for _, doc := range docs {
 445         items, err := read(doc)
 546         if err != nil {
 647             continue
 748         }
 849         for _, item := range items {
 950             if strings.Contains(item.Description, topic) {
1051                 found++
1152             }
1253         }
1354     }
1455     return found
1556 }

上面代码就是 find 方法的实现。功能就是从一组字符串中找到

下面是 read 函数的实现

代码语言:javascript
复制
133 func read(doc string) ([]item, error) {
234     time.Sleep(time.Millisecond) // Simulate blocking disk read.
335     var d document
436     if err := xml.Unmarshal([]byte(file), &d); err != nil {
537         return nil, err
638     }
739     return d.Channel.Items, nil
840 }

read 函数在使用了 time.Sleep 把自己挂起 1 毫秒。这个主要是为了模拟 IO 操作延迟。因为你实际去读文件,Goroutine 也是一样会被挂起一段时间。这 1 毫秒的延迟对于后面的压测结果至关重要。

下面是并发版本的实现。

代码语言:javascript
复制
 158 func 
 2findConcurrent(goroutines int, topic string, docs []string) int {
 359     var found int64
 460
 561     ch := make(chan string, len(docs))
 662     for _, doc := range docs {
 763         ch <- doc
 864     }
 965     close(ch)
1066
1167     var wg sync.WaitGroup
1268     wg.Add(goroutines)
1369
1470     for g := 0; g < goroutines; g++ {
1571         go func() {
1672             var lFound int64
1773             for doc := range ch {
1874                 items, err := read(doc)
1975                 if err != nil {
2076                     continue
2177                 }
2278                 for _, item := range items {
2379                     if strings.Contains(item.Description, topic) {
2480                         lFound++
2581                     }
2682                 }
2783             }
2884             atomic.AddInt64(&found, lFound)
2985             wg.Done()
3086         }()
3187     }
3288
3389     wg.Wait()
3490
3591     return int(found)
3692 }

上面代码中 findConcurrent 函数就是 find 函数的并发实现版本。并发版本中使用适量的 Goroutine 完成不定数量的文档查询。代码太多,这里只解释几个重要的地方。

61-64行:创建了一个 Channel,用来发送文档。

65行: 关闭了 channel,这样当所有文档都处理完了后,所有的 Goroutine 就都自动退出了。

70行: Goroutine 被创建

73-83行:每个 Goroutine 从 Channel 中获取文档,把文档读到内存并查找 topic。当找到了,将本地变量 lFound 加一。

84行:每个 Goroutine 都将自己查找到的文档个数,加到全局变量 found 上。

下面我使用一千个文档进行压测,并关闭 GC。

代码语言:javascript
复制
 1func BenchmarkSequential(b *testing.B) {
 2    for i := 0; i < b.N; i++ {
 3        find("test", docs)
 4    }
 5}
 6
 7func BenchmarkConcurrent(b *testing.B) {
 8    for i := 0; i < b.N; i++ {
 9        findConcurrent(runtime.NumCPU(), "test", docs)
10    }
11}

上面是压测代码。下面是仅使用 1 个 CPU Core 的压测结果。此时并发版本也只能使用一个 CPU Core 来执行 8 个 Goroutine。

代码语言:javascript
复制
 110 Thousand Documents using 8 goroutines with 1 core
 22.9 GHz Intel 4 Core i7
 3Concurrency WITHOUT Parallelism
 4-----------------------------------------------------------------------------
 5$ GOGC=off go test -cpu 1 -run none -bench . -benchtime 3s
 6goos: darwin
 7goarch: amd64
 8pkg: github.com/ardanlabs/gotraining/topics/go/testing/benchmarks/io-bound
 9BenchmarkSequential                3    1483458120 ns/op
10BenchmarkConcurrent               20     188941855 ns/op : ~87% Faster
11BenchmarkSequentialAgain           2    1502682536 ns/op
12BenchmarkConcurrentAgain          20     184037843 ns/op : ~88% Faster

发现在单核的情况下。并发版本要比顺序版本快 87%~88%。这和我预期一致。因为所有的 Goroutine 共用一个 CPU Core,而 Goroutine 在调用 read 时候会自动进入阻塞态,这时会将 CPU Core 出让给其他 Goroutine 使用,这使得这个 CPU Core 更加繁忙。

下面看下使用多核进行压测。

代码语言:javascript
复制
 110 Thousand Documents using 8 goroutines with 1 core
 22.9 GHz Intel 4 Core i7
 3Concurrency WITH Parallelism
 4-----------------------------------------------------------------------------
 5$ GOGC=off go test -run none -bench . -benchtime 3s
 6goos: darwin
 7goarch: amd64
 8pkg: github.com/ardanlabs/gotraining/topics/go/testing/benchmarks/io-bound
 9BenchmarkSequential-8                  3    1490947198 ns/op
10BenchmarkConcurrent-8                 20     187382200 ns/op : ~88% Faster
11BenchmarkSequentialAgain-8             3    1416126029 ns/op
12BenchmarkConcurrentAgain-8            20     185965460 ns/op : ~87% Faster

这个压测结果说明,更多的 CPU Core 并不会对性能有多大的提升。

结论

这篇文章的目标就是用具体的例子像你说明,你一定要考虑你的场景到底适不适合并发。这样才能做出更好的工程决策。 你现在清晰的看到了,对于 IO 密集型的工作,并行对提升性能没有多大的帮助,这与 CPU 密集型的正好相反。但是并不是所有 CPU 密集型的工作都适合并发的,比如冒泡排序。搞明白你所应对的场景的特点非常重要。


版权申明:内容来源网络,版权归原创者所有。除非无法确认,我们都会标明作者及出处,如有侵权烦请告知,我们会立即删除并表示歉意。谢谢。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-03-28,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Golang语言社区 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 介绍
  • 什么是并发
  • 工作负荷
  • 加法
  • 排序
  • 读文件
  • 结论
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档