Go Channel 应用模式(二)

原文作者:大道至简

分布模式

分布模式将从输入channel中读取的值往输出channel中的其中一个发送。

Goroutine方式

roundrobin的方式选择输出channel。

 1 func fanOut(ch <-chan interface{}, out []chan interface{}) {
 2    go func() {
 3        defer func() {
 4            for i := 0; i < len(out); i++ {
 5                close(out[i])
 6            }
 7        }()
 8        // roundrobin
 9        var i = 0
10        var n = len(out)
11        for v := range ch {
12            v := v
13            out[i] <- v
14            i = (i + 1) % n
15        }
16    }()
17 }

Reflect方式

利用发射随机的选择。

 1 func fanOutReflect(ch <-chan interface{}, out []chan interface{}) {
 2    go func() {
 3        defer func() {
 4            for i := 0; i < len(out); i++ {
 5                close(out[i])
 6            }
 7        }()
 8        cases := make([]reflect.SelectCase, len(out))
 9        for i := range cases {
10            cases[i].Dir = reflect.SelectSend
11            cases[i].Chan = reflect.ValueOf(out[i])
12        }
13        for v := range ch {
14            v := v
15            for i := range cases {
16                cases[i].Send = reflect.ValueOf(v)
17            }
18            _, _, _ = reflect.Select(cases)
19        }
20    }()
21 }

eapache

eapache/channels提供了一些channel应用模式的方法,比如上面的扇入扇出模式等。

因为go本身的channel无法再进行扩展, eapache/channels库定义了自己的channel接口,并提供了与channel方便的转换。

eapache/channels 提供了四个方法:

  • Distribute: 从输入channel读取值,发送到其中一个输出channel中。当输入channel关闭后,输出channel都被关闭
  • Tee: 从输入channel读取值,发送到所有的输出channel中。当输入channel关闭后,输出channel都被关闭
  • Multiplex: 合并输入channel为一个输出channel, 当所有的输入都关闭后,输出才关闭
  • Pipe: 将两个channel串起来

同时对上面的四个函数还提供了WeakXXX的函数,输入关闭后不会关闭输出。

下面看看对应的函数的例子。

Distribute

 1 func testDist() {
 2    fmt.Println("dist:")
 3    a := channels.NewNativeChannel(channels.None)
 4    outputs := []channels.Channel{
 5        channels.NewNativeChannel(channels.None),
 6        channels.NewNativeChannel(channels.None),
 7        channels.NewNativeChannel(channels.None),
 8        channels.NewNativeChannel(channels.None),
 9    }
10    channels.Distribute(a, outputs[0], outputs[1], outputs[2], outputs[3])
11    //channels.WeakDistribute(a, outputs[0], outputs[1], outputs[2], outputs[3])
12    go func() {
13        for i := 0; i < 5; i++ {
14            a.In() <- i
15        }
16        a.Close()
17    }()
18    for i := 0; i < 6; i++ {
19        var v interface{}
20        var j int
21        select {
22        case v = <-outputs[0].Out():
23            j = 0
24        case v = <-outputs[1].Out():
25            j = 1
26        case v = <-outputs[2].Out():
27            j = 2
28        case v = <-outputs[3].Out():
29            j = 3
30        }
31        fmt.Printf("channel#%d: %d\n", j, v)
32    }
33 }

Tee

 1 func testTee() {
 2    fmt.Println("tee:")
 3    a := channels.NewNativeChannel(channels.None)
 4    outputs := []channels.Channel{
 5        channels.NewNativeChannel(channels.None),
 6        channels.NewNativeChannel(channels.None),
 7        channels.NewNativeChannel(channels.None),
 8        channels.NewNativeChannel(channels.None),
 9    }
10    channels.Tee(a, outputs[0], outputs[1], outputs[2], outputs[3])
11    //channels.WeakTee(a, outputs[0], outputs[1], outputs[2], outputs[3])
12    go func() {
13        for i := 0; i < 5; i++ {
14            a.In() <- i
15        }
16        a.Close()
17    }()
18    for i := 0; i < 20; i++ {
19        var v interface{}
20        var j int
21        select {
22        case v = <-outputs[0].Out():
23            j = 0
24        case v = <-outputs[1].Out():
25            j = 1
26        case v = <-outputs[2].Out():
27            j = 2
28        case v = <-outputs[3].Out():
29            j = 3
30        }
31        fmt.Printf("channel#%d: %d\n", j, v)
32    }
33 }

Multiplex

 1 func testMulti() {
 2    fmt.Println("multi:")
 3    a := channels.NewNativeChannel(channels.None)
 4    inputs := []channels.Channel{
 5        channels.NewNativeChannel(channels.None),
 6        channels.NewNativeChannel(channels.None),
 7        channels.NewNativeChannel(channels.None),
 8        channels.NewNativeChannel(channels.None),
 9    }
10    channels.Multiplex(a, inputs[0], inputs[1], inputs[2], inputs[3])
11    //channels.WeakMultiplex(a, inputs[0], inputs[1], inputs[2], inputs[3])
12    go func() {
13        for i := 0; i < 5; i++ {
14            for j := range inputs {
15                inputs[j].In() <- i
16            }
17        }
18        for i := range inputs {
19            inputs[i].Close()
20        }
21    }()
22    for v := range a.Out() {
23        fmt.Printf("%d ", v)
24    }
25 }

Pipe

 1 func testPipe() {
 2    fmt.Println("pipe:")
 3    a := channels.NewNativeChannel(channels.None)
 4    b := channels.NewNativeChannel(channels.None)
 5    channels.Pipe(a, b)
 6    // channels.WeakPipe(a, b)
 7    go func() {
 8        for i := 0; i < 5; i++ {
 9            a.In() <- i
10        }
11        a.Close()
12    }()
13    for v := range b.Out() {
14        fmt.Printf("%d ", v)
15    }
16 }

集合操作

从channel的行为来看,它看起来很像一个数据流,所以我们可以实现一些类似Scala 集合的操作。

Scala的集合类提供了丰富的操作(方法), 当然其它的一些编程语言或者框架也提供了类似的方法, 比如Apache Spark、Java Stream、ReactiveX等。

下面列出了一些方法的实现,我相信经过一些人的挖掘,相关的方法可以变成一个很好的类库,但是目前我们先看一些例子。

skip

skip函数是从一个channel中跳过开一些数据,然后才开始读取。

skipN

skipN跳过开始的N个数据。

 1 func skipN(done <-chan struct{}, valueStream <-chan interface{}, num int) <-chan interface{} {
 2    takeStream := make(chan interface{})
 3    go func() {
 4        defer close(takeStream)
 5        for i := 0; i < num; i++ {
 6            select {
 7            case <-done:
 8                return
 9            case takeStream <- <-valueStream:
10            }
11        }
12    }()
13    return takeStream
14 }

skipFn

skipFn 提供Fn函数为true的数据,比如跳过偶数。

 1 func skipFn(done <-chan struct{}, valueStream <-chan interface{}, fn func(interface{}) bool) <-chan interface{} {
 2    takeStream := make(chan interface{})
 3    go func() {
 4        defer close(takeStream)
 5        for {
 6            select {
 7            case <-done:
 8                return
 9            case v := <-valueStream:
10                if !fn(v) {
11                    takeStream <- v
12                }
13            }
14        }
15    }()
16    return takeStream
17 }

skipWhile

跳过开头函数fn为true的数据。

 1 func skipWhile(done <-chan struct{}, valueStream <-chan interface{}, fn func(interface{}) bool) <-chan interface{} {
 2    takeStream := make(chan interface{})
 3    go func() {
 4        defer close(takeStream)
 5        take := false
 6        for {
 7            select {
 8            case <-done:
 9                return
10            case v := <-valueStream:
11                if !take {
12                    take = !fn(v)
13                    if !take {
14                        continue
15                    }
16                }
17                takeStream <- v
18            }
19        }
20    }()
21    return takeStream
22 }

take

skip的反向操作,读取一部分数据。

takeN

takeN 读取开头N个数据。

 1 func takeN(done <-chan struct{}, valueStream <-chan interface{}, num int) <-chan interface{} {
 2    takeStream := make(chan interface{})
 3    go func() {
 4        defer close(takeStream)
 5        for i := 0; i < num; i++ {
 6            select {
 7            case <-done:
 8                return
 9            case takeStream <- <-valueStream:
10            }
11        }
12    }()
13    return takeStream
14 }

takeFn

takeFn 只筛选满足fn的数据。

 1 func takeFn(done <-chan struct{}, valueStream <-chan interface{}, fn func(interface{}) bool) <-chan interface{} {
 2    takeStream := make(chan interface{})
 3    go func() {
 4        defer close(takeStream)
 5        for {
 6            select {
 7            case <-done:
 8                return
 9            case v := <-valueStream:
10                if fn(v) {
11                    takeStream <- v
12                }
13            }
14        }
15    }()
16    return takeStream
17 }

takeWhile

takeWhile只挑选开头满足fn的数据。

 1 func takeWhile(done <-chan struct{}, valueStream <-chan interface{}, fn func(interface{}) bool) <-chan interface{} {
 2    takeStream := make(chan interface{})
 3    go func() {
 4        defer close(takeStream)
 5        for {
 6            select {
 7            case <-done:
 8                return
 9            case v := <-valueStream:
10                if !fn(v) {
11                    return
12                }
13                takeStream <- v
14            }
15        }
16    }()
17    return takeStream
18 }

flat

平展(flat)操作是一个有趣的操作。

如果输入是一个channel,channel中的数据还是相同类型的channel, 那么flat将返回一个输出channel,输出channel中的数据是输入的各个channel中的数据。

它与扇入不同,扇入的输入channel在调用的时候就是固定的,并且以数组的方式提供,而flat的输入是一个channel,可以运行时随时的加入channel。

 1 func orDone(done <-chan struct{}, c <-chan interface{}) <-chan interface{} {
 2    valStream := make(chan interface{})
 3    go func() {
 4        defer close(valStream)
 5        for {
 6            select {
 7            case <-done:
 8                return
 9            case v, ok := <-c:
10                if ok == false {
11                    return
12                }
13                select {
14                case valStream <- v:
15                case <-done:
16                }
17            }
18        }
19    }()
20    return valStream
21 }
22 func flat(done <-chan struct{}, chanStream <-chan <-chan interface{}) <-chan interface{} {
23    valStream := make(chan interface{})
24    go func() {
25        defer close(valStream)
26        for {
27            var stream <-chan interface{}
28            select {
29            case maybeStream, ok := <-chanStream:
30                if ok == false {
31                    return
32                }
33                stream = maybeStream
34            case <-done:
35                return
36            }
37            for val := range orDone(done, stream) {
38                select {
39                case valStream <- val:
40                case <-done:
41                }
42            }
43        }
44    }()
45    return valStream
46 }

map

map和reduce是一组常用的操作。

map将一个channel映射成另外一个channel, channel的类型可以不同。

 1 func mapChan(in <-chan interface{}, fn func(interface{}) interface{}) <-chan interface{} {
 2    out := make(chan interface{})
 3    if in == nil {
 4        close(out)
 5        return out
 6    }
 7    go func() {
 8        defer close(out)
 9        for v := range in {
10            out <- fn(v)
11        }
12    }()
13    return out
14 }

因为map是go的关键字,所以我们不能命名函数类型为map,这里用mapChan代替。

比如你可以处理一个公司员工工资的channel, 输出一个扣税之后的员工工资的channel。

reduce

 1 func reduce(in <-chan interface{}, fn func(r, v interface{}) interface{}) interface{} {
 2    if in == nil {
 3        return nil
 4    }
 5    out := <-in
 6    for v := range in {
 7        out = fn(out, v)
 8    }
 9    return out
10 }
11 你可以用`reduce`实现`sum`、`max`、`min`等聚合操作。

总结

本文列出了channel的一些深入应用的模式,相信通过阅读本文,你可以更加深入的了解Go的channel类型,并在开发中灵活的应用channel。也欢迎你在评论中提出更多的 channel的应用模式。

所有的代码可以在github上找到: smallnest/channels。

参考资料

  1. https://github.com/kat-co/concurrency-in-go-src
  2. https://github.com/campoy/justforfunc/tree/master/27-merging-chans
  3. https://github.com/eapache/channels
  4. https://github.com/LK4D4/trylock
  5. https://stackoverflow.com/questions/36391421/explain-dont-communicate-by-sharing-memory-share-memory-by-communicating
  6. https://github.com/lrita/gosync
  7. https://www.ardanlabs.com/blog/2017/10/the-behavior-of-channels.html

版权申明:内容来源网络,版权归原创者所有。除非无法确认,我们都会标明作者及出处,如有侵权烦请告知,我们会立即删除并表示歉意。谢谢。

原文发布于微信公众号 - Golang语言社区(Golangweb)

原文发表时间:2018-08-02

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏大内老A

ASP.NET MVC涉及到的5个同步与异步,你是否傻傻分不清楚?[下篇]

关于ASP.NET MVC对请求的处理方式(同步或者异步)涉及到的五个组件,在《上篇》中我们谈了三个(MvcHandler、Controller和ActionI...

21260
来自专栏编程心路

想学习php的,不如来这里看看

win+R打开命令行,cmd进DOS窗口 DOS命令开启关闭Apache和Mysql Apache启动关闭命令

13130
来自专栏程序员互动联盟

【编程基础】C++ Primer快速入门三:两种控制语句

语句总是顺序执行的:第一条语句执行完了接着是第二条,第三条等等。这是最简单的情况,为了更好的控制语句的运行,程序设计语言提供了多种控制结构支持更为复杂的语句执行...

34590
来自专栏性能与架构

Javascript ES6版本的4个基础用法

JS 的 ES6版本已经被各大浏览器广泛支持,很多前端框架也已经使用 ES6,并且还有 Babel 可以做兼容处理,所以ES6已经进入了应用阶段 如果您对 ES...

34670
来自专栏漫漫前端路

巧用 TypeScript (一)

TypeScript 提供函数重载的功能,用来处理因函数参数不同而返回类型不同的使用场景,使用时,只需为同一个函数定义多个类型即可,简单使用如下所示:

30420
来自专栏程序员的SOD蜜

C#中?与??的区别

起初我也不知道C#中有??操作符,今天张鹏在查看我的MVC示例程序的时候问了这个问题,检查代码后发现,下面的代码是VS2010在生成MVC应用程序自己添加的: ...

24270
来自专栏我是攻城师

Java里面volatile关键字修饰引用变量的陷阱

如果我现在问你volatile的关键字的作用,你可能会回答对于一个线程修改的变量对其他的线程立即可见。这种说法没多大问题,但是不够严谨。

7920
来自专栏狮乐园

高级 Angular 组件模式 (5)

在之前的例子中,已经出现多次使用template reference variable(模板引用变量)的场景,现在让我们来深入研究如何通过使用模板引用变量来关联...

9420
来自专栏算法修养

pta 习题集 5-17九宫格输入法

假设有九宫格输入法键盘布局如下: [ 1,.?! ] [ 2ABC ] [ 3DEF ] [ 4GHI ] [ 5JKL ] [ 6MNO ] [...

38070
来自专栏Golang语言社区

Golang中container/list包中的坑

但是list包中大部分对于e *Element进行操作的元素都可能会导致程序崩溃,其根本原因是e是一个Element类型的指针,当然其也可能为nil,但是gol...

391140

扫码关注云+社区

领取腾讯云代金券