time.After(time.Duration)的功能是当持续的时间结束后,会将当前的时间发送到返回的通道中。在某个时间到后执行某个动作可以用time.After来实现,它使用起来非常方便,在并发程序中用的比较多。如果我们只是想让程序睡眠一段时间,可以使用time.Sleep(time.Duration). time.After主要用在“如果在5秒内没有从通道收到消息,那么将做..."这样的场景中。然而经常会看到在循环中调用time.After的代码,非常糟糕,这可能会导致内存泄露。
下面来看一个具体的例子,此函数完成的功能是不断地从通道中读取数据并进行处理,如果长达1个小时都没有从通道中接收到任何消息,希望记录一条警告日志。实现代码如下:
func consumer(ch <-chan Event) {
for {
select {
case event := <-ch:
handle(event)
case <-time.After(time.Hour):
log.Println("warning: no messages received")
}
}
}
上面循环体中select语句被执行有两种情况:1. 从通道ch获取到了消息 2. 已经有1个小时没有从ch获取到消息。因为每次循环执行select时都会对time.After进行求值计算,也就是每次都会重置超时。这段代码有什么问题吗?咋一看,没有发现问题,实际上这段代码可能存在内存泄露。
time.After会返回一个通道,函数签名如下,可以看到返回的是一个Time类型的通道。我们期望的效果是这个通道在每次循环后都被关闭,然而实际情况可能并不是这样。例如这种情况,在通道ch每次都有消息的时候,在1个小时内会一直走case event := <-ch
分支,但是每次运行select时也会对time.After(time.Hour)执行求值,每次申请的通道资源在超时(持续时间结束后)才会关闭释放,占用的内存会在一小时内一直累积。在Go1.15中,每次调用time.After大约需要200字节的内存,假如每小时收到500万条消息,那会消耗200Byte*5000000=1G的内存空间。
func After(d Duration) <-chan Time {
return NewTimer(d).C
}
如何修复这个问题呢? 这还不简单么,我们在每次循环结束将通道关闭不就可以了吗?这是不可能能的,因为返回的是一个只能接收值的通道。函数签名如上,这里返回的是<-chan Time
不是chan Time
,只接收通道不能执行close(ch)
操作, 编译是通不过的,会报下面的错误。
invalid operation: close(time.After(time.Second)) (cannot close receive-only channel)
有多种方法修复上面代码存在的问题。第一种不使用time.After,采用上下文Context包中的ctx.Done(),代码如下. 这种方法的缺点是必须在每次循环迭代期间不断重新创建上下文,Context.WithTimeout放在for内。创建上下文在Go语言中不是一个轻量级操作。有其他更好的解决方法吗?
func consumer(ch <-chan Event) {
for {
ctx, cancel := context.WithTimeout(context.Background(), time.Hour)
select {
case event := <-ch:
cancel()
handle(event)
case <-ctx.Done():
log.Println("warning: no messages received")
}
}
}
第二种方法是使用time包中的time.NewTimer
,该函数会返回一个time.Timer
结构对象,该结构有下面的可导出方法和字段:
「NOTE:虽然time.After实现也是依赖于time.Timer,但是time.After可导出的只有字段C,所以不能调用Reset方法。」
package time
func After(d Duration) <-chan Time {
return NewTimer(d).C
}
下面是使用time.NewTimer
实现版本,代码如下:
func consumer(ch <-chan Event) {
timerDuration := 1 * time.Hour
timer := time.NewTimer(timerDuration)
for {
timer.Reset(timerDuration)
select {
case event := <-ch:
handle(event)
case <-timer.C:
log.Println("warning: no messages received")
}
}
}
在上面的程序中,每次循环刚开始时调用timer.Reset
进行重置操作。调用Reset操作比每次都创建一个新的上下文更简单,更快并且对GC产生的压力更小,因为它不需要任何新的堆分配。相比第一种方法,此方法更好。因此,使用time.Timer是解决本文开始提到问题的最佳解决方案。
「NOTE: 为了简单,前面的示例代码中的goroutine没有进行停止处理,在Go语言中常见100问题-#62 Starting a goroutine without knowing when to ..有提到,在不知道什么时候停止的情况下启动goroutine不是最佳实践。在生产级别的代码中,应该有退出条件,例如在上下文取消的时候。在goroutine退出的时候,记得通过使用defer timer.Stop()停止创建的time.Timer.」
在循环中使用time.After并不是唯一可能导致内存泄露的原因,本质原因与重复调用的代码有关。循环只是其中一种情况,在HTTP处理函数中使用time.After也会导致相同的问题,因为该处理函数将被多次调用。
总结,在使用time.After时应该谨慎小心,记住创建的资源只有在定时器到期时才会被释放。当time.After被重复调用时,例如在循环中(本文中的例子)、Kafka消费处理函数和HTTP处理程序中等,可能会导致内存在一段时间持续上涨,甚至会出现OOM,这种情况下,我们应该使用time.NewTimer取代time.After.