要开始执行两个goroutine的无限循环,我可以使用以下代码:
收到消息后,它将启动一个新的goroutine并永远运行。
c1 := make(chan string)
c2 := make(chan string)
go DoStuff(c1, 5)
go DoStuff(c2, 2)
for ; true; {
select {
case msg1 := <-c1:
fmt.Println("received ", msg1)
go DoStuff(c1, 1)
case msg2 := <-c2:
fmt.Println("received ", msg2)
go DoStuff(c2, 9)
}
}
我现在希望对N个goroutines具有相同的行为,但是在这种情况下select语句会是什么样子呢?
这是我开始编写的代码,但是我对如何编写select语句感到困惑
numChans := 2
//I keep the channels in this slice, and want to "loop" over them in the select statemnt
var chans = [] chan string{}
for i:=0;i<numChans;i++{
tmp := make(chan string);
chans = append(chans, tmp);
go DoStuff(tmp, i + 1)
//How shall the select statment be coded for this case?
for ; true; {
select {
case msg1 := <-c1:
fmt.Println("received ", msg1)
go DoStuff(c1, 1)
case msg2 := <-c2:
fmt.Println("received ", msg2)
go DoStuff(c2, 9)
}
}
发布于 2015-09-02 08:08:01
您可以通过将每个通道包装在一个goroutine中来实现这一点,goroutine将消息“转发”到一个共享的“聚合”通道。例如:
agg := make(chan string)
for _, ch := range chans {
go func(c chan string) {
for msg := range c {
agg <- msg
}
}(ch)
}
select {
case msg <- agg:
fmt.Println("received ", msg)
}
如果您需要知道消息来自哪个通道,则可以在将其转发到聚合通道之前将其包装在带有任何额外信息的结构中。
在我的(有限的)测试中,这种方法的性能大大优于使用反射包:
$ go test dynamic_select_test.go -test.bench=.
...
BenchmarkReflectSelect 1 5265109013 ns/op
BenchmarkGoSelect 20 81911344 ns/op
ok command-line-arguments 9.463s
基准测试代码here
发布于 2020-07-31 15:03:50
可能更简单的选项:
与其使用通道数组,为什么不只将一个通道作为参数传递给在单独goroutine上运行的函数,然后在消费者goroutine中侦听该通道?
这允许您在侦听器中只选择一个通道,从而实现简单的select,并避免创建新的goroutines来聚合来自多个通道的消息?
发布于 2021-02-10 02:52:27
我们实际上对这个问题做了一些研究,并找到了最佳解决方案。我们使用了reflect.Select
一段时间,它是解决这个问题的一个很好的解决方案。它比每个通道的goroutine轻得多,并且操作简单。但不幸的是,它并不真正支持大量的通道,这是我们的情况,所以我们找到了一些有趣的东西,并为此写了一篇博客:https://cyolo.io/blog/how-we-enabled-dynamic-channel-selection-at-scale-in-go/
我将总结一下这里所写的内容:我们为2的32次方的幂的每个结果静态地创建了一批select..case语句,以及一个路由到不同情况并通过聚合通道聚合结果的函数。
这类批处理的一个示例:
func select4(ctx context.Context, chanz []chan interface{}, res chan *r, r *r, i int) {
select {
case r.v, r.ok = <-chanz[0]:
r.i = i + 0
res <- r
case r.v, r.ok = <-chanz[1]:
r.i = i + 1
res <- r
case r.v, r.ok = <-chanz[2]:
r.i = i + 2
res <- r
case r.v, r.ok = <-chanz[3]:
r.i = i + 3
res <- r
case <-ctx.Done():
break
}
}
以及使用这些类型的select..case
批处理从任意数量的通道聚合第一个结果的逻辑:
for i < len(channels) {
l = len(channels) - i
switch {
case l > 31 && maxBachSize >= 32:
go select32(ctx, channels[i:i+32], agg, rPool.Get().(*r), i)
i += 32
case l > 15 && maxBachSize >= 16:
go select16(ctx, channels[i:i+16], agg, rPool.Get().(*r), i)
i += 16
case l > 7 && maxBachSize >= 8:
go select8(ctx, channels[i:i+8], agg, rPool.Get().(*r), i)
i += 8
case l > 3 && maxBachSize >= 4:
go select4(ctx, channels[i:i+4], agg, rPool.Get().(*r), i)
i += 4
case l > 1 && maxBachSize >= 2:
go select2(ctx, channels[i:i+2], agg, rPool.Get().(*r), i)
i += 2
case l > 0:
go select1(ctx, channels[i], agg, rPool.Get().(*r), i)
i += 1
}
}
https://stackoverflow.com/questions/19992334
复制相似问题