如何优雅地关闭Go channel

作者:天唯 链接:https://www.jianshu.com/p/d24dfbb33781 來源:简书

本文译自:How To Close Channels in Golang Elegantly。 几天前,我写了一篇文章来说明golang中channel的使用规范。在reddit和HN,那篇文章收到了很多赞同,但是我也收到了下面几个关于Go channel设计和规范的批评:

  1. 在不能更改channel状态的情况下,没有简单普遍的方式来检查channel是否已经关闭了
  2. 关闭已经关闭的channel会导致panic,所以在closer(关闭者)不知道channel是否已经关闭的情况下去关闭channel是很危险的
  3. 发送值到已经关闭的channel会导致panic,所以如果sender(发送者)在不知道channel是否已经关闭的情况下去向channel发送值是很危险的

那些批评看起来都很有道理(实际上并没有)。是的,没有一个内置函数可以检查一个channel是否已经关闭。如果你能确定不会向channel发送任何值,那么也确实需要一个简单的方法来检查channel是否已经关闭:

 1package main
 2
 3import "fmt"
 4
 5type T int
 6
 7func IsClosed(ch <-chan T) bool {
 8    select {
 9    case <-ch:
10        return true
11    default:
12    }
13
14    return false
15}
16
17func main() {
18    c := make(chan T)
19    fmt.Println(IsClosed(c)) // false
20    close(c)
21    fmt.Println(IsClosed(c)) // true
22}

上面已经提到了,没有一种适用的方式来检查channel是否已经关闭了。但是,就算有一个简单的 closed(chan T) bool函数来检查channel是否已经关闭,它的用处还是很有限的,就像内置的len函数用来检查缓冲channel中元素数量一样。原因就在于,已经检查过的channel的状态有可能在调用了类似的方法返回之后就修改了,因此返回来的值已经不能够反映刚才检查的channel的当前状态了。 尽管在调用closed(ch)返回true的情况下停止向channel发送值是可以的,但是如果调用closed(ch)返回false,那么关闭channel或者继续向channel发送值就不安全了(会panic)。

The Channel Closing Principle

在使用Go channel的时候,一个适用的原则是不要从接收端关闭channel,也不要关闭有多个并发发送者的channel。换句话说,如果sender(发送者)只是唯一的sender或者是channel最后一个活跃的sender,那么你应该在sender的goroutine关闭channel,从而通知receiver(s)(接收者们)已经没有值可以读了。维持这条原则将保证永远不会发生向一个已经关闭的channel发送值或者关闭一个已经关闭的channel。 (下面,我们将会称上面的原则为channel closing principle

打破channel closing principle的解决方案

如果你因为某种原因从接收端(receiver side)关闭channel或者在多个发送者中的一个关闭channel,那么你应该使用列在Golang panic/recover Use Cases的函数来安全地发送值到channel中(假设channel的元素类型是T)

 1func SafeSend(ch chan T, value T) (closed bool) {
 2    defer func() {
 3        if recover() != nil {
 4            // the return result can be altered 
 5            // in a defer function call
 6            closed = true
 7        }
 8    }()
 9
10    ch <- value // panic if ch is closed
11    return false // <=> closed = false; return
12}

如果channel ch没有被关闭的话,那么这个函数的性能将和ch <- value接近。对于channel关闭的时候,SafeSend函数只会在每个sender goroutine中调用一次,因此程序不会有太大的性能损失。 同样的想法也可以用在从多个goroutine关闭channel中:

 1func SafeClose(ch chan T) (justClosed bool) {
 2    defer func() {
 3        if recover() != nil {
 4            justClosed = false
 5        }
 6    }()
 7
 8    // assume ch != nil here.
 9    close(ch) // panic if ch is closed
10    return true
11}

很多人喜欢用sync.Once来关闭channel:

 1type MyChannel struct {
 2    C    chan T
 3    once sync.Once
 4}
 5
 6func NewMyChannel() *MyChannel {
 7    return &MyChannel{C: make(chan T)}
 8}
 9
10func (mc *MyChannel) SafeClose() {
11    mc.once.Do(func(){
12        close(mc.C)
13    })
14}

当然了,我们也可以用sync.Mutex来避免多次关闭channel:

 1type MyChannel struct {
 2    C      chan T
 3    closed bool
 4    mutex  sync.Mutex
 5}
 6
 7func NewMyChannel() *MyChannel {
 8    return &MyChannel{C: make(chan T)}
 9}
10
11func (mc *MyChannel) SafeClose() {
12    mc.mutex.Lock()
13    if !mc.closed {
14        close(mc.C)
15        mc.closed = true
16    }
17    mc.mutex.Unlock()
18}
19
20func (mc *MyChannel) IsClosed() bool {
21    mc.mutex.Lock()
22    defer mc.mutex.Unlock()
23    return mc.closed
24}

我们应该要理解为什么Go不支持内置SafeSendSafeClose函数,原因就在于并不推荐从接收端或者多个并发发送端关闭channel。Golang甚至禁止关闭只接收(receive-only)的channel。

保持channel closing principle的优雅方案

上面的SaveSend函数有一个缺点是,在select语句的case关键字后不能作为发送操作被调用(译者注:类似于 case SafeSend(ch, t):)。另外一个缺点是,很多人,包括我自己都觉得上面通过使用panic/recoversync包的方案不够优雅。针对各种场景,下面介绍不用使用panic/recoversync包,纯粹是利用channel的解决方案。 (在下面的例子总,sync.WaitGroup只是用来让例子完整的。它的使用在实践中不一定一直都有用)

  • M个receivers,一个sender,sender通过关闭data channel说“不再发送” 这是最简单的场景了,就只是当sender不想再发送的时候让sender关闭data 来关闭channel:
 1package main
 2
 3import (
 4    "time"
 5    "math/rand"
 6    "sync"
 7    "log"
 8)
 9
10func main() {
11    rand.Seed(time.Now().UnixNano())
12    log.SetFlags(0)
13
14    // ...
15    const MaxRandomNumber = 100000
16    const NumReceivers = 100
17
18    wgReceivers := sync.WaitGroup{}
19    wgReceivers.Add(NumReceivers)
20
21    // ...
22    dataCh := make(chan int, 100)
23
24    // the sender
25    go func() {
26        for {
27            if value := rand.Intn(MaxRandomNumber); value == 0 {
28                // the only sender can close the channel safely.
29                close(dataCh)
30                return
31            } else {            
32                dataCh <- value
33            }
34        }
35    }()
36
37    // receivers
38    for i := 0; i < NumReceivers; i++ {
39        go func() {
40            defer wgReceivers.Done()
41
42            // receive values until dataCh is closed and
43            // the value buffer queue of dataCh is empty.
44            for value := range dataCh {
45                log.Println(value)
46            }
47        }()
48    }
49
50    wgReceivers.Wait()
51}
  • 一个receiver,N个sender,receiver通过关闭一个额外的signal channel说“请停止发送” 这种场景比上一个要复杂一点。我们不能让receiver关闭data channel,因为这么做将会打破channel closing principle。但是我们可以让receiver关闭一个额外的signal channel来通知sender停止发送值:
 1package main
 2
 3import (
 4    "time"
 5    "math/rand"
 6    "sync"
 7    "log"
 8)
 9
10func main() {
11    rand.Seed(time.Now().UnixNano())
12    log.SetFlags(0)
13
14    // ...
15    const MaxRandomNumber = 100000
16    const NumSenders = 1000
17
18    wgReceivers := sync.WaitGroup{}
19    wgReceivers.Add(1)
20
21    // ...
22    dataCh := make(chan int, 100)
23    stopCh := make(chan struct{})
24        // stopCh is an additional signal channel.
25        // Its sender is the receiver of channel dataCh.
26        // Its reveivers are the senders of channel dataCh.
27
28    // senders
29    for i := 0; i < NumSenders; i++ {
30        go func() {
31            for {
32                value := rand.Intn(MaxRandomNumber)
33
34                select {
35                case <- stopCh:
36                    return
37                case dataCh <- value:
38                }
39            }
40        }()
41    }
42
43    // the receiver
44    go func() {
45        defer wgReceivers.Done()
46
47        for value := range dataCh {
48            if value == MaxRandomNumber-1 {
49                // the receiver of the dataCh channel is
50                // also the sender of the stopCh cahnnel.
51                // It is safe to close the stop channel here.
52                close(stopCh)
53                return
54            }
55
56            log.Println(value)
57        }
58    }()
59
60    // ...
61    wgReceivers.Wait()
62}

正如注释说的,对于额外的signal channel来说,它的sender是data channel的receiver。这个额外的signal channel被它唯一的sender关闭,遵守了channel closing principle

  • M个receiver,N个sender,它们当中任意一个通过通知一个moderator(仲裁者)关闭额外的signal channel来说“让我们结束游戏吧” 这是最复杂的场景了。我们不能让任意的receivers和senders关闭data channel,也不能让任何一个receivers通过关闭一个额外的signal channel来通知所有的senders和receivers退出游戏。这么做的话会打破channel closing principle。但是,我们可以引入一个moderator来关闭一个额外的signal channel。这个例子的一个技巧是怎么通知moderator去关闭额外的signal channel:
  1package main
  2
  3import (
  4    "time"
  5    "math/rand"
  6    "sync"
  7    "log"
  8    "strconv"
  9)
 10
 11func main() {
 12    rand.Seed(time.Now().UnixNano())
 13    log.SetFlags(0)
 14
 15    // ...
 16    const MaxRandomNumber = 100000
 17    const NumReceivers = 10
 18    const NumSenders = 1000
 19
 20    wgReceivers := sync.WaitGroup{}
 21    wgReceivers.Add(NumReceivers)
 22
 23    // ...
 24    dataCh := make(chan int, 100)
 25    stopCh := make(chan struct{})
 26        // stopCh is an additional signal channel.
 27        // Its sender is the moderator goroutine shown below.
 28        // Its reveivers are all senders and receivers of dataCh.
 29    toStop := make(chan string, 1)
 30        // the channel toStop is used to notify the moderator
 31        // to close the additional signal channel (stopCh).
 32        // Its senders are any senders and receivers of dataCh.
 33        // Its reveiver is the moderator goroutine shown below.
 34
 35    var stoppedBy string
 36
 37    // moderator
 38    go func() {
 39        stoppedBy = <- toStop // part of the trick used to notify the moderator
 40                              // to close the additional signal channel.
 41        close(stopCh)
 42    }()
 43
 44    // senders
 45    for i := 0; i < NumSenders; i++ {
 46        go func(id string) {
 47            for {
 48                value := rand.Intn(MaxRandomNumber)
 49                if value == 0 {
 50                    // here, a trick is used to notify the moderator
 51                    // to close the additional signal channel.
 52                    select {
 53                    case toStop <- "sender#" + id:
 54                    default:
 55                    }
 56                    return
 57                }
 58
 59                // the first select here is to try to exit the
 60                // goroutine as early as possible.
 61                select {
 62                case <- stopCh:
 63                    return
 64                default:
 65                }
 66
 67                select {
 68                case <- stopCh:
 69                    return
 70                case dataCh <- value:
 71                }
 72            }
 73        }(strconv.Itoa(i))
 74    }
 75
 76    // receivers
 77    for i := 0; i < NumReceivers; i++ {
 78        go func(id string) {
 79            defer wgReceivers.Done()
 80
 81            for {
 82                // same as senders, the first select here is to 
 83                // try to exit the goroutine as early as possible.
 84                select {
 85                case <- stopCh:
 86                    return
 87                default:
 88                }
 89
 90                select {
 91                case <- stopCh:
 92                    return
 93                case value := <-dataCh:
 94                    if value == MaxRandomNumber-1 {
 95                        // the same trick is used to notify the moderator 
 96                        // to close the additional signal channel.
 97                        select {
 98                        case toStop <- "receiver#" + id:
 99                        default:
100                        }
101                        return
102                    }
103
104                    log.Println(value)
105                }
106            }
107        }(strconv.Itoa(i))
108    }
109
110    // ...
111    wgReceivers.Wait()
112    log.Println("stopped by", stoppedBy)
113}

在这个例子中,仍然遵守着channel closing principle。 请注意channel toStop的缓冲大小是1.这是为了避免当mederator goroutine 准备好之前第一个通知就已经发送了,导致丢失。

  • 更多的场景? 很多的场景变体是基于上面三种的。举个例子,一个基于最复杂情况的变体可能要求receivers读取buffer channel中剩下所有的值。这应该很容易处理,所有这篇文章也就不提了。 尽管上面三种场景不能覆盖所有Go channel的使用场景,但它们是最基础的,实践中的大多数场景都可以分类到那三种中。

结论

这里没有一种场景要求你去打破channel closing principle。如果你遇到了这种场景,请思考一下你的设计并重写你的代码。 用Go编程就像在创作艺术。


版权申明:内容来源网络,版权归原创者所有。除非无法确认,我们都会标明作者及出处,如有侵权烦请告知,我们会立即删除并表示歉意。谢谢。

Golang语言社区

ID:Golangweb

www.bytedancing.com

游戏服务器架构丨分布式技术丨大数据丨游戏算法学习

原文发布于微信公众号 - Golang语言社区(Golangweb)

原文发表时间:2018-09-09

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏较真的前端

NodeJS作者总结自己在node设计中的失误

42160
来自专栏Python数据科学

要成为一个专业的爬虫大佬,你还需要了解这些

本文内容参考Github:https://github.com/lorien/awesome-web-scraping/blob/master/python.m...

40810
来自专栏Python中文社区

Python爬虫一步步抓取房产信息

專 欄 ❈ Garfield_Liang,Python中文社区专栏作者。 简书地址:http://www.jianshu.com/u/cac1d39abfa9...

39360
来自专栏.net

干货,比较全面的c#.net公共帮助类(Common.Utility)

       网上有各式各样的帮助类,公共类,但是比较零碎,经常有人再群里或者各种社交账号上问我有没有这个helper, 那个helper,于是萌生了收集全部h...

55480
来自专栏五毛程序员

五毛的cocos2d-x学习笔记01-创建项目

18840
来自专栏Jerry的SAP技术分享

小技巧:不用任何媒体处理软件进行视频压缩

如果需要压缩一个很大的视频,网上有很多介绍,但是都需要安装各种视频处理软件,比如格式工厂等等。本文介绍一个非常方便的小技巧,无需任何视频处理软件,只需要微软的P...

18720
来自专栏Fundebug

抛弃console.log(),拥抱浏览器Debugger

为了保证可读性,本文采用意译而非直译。另外,本文版权归原作者所有,翻译仅用于学习。

15330
来自专栏醉梦轩

解决INSTALL_FAILED_INSUFFICIENT_STORAGE错误

17330
来自专栏北京马哥教育

快收藏!史上最全156个Python网络爬虫资源

awesome系列真是碉堡了~今天把Python的爬虫工具搬过来~ ——————译文分割线—————— 本列表包含Python网页抓取和数据处理相关的库。 网络...

40440
来自专栏Coco的专栏

【前端性能】浅谈域名发散与域名收敛

30320

扫码关注云+社区

领取腾讯云代金券