作者:Ovenvan 来源:简书
作者近期在写一个项目时遇到了这样的需求:调用一个库API函数,函数内部又会拉起若干个后台goroutine。这时后台goroutine如果遇到错误想要及时通知库的使用者将不会是一件容易的事情,因为这是一个异步通知error的方法。作者最终的解决思路是:使用者另启一个goroutine监听Err channel,库后台goroutine出现的错误将直接发送至Err channel中。 作者以自己项目简单举例:
1func DaemonListen(err <-chan Errsocket){
2 for {
3 v, ok := <-err
4 if ok {
5 fmt.Println(v) /*处理错误*/
6 } else {
7 fmt.Println("Listen closed.")/*后台安全退出*/
8 return
9 }
10 }
11}
12
13func main(){
14 x := NewServer(/*......*/)
15 //后台会启动多个goroutine协同工作,该方法立即返回
16 x.Listen()
17 //启动守护goroutine监听error channel
18 go DaemonListen(x.ErrDone())
19}
需要注意的是:1.channel类型不一定只是error。如果你需要更多的信息,完全可以是一个包含error的struct;2.告诉守护goroutine可以安全退出的方法是关闭该channel,此时需保证该channel不会再被使用。若此时有goroutine试图向该channel发送error则会引发panic。
即确保关闭该channel之前其余所有goroutine都已经安全退出,不会再使用该channel。我们最先容易想到Go中的context标准库解决该问题。该标准库的作用也是维护层层调用的goroutine,并当parentCtx执行关闭操作时,能够顺利通知到所有childrenCtx,让所有childrenCtx安全退出。但遗憾的是,context只负责通知关闭,却不负责goroutine的退出顺序。即依然存在当channel被关闭时仍有子goroutine向channel发送数据的情况,我们仍需手动维护。另外,维护一个goroutine有时可能并不符合业务逻辑,例如:
业务需求.jpg
当使用者调用exposedAPI关闭所有goroutine时,该API需要保存着所有运行着的goroutine信息。而事实上,goroutine并不需要向该API注册自己的信息。另外,当某goroutine异常宕机时,维护信息表也是一件较为复杂的事情。
作者不清楚是否有业界前辈早已使用了类似或更成熟的技术,在这里作者只是提供自己处理该需求的一种方法。errDiversion(以下简称为eD)即另启一个守护goroutine,负责将error信息导流给上游channel或简单丢弃。
errDiversion.jpg
只需简单指定upstreamChannel和errChannel即可开启一个eD。eD的工作逻辑如下:
若errChan(以下简称为eC)已被关闭,则自己安全退出; 若upstreamChan(以下简称为uC)已关闭,则将DATA直接丢弃; 若upstreamChan处于开启状态,将DATA发送至uC。
判断uC的状态可以尝试关闭通道并捕获panic;或使用flag变量记录uC开闭状态即可(注意维护数据一致性)。
新建eD的过程应该在父goroutine完成的,并只需要传递给子goroutine一个用于传递err的channel(eC)即可。对子goroutine屏蔽细节。 再次使用作者项目作简单演示:
1func errDiversion(eD *eDer) func(eC chan Errsocket) {
2 //when upstream channel(uC) is closed(detected by closed flag),
3 //following data will be received but discarded
4 //when eC channel has closed, this goroutine will exit
5 return func(eC chan Errsocket) {
6 for {
7 err, ok := <-eC
8 if !ok { return }
9 if !eD.closed {
10 eD.eU <- err
11 }
12 eD.mu.Unlock()
13 }
14 }
15}
16
17func (s *server) Listen() { //Notifies the consumer when an error occurs ASYNCHRONOUSLY
18 /*......*/
19 //父goroutine创建eD,简单为子goroutine传递eC即可。
20 s.eDerfunc = errDiversion(&s.eDer) //closure
21 eC := make(chan error)
22 go s.eDerfunc(eC)
23 go handleListen(s, eC)
24 return
25}
26
27func handleListen(s *server, eC chan error) {
28 /*......*/
29 if err != nil {
30 eC <- err //s.errDoneTry即父goroutine传递的eC
31 }
32 /*......*/
33}
对于一个eD而言,他的引用环境(uC,closed flag,mutex)是确定的。使用者只需传递eC即可使之正常工作。
最后简单提及维护数据一致性的问题。我们需要维护的有
1. flag与channel close的关系; 2. 确保eD能够及时执行(在uC关闭之前)【换言之,当eC存有error时,先等待eD处理error再关闭uC】。
在这里我们可以使用普通锁来实现:
1type somestruct struct {
2 //under protected data
3 errDone chan Errsocket
4 errDoneTry chan Errsocket
5 closed bool
6
7 mu sync.Mutex
8
9 /*...other data...*/
10}
11
12func xxx(){
13 /*...*/
14 //close upstream channel
15 s.mu.Lock()
16 s.closed = true
17 close(s.errDone)
18 s.mu.Unlock()
19}
20
21func subgoroutine(){
22 /*...*/
23 if err != nil{
24 s.mu.Lock() //Notice, lock before sending data to channel
25 eC <- err
26 }
27}
28
errDiversion的代码也需要作部分调整:
1func errDiversion(eD *eDer) func(eC chan Errsocket) {
2 return func(eC chan Errsocket) {
3 for {
4 err, ok := <-eC
5 if !ok {
6 return
7 }
8 if !eD.closed {
9 eD.eU <- err
10 }
11 eD.mu.Unlock()
12 }
13 }
14}
这样我们就完成了并发流程控制以及数据的一致性。注意不要在eD中上锁,因为读取eC是一个阻塞过程,会引发死锁。正确的做法是向eC传递error之前上锁。
即某上游eD(下简称为A)的eC是某下游eD(下简称为B)的uC。他们是共享同一个channel而非传递的关系。当B发送error至uC(a.eC)时,需要获得上游的锁并加锁。 为要实现该功能,将errDiversion代码改为
1//......
2if !closed{
3 if eD.pmu != nil {
4 eD.pmu.Lock() //send to upstream channel
5 }
6 eD.eU <- err
7}
8//......
即可。
即多个eD的共享同一个uC。同一个uC意味着同一把锁(Mutex),同一个Flag标记uC状态。因此父goroutine应先将mutex和flag设置好,再将参数通过指针的方式传递给子goroutine。
这套机制为处理goroutine异步通知error提供了一种有效解决方案,库的使用者仅需启用一个goroutine监听errchannel即可。不足之处在于:
普遍情况下,开启一个子goroutine就需要另启一个eD作错误导流,从性能而言并不是特别优秀; 另外,他违反了通道关闭原则(一般原则下不允许接收方关闭通道和不能关闭一个有多个并发发送者的通道。 换而言之, 你只能在发送方的 goroutine 中关闭只有该发送方的通道)。关于如何优雅地关闭channel可参考这篇文章。
版权申明:内容来源网络,版权归原创者所有。除非无法确认,我们都会标明作者及出处,如有侵权烦请告知,我们会立即删除并表示歉意。谢谢。