前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Go语言 | 从并发模式看channel使用技巧

Go语言 | 从并发模式看channel使用技巧

作者头像
飞雪无情
发布2020-08-05 14:59:33
8340
发布2020-08-05 14:59:33
举报
文章被收录于专栏:飞雪无情的博客

最近重看MinIO的源代码,发现纠删码模式下读取数据盘的时候,使用了更简单的并发读取方式,以前看的时候没发现,查了下Git历史记录,发现是19年新改的,新的使用channel做标记的方式的确非常巧妙,简化了代码逻辑,值得我们学习。所以今天就开篇文章,介绍下channel在并发下的两个使用技巧。

赢者为王模式

这种并发模式并不稀奇,相信很多朋友都用到过。它的核心思想就是同时开几个协程做同样的事情,谁先搞定,我们就用谁的结果。在Go语言的channel支持下,我们很容易实现这种并发方式。

假设我们把同一份资源,存储在网络上的5个服务器上(镜像、备份等),然后我们现在需要获取这个资源,我们就可以同时开5个协程,访问这5个服务器上的资源,谁先获取到,我们就用谁的,这样就可以最快速度获取,排除掉网络慢的服务器。

代码语言:javascript
复制
func main() {
	txtResult := make(chan string, 5)
	go func() {txtResult <- getTxt("res1.flysnow.org")}()
	go func() {txtResult <- getTxt("res2.flysnow.org")}()
	go func() {txtResult <- getTxt("res3.flysnow.org")}()
	go func() {txtResult <- getTxt("res4.flysnow.org")}()
	go func() {txtResult <- getTxt("res5.flysnow.org")}()
	println(<-txtResult)
}

func getTxt(host string) string{
	//省略网络访问逻辑,直接返回模拟结果
	//http.Get(host+"/1.txt")
	return host+":模拟结果"
}

其中getTxt没有真实实现,只是一个模拟,但是通过以上示例已经可以说明赢者为王这种并发模式的使用。这种并发模式适合多个协程对同一种资源的读取,更概括的讲就是做同一件事情,只要有一个协程干成了就OK了。这种模式的优点主要有两个:1.可以最大程度减少耗时;提高成功率。

最终成功模式

这种并发模式我们自己可能遇到过,但是可能不是叫这个名字,这个名字是我自己起的,我觉得比较贴切。比如同时并发的从10个文件中成功读取任意5个文件,你可以开启5个协程,也可以开启3个,都随意,但是必须是成功读取了5个才算成功,否则就是失败。

这种模式MinIO也遇到了,它的解决方式就是我在开篇提到的非常好的技巧,现在我们就来介绍这种技巧。在介绍这种技巧前,我们先列举下其他的思路。

第一种思路: 先并发获取,存放起来,然后再一个个判断是否获取成功,如果有的没有成功再重新获取,而且获取的文件不能重复。这种方式是取到结果后进行判断是否成功,然后根据情况再决定是否重新获取,要去重,要判断,业务逻辑比较复杂。

第二种思路: 并发的时候就保证成功,里面可能是个for循环,直到成功为止,然后再返回结果。这种思路缺陷也很明显,如果这个文件损坏,那么就会一直死循环下去,要避免死循环,就要加上重试次数。

而MinIO的实现方式比较巧妙,它也是多协程,但是发现如果有文件读取不成功,他会通过channel的方式标记,换一个文件读取。因为一共10个文件呢,这个不行,换一个,不能在一个文件上等死,只要成功读取5个就可以了。

现在我们看下MinIO的这段代码,代码比较长,我尽可能删除一些无用的,但是为了保证可读性,还是会长一些,大家耐心看完,就学到了。

代码语言:javascript
复制
// Read reads from readers in parallel. Returns p.dataBlocks number of bufs.
func (p *parallelReader) Read(dst [][]byte) ([][]byte, error) {
	newBuf := dst
	//省略不太相关代码
	var newBufLK sync.RWMutex

	//省略无关
	//channel开始创建,要发挥作用了。这里记住几个数字:
	//readTriggerCh大小是10,p.dataBlocks大小是5
	readTriggerCh := make(chan bool, len(p.readers))
	for i := 0; i < p.dataBlocks; i++ {
		// Setup read triggers for p.dataBlocks number of reads so that it reads in parallel.
		readTriggerCh <- true
	}

	healRequired := int32(0) // Atomic bool flag.
	readerIndex := 0
	var wg sync.WaitGroup
	// readTrigger 为 true, 意味着需要用disk.ReadAt() 读取下一个数据
	// readTrigger 为 false, 意味着读取成功了,不再需要读取
	for readTrigger := range readTriggerCh {
		newBufLK.RLock()
		canDecode := p.canDecode(newBuf)
		newBufLK.RUnlock()
		//判断是否有5个成功的,如果有,退出for循环
		if canDecode {
			break
		}
		//读取次数上限,不能大于10
		if readerIndex == len(p.readers) {
			break
		}
		//成功了,退出本次读取
		if !readTrigger {
			continue
		}
		wg.Add(1)
		//并发读取数据
		go func(i int) {
			defer wg.Done()
			//省略不太相关代码
			_, err := rr.ReadAt(p.buf[bufIdx], p.offset)
			if err != nil {
				//省略不太相关代码
				// 失败了,标记为true,触发下一个读取.
				readTriggerCh <- true
				return
			}
			newBufLK.Lock()
			newBuf[bufIdx] = p.buf[bufIdx]
			newBufLK.Unlock()
			// 成功了,标记为false,不再读取
			readTriggerCh <- false
		}(readerIndex)
		//控制次数,同时用来作为索引获取和存储数据
		readerIndex++
	}
	wg.Wait()

    //最终结果判断,如果OK了就正确返回,如果有失败的,返回error信息。
	if p.canDecode(newBuf) {
		p.offset += p.shardSize
		if healRequired != 0 {
			return newBuf, errHealRequired
		}
		return newBuf, nil
	}

	return nil, errErasureReadQuorum
}

以上代码虽然长,但是我做了注释,也比较容易理解了。现在再对这段逻辑进行解释下:

  1. 前提是从10个数据里读取任意5个
  2. 初始化的chan大小是10,但是通过for循环只存放了5个true
  3. 然后对chan循环读取数据,如果是true就开启go协程获取数据,如果是false就终止这次循环
  4. 当前在这之前还会判断下是否已经成功获取了5个,如果是的话,直接跳出整个for循环
  5. 通过readerIndex每次尝试获取一个数据,如果成功赛一个false到chan中,如果失败则塞个true
  6. 这样不成功的readerIndex不再尝试读取,失败了就通过true标记尝试读取下一个readerIndex
  7. 通过chan这种巧妙的方式不断循环,直到成功读取5个,或者把10个数据都读一遍为止
  8. 最终再基于是否成功读取到5个数据,做最终的判断,是返回成功数据,还是错误

利用channel来做标记和循环取数据,是一种非常好的方式,简化了代码逻辑,整体看起来非常清晰了,有兴趣的朋友可以看下MinIO原来的代码,感受会更强烈。

小结

以上主要是两种使用channel的技巧,这些技巧一些会靠自己熟能生巧,一些需要看别人的源代码学习,而阅读开源代码是一个非常不错的途径。

本文为原创文章,转载注明出处,欢迎扫码关注公众号flysnow_org或者网站 https://www.flysnow.org/ ,第一时间看后续精彩文章。觉得好的话,请顺手点个赞吧。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2020年8月4日,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 赢者为王模式
  • 最终成功模式
  • 小结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档