专栏首页吴亲强的深夜食堂如何优雅地实现并发编排任务
原创

如何优雅地实现并发编排任务

公众号 【吴亲强的深夜食堂】

业务场景

在做任务开发的时候,你们一定会碰到以下场景:

场景1:调用第三方接口的时候, 一个需求你需要调用不同的接口,做数据组装。 场景2:一个应用首页可能依托于很多服务。那就涉及到在加载页面时需要同时请求多个服务的接口。这一步往往是由后端统一调用组装数据再返回给前端,也就是所谓的 BFF(Backend For Frontend) 层。

针对以上两种场景,假设在没有强依赖关系下,选择串行调用,那么总耗时即:

time=s1+s2+....sn

按照当代秒入百万的有为青年,这么长时间早就把你祖宗十八代问候了一遍。

为了伟大的KPI,我们往往会选择并发地调用这些依赖接口。那么总耗时就是:

time=max(s1,s2,s3.....,sn)

当然开始堆业务的时候可以先串行化,等到上面的人着急的时候,亮出绝招。

这样,年底 PPT 就可以加上浓重的一笔流水账:为业务某个接口提高百分之XXX性能,间接产生XXX价值。

当然这一切的前提是,做老板不懂技术,做技术”懂”你。

言归正传,如果修改成并发调用,你可能会这么写,

package main

import (
	"fmt"
	"sync"
	"time"
)

func main() {
	var wg sync.WaitGroup
	wg.Add(2)

	var userInfo *User
	var productList []Product
	
	go func() {
		defer wg.Done()
		userInfo, _ = getUser()
	}()

	go func() {
		defer wg.Done()
		productList, _ = getProductList()
	}()
	wg.Wait()
	fmt.Printf("用户信息:%+v\n", userInfo)
	fmt.Printf("商品信息:%+v\n", productList)
}


/********用户服务**********/

type User struct {
	Name string
	Age  uint8
}

func getUser() (*User, error) {
	time.Sleep(500 * time.Millisecond)
	var u User
	u.Name = "wuqinqiang"
	u.Age = 18
	return &u, nil
}

/********商品服务**********/

type Product struct {
	Title string
	Price uint32
}

func getProductList() ([]Product, error) {
	time.Sleep(400 * time.Millisecond)
	var list []Product
	list = append(list, Product{
		Title: "SHib",
		Price: 10,
	})
	return list, nil
}

先不管其他问题。从实现上来说,需要多少服务,你会开多少个 G,利用 sync.WaitGroup 的特性, 实现并发编排任务的效果。

好像,问题不大。

但是随着代号 996 业务场景的增加,你会发现,好多模块都有相似的功能,只是对应的业务场景不同而已。

那么我们能不能抽像出一套针对此业务场景的工具,而把具体业务实现交给业务方。

安排。

使用

本着不重复造轮子的原则,去搜了下开源项目,最终看上了 go-zero 里面的一个工具 mapreduce。 从文件名我们能看出来是什么了,可以自行 Google 这个名词。

使用很简单。我们通过它改造一下上面的代码:

package main

import (
	"fmt"
	"github.com/tal-tech/go-zero/core/mr"
	"time"
)

func main() {
	var userInfo *User
	var productList []Product
	_ = mr.Finish(func() (err error) {
		userInfo, err = getUser()
		return err
	}, func() (err error) {
		productList, err = getProductList()
		return err
	})
	fmt.Printf("用户信息:%+v\n", userInfo)
	fmt.Printf("商品信息:%+v\n", productList)
}
用户信息:&{Name:wuqinqiang Age:18}
商品信息:[{Title:SHib Price:10}]

是不是舒服多了。

但是这里还需要注意一点,假设你调用的其中一个服务错误,并且你 return err 对应的错误,那么其他调用的服务会被取消。 比如我们修改 getProductList 直接响应错误。

func getProductList() ([]Product, error) {
	return nil, errors.New("test error")
}
//打印
用户信息:<nil>
商品信息:[]

那么最终打印的时候连用户信息都会为空,因为出现一个服务错误,用户服务请求被取消了。

一般情况下,在请求服务错误的时候我们会有保底操作,一个服务错误不能影响其他请求的结果。 所以在使用的时候具体处理取决于业务场景。

源码

既然用了,那么就追下源码吧。

func Finish(fns ...func() error) error {
	if len(fns) == 0 {
		return nil
	}

	return MapReduceVoid(func(source chan<- interface{}) {
		for _, fn := range fns {
			source <- fn
		}
	}, func(item interface{}, writer Writer, cancel func(error)) {
		fn := item.(func() error)
		if err := fn(); err != nil {
			cancel(err)
		}
	}, func(pipe <-chan interface{}, cancel func(error)) {
		drain(pipe)
	}, WithWorkers(len(fns)))
}
func MapReduceVoid(generator GenerateFunc, mapper MapperFunc, reducer VoidReducerFunc, opts ...Option) error {
	_, err := MapReduce(generator, mapper, func(input <-chan interface{}, writer Writer, cancel func(error)) {
		reducer(input, cancel)
		drain(input)
		// We need to write a placeholder to let MapReduce to continue on reducer done,
		// otherwise, all goroutines are waiting. The placeholder will be discarded by MapReduce.
		writer.Write(lang.Placeholder)
	}, opts...)
	return err
}

对于 MapReduceVoid函数,主要查看三个闭包参数。

  • 第一个 GenerateFunc 用于生产数据。
  • MapperFunc 读取生产出的数据,进行处理。
  • VoidReducerFunc 这里表示不对 mapper 后的数据做聚合返回。所以这个闭包在此操作几乎0作用。
func MapReduce(generate GenerateFunc, mapper MapperFunc, reducer ReducerFunc, opts ...Option) (interface{}, error) {
	source := buildSource(generate) 
	return MapReduceWithSource(source, mapper, reducer, opts...)
}

func buildSource(generate GenerateFunc) chan interface{} {
	source := make(chan interface{})// 创建无缓冲通道
	threading.GoSafe(func() {
		defer close(source)
		generate(source) //开始生产数据
	})

	return source //返回无缓冲通道
}

buildSource函数中,返回一个无缓冲的通道。并开启一个 G 运行 generate(source),往无缓冲通道塞数据。 这个generate(source) 不就是一开始 Finish 传递的第一个闭包参数。

return MapReduceVoid(func(source chan<- interface{}) {
	// 就这个
		for _, fn := range fns {
			source <- fn
		}
	})

然后查看 MapReduceWithSource 函数,

func MapReduceWithSource(source <-chan interface{}, mapper MapperFunc, reducer ReducerFunc,
	opts ...Option) (interface{}, error) {
	options := buildOptions(opts...)
	//任务执行结束通知信号
	output := make(chan interface{})
    //将mapper处理完的数据写入collector
    collector := make(chan interface{}, options.workers)
    // 取消操作信号
	done := syncx.NewDoneChan()
	writer := newGuardedWriter(output, done.Done())
	var closeOnce sync.Once
	var retErr errorx.AtomicError
	finish := func() {
		closeOnce.Do(func() {
			done.Close()
			close(output)
		})
	}
	cancel := once(func(err error) {
		if err != nil {
			retErr.Set(err)
		} else {
			retErr.Set(ErrCancelWithNil)
		}

		drain(source)
		finish()
	})

	go func() {
		defer func() {
			if r := recover(); r != nil {
				cancel(fmt.Errorf("%v", r))
			} else {
				finish()
			}
		}()
		reducer(collector, writer, cancel)
		drain(collector)
	}()
	// 真正从生成器通道取数据执行Mapper
	go executeMappers(func(item interface{}, w Writer) {
		mapper(item, w, cancel)
	}, source, collector, done.Done(), options.workers)

	value, ok := <-output
	if err := retErr.Load(); err != nil {
		return nil, err
	} else if ok {
		return value, nil
	} else {
		return nil, ErrReduceNoOutput
	}
}

这段代码挺长的,我们说下核心的点。我们看到使用一个G 调用 executeMappers 方法。

go executeMappers(func(item interface{}, w Writer) {
		mapper(item, w, cancel)
	}, source, collector, done.Done(), options.workers)
func executeMappers(mapper MapFunc, input <-chan interface{}, collector chan<- interface{},
	done <-chan lang.PlaceholderType, workers int) {
	var wg sync.WaitGroup
	defer func() {
		// 等待所有任务全部执行完毕
		wg.Wait()
		// 关闭通道
		close(collector)
	}()
   //根据指定数量创建 worker池
	pool := make(chan lang.PlaceholderType, workers) 
	writer := newGuardedWriter(collector, done)
	for {
		select {
		case <-done:
			return
		case pool <- lang.Placeholder:
			// 从buildSource() 返回的无缓冲通道取数据
			item, ok := <-input 
			// 当通道关闭,结束
			if !ok {
				<-pool
				return
			}

			wg.Add(1)
			// better to safely run caller defined method
			threading.GoSafe(func() {
				defer func() {
					wg.Done()
					<-pool
				}()
				//真正运行闭包函数的地方
               // func(item interface{}, w Writer) {
               //    mapper(item, w, cancel)
               //    }
				mapper(item, writer)
			})
		}
	}
}

具体的逻辑已备注,代码很容易懂。

一旦 executeMappers 函数返回,关闭 collector 通道,那么执行 reducer 不再阻塞。

go func() {
		defer func() {
			if r := recover(); r != nil {
				cancel(fmt.Errorf("%v", r))
			} else {
				finish()
			}
		}()
		reducer(collector, writer, cancel)
		//这里
		drain(collector)
	}()

这里的 reducer(collector, writer, cancel) 其实就是从 MapReduceVoid 传递的第三个闭包函数。

func MapReduceVoid(generator GenerateFunc, mapper MapperFunc, reducer VoidReducerFunc, opts ...Option) error {
	_, err := MapReduce(generator, mapper, func(input <-chan interface{}, writer Writer, cancel func(error)) {
		reducer(input, cancel)
		//这里
		drain(input)
		// We need to write a placeholder to let MapReduce to continue on reducer done,
		// otherwise, all goroutines are waiting. The placeholder will be discarded by MapReduce.
		writer.Write(lang.Placeholder)
	}, opts...)
	return err
}

然后这个闭包函数又执行了 reducer(input, cancel),这里的 reducer 就是我们一开始解释过的 VoidReducerFunc,从 Finish() 而来

等等,看到上面三个地方的 drain(input)了吗?

// drain drains the channel.
func drain(channel <-chan interface{}) {
	// drain the channel
	for range channel {
	}
}

其实就是一个排空 channel 的操作,但是三个地方都对同一个 channel,也是让我费解。

还有更重要的一点。

go func() {
		defer func() {
			if r := recover(); r != nil {
				cancel(fmt.Errorf("%v", r))
			} else {
				finish()
			}
		}()
		reducer(collector, writer, cancel)
		drain(collector)
	}()

上面的代码,假如执行 reducerwriter 写入引发 panic,那么drain(collector) 会直接卡住。

不过作者已经修复了这个问题,直接把 drain(collector) 放入到 defer

具体 issues[1]。

到这里,关于 Finish 的源码也就结束了。感兴趣的可以看看其他源码。

很喜欢 go-zero 里的一些工具,但是往往用的一些工具并不独立, 依赖于其他文件包,导致明明只想使用其中一个工具却需要安装整个包。 所以最终的结果就是扒源码,创建无依赖库工具集,遵循 MIT 即可。

附录 [1] https://github.com/tal-tech/go-zero/issues/676

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 探索JAVA并发 - 如何优雅地取消线程任务

    一种常用的方法是在任务代码中加入一个“是否取消”的标志,任务定期去查看这个标志是否改变,如果被改变了就取消剩下的任务,此时如果想取消这个任务只需要修改它的标志,...

    acupt
  • 如何优雅地实现定时任务?go定时任务库cron详解

     默认上次任务没运行完,下次任务依然会运行(任务运行在goroutine里相互不干扰)

    hugo_lei
  • 如何优雅地实现分页查询

    分页功能是很常见的功能,特别是当数据量越来越大的时候,分页查询是必不可少的。实现分页功能有很多种方式,如果使用的ORM框架是mybatis的话,有开源的分页插件...

    Bug开发工程师
  • 如何优雅地实现高可用系统?

    织云平台团队
  • 如何优雅地处理重复请求(并发请求)

    你可能会想到的是,只要请求有唯一的请求编号,那么就能借用Redis做这个去重——只要这个唯一请求编号在redis存在,证明处理过,那么就认为是重复的

    用户2781897
  • 如何优雅地用Redis实现分布式锁

    在学习Java多线程编程的时候,锁是一个很重要也很基础的概念,锁可以看做是多线程情况下访问共享资源的一种线程同步机制。这是对于单进程应用而言的,即所有线程都在同...

    Bug开发工程师
  • 如何优雅地用Redis实现分布式锁

    在学习Java多线程编程的时候,锁是一个很重要也很基础的概念,锁可以看做是多线程情况下访问共享资源的一种线程同步机制。这是对于单进程应用而言的,即所有线程都在同...

    Bug开发工程师
  • 如何优雅地用Redis实现分布式锁

    什么是分布式锁 在学习Java多线程编程的时候,锁是一个很重要也很基础的概念,锁可以看做是多线程情况下访问共享资源的一种线程同步机制。这是对于单进程应用而言的,...

    Bug开发工程师
  • 如何优雅地实现Redis命令setbits与getbits

    在之前的文章《如何优雅地使用Redis之位图操作》和《再谈如何优雅地使用Redis之位图操作》中,笔者介绍了关于Redis位图操作的高级应用,其中就讲到了如何优...

    Bug开发工程师
  • 如何优雅地实现一个分屏滤镜

    本文通过编写一个通用的片段着色器,实现了抖音中的各种分屏滤镜。另外,还讲解了延时动态分屏滤镜的实现。

    glumes
  • Netty如何实现服务的优雅关闭

    最常见的,比如业务开发中,服务突然异常,刚进来的用户请求还在,通过优雅关闭,给他们 30s 时间继续执行,以免直接报错出去。

    JavaEdge
  • Java如何优雅地实现接口数据校验

    本篇文章给大家分享平时开发中总结的一点小技巧!在工作中写过Java程序的朋友都知道,目前使用Java开发服务最主流的方式就是通过Spring MVC定义一个Co...

    用户5927304
  • Angular 实践:如何优雅地发起和处理请求

    Tips: 本文实现重度依赖 ObservableInput,灵感来自同事 @Mengqi Zhang 实现的 asyncData 指令,但之前没有 Obser...

    灵雀云
  • 如何使用Go来实现优雅重启服务?

    一般服务器重启可以直接通过 kill 命令杀死进程,然后重新启动一个新的进程即可。但这种方法比较粗暴,有可能导致某些正在处理中的客户端请求失败,如果请求正在写数...

    用户7686797
  • 手写中间件之——并行框架(2 任务编排顺序如何选型和实现)

    如果大家仔细看了上一篇文章,可以看到该框架的难点和重点,主要有两点,分别是任务的顺序编排和任务结果的回调。

    天涯泪小武
  • 如何用 Serverless 优雅地实现图片艺术化应用

    本文将分享如何从零开始搭建一个基于腾讯云 Serverless 的图片艺术化应用! ? 项目已开源,完整代码见文末 线上 demo 预览: https://a...

    腾讯云serverless团队
  • 如何优雅地运用位运算实现产品需求

    在开始正文之前,我们先来说一下 Linux 的系统权限设计。在 Linux 系统中,为了保证文件的安全,对文件所有者、同组用户、其他用户的访问权限进行了分别管理...

    用户2781897
  • 微服务-高并发下接口如何做到优雅的限流

    通俗的来讲,一根管子往池塘注水,池塘底部有一个口子往外出水,当注水的速度过快时,池塘的水会溢出,此时,我们的做法换根小管子注水或者把注水管子的口堵住一半,这就是...

    阿伟
  • 如何优雅简洁地实现时钟翻牌器(支持JS/Vue/React)

    双十一剁手节过去了,大家应该在很多网页中看到了数字翻牌的效果吧,比如倒计时、数字增长等。相信很多人都已经自己独立实现过了,我也在网上看了一些demo,发现HTM...

    Nealyang

扫码关注云+社区

领取腾讯云代金券