原文作者:大道至简
分布模式将从输入channel中读取的值往输出channel中的其中一个发送。
roundrobin的方式选择输出channel。
1 func fanOut(ch <-chan interface{}, out []chan interface{}) {
2 go func() {
3 defer func() {
4 for i := 0; i < len(out); i++ {
5 close(out[i])
6 }
7 }()
8 // roundrobin
9 var i = 0
10 var n = len(out)
11 for v := range ch {
12 v := v
13 out[i] <- v
14 i = (i + 1) % n
15 }
16 }()
17 }
利用发射随机的选择。
1 func fanOutReflect(ch <-chan interface{}, out []chan interface{}) {
2 go func() {
3 defer func() {
4 for i := 0; i < len(out); i++ {
5 close(out[i])
6 }
7 }()
8 cases := make([]reflect.SelectCase, len(out))
9 for i := range cases {
10 cases[i].Dir = reflect.SelectSend
11 cases[i].Chan = reflect.ValueOf(out[i])
12 }
13 for v := range ch {
14 v := v
15 for i := range cases {
16 cases[i].Send = reflect.ValueOf(v)
17 }
18 _, _, _ = reflect.Select(cases)
19 }
20 }()
21 }
eapache/channels提供了一些channel应用模式的方法,比如上面的扇入扇出模式等。
因为go本身的channel无法再进行扩展, eapache/channels
库定义了自己的channel接口,并提供了与channel方便的转换。
eapache/channels
提供了四个方法:
同时对上面的四个函数还提供了WeakXXX
的函数,输入关闭后不会关闭输出。
下面看看对应的函数的例子。
1 func testDist() {
2 fmt.Println("dist:")
3 a := channels.NewNativeChannel(channels.None)
4 outputs := []channels.Channel{
5 channels.NewNativeChannel(channels.None),
6 channels.NewNativeChannel(channels.None),
7 channels.NewNativeChannel(channels.None),
8 channels.NewNativeChannel(channels.None),
9 }
10 channels.Distribute(a, outputs[0], outputs[1], outputs[2], outputs[3])
11 //channels.WeakDistribute(a, outputs[0], outputs[1], outputs[2], outputs[3])
12 go func() {
13 for i := 0; i < 5; i++ {
14 a.In() <- i
15 }
16 a.Close()
17 }()
18 for i := 0; i < 6; i++ {
19 var v interface{}
20 var j int
21 select {
22 case v = <-outputs[0].Out():
23 j = 0
24 case v = <-outputs[1].Out():
25 j = 1
26 case v = <-outputs[2].Out():
27 j = 2
28 case v = <-outputs[3].Out():
29 j = 3
30 }
31 fmt.Printf("channel#%d: %d\n", j, v)
32 }
33 }
1 func testTee() {
2 fmt.Println("tee:")
3 a := channels.NewNativeChannel(channels.None)
4 outputs := []channels.Channel{
5 channels.NewNativeChannel(channels.None),
6 channels.NewNativeChannel(channels.None),
7 channels.NewNativeChannel(channels.None),
8 channels.NewNativeChannel(channels.None),
9 }
10 channels.Tee(a, outputs[0], outputs[1], outputs[2], outputs[3])
11 //channels.WeakTee(a, outputs[0], outputs[1], outputs[2], outputs[3])
12 go func() {
13 for i := 0; i < 5; i++ {
14 a.In() <- i
15 }
16 a.Close()
17 }()
18 for i := 0; i < 20; i++ {
19 var v interface{}
20 var j int
21 select {
22 case v = <-outputs[0].Out():
23 j = 0
24 case v = <-outputs[1].Out():
25 j = 1
26 case v = <-outputs[2].Out():
27 j = 2
28 case v = <-outputs[3].Out():
29 j = 3
30 }
31 fmt.Printf("channel#%d: %d\n", j, v)
32 }
33 }
1 func testMulti() {
2 fmt.Println("multi:")
3 a := channels.NewNativeChannel(channels.None)
4 inputs := []channels.Channel{
5 channels.NewNativeChannel(channels.None),
6 channels.NewNativeChannel(channels.None),
7 channels.NewNativeChannel(channels.None),
8 channels.NewNativeChannel(channels.None),
9 }
10 channels.Multiplex(a, inputs[0], inputs[1], inputs[2], inputs[3])
11 //channels.WeakMultiplex(a, inputs[0], inputs[1], inputs[2], inputs[3])
12 go func() {
13 for i := 0; i < 5; i++ {
14 for j := range inputs {
15 inputs[j].In() <- i
16 }
17 }
18 for i := range inputs {
19 inputs[i].Close()
20 }
21 }()
22 for v := range a.Out() {
23 fmt.Printf("%d ", v)
24 }
25 }
1 func testPipe() {
2 fmt.Println("pipe:")
3 a := channels.NewNativeChannel(channels.None)
4 b := channels.NewNativeChannel(channels.None)
5 channels.Pipe(a, b)
6 // channels.WeakPipe(a, b)
7 go func() {
8 for i := 0; i < 5; i++ {
9 a.In() <- i
10 }
11 a.Close()
12 }()
13 for v := range b.Out() {
14 fmt.Printf("%d ", v)
15 }
16 }
从channel的行为来看,它看起来很像一个数据流,所以我们可以实现一些类似Scala 集合的操作。
Scala的集合类提供了丰富的操作(方法), 当然其它的一些编程语言或者框架也提供了类似的方法, 比如Apache Spark、Java Stream、ReactiveX等。
下面列出了一些方法的实现,我相信经过一些人的挖掘,相关的方法可以变成一个很好的类库,但是目前我们先看一些例子。
skip函数是从一个channel中跳过开一些数据,然后才开始读取。
skipN跳过开始的N个数据。
1 func skipN(done <-chan struct{}, valueStream <-chan interface{}, num int) <-chan interface{} {
2 takeStream := make(chan interface{})
3 go func() {
4 defer close(takeStream)
5 for i := 0; i < num; i++ {
6 select {
7 case <-done:
8 return
9 case takeStream <- <-valueStream:
10 }
11 }
12 }()
13 return takeStream
14 }
skipFn 提供Fn函数为true的数据,比如跳过偶数。
1 func skipFn(done <-chan struct{}, valueStream <-chan interface{}, fn func(interface{}) bool) <-chan interface{} {
2 takeStream := make(chan interface{})
3 go func() {
4 defer close(takeStream)
5 for {
6 select {
7 case <-done:
8 return
9 case v := <-valueStream:
10 if !fn(v) {
11 takeStream <- v
12 }
13 }
14 }
15 }()
16 return takeStream
17 }
跳过开头函数fn为true的数据。
1 func skipWhile(done <-chan struct{}, valueStream <-chan interface{}, fn func(interface{}) bool) <-chan interface{} {
2 takeStream := make(chan interface{})
3 go func() {
4 defer close(takeStream)
5 take := false
6 for {
7 select {
8 case <-done:
9 return
10 case v := <-valueStream:
11 if !take {
12 take = !fn(v)
13 if !take {
14 continue
15 }
16 }
17 takeStream <- v
18 }
19 }
20 }()
21 return takeStream
22 }
skip的反向操作,读取一部分数据。
takeN 读取开头N个数据。
1 func takeN(done <-chan struct{}, valueStream <-chan interface{}, num int) <-chan interface{} {
2 takeStream := make(chan interface{})
3 go func() {
4 defer close(takeStream)
5 for i := 0; i < num; i++ {
6 select {
7 case <-done:
8 return
9 case takeStream <- <-valueStream:
10 }
11 }
12 }()
13 return takeStream
14 }
takeFn 只筛选满足fn的数据。
1 func takeFn(done <-chan struct{}, valueStream <-chan interface{}, fn func(interface{}) bool) <-chan interface{} {
2 takeStream := make(chan interface{})
3 go func() {
4 defer close(takeStream)
5 for {
6 select {
7 case <-done:
8 return
9 case v := <-valueStream:
10 if fn(v) {
11 takeStream <- v
12 }
13 }
14 }
15 }()
16 return takeStream
17 }
takeWhile只挑选开头满足fn的数据。
1 func takeWhile(done <-chan struct{}, valueStream <-chan interface{}, fn func(interface{}) bool) <-chan interface{} {
2 takeStream := make(chan interface{})
3 go func() {
4 defer close(takeStream)
5 for {
6 select {
7 case <-done:
8 return
9 case v := <-valueStream:
10 if !fn(v) {
11 return
12 }
13 takeStream <- v
14 }
15 }
16 }()
17 return takeStream
18 }
平展(flat)操作是一个有趣的操作。
如果输入是一个channel,channel中的数据还是相同类型的channel, 那么flat将返回一个输出channel,输出channel中的数据是输入的各个channel中的数据。
它与扇入不同,扇入的输入channel在调用的时候就是固定的,并且以数组的方式提供,而flat的输入是一个channel,可以运行时随时的加入channel。
1 func orDone(done <-chan struct{}, c <-chan interface{}) <-chan interface{} {
2 valStream := make(chan interface{})
3 go func() {
4 defer close(valStream)
5 for {
6 select {
7 case <-done:
8 return
9 case v, ok := <-c:
10 if ok == false {
11 return
12 }
13 select {
14 case valStream <- v:
15 case <-done:
16 }
17 }
18 }
19 }()
20 return valStream
21 }
22 func flat(done <-chan struct{}, chanStream <-chan <-chan interface{}) <-chan interface{} {
23 valStream := make(chan interface{})
24 go func() {
25 defer close(valStream)
26 for {
27 var stream <-chan interface{}
28 select {
29 case maybeStream, ok := <-chanStream:
30 if ok == false {
31 return
32 }
33 stream = maybeStream
34 case <-done:
35 return
36 }
37 for val := range orDone(done, stream) {
38 select {
39 case valStream <- val:
40 case <-done:
41 }
42 }
43 }
44 }()
45 return valStream
46 }
map和reduce是一组常用的操作。
map将一个channel映射成另外一个channel, channel的类型可以不同。
1 func mapChan(in <-chan interface{}, fn func(interface{}) interface{}) <-chan interface{} {
2 out := make(chan interface{})
3 if in == nil {
4 close(out)
5 return out
6 }
7 go func() {
8 defer close(out)
9 for v := range in {
10 out <- fn(v)
11 }
12 }()
13 return out
14 }
因为map
是go的关键字,所以我们不能命名函数类型为map
,这里用mapChan
代替。
比如你可以处理一个公司员工工资的channel, 输出一个扣税之后的员工工资的channel。
1 func reduce(in <-chan interface{}, fn func(r, v interface{}) interface{}) interface{} {
2 if in == nil {
3 return nil
4 }
5 out := <-in
6 for v := range in {
7 out = fn(out, v)
8 }
9 return out
10 }
11 你可以用`reduce`实现`sum`、`max`、`min`等聚合操作。
本文列出了channel的一些深入应用的模式,相信通过阅读本文,你可以更加深入的了解Go的channel类型,并在开发中灵活的应用channel。也欢迎你在评论中提出更多的 channel的应用模式。
所有的代码可以在github上找到: smallnest/channels。
版权申明:内容来源网络,版权归原创者所有。除非无法确认,我们都会标明作者及出处,如有侵权烦请告知,我们会立即删除并表示歉意。谢谢。