2020-3-1
今天研究了一下channel的源码,对channel的安全退出有了一些小见解。在此结合实际应用,对select 于channel结合对情况下,安全退出channel做一下记录。
场景1:直接退出(会丢失数据)
因为退出时,直接程序就中断了,channel里存对数据直接丢失。
package main
import (
"fmt"
"sync"
"time"
)
var (
wg sync.WaitGroup
channel = make(chan int, 10)
)
func main() {
//先写满一个channel
for i := 0; i < 10; i++ {
channel <- i
}
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case num := <-channel:
fmt.Println("======", num)
//每次从channel取值后sleep 1秒,方便我们分析
time.Sleep(time.Duration(num) * time.Second)
}
}
}()
wg.Wait()
}
场景2:捕捉程序退出信号,然后关闭channel (不丢失数据)
package main
import (
"fmt"
"log"
"os"
"os/signal"
"sync"
"syscall"
"time"
)
var (
wg sync.WaitGroup
channel = make(chan int, 10000)
)
func main() {
//先写满一个channel
for i := 0; i < 10; i++ {
channel <- i
}
wg.Add(1)
go HandleSignals()
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case num, ok := <-channel:
if !ok {
return
}
fmt.Println("======", num)
//每次从channel取值后sleep 1秒,方便我们分析
time.Sleep(time.Duration(num) * time.Second)
}
}
}()
wg.Wait()
}
func HandleSignals() {
defer wg.Done()
ch := make(chan os.Signal, 10)
signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM, syscall.SIGUSR2)
for {
sig := <-ch
switch sig {
case syscall.SIGINT, syscall.SIGTERM:
close(channel)
log.Println("Exiting, please wait...")
return
}
}
}
以上实现是在捕捉到系统退出信号时 执行了 close(channel) 。 从而实现,完全退出前,仍将缓存在channel中到数据,读出并执行。
那是怎么实现的呢? 通过阅读源码 go/src/runtime/chan.go: closechan
看到以下实现,可以看到,在close channel时,仍会将channel中的数据读出来。 因此,我们要使用此特性时,就需要根据系统退出信号,关闭channel。然后判断channel是否关闭,若关闭,再退出for循环。 否则,直接退出的程序,就会直接将channel中的数据抛弃。
func closechan(c *hchan) {
...
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
...
c.closed = 1
var glist gList
// release all readers
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
...
}