Go Channel 应用模式(一)

原文作者:大道至简

Channel是Go中的一种类型,和goroutine一起为Go提供了并发技术, 它在开发中得到了广泛的应用。Go鼓励人们通过Channel在goroutine之间传递数据的引用(就像把数据的owner从一个goroutine传递给另外一个goroutine), Effective Go总结了这么一句话:

Do not communicate by sharing memory; instead, share memory by communicating.

在 Go内存模型指出了channel作为并发控制的一个特性:

A send on a channel happens before the corresponding receive from that channel completes. (Golang Spec)

除了正常的在goroutine之间安全地传递共享数据, Channel还可以玩出很多的花样(模式), 本文列举了一些channel的应用模式。

促成本文诞生的因素主要包括:

  1. eapache的channels库
  2. concurrency in go 这本书
  3. Francesc Campoy的 justforfun系列中关于merge channel的实现
  4. 我在出版Scala集合手册这本书中对Scala集合的启发

下面就让我们以实例的方式看看这么模式吧。

Lock/TryLock 模式

我们知道, Go的标准库syncMutex,可以用来作为锁,但是Mutex却没有实现TryLock方法。

我们对于TryLock的定义是当前goroutine尝试获得锁, 如果成功,则获得了锁,返回true, 否则返回false。我们可以使用这个方法避免在获取锁的时候当前goroutine被阻塞住。

本来,这是一个常用的功能,在一些其它编程语言中都有实现,为什么Go中没有实现的?issue#6123有详细的讨论,在我看来,Go核心组成员本身对这个特性没有积极性,并且认为通过channel可以实现相同的方式。

Hacked Lock/TryLock 模式

其实,对于标准库的sync.Mutex要增加这个功能很简单,下面的方式就是通过hack的方式为Mutex实现了TryLock的功能。

 1const mutexLocked = 1 << iota
 2type Mutex struct {
 3    mu sync.Mutex
 4}
 5func (m *Mutex) Lock() {
 6    m.mu.Lock()
 7}
 8func (m *Mutex) Unlock() {
 9    m.mu.Unlock()
10}
11func (m *Mutex) TryLock() bool {
12    return atomic.CompareAndSwapInt32((*int32)(unsafe.Pointer(&m.mu)), 0, mutexLocked)
13}
14func (m *Mutex) IsLocked() bool {
15    return atomic.LoadInt32((*int32)(unsafe.Pointer(&m.mu))) == mutexLocked
16}

如果你看一下Mutex实现的源代码,就很容易理解上面的这段代码了,因为mutex实现锁主要利用CAS对它的一个int32字段做操作。

上面的代码还额外增加了一个IsLocked方法,不过这个方法一般不常用,因为查询和加锁这两个方法执行的时候不是一个原子的操作,素以这个方法一般在调试和打日志的时候可能有用。

TryLock By Channel

既然标准库中不准备在Mutex上增加这个方法,而是推荐使用channel来实现,那么就让我们看看如何使用 channel来实现。

 1type Mutex struct {
 2    ch chan struct{}
 3}
 4func NewMutex() *Mutex {
 5    mu := &Mutex{make(chan struct{}, 1)}
 6    mu.ch <- struct{}{}
 7    return mu
 8}
 9func (m *Mutex) Lock() {
10    <-m.ch
11}
12func (m *Mutex) Unlock() {
13    select {
14    case m.ch <- struct{}{}:
15    default:
16        panic("unlock of unlocked mutex")
17    }
18}
19func (m *Mutex) TryLock() bool {
20    select {
21    case <-m.ch:
22        return true
23    default:
24    }
25    return false
26}
27func (m *Mutex) IsLocked() bool {
28    return len(m.ch) > 0
29}

主要是利用channel边界情况下的阻塞特性实现的。

你还可以将缓存的大小从1改为n,用来处理n个锁(资源)。

TryLock with Timeout

有时候,我们在获取一把锁的时候,由于有竞争的关系,在锁被别的goroutine拥有的时候,当前goroutine没有办法立即获得锁,只能阻塞等待。标准库并没有提供等待超时的功能,我们尝试实现它。

 1type Mutex struct {
 2    ch chan struct{}
 3}
 4func NewMutex() *Mutex {
 5    mu := &Mutex{make(chan struct{}, 1)}
 6    mu.ch <- struct{}{}
 7    return mu
 8}
 9func (m *Mutex) Lock() {
10    <-m.ch
11}
12func (m *Mutex) Unlock() {
13    select {
14    case m.ch <- struct{}{}:
15    default:
16        panic("unlock of unlocked mutex")
17    }
18}
19func (m *Mutex) TryLock(timeout time.Duration) bool {
20    timer := time.NewTimer(timeout)
21    select {
22    case <-m.ch:
23        timer.Stop()
24        return true
25    case <-time.After(timeout):
26    }
27    return false
28}
29func (m *Mutex) IsLocked() bool {
30    return len(m.ch) > 0
31}

你也可以把它用Context来改造,不是利用超时,而是利用Context来取消/超时获得锁的操作,这个作业留给读者来实现。

Or Channel 模式

当你等待多个信号的时候,如果收到任意一个信号, 就执行业务逻辑,忽略其它的还未收到的信号。

举个例子, 我们往提供相同服务的n个节点发送请求,只要任意一个服务节点返回结果,我们就可以执行下面的业务逻辑,其它n-1的节点的请求可以被取消或者忽略。当n=2的时候,这就是back request模式。 这样可以用资源来换取latency的提升。

需要注意的是,当收到任意一个信号的时候,其它信号都被忽略。如果用channel来实现,只要从任意一个channel中接收到一个数据,那么所有的channel都可以被关闭了(依照你的实现,但是输出的channel肯定会被关闭)。

有三种实现的方式: goroutine、reflect和递归。

Goroutine方式

 1func or(chans ...<-chan interface{}) <-chan interface{} {
 2    out := make(chan interface{})
 3    go func() {
 4        var once sync.Once
 5        for _, c := range chans {
 6            go func(c <-chan interface{}) {
 7                select {
 8                case <-c:
 9                    once.Do(func() { close(out) })
10                case <-out:
11                }
12            }(c)
13        }
14    }()
15    return out
16}

or函数可以处理n个channel,它为每个channel启动一个goroutine,只要任意一个goroutine从channel读取到数据,输出的channel就被关闭掉了。

为了避免并发关闭输出channel的问题,关闭操作只执行一次。

Reflect方式

Go的反射库针对select语句有专门的数据(reflect.SelectCase)和函数(reflect.Select)处理。 所以我们可以利用反射“随机”地从一组可选的channel中接收数据,并关闭输出channel。

这种方式看起来更简洁。

 1func or(channels ...<-chan interface{}) <-chan interface{} {
 2    switch len(channels) {
 3    case 0:
 4        return nil
 5    case 1:
 6        return channels[0]
 7    }
 8    orDone := make(chan interface{})
 9    go func() {
10        defer close(orDone)
11        var cases []reflect.SelectCase
12        for _, c := range channels {
13            cases = append(cases, reflect.SelectCase{
14                Dir:  reflect.SelectRecv,
15                Chan: reflect.ValueOf(c),
16            })
17        }
18        reflect.Select(cases)
19    }()
20    return orDone
21}

递归方式

递归方式一向是比较开脑洞的实现,下面的方式就是分而治之的方式,逐步合并channel,最终返回一个channel。

 1func or(channels ...<-chan interface{}) <-chan interface{} {
 2    switch len(channels) {
 3    case 0:
 4        return nil
 5    case 1:
 6        return channels[0]
 7    }
 8    orDone := make(chan interface{})
 9    go func() {
10        defer close(orDone)
11        switch len(channels) {
12        case 2:
13            select {
14            case <-channels[0]:
15            case <-channels[1]:
16            }
17        default:
18            m := len(channels) / 2
19            select {
20            case <-or(channels[:m]...):
21            case <-or(channels[m:]...):
22            }
23        }
24    }()
25    return orDone
26}

在后面的扇入(合并)模式中,我们还是会使用相同样的递归模式来合并多个输入channel,根据 justforfun 的测试结果,这种递归的方式要比goroutine、Reflect更有效。

Or-Done-Channel模式

这种模式是我们经常使用的一种模式,通过一个信号channel(done)来控制(取消)输入channel的处理。

一旦从done channel中读取到一个信号,或者done channel被关闭, 输入channel的处理则被取消。

这个模式提供一个简便的方法,把done channel 和 输入 channel 融合成一个输出channel。

 1func orDone(done <-chan struct{}, c <-chan interface{}) <-chan interface{} {
 2    valStream := make(chan interface{})
 3    go func() {
 4        defer close(valStream)
 5        for {
 6            select {
 7            case <-done:
 8                return
 9            case v, ok := <-c:
10                if ok == false {
11                    return
12                }
13                select {
14                case valStream <- v:
15                case <-done:
16                }
17            }
18        }
19    }()
20    return valStream
21}

扇入模式

扇入模式(FanIn)是将多个同样类型的输入channel合并成一个同样类型的输出channel,也就是channel的合并。

Goroutine方式

每个channel起一个goroutine。

 1func fanIn(chans ...<-chan interface{}) <-chan interface{} {
 2    out := make(chan interface{})
 3    go func() {
 4        var wg sync.WaitGroup
 5        wg.Add(len(chans))
 6        for _, c := range chans {
 7            go func(c <-chan interface{}) {
 8                for v := range c {
 9                    out <- v
10                }
11                wg.Done()
12            }(c)
13        }
14        wg.Wait()
15        close(out)
16    }()
17    return out
18}

Reflect

利用反射库针对select语句的处理合并输入channel。

下面这种实现方式其实还是有些问题的, 在输入channel读取比较均匀的时候比较有效,否则性能比较低下。

 1func fanInReflect(chans ...<-chan interface{}) <-chan interface{} {
 2    out := make(chan interface{})
 3    go func() {
 4        defer close(out)
 5        var cases []reflect.SelectCase
 6        for _, c := range chans {
 7            cases = append(cases, reflect.SelectCase{
 8                Dir:  reflect.SelectRecv,
 9                Chan: reflect.ValueOf(c),
10            })
11        }
12        for len(cases) > 0 {
13            i, v, ok := reflect.Select(cases)
14            if !ok { //remove this case
15                cases = append(cases[:i], cases[i+1:]...)
16                continue
17            }
18            out <- v.Interface()
19        }
20    }()
21    return out
22}

递归方式

这种方式虽然理解起来不直观,但是性能还是不错的(输入channel不是很多的情况下递归层级不会很高,不会成为瓶颈)

 1func fanInRec(chans ...<-chan interface{}) <-chan interface{} {
 2    switch len(chans) {
 3    case 0:
 4        c := make(chan interface{})
 5        close(c)
 6        return c
 7    case 1:
 8        return chans[0]
 9    case 2:
10        return mergeTwo(chans[0], chans[1])
11    default:
12        m := len(chans) / 2
13        return mergeTwo(
14            fanInRec(chans[:m]...),
15            fanInRec(chans[m:]...))
16    }
17}
18func mergeTwo(a, b <-chan interface{}) <-chan interface{} {
19    c := make(chan interface{})
20    go func() {
21        defer close(c)
22        for a != nil || b != nil {
23            select {
24            case v, ok := <-a:
25                if !ok {
26                    a = nil
27                    continue
28                }
29                c <- v
30            case v, ok := <-b:
31                if !ok {
32                    b = nil
33                    continue
34                }
35                c <- v
36            }
37        }
38    }()
39    return c
40}

Tee模式

扇出模式(FanOut)是将一个输入channel扇出为多个channel。

扇出行为至少可以分为两种:

  1. 从输入channel中读取一个数据,发送给每个输入channel,这种模式称之为Tee模式
  2. 从输入channel中读取一个数据,在输出channel中选择一个channel发送

本节只介绍第一种情况,下一节介绍第二种情况

Goroutine方式

将读取的值发送给每个输出channel, 异步模式可能会产生很多的goroutine。

 1func fanOut(ch <-chan interface{}, out []chan interface{}, async bool) {
 2    go func() {
 3        defer func() {
 4            for i := 0; i < len(out); i++ {
 5                close(out[i])
 6            }
 7        }()
 8        for v := range ch {
 9            v := v
10            for i := 0; i < len(out); i++ {
11                i := i
12                if async {
13                    go func() {
14                        out[i] <- v
15                    }()
16                } else {
17                    out[i] <- v
18                }
19            }
20        }
21    }()
22}

Reflect方式

这种模式一旦一个输出channel被阻塞,可能会导致后续的处理延迟。

 1func fanOutReflect(ch <-chan interface{}, out []chan interface{}) {
 2    go func() {
 3        defer func() {
 4            for i := 0; i < len(out); i++ {
 5                close(out[i])
 6            }
 7        }()
 8        cases := make([]reflect.SelectCase, len(out))
 9        for i := range cases {
10            cases[i].Dir = reflect.SelectSend
11        }
12        for v := range ch {
13            v := v
14            for i := range cases {
15                cases[i].Chan = reflect.ValueOf(out[i])
16                cases[i].Send = reflect.ValueOf(v)
17            }
18            for _ = range cases { // for each channel
19                chosen, _, _ := reflect.Select(cases)
20                cases[chosen].Chan = reflect.ValueOf(nil)
21            }
22        }
23    }()
24}

版权申明:内容来源网络,版权归原创者所有。除非无法确认,我们都会标明作者及出处,如有侵权烦请告知,我们会立即删除并表示歉意。谢谢。

原文发布于微信公众号 - Golang语言社区(Golangweb)

原文发表时间:2018-07-31

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏阿杜的世界

Java Web技术经验总结(十三)

1132
来自专栏Kevin-ZhangCG

[ Java面试题 ]持久层篇

2976
来自专栏安恒网络空间安全讲武堂

堆利用之double-free

3294
来自专栏QQ音乐技术团队的专栏

打通Android Gradle编译过程的任督二脉

本文主要是基于自己在工作当中的一些Android Gradle实践经验,对gradle相关知识做的一个简单总结和分享,希望对大家有帮助。 首先会讲Gradle大...

1.7K9
来自专栏公有云大数据平台弹性 MapReduce

yarn UI中appliaction展示个数分析

客户在使用我们的EMR产品时一天大概提交2000个appliaction,但是yarn的UI界面仅仅展示出了100多个历史application信息,影响了客户...

5025
来自专栏JAVA高级架构开发

成为Java顶尖程序员,先过了下面问题!

ArrayList和LinkedList内部的实现大致是怎样的?他们之间的区别和优缺点?

2770
来自专栏Java开发者杂谈

Spring @Transactional踩坑记

@Transactional踩坑记 总述 ​ Spring在1.2引入@Transactional注解, 该注解的引入使得我们可以简单地通过在方法或者类上添加@...

8557
来自专栏王磊的博客

Java核心(三)并发中的线程同步与锁

乐观锁、悲观锁、公平锁、自旋锁、偏向锁、轻量级锁、重量级锁、锁膨胀...难理解?不存的!来,话不多说,带你飙车。

1332
来自专栏Java学习123

Python3.4+Django1.7+SQLite3实现增删改查

3785
来自专栏铭毅天下

干货 | 吃透Elasticsearch 堆内存

1、什么是堆内存? Java 中的堆是 JVM 所管理的最大的一块内存空间,主要用于存放各种类的实例对象。 在 Java 中,堆被划分成两个不同的区域: 新生代...

5024

扫码关注云+社区

领取腾讯云代金券