如何优雅的关闭Go Channel【译】

Channel关闭原则

不要在消费端关闭channel,不要在有多个并行的生产者时对channel执行关闭操作。

也就是说应该只在[唯一的或者最后唯一剩下]的生产者协程中关闭channel,来通知消费者已经没有值可以继续读了。只要坚持这个原则,就可以确保向一个已经关闭的channel发送数据的情况不可能发生。

暴力关闭channel的正确方法

如果想要在消费端关闭channel,或者在多个生产者端关闭channel,可以使用recover机制来上个保险,避免程序因为panic而崩溃。

func SafeClose(ch chan T) (justClosed bool) {
	defer func() {
		if recover() != nil {
			justClosed = false
		}
	}()
	
	// assume ch != nil here.
	close(ch) // panic if ch is closed
	return true // <=> justClosed = true; return
}

使用这种方法明显违背了上面的channel关闭原则,然后性能还可以,毕竟在每个协程只会调用一次SafeClose,性能损失很小。

同样也可以在生产消息的时候使用recover方法。

func SafeSend(ch chan T, value T) (closed bool) {
	defer func() {
		if recover() != nil {
			// The return result can be altered 
			// in a defer function call.
			closed = true
		}
	}()
	
	ch <- value // panic if ch is closed
	return false // <=> closed = false; return
}

礼貌的关闭channel方法

还有不少人经常使用用sync.Once来关闭channel,这样可以确保只会关闭一次

type MyChannel struct {
	C    chan T
	once sync.Once
}

func NewMyChannel() *MyChannel {
	return &MyChannel{C: make(chan T)}
}

func (mc *MyChannel) SafeClose() {
	mc.once.Do(func() {
		close(mc.C)
	})
}

同样我们也可以使用sync.Mutex达到同样的目的。

type MyChannel struct {
	C      chan T
	closed bool
	mutex  sync.Mutex
}

func NewMyChannel() *MyChannel {
	return &MyChannel{C: make(chan T)}
}

func (mc *MyChannel) SafeClose() {
	mc.mutex.Lock()
	if !mc.closed {
		close(mc.C)
		mc.closed = true
	}
	mc.mutex.Unlock()
}

func (mc *MyChannel) IsClosed() bool {
	mc.mutex.Lock()
	defer mc.mutex.Unlock()
	return mc.closed
}

要知道golang的设计者不提供SafeClose或者SafeSend方法是有原因的,他们本来就不推荐在消费端或者在并发的多个生产端关闭channel,比如关闭只读channel在语法上就彻底被禁止使用了。

优雅的关闭channel的方法

上文的SafeSend方法一个很大的劣势在于它不能用在select块的case语句中。而另一个很重要的劣势在于像我这样对代码有洁癖的人来说,使用panic/recover和sync/mutex来搞定不是那么的优雅。下面我们引入在不同的场景下可以使用的纯粹的优雅的解决方法。

多个消费者,单个生产者。这种情况最简单,直接让生产者关闭channel好了。

package main

import (
	"time"
	"math/rand"
	"sync"
	"log"
)

func main() {
	rand.Seed(time.Now().UnixNano())
	log.SetFlags(0)
	
	// ...
	const MaxRandomNumber = 100000
	const NumReceivers = 100
	
	wgReceivers := sync.WaitGroup{}
	wgReceivers.Add(NumReceivers)
	
	// ...
	dataCh := make(chan int, 100)
	
	// the sender
	go func() {
		for {
			if value := rand.Intn(MaxRandomNumber); value == 0 {
				// The only sender can close the channel safely.
				close(dataCh)
				return
			} else {			
				dataCh <- value
			}
		}
	}()
	
	// receivers
	for i := 0; i < NumReceivers; i++ {
		go func() {
			defer wgReceivers.Done()
			
			// Receive values until dataCh is closed and
			// the value buffer queue of dataCh is empty.
			for value := range dataCh {
				log.Println(value)
			}
		}()
	}
	
	wgReceivers.Wait()
}

多个生产者,单个消费者。这种情况要比上面的复杂一点。我们不能在消费端关闭channel,因为这违背了channel关闭原则。但是我们可以让消费端关闭一个附加的信号来通知发送端停止生产数据。

package main

import (
	"time"
	"math/rand"
	"sync"
	"log"
)

func main() {
	rand.Seed(time.Now().UnixNano())
	log.SetFlags(0)
	
	// ...
	const MaxRandomNumber = 100000
	const NumSenders = 1000
	
	wgReceivers := sync.WaitGroup{}
	wgReceivers.Add(1)
	
	// ...
	dataCh := make(chan int, 100)
	stopCh := make(chan struct{})
	// stopCh is an additional signal channel.
	// Its sender is the receiver of channel dataCh.
	// Its reveivers are the senders of channel dataCh.
	
	// senders
	for i := 0; i < NumSenders; i++ {
		go func() {
			for {
				// The first select here is to try to exit the goroutine
				// as early as possible. In fact, it is not essential
				// for this example, so it can be omitted.
				select {
				case <- stopCh:
					return
				default:
				}
				
				// Even if stopCh is closed, the first branch in the
				// second select may be still not selected for some
				// loops if the send to dataCh is also unblocked.
				// But this is acceptable, so the first select
				// can be omitted.
				select {
				case <- stopCh:
					return
				case dataCh <- rand.Intn(MaxRandomNumber):
				}
			}
		}()
	}
	
	// the receiver
	go func() {
		defer wgReceivers.Done()
		
		for value := range dataCh {
			if value == MaxRandomNumber-1 {
				// The receiver of the dataCh channel is
				// also the sender of the stopCh cahnnel.
				// It is safe to close the stop channel here.
				close(stopCh)
				return
			}
			
			log.Println(value)
		}
	}()
	
	// ...
	wgReceivers.Wait()
}

就上面这个例子,生产者同时也是退出信号channel的接受者,退出信号channel仍然是由它的生产端关闭的,所以这仍然没有违背channel关闭原则。值得注意的是,这个例子中生产端和接受端都没有关闭消息数据的channel,channel在没有任何goroutine引用的时候会自行关闭,而不需要显示进行关闭。

多个生产者,多个消费者

这是最复杂的一种情况,我们既不能让接受端也不能让发送端关闭channel。我们甚至都不能让接受者关闭一个退出信号来通知生产者停止生产。因为我们不能违反channel关闭原则。但是我们可以引入一个额外的协调者来关闭附加的退出信号channel。

package main

import (
	"time"
	"math/rand"
	"sync"
	"log"
	"strconv"
)

func main() {
	rand.Seed(time.Now().UnixNano())
	log.SetFlags(0)
	
	// ...
	const MaxRandomNumber = 100000
	const NumReceivers = 10
	const NumSenders = 1000
	
	wgReceivers := sync.WaitGroup{}
	wgReceivers.Add(NumReceivers)
	
	// ...
	dataCh := make(chan int, 100)
	stopCh := make(chan struct{})
		// stopCh is an additional signal channel.
		// Its sender is the moderator goroutine shown below.
		// Its reveivers are all senders and receivers of dataCh.
	toStop := make(chan string, 1)
		// The channel toStop is used to notify the moderator
		// to close the additional signal channel (stopCh).
		// Its senders are any senders and receivers of dataCh.
		// Its reveiver is the moderator goroutine shown below.
	
	var stoppedBy string
	
	// moderator
	go func() {
		stoppedBy = <- toStop
		close(stopCh)
	}()
	
	// senders
	for i := 0; i < NumSenders; i++ {
		go func(id string) {
			for {
				value := rand.Intn(MaxRandomNumber)
				if value == 0 {
					// Here, a trick is used to notify the moderator
					// to close the additional signal channel.
					select {
					case toStop <- "sender#" + id:
					default:
					}
					return
				}
				
				// The first select here is to try to exit the goroutine
				// as early as possible. This select blocks with one
				// receive operation case and one default branches will
				// be optimized as a try-receive operation by the
				// official Go compiler.
				select {
				case <- stopCh:
					return
				default:
				}
				
				// Even if stopCh is closed, the first branch in the
				// second select may be still not selected for some
				// loops (and for ever in theory) if the send to
				// dataCh is also unblocked.
				// This is why the first select block is needed.
				select {
				case <- stopCh:
					return
				case dataCh <- value:
				}
			}
		}(strconv.Itoa(i))
	}
	
	// receivers
	for i := 0; i < NumReceivers; i++ {
		go func(id string) {
			defer wgReceivers.Done()
			
			for {
				// Same as the sender goroutine, the first select here
				// is to try to exit the goroutine as early as possible.
				select {
				case <- stopCh:
					return
				default:
				}
				
				// Even if stopCh is closed, the first branch in the
				// second select may be still not selected for some
				// loops (and for ever in theory) if the receive from
				// dataCh is also unblocked.
				// This is why the first select block is needed.
				select {
				case <- stopCh:
					return
				case value := <-dataCh:
					if value == MaxRandomNumber-1 {
						// The same trick is used to notify
						// the moderator to close the
						// additional signal channel.
						select {
						case toStop <- "receiver#" + id:
						default:
						}
						return
					}
					
					log.Println(value)
				}
			}
		}(strconv.Itoa(i))
	}
	
	// ...
	wgReceivers.Wait()
	log.Println("stopped by", stoppedBy)
}

以上三种场景不能涵盖全部,但是它们是最常见最通用的三种场景,基本上所有的场景都可以划分为以上三类。

结论

没有任何场景值得你去打破channel关闭原则,如果你遇到这样的一种特殊场景,还是建议你好好思考一下自己设计,是不是该重构一下了。

原文发布于微信公众号 - 码洞(codehole)

原文发表时间:2018-01-01

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏菩提树下的杨过

老生常谈:利用Membership实现SSO(单点登录)

虽然有一些现成的第三方解决方案比如:OpenID,Passport,SpaceCard等都还不错,但是要么就是收费的(passport),要么就是有点用不习惯(...

2005
来自专栏散尽浮华

Saltstack自动化操作记录(1)-环境部署

早期运维工作中用过稍微复杂的Puppet,下面介绍下更为简单实用的Saltstack自动化运维的使用。 Saltstack知多少 Saltstack是一种全新的...

25310
来自专栏黑泽君的专栏

day51_BOS项目_03

将上面的js文件引入所需要的jsp页面中,本例以index.jsp为例 /bos19/WebContent/WEB-INF/pages/common/inde...

741
来自专栏琯琯博客

awesome-php

收集整理一些常用的PHP类库, 资源以及技巧. 以便在工作中迅速的查找所需… 这个列表中的内容有来自 awesome-php 的翻译, 有来自开发者周刊以及个人...

9029
来自专栏Thinks

你的第一个渐进式网站应用(5)

渐进式网站应用会很快启动并马上可用。在目前的状态中(step-04),我们的天气app启动很快,但是不可用。因为还木有数据。 我们要使用一个AJAX请求去获取数...

793
来自专栏NetCore

Do You Kown Asp.Net Core -- Asp.Net Core 2.0 未来web开发新趋势 Razor Page

Razor Page介绍 前言     上周期待已久的Asp.Net Core 2.0提前发布了,一下子Net圈热闹了起来,2.0带来了很多新的特性和新的功能,...

3296
来自专栏Danny的专栏

学习中遇到的小技巧 一(暂停更新)

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/huyuyang6688/article/...

992
来自专栏有趣的Python

程序员装机必备爆款软件推荐与配置(windows版)

做机也要做一只全能的机哦 值此新年来临之即,面对两百多个G的c盘。忍痛割爱将电脑系统重装,版本为(win10:1079)之后的所有电脑环境更新,专业软件安装均会...

4253
来自专栏拂晓风起

cocos2d-js 调试办法 断点调试 Android真机调试

1692
来自专栏一个爱瞎折腾的程序猿

.NET Core+Selenium+Github+Travis CI => SiteHistory

)。 想知道YouTube今天的首页长啥样么?点此查看 想知道YouTube2017年8月31日的首页长啥样么?改天再点开 想为你的网站增加访客么?不要问...

1071

扫码关注云+社区