原文作者:大道至简
Channel是Go中的一种类型,和goroutine一起为Go提供了并发技术, 它在开发中得到了广泛的应用。Go鼓励人们通过Channel在goroutine之间传递数据的引用(就像把数据的owner从一个goroutine传递给另外一个goroutine), Effective Go总结了这么一句话:
Do not communicate by sharing memory; instead, share memory by communicating.
在 Go内存模型指出了channel作为并发控制的一个特性:
A send on a channel happens before the corresponding receive from that channel completes. (Golang Spec)
除了正常的在goroutine之间安全地传递共享数据, Channel还可以玩出很多的花样(模式), 本文列举了一些channel的应用模式。
促成本文诞生的因素主要包括:
下面就让我们以实例的方式看看这么模式吧。
我们知道, Go的标准库sync
有Mutex
,可以用来作为锁,但是Mutex
却没有实现TryLock
方法。
我们对于TryLock
的定义是当前goroutine尝试获得锁, 如果成功,则获得了锁,返回true, 否则返回false。我们可以使用这个方法避免在获取锁的时候当前goroutine被阻塞住。
本来,这是一个常用的功能,在一些其它编程语言中都有实现,为什么Go中没有实现的?issue#6123有详细的讨论,在我看来,Go核心组成员本身对这个特性没有积极性,并且认为通过channel可以实现相同的方式。
其实,对于标准库的sync.Mutex
要增加这个功能很简单,下面的方式就是通过hack
的方式为Mutex
实现了TryLock
的功能。
1const mutexLocked = 1 << iota
2type Mutex struct {
3 mu sync.Mutex
4}
5func (m *Mutex) Lock() {
6 m.mu.Lock()
7}
8func (m *Mutex) Unlock() {
9 m.mu.Unlock()
10}
11func (m *Mutex) TryLock() bool {
12 return atomic.CompareAndSwapInt32((*int32)(unsafe.Pointer(&m.mu)), 0, mutexLocked)
13}
14func (m *Mutex) IsLocked() bool {
15 return atomic.LoadInt32((*int32)(unsafe.Pointer(&m.mu))) == mutexLocked
16}
如果你看一下Mutex
实现的源代码,就很容易理解上面的这段代码了,因为mutex
实现锁主要利用CAS
对它的一个int32字段做操作。
上面的代码还额外增加了一个IsLocked
方法,不过这个方法一般不常用,因为查询和加锁这两个方法执行的时候不是一个原子的操作,素以这个方法一般在调试和打日志的时候可能有用。
既然标准库中不准备在Mutex
上增加这个方法,而是推荐使用channel来实现,那么就让我们看看如何使用 channel来实现。
1type Mutex struct {
2 ch chan struct{}
3}
4func NewMutex() *Mutex {
5 mu := &Mutex{make(chan struct{}, 1)}
6 mu.ch <- struct{}{}
7 return mu
8}
9func (m *Mutex) Lock() {
10 <-m.ch
11}
12func (m *Mutex) Unlock() {
13 select {
14 case m.ch <- struct{}{}:
15 default:
16 panic("unlock of unlocked mutex")
17 }
18}
19func (m *Mutex) TryLock() bool {
20 select {
21 case <-m.ch:
22 return true
23 default:
24 }
25 return false
26}
27func (m *Mutex) IsLocked() bool {
28 return len(m.ch) > 0
29}
主要是利用channel边界情况下的阻塞特性实现的。
你还可以将缓存的大小从1改为n,用来处理n个锁(资源)。
有时候,我们在获取一把锁的时候,由于有竞争的关系,在锁被别的goroutine拥有的时候,当前goroutine没有办法立即获得锁,只能阻塞等待。标准库并没有提供等待超时的功能,我们尝试实现它。
1type Mutex struct {
2 ch chan struct{}
3}
4func NewMutex() *Mutex {
5 mu := &Mutex{make(chan struct{}, 1)}
6 mu.ch <- struct{}{}
7 return mu
8}
9func (m *Mutex) Lock() {
10 <-m.ch
11}
12func (m *Mutex) Unlock() {
13 select {
14 case m.ch <- struct{}{}:
15 default:
16 panic("unlock of unlocked mutex")
17 }
18}
19func (m *Mutex) TryLock(timeout time.Duration) bool {
20 timer := time.NewTimer(timeout)
21 select {
22 case <-m.ch:
23 timer.Stop()
24 return true
25 case <-time.After(timeout):
26 }
27 return false
28}
29func (m *Mutex) IsLocked() bool {
30 return len(m.ch) > 0
31}
你也可以把它用Context
来改造,不是利用超时,而是利用Context
来取消/超时获得锁的操作,这个作业留给读者来实现。
当你等待多个信号的时候,如果收到任意一个信号, 就执行业务逻辑,忽略其它的还未收到的信号。
举个例子, 我们往提供相同服务的n个节点发送请求,只要任意一个服务节点返回结果,我们就可以执行下面的业务逻辑,其它n-1的节点的请求可以被取消或者忽略。当n=2的时候,这就是back request
模式。 这样可以用资源来换取latency的提升。
需要注意的是,当收到任意一个信号的时候,其它信号都被忽略。如果用channel来实现,只要从任意一个channel中接收到一个数据,那么所有的channel都可以被关闭了(依照你的实现,但是输出的channel肯定会被关闭)。
有三种实现的方式: goroutine、reflect和递归。
1func or(chans ...<-chan interface{}) <-chan interface{} {
2 out := make(chan interface{})
3 go func() {
4 var once sync.Once
5 for _, c := range chans {
6 go func(c <-chan interface{}) {
7 select {
8 case <-c:
9 once.Do(func() { close(out) })
10 case <-out:
11 }
12 }(c)
13 }
14 }()
15 return out
16}
or
函数可以处理n个channel,它为每个channel启动一个goroutine,只要任意一个goroutine从channel读取到数据,输出的channel就被关闭掉了。
为了避免并发关闭输出channel的问题,关闭操作只执行一次。
Go的反射库针对select语句有专门的数据(reflect.SelectCase
)和函数(reflect.Select
)处理。
所以我们可以利用反射“随机”地从一组可选的channel中接收数据,并关闭输出channel。
这种方式看起来更简洁。
1func or(channels ...<-chan interface{}) <-chan interface{} {
2 switch len(channels) {
3 case 0:
4 return nil
5 case 1:
6 return channels[0]
7 }
8 orDone := make(chan interface{})
9 go func() {
10 defer close(orDone)
11 var cases []reflect.SelectCase
12 for _, c := range channels {
13 cases = append(cases, reflect.SelectCase{
14 Dir: reflect.SelectRecv,
15 Chan: reflect.ValueOf(c),
16 })
17 }
18 reflect.Select(cases)
19 }()
20 return orDone
21}
递归方式一向是比较开脑洞的实现,下面的方式就是分而治之的方式,逐步合并channel,最终返回一个channel。
1func or(channels ...<-chan interface{}) <-chan interface{} {
2 switch len(channels) {
3 case 0:
4 return nil
5 case 1:
6 return channels[0]
7 }
8 orDone := make(chan interface{})
9 go func() {
10 defer close(orDone)
11 switch len(channels) {
12 case 2:
13 select {
14 case <-channels[0]:
15 case <-channels[1]:
16 }
17 default:
18 m := len(channels) / 2
19 select {
20 case <-or(channels[:m]...):
21 case <-or(channels[m:]...):
22 }
23 }
24 }()
25 return orDone
26}
在后面的扇入(合并)模式中,我们还是会使用相同样的递归模式来合并多个输入channel,根据 justforfun 的测试结果,这种递归的方式要比goroutine、Reflect更有效。
这种模式是我们经常使用的一种模式,通过一个信号channel(done)来控制(取消)输入channel的处理。
一旦从done channel中读取到一个信号,或者done channel被关闭, 输入channel的处理则被取消。
这个模式提供一个简便的方法,把done channel 和 输入 channel 融合成一个输出channel。
1func orDone(done <-chan struct{}, c <-chan interface{}) <-chan interface{} {
2 valStream := make(chan interface{})
3 go func() {
4 defer close(valStream)
5 for {
6 select {
7 case <-done:
8 return
9 case v, ok := <-c:
10 if ok == false {
11 return
12 }
13 select {
14 case valStream <- v:
15 case <-done:
16 }
17 }
18 }
19 }()
20 return valStream
21}
扇入模式(FanIn)是将多个同样类型的输入channel合并成一个同样类型的输出channel,也就是channel的合并。
每个channel起一个goroutine。
1func fanIn(chans ...<-chan interface{}) <-chan interface{} {
2 out := make(chan interface{})
3 go func() {
4 var wg sync.WaitGroup
5 wg.Add(len(chans))
6 for _, c := range chans {
7 go func(c <-chan interface{}) {
8 for v := range c {
9 out <- v
10 }
11 wg.Done()
12 }(c)
13 }
14 wg.Wait()
15 close(out)
16 }()
17 return out
18}
利用反射库针对select语句的处理合并输入channel。
下面这种实现方式其实还是有些问题的, 在输入channel读取比较均匀的时候比较有效,否则性能比较低下。
1func fanInReflect(chans ...<-chan interface{}) <-chan interface{} {
2 out := make(chan interface{})
3 go func() {
4 defer close(out)
5 var cases []reflect.SelectCase
6 for _, c := range chans {
7 cases = append(cases, reflect.SelectCase{
8 Dir: reflect.SelectRecv,
9 Chan: reflect.ValueOf(c),
10 })
11 }
12 for len(cases) > 0 {
13 i, v, ok := reflect.Select(cases)
14 if !ok { //remove this case
15 cases = append(cases[:i], cases[i+1:]...)
16 continue
17 }
18 out <- v.Interface()
19 }
20 }()
21 return out
22}
这种方式虽然理解起来不直观,但是性能还是不错的(输入channel不是很多的情况下递归层级不会很高,不会成为瓶颈)
1func fanInRec(chans ...<-chan interface{}) <-chan interface{} {
2 switch len(chans) {
3 case 0:
4 c := make(chan interface{})
5 close(c)
6 return c
7 case 1:
8 return chans[0]
9 case 2:
10 return mergeTwo(chans[0], chans[1])
11 default:
12 m := len(chans) / 2
13 return mergeTwo(
14 fanInRec(chans[:m]...),
15 fanInRec(chans[m:]...))
16 }
17}
18func mergeTwo(a, b <-chan interface{}) <-chan interface{} {
19 c := make(chan interface{})
20 go func() {
21 defer close(c)
22 for a != nil || b != nil {
23 select {
24 case v, ok := <-a:
25 if !ok {
26 a = nil
27 continue
28 }
29 c <- v
30 case v, ok := <-b:
31 if !ok {
32 b = nil
33 continue
34 }
35 c <- v
36 }
37 }
38 }()
39 return c
40}
扇出模式(FanOut)是将一个输入channel扇出为多个channel。
扇出行为至少可以分为两种:
本节只介绍第一种情况,下一节介绍第二种情况
将读取的值发送给每个输出channel, 异步模式可能会产生很多的goroutine。
1func fanOut(ch <-chan interface{}, out []chan interface{}, async bool) {
2 go func() {
3 defer func() {
4 for i := 0; i < len(out); i++ {
5 close(out[i])
6 }
7 }()
8 for v := range ch {
9 v := v
10 for i := 0; i < len(out); i++ {
11 i := i
12 if async {
13 go func() {
14 out[i] <- v
15 }()
16 } else {
17 out[i] <- v
18 }
19 }
20 }
21 }()
22}
这种模式一旦一个输出channel被阻塞,可能会导致后续的处理延迟。
1func fanOutReflect(ch <-chan interface{}, out []chan interface{}) {
2 go func() {
3 defer func() {
4 for i := 0; i < len(out); i++ {
5 close(out[i])
6 }
7 }()
8 cases := make([]reflect.SelectCase, len(out))
9 for i := range cases {
10 cases[i].Dir = reflect.SelectSend
11 }
12 for v := range ch {
13 v := v
14 for i := range cases {
15 cases[i].Chan = reflect.ValueOf(out[i])
16 cases[i].Send = reflect.ValueOf(v)
17 }
18 for _ = range cases { // for each channel
19 chosen, _, _ := reflect.Select(cases)
20 cases[chosen].Chan = reflect.ValueOf(nil)
21 }
22 }
23 }()
24}
版权申明:内容来源网络,版权归原创者所有。除非无法确认,我们都会标明作者及出处,如有侵权烦请告知,我们会立即删除并表示歉意。谢谢。