前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Go Channel 应用模式(二)

Go Channel 应用模式(二)

作者头像
李海彬
发布2018-08-16 15:27:52
6410
发布2018-08-16 15:27:52
举报
文章被收录于专栏:Golang语言社区Golang语言社区

原文作者:大道至简

分布模式

分布模式将从输入channel中读取的值往输出channel中的其中一个发送。

Goroutine方式

roundrobin的方式选择输出channel。

代码语言:javascript
复制
 1 func fanOut(ch <-chan interface{}, out []chan interface{}) {
 2    go func() {
 3        defer func() {
 4            for i := 0; i < len(out); i++ {
 5                close(out[i])
 6            }
 7        }()
 8        // roundrobin
 9        var i = 0
10        var n = len(out)
11        for v := range ch {
12            v := v
13            out[i] <- v
14            i = (i + 1) % n
15        }
16    }()
17 }

Reflect方式

利用发射随机的选择。

代码语言:javascript
复制
 1 func fanOutReflect(ch <-chan interface{}, out []chan interface{}) {
 2    go func() {
 3        defer func() {
 4            for i := 0; i < len(out); i++ {
 5                close(out[i])
 6            }
 7        }()
 8        cases := make([]reflect.SelectCase, len(out))
 9        for i := range cases {
10            cases[i].Dir = reflect.SelectSend
11            cases[i].Chan = reflect.ValueOf(out[i])
12        }
13        for v := range ch {
14            v := v
15            for i := range cases {
16                cases[i].Send = reflect.ValueOf(v)
17            }
18            _, _, _ = reflect.Select(cases)
19        }
20    }()
21 }

eapache

eapache/channels提供了一些channel应用模式的方法,比如上面的扇入扇出模式等。

因为go本身的channel无法再进行扩展, eapache/channels库定义了自己的channel接口,并提供了与channel方便的转换。

eapache/channels 提供了四个方法:

  • Distribute: 从输入channel读取值,发送到其中一个输出channel中。当输入channel关闭后,输出channel都被关闭
  • Tee: 从输入channel读取值,发送到所有的输出channel中。当输入channel关闭后,输出channel都被关闭
  • Multiplex: 合并输入channel为一个输出channel, 当所有的输入都关闭后,输出才关闭
  • Pipe: 将两个channel串起来

同时对上面的四个函数还提供了WeakXXX的函数,输入关闭后不会关闭输出。

下面看看对应的函数的例子。

Distribute

代码语言:javascript
复制
 1 func testDist() {
 2    fmt.Println("dist:")
 3    a := channels.NewNativeChannel(channels.None)
 4    outputs := []channels.Channel{
 5        channels.NewNativeChannel(channels.None),
 6        channels.NewNativeChannel(channels.None),
 7        channels.NewNativeChannel(channels.None),
 8        channels.NewNativeChannel(channels.None),
 9    }
10    channels.Distribute(a, outputs[0], outputs[1], outputs[2], outputs[3])
11    //channels.WeakDistribute(a, outputs[0], outputs[1], outputs[2], outputs[3])
12    go func() {
13        for i := 0; i < 5; i++ {
14            a.In() <- i
15        }
16        a.Close()
17    }()
18    for i := 0; i < 6; i++ {
19        var v interface{}
20        var j int
21        select {
22        case v = <-outputs[0].Out():
23            j = 0
24        case v = <-outputs[1].Out():
25            j = 1
26        case v = <-outputs[2].Out():
27            j = 2
28        case v = <-outputs[3].Out():
29            j = 3
30        }
31        fmt.Printf("channel#%d: %d\n", j, v)
32    }
33 }

Tee

代码语言:javascript
复制
 1 func testTee() {
 2    fmt.Println("tee:")
 3    a := channels.NewNativeChannel(channels.None)
 4    outputs := []channels.Channel{
 5        channels.NewNativeChannel(channels.None),
 6        channels.NewNativeChannel(channels.None),
 7        channels.NewNativeChannel(channels.None),
 8        channels.NewNativeChannel(channels.None),
 9    }
10    channels.Tee(a, outputs[0], outputs[1], outputs[2], outputs[3])
11    //channels.WeakTee(a, outputs[0], outputs[1], outputs[2], outputs[3])
12    go func() {
13        for i := 0; i < 5; i++ {
14            a.In() <- i
15        }
16        a.Close()
17    }()
18    for i := 0; i < 20; i++ {
19        var v interface{}
20        var j int
21        select {
22        case v = <-outputs[0].Out():
23            j = 0
24        case v = <-outputs[1].Out():
25            j = 1
26        case v = <-outputs[2].Out():
27            j = 2
28        case v = <-outputs[3].Out():
29            j = 3
30        }
31        fmt.Printf("channel#%d: %d\n", j, v)
32    }
33 }

Multiplex

代码语言:javascript
复制
 1 func testMulti() {
 2    fmt.Println("multi:")
 3    a := channels.NewNativeChannel(channels.None)
 4    inputs := []channels.Channel{
 5        channels.NewNativeChannel(channels.None),
 6        channels.NewNativeChannel(channels.None),
 7        channels.NewNativeChannel(channels.None),
 8        channels.NewNativeChannel(channels.None),
 9    }
10    channels.Multiplex(a, inputs[0], inputs[1], inputs[2], inputs[3])
11    //channels.WeakMultiplex(a, inputs[0], inputs[1], inputs[2], inputs[3])
12    go func() {
13        for i := 0; i < 5; i++ {
14            for j := range inputs {
15                inputs[j].In() <- i
16            }
17        }
18        for i := range inputs {
19            inputs[i].Close()
20        }
21    }()
22    for v := range a.Out() {
23        fmt.Printf("%d ", v)
24    }
25 }

Pipe

代码语言:javascript
复制
 1 func testPipe() {
 2    fmt.Println("pipe:")
 3    a := channels.NewNativeChannel(channels.None)
 4    b := channels.NewNativeChannel(channels.None)
 5    channels.Pipe(a, b)
 6    // channels.WeakPipe(a, b)
 7    go func() {
 8        for i := 0; i < 5; i++ {
 9            a.In() <- i
10        }
11        a.Close()
12    }()
13    for v := range b.Out() {
14        fmt.Printf("%d ", v)
15    }
16 }

集合操作

从channel的行为来看,它看起来很像一个数据流,所以我们可以实现一些类似Scala 集合的操作。

Scala的集合类提供了丰富的操作(方法), 当然其它的一些编程语言或者框架也提供了类似的方法, 比如Apache Spark、Java Stream、ReactiveX等。

下面列出了一些方法的实现,我相信经过一些人的挖掘,相关的方法可以变成一个很好的类库,但是目前我们先看一些例子。

skip

skip函数是从一个channel中跳过开一些数据,然后才开始读取。

skipN

skipN跳过开始的N个数据。

代码语言:javascript
复制
 1 func skipN(done <-chan struct{}, valueStream <-chan interface{}, num int) <-chan interface{} {
 2    takeStream := make(chan interface{})
 3    go func() {
 4        defer close(takeStream)
 5        for i := 0; i < num; i++ {
 6            select {
 7            case <-done:
 8                return
 9            case takeStream <- <-valueStream:
10            }
11        }
12    }()
13    return takeStream
14 }
skipFn

skipFn 提供Fn函数为true的数据,比如跳过偶数。

代码语言:javascript
复制
 1 func skipFn(done <-chan struct{}, valueStream <-chan interface{}, fn func(interface{}) bool) <-chan interface{} {
 2    takeStream := make(chan interface{})
 3    go func() {
 4        defer close(takeStream)
 5        for {
 6            select {
 7            case <-done:
 8                return
 9            case v := <-valueStream:
10                if !fn(v) {
11                    takeStream <- v
12                }
13            }
14        }
15    }()
16    return takeStream
17 }
skipWhile

跳过开头函数fn为true的数据。

代码语言:javascript
复制
 1 func skipWhile(done <-chan struct{}, valueStream <-chan interface{}, fn func(interface{}) bool) <-chan interface{} {
 2    takeStream := make(chan interface{})
 3    go func() {
 4        defer close(takeStream)
 5        take := false
 6        for {
 7            select {
 8            case <-done:
 9                return
10            case v := <-valueStream:
11                if !take {
12                    take = !fn(v)
13                    if !take {
14                        continue
15                    }
16                }
17                takeStream <- v
18            }
19        }
20    }()
21    return takeStream
22 }

take

skip的反向操作,读取一部分数据。

takeN

takeN 读取开头N个数据。

代码语言:javascript
复制
 1 func takeN(done <-chan struct{}, valueStream <-chan interface{}, num int) <-chan interface{} {
 2    takeStream := make(chan interface{})
 3    go func() {
 4        defer close(takeStream)
 5        for i := 0; i < num; i++ {
 6            select {
 7            case <-done:
 8                return
 9            case takeStream <- <-valueStream:
10            }
11        }
12    }()
13    return takeStream
14 }
takeFn

takeFn 只筛选满足fn的数据。

代码语言:javascript
复制
 1 func takeFn(done <-chan struct{}, valueStream <-chan interface{}, fn func(interface{}) bool) <-chan interface{} {
 2    takeStream := make(chan interface{})
 3    go func() {
 4        defer close(takeStream)
 5        for {
 6            select {
 7            case <-done:
 8                return
 9            case v := <-valueStream:
10                if fn(v) {
11                    takeStream <- v
12                }
13            }
14        }
15    }()
16    return takeStream
17 }
takeWhile

takeWhile只挑选开头满足fn的数据。

代码语言:javascript
复制
 1 func takeWhile(done <-chan struct{}, valueStream <-chan interface{}, fn func(interface{}) bool) <-chan interface{} {
 2    takeStream := make(chan interface{})
 3    go func() {
 4        defer close(takeStream)
 5        for {
 6            select {
 7            case <-done:
 8                return
 9            case v := <-valueStream:
10                if !fn(v) {
11                    return
12                }
13                takeStream <- v
14            }
15        }
16    }()
17    return takeStream
18 }

flat

平展(flat)操作是一个有趣的操作。

如果输入是一个channel,channel中的数据还是相同类型的channel, 那么flat将返回一个输出channel,输出channel中的数据是输入的各个channel中的数据。

它与扇入不同,扇入的输入channel在调用的时候就是固定的,并且以数组的方式提供,而flat的输入是一个channel,可以运行时随时的加入channel。

代码语言:javascript
复制
 1 func orDone(done <-chan struct{}, c <-chan interface{}) <-chan interface{} {
 2    valStream := make(chan interface{})
 3    go func() {
 4        defer close(valStream)
 5        for {
 6            select {
 7            case <-done:
 8                return
 9            case v, ok := <-c:
10                if ok == false {
11                    return
12                }
13                select {
14                case valStream <- v:
15                case <-done:
16                }
17            }
18        }
19    }()
20    return valStream
21 }
22 func flat(done <-chan struct{}, chanStream <-chan <-chan interface{}) <-chan interface{} {
23    valStream := make(chan interface{})
24    go func() {
25        defer close(valStream)
26        for {
27            var stream <-chan interface{}
28            select {
29            case maybeStream, ok := <-chanStream:
30                if ok == false {
31                    return
32                }
33                stream = maybeStream
34            case <-done:
35                return
36            }
37            for val := range orDone(done, stream) {
38                select {
39                case valStream <- val:
40                case <-done:
41                }
42            }
43        }
44    }()
45    return valStream
46 }

map

map和reduce是一组常用的操作。

map将一个channel映射成另外一个channel, channel的类型可以不同。

代码语言:javascript
复制
 1 func mapChan(in <-chan interface{}, fn func(interface{}) interface{}) <-chan interface{} {
 2    out := make(chan interface{})
 3    if in == nil {
 4        close(out)
 5        return out
 6    }
 7    go func() {
 8        defer close(out)
 9        for v := range in {
10            out <- fn(v)
11        }
12    }()
13    return out
14 }

因为map是go的关键字,所以我们不能命名函数类型为map,这里用mapChan代替。

比如你可以处理一个公司员工工资的channel, 输出一个扣税之后的员工工资的channel。

reduce

代码语言:javascript
复制
 1 func reduce(in <-chan interface{}, fn func(r, v interface{}) interface{}) interface{} {
 2    if in == nil {
 3        return nil
 4    }
 5    out := <-in
 6    for v := range in {
 7        out = fn(out, v)
 8    }
 9    return out
10 }
11 你可以用`reduce`实现`sum`、`max`、`min`等聚合操作。

总结

本文列出了channel的一些深入应用的模式,相信通过阅读本文,你可以更加深入的了解Go的channel类型,并在开发中灵活的应用channel。也欢迎你在评论中提出更多的 channel的应用模式。

所有的代码可以在github上找到: smallnest/channels。

参考资料

  1. https://github.com/kat-co/concurrency-in-go-src
  2. https://github.com/campoy/justforfunc/tree/master/27-merging-chans
  3. https://github.com/eapache/channels
  4. https://github.com/LK4D4/trylock
  5. https://stackoverflow.com/questions/36391421/explain-dont-communicate-by-sharing-memory-share-memory-by-communicating
  6. https://github.com/lrita/gosync
  7. https://www.ardanlabs.com/blog/2017/10/the-behavior-of-channels.html

版权申明:内容来源网络,版权归原创者所有。除非无法确认,我们都会标明作者及出处,如有侵权烦请告知,我们会立即删除并表示歉意。谢谢。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2018-08-01,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Golang语言社区 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 分布模式
    • Goroutine方式
      • Reflect方式
      • eapache
        • Distribute
          • Tee
            • Multiplex
              • Pipe
              • 集合操作
                • skip
                  • skipN
                  • skipFn
                  • skipWhile
                • take
                  • takeN
                  • takeFn
                  • takeWhile
                • flat
                  • map
                    • reduce
                    • 总结
                    • 参考资料
                    领券
                    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档