前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Golang任务队列machinery使用与源码剖析(一)

Golang任务队列machinery使用与源码剖析(一)

原创
作者头像
netkiddy
修改2019-09-12 15:33:22
9.6K3
修改2019-09-12 15:33:22
举报
文章被收录于专栏:非典型程序猿非典型程序猿

导语

异步任务,是每一位开发者都遇到过的技术名词,在任何一个稍微复杂的后台系统中,异步任务总是无法避免的,而任务队列由于其松耦合、易扩展的特性,成为了实现异步任务的可靠保证。

背景

当用户的一次请求事件发生,可能是某种数据的重复数查询,抑或是某批人群的覆盖率统计,展现到用户的是几行数字,但在透视到后端逻辑中,简单的这可能是一次mysql的联表查询或者elasticsearch的聚合,但更多情况下,是附带了一系列复杂的数据交互或者耗时的逻辑计算。当后端这种发生多次数据交互任务的情况一旦存在,为了实现每一次任务的可靠执行以及前端响应速度,任务队列的存在意义就凸显了。

场景与功能

任务队列有着广泛的适应场景:

  • 大批量的计算任务。如大量数据插入,通过拆分并分批插入任务队列,从而实现串行链式任务处理或者实现分组并行任务处理,提高系统鲁棒性,提高系统并发度;
  • 数据预处理。定期的从后端存储将数据同步到到缓存系统,从而在查询请求发生时,直接去缓存系统中查询,提高查询请求的响应速度;
  • 错误重试功能。为了提高系统的可用性,当函数处理出现错误时,我们希望可以给予其重试的机会,增强系统的可用性。

适用于任务队列的场景还有很多,同样,不同语言也有着自己著名的任务队列系统,众所周知的如python下的celery,PHP中laraval框架的Queues,都是使用度十分广泛的任务队列系统。

我们项目的技术栈为golang,因此,在我们go为基础的微服务框架中,需要存在一个类型于celery或者laraval中的任务队列系统,在经过了一系列筛选后,我们采用了machinery作为我们的任务队列系统。machinery,一个第三方开源的基于分布式消息分发的异步任务队列,有着以下这些特性:

  • 任务重试机制
  • 延迟任务支持
  • 任务回调机制
  • 任务结果记录
  • 支持Workflow模式:Chain,Group,Chord
  • 多Brokers支持:Redis, AMQP, AWS SQS
  • 多Backends支持:Redis, Memcache, AMQP, MongoDB

当前machinery在v1 stable版本,可以通过go get github.com/RichardKnop/machinery/v1获取。

架构设计

任务队列,简而言之就是一个放大的生产者消费者模型,用户请求会生成任务,任务生产者不断的向队列中插入任务,同时,队列的处理器程序充当消费者不断的消费任务。基于这种框架设计思想,我们来看下machinery的简单设计结构图例:

其中:

  • Server:业务模块,生成具体任务,可根据业务逻辑中,按交互进行拆分;
  • Broker:存储具体序列化后的任务,machinery中目前支持到Redis, AMQP,和SQS;
  • Worker:工作进程,负责消费者功能,处理具体的任务;
  • Backend:后端存储,用于存储任务执行状态的数据;

在本篇文章中,我们将对上述几个模块进行详细讲解。

Broker

machinery的broker支持多种存储介质:Redis,AMQP和SQS,本篇文章中,我们将以redis来详细介绍,其他类型的存储介质,在实现细节上由于介质的API支持不一可能略有不同,但machinery具体暴露接口类似,有兴趣的读者可以详细再阅读相关源码。

machinery的Broker实现了以下这几种接口,我们将重点介绍起着关键作用的接口:

代码语言:javascript
复制
GetConfig() *config.Config
SetRegisteredTaskNames(names []string)
IsTaskRegistered(name string) bool
StartConsuming(consumerTag string, concurrency int, p TaskProcessor) (bool, error)
StopConsuming()
Publish(task *tasks.Signature) error
GetPendingTasks(queue string) ([]*tasks.Signature, error)

Broker启动和停止

当我们使用machinery时,在启动服务之后,StartConsuming()函数将以阻塞轮询的方式去Broker中获取任务并消费处理。而当服务停止之后,StopConsuming()函数将会等待一系列go程结束,以实现gracefully stop。

详细来看StartConsumin()函数,具体源码如下(不相关的代码细节已经省略)。

代码语言:javascript
复制
func (b *RedisBroker) StartConsuming(consumerTag string, concurrency int, taskProcessor TaskProcessor) (bool, error) {
	...
  // 获取任务go程
	go func() {
                ...
		for {
			select {
			case <-b.stopReceivingChan:
				return
			case <-timer.C:
				if concurrencyAvailable() {
					task, err := b.nextTask(b.cnf.DefaultQueue)
					if err != nil {
						timer.Reset(timerDuration)
						continue
					}
					deliveries <- task
				}
         //并发控制逻辑
				if concurrencyAvailable() {
					timer.Reset(0)//设置timer为0,立即继续消费任务
				} else {
					timer.Reset(timerDuration)//重置timer,等待duration后再尝试消费
				}
			}
		}
	}()

  // 获取延时任务go程
	go func() {
                ...
		for {
			select {
			case <-b.stopDelayedChan:
				return
			default:
				task, err := b.nextDelayedTask(redisDelayedTasksKey)
				if err != nil {
					continue
				}

				signature := new(tasks.Signature)
				decoder := json.NewDecoder(bytes.NewReader(task))
				decoder.UseNumber()
				if err := decoder.Decode(signature); err != nil {
					log.ERROR.Print(NewErrCouldNotUnmarshaTaskSignature(task, err))
				}

				if err := b.Publish(signature); err != nil {
					log.ERROR.Print(err)
				}
			}
		}
	}()
  //执行任务消费
	if err := b.consume(deliveries, pool, concurrency, taskProcessor); err != nil {
		return b.retry, err
	}
        ...
}

其中,

参数consumerTag在AMQP作为Broker时有意义;

参数concurrency用来实现任务并发调度的控制。

Broker任务获取

在StartComsuming()中,分别启动了两个go程来并行处理任务,因为针对延时任务和普通任务,machinery将任务存放于两个不同的rediskey中。

  • 对于普通任务,使用nextTask()函数用来从broker中获取任务,在redis作为broker时,machinery使用了LIST类型来存储任务,而nextTask()中使用了BLPOP来阻塞式的读取任务*1
  • 对于延时任务,使用nextDelayTask()函数从redis中的ZSET中,根据score来优先获取最近的任务(score为ETA的对应的unixnano值)。

具体来看nextTask()函数和nextDelayTask()函数,如下列出:

代码语言:javascript
复制
// BLPOP出LIST中的数据
func (b *RedisBroker) nextTask(queue string) (result []byte, err error) {
	conn := b.open()
	defer conn.Close()

	items, err := redis.ByteSlices(conn.Do("BLPOP", queue, 1))
	if err != nil {
		return []byte{}, err
	}

	if len(items) != 2 {
		return []byte{}, redis.ErrNil
	}

	result = items[1]

	return result, nil
}
代码语言:javascript
复制
// 结合WATCH,从ZSET中获取score最小的
func (b *RedisBroker) nextDelayedTask(key string) (result []byte, err error) {
   ...
   for {
      time.Sleep(time.Duration(pollPeriod) * time.Millisecond)
      if _, err = conn.Do("WATCH", key); err != nil {
         return
      }

      now := time.Now().UTC().UnixNano()

      items, err = redis.ByteSlices(conn.Do(
         "ZRANGEBYSCORE",
         key,
         0,
         now,
         "LIMIT",
         0,
         1,
      ))
      if err != nil {
         return
      }
      if len(items) != 1 {
         err = redis.ErrNil
         return
      }

      conn.Send("MULTI")
      conn.Send("ZREM", key, items[0])
      reply, err = conn.Do("EXEC")
      if err != nil {
         return
      }

      if reply != nil {
         result = items[0]
         break
      }
   }

   return
}

*1 特别需要注意的是,由于云服务的盛行,当下的云服务基本上都涵盖了redis服务,且提供了主备方案和集群方案等,但是不论时云服务或者时公司内部的redis服务,对BLPOP的支持可能会受限,这时候我们需要更改nextTask()函数中的BLPOP为LPOP来适应:

代码语言:javascript
复制
/*
* modified at 20180717
* use LPOP instead of BLPOP, cause L5 redis does not support BLPOP
**/
// nextTask pops next available task from the default queue
func (b *Broker) nextTask(queue string) (result []byte, err error) {
   conn := b.open()
   defer conn.Close()

   item, err := redis.Bytes(conn.Do("LPOP", queue))
   if err != nil {
      return []byte{}, err
   }
   result = item

   return result, nil
}

Broker任务查看

在redis作为Broker时,machinery还提供了一个额外的接口实现(其他接口Broker存储介质未对该接口进行实现)GetPendingTasks()。顾名思义,GetPendingTasks()可以用来查看当前任务队列中处理pending状态,在等待被处理的任务的详细信息。

GetPendingTasks()函数,更多的可以理解为,是作者提供的“接口糖”,方便离线的对任务队列中的任务进行查看,当然,machinery中使用的几种第三方队列作为Broker,基本上都是支持这类数据的单独查看的。

代码语言:javascript
复制
func (b *RedisBroker) GetPendingTasks(queue string) ([]*tasks.Signature, error) {
	...
	dataBytes, err := conn.Do("LRANGE", queue, 0, 10)
	if err != nil {
		return nil, err
	}
	results, err := redis.ByteSlices(dataBytes, err)
	if err != nil {
		return nil, err
	}

	taskSignatures := make([]*tasks.Signature, len(results))
	for i, result := range results {
		signature := new(tasks.Signature)
		decoder := json.NewDecoder(bytes.NewReader(result))
		decoder.UseNumber()
		if err := decoder.Decode(signature); err != nil {
			return nil, err
		}
		taskSignatures[i] = signature
	}
	return taskSignatures, nil
}

Broker任务发布

Publish()接口是实现任务发布的函数,将在后续篇幅在对任务做介绍时再单独详细介绍。

Backend

Backend,同样是任务队列不可或缺的一部分,其作用主要是用来存储任务的执行结果的,machinery中支持Redis, Memcache, AMQP, MongoDB四种类型的存储介质来实现Backend。

machinery的Backend,根据其自身的功能特性,实现了以下这几种接口,与Broker类似,我们将重点介绍几个关键的接口(同样,以下接口是不同类型的Backend的实现的接口超集,并不是Redis作为介质时都有的):

代码语言:javascript
复制
// Workflow相关接口

InitGroup(groupUUID string, taskUUIDs []string) error
GroupCompleted(groupUUID string, groupTaskCount int) (bool, error)
GroupTaskStates(groupUUID string, groupTaskCount int) ([]*tasks.TaskState, error)
TriggerChord(groupUUID string) (bool, error)

// 任务状态设置接口
SetStatePending(signature *tasks.Signature) error
SetStateReceived(signature *tasks.Signature) error
SetStateStarted(signature *tasks.Signature) error
SetStateRetry(signature *tasks.Signature) error
SetStateSuccess(signature *tasks.Signature, results []*tasks.TaskResult) error
SetStateFailure(signature *tasks.Signature, err string) error
GetState(taskUUID string) (*tasks.TaskState, error)

// Purging stored stored tasks states and group meta data
PurgeState(taskUUID string) error
PurgeGroupMeta(groupUUID string) error

Workflow相关接口

我们可以看到,第一批接口有Group和Chord相关的字眼,这就是我们在一开始提到的machinery中Workflow机制。Workflow极大的使能了任务队列的功能,使得machinery更加得心应手。关于Workflow的知识,我们将在下面的篇幅中详细介绍,这儿仅仅简单的介绍这几个接口的功能。

InitGroup(),顾名思义,在创建一个Group任务;

GroupCompleted(),检查一个Group中所有的任务是否都执行完毕;

GroupTaskStates(),返回一个Group中,所有任务的状态

TriggerChord(),当Group中任务全部执行完毕后,触发Chrod任务

Backend任务状态

machinery中将任务的状态进行了很详细的划分,通过接口我们就可以看到,machinery支持了以下几种任务中间态:

  • Pending,任务到达Broker
  • Received,任务从Broker中读取成功
  • Started,任务开始执行
  • Retry,任务需要重试
  • Success,任务执行成功
  • Failure,任务执行失败

下面简单列出源码中设置状态接口的使用:

代码语言:javascript
复制
// SetStatePending updates task state to PENDING
func (b *RedisBackend) SetStatePending(signature *tasks.Signature) error {
   taskState := tasks.NewPendingTaskState(signature)
   return b.updateState(taskState)
}

// SetStateReceived updates task state to RECEIVED
func (b *RedisBackend) SetStateReceived(signature *tasks.Signature) error {
   taskState := tasks.NewReceivedTaskState(signature)
   return b.updateState(taskState)
}
...

Worker

Worker负责了任务队列的执行单元,是任务队列中处理任务的关键元素,也是因此,Worker的接口很少,很直接:

代码语言:javascript
复制
Launch() 
LaunchAsync(errorsChan chan<- error)
Quit()
Process(signature *tasks.Signature)

Worker启动和停止

Worker启动是通过Launch()启动了一个进程,去订阅默认的任务队列,并且处理收到的任务。LaunchAsync()是Launch()的非阻塞版本,而通过Launch()中的代码,我们发现,其实就是调用了LaunchAsync()。

在LaunchAsync()中,通过开启一个go程,实现了非阻塞式的调用了Broker的StartConsuming()函数。

代码语言:javascript
复制
func (worker *Worker) Launch() error {
   errorsChan := make(chan error)
   worker.LaunchAsync(errorsChan)
   return <-errorsChan
}

// Launch()的非阻塞调用
func (worker *Worker) LaunchAsync(errorsChan chan<- error) {
   ...
   // broker消费者go程,同时负责与broker的断开重连等
   go func() {
      for {
         retry, err := broker.StartConsuming(worker.ConsumerTag, worker.Concurrency, worker)

         if retry {
            if worker.errorHandler != nil {
               worker.errorHandler(err)
            } else {
               log.WARNING.Printf("Broker failed with error: %s", err)
            }
         } else {
            errorsChan <- err // stop the goroutine
            return
         }
      }
   }()
   ...
   }
}

Worker停止是通过Quit()函数来实现,其调用了Broker的StopConsuming()函数,以实现gracefully stop。

代码语言:javascript
复制
// Quit tears down the running worker process
func (worker *Worker) Quit() {
   worker.server.GetBroker().StopConsuming()
}

Worker处理

Worker中的Process()函数,将会处理在Broker中的待处理任务,并且负责了任务回调的触发功能。Process()函数的任务流程主要是:

任务检测->任务获取->任务预处理->Tracing处理->任务执行

代码语言:javascript
复制
func (worker *Worker) Process(signature *tasks.Signature) error {
   ...
   //根据任务名,获取注册任务
   taskFunc, err := worker.server.GetRegisteredTask(signature.Name)
   if err != nil {
      return nil
   }

   // 更新任务状态 Received
   if err = worker.server.GetBackend().SetStateReceived(signature); err != nil {
      return fmt.Errorf("Set state received error: %s", err)
   }

   // 任务预处理,预防任务出错,导致后面影响worker的运行
   task, err := tasks.New(taskFunc, signature.Args)
   if err != nil {
      worker.taskFailed(signature, err)
      return err
   }

   // tracing处理   
   taskSpan := tracing.StartSpanFromHeaders(signature.Headers, signature.Name)
   tracing.AnnotateSpanWithSignatureInfo(taskSpan, signature)
   task.Context = opentracing.ContextWithSpan(task.Context, taskSpan)

   // 更新任务状态 Started
   if err = worker.server.GetBackend().SetStateStarted(signature); err != nil {
      return fmt.Errorf("Set state started error: %s", err)
   }

   // 任务执行
   results, err := task.Call()
   if err != nil {
      // If a tasks.ErrRetryTaskLater was returned from the task,
      // retry the task after specified duration
      retriableErr, ok := interface{}(err).(tasks.ErrRetryTaskLater)
      if ok {
         return worker.retryTaskIn(signature, retriableErr.RetryIn())
      }

      // Otherwise, execute default retry logic based on signature.RetryCount
      // and signature.RetryTimeout values
      if signature.RetryCount > 0 {
         return worker.taskRetry(signature)
      }

      return worker.taskFailed(signature, err)
   }

   return worker.taskSucceeded(signature, results)
}

machinery中,主要是通过反射实现了任务执行,具体的执行方式,在获取了函数之后与普通的反射无异,详细的介绍在后续篇幅介绍。关于任务执行之后的处理,有可能三种处理:

  • 任务执行成功

taskSucceeded(),是在一个任务被成功执行后调用,主要负责更新任务状态、触发回调函数或者chord任务中的回调函数(前提是该task是chrod的分组任务中的最后一个任务),关于chord任务,在后面关于Workflow模式中将会详细介绍。

代码语言:javascript
复制
func (worker *Worker) taskSucceeded(signature *tasks.Signature, taskResults []*tasks.TaskResult) error {
	// 更新任务状态
	if err := worker.server.GetBackend().SetStateSuccess(signature, taskResults); err != nil {
		return fmt.Errorf("Set state success error: %s", err)
	}
        ...
	// 回调任务
	for _, successTask := range signature.OnSuccess {
        // 当immutable为false时,传递参数
		if signature.Immutable == false {
			// Pass results of the task to success callbacks
			for _, taskResult := range taskResults {
				successTask.Args = append(successTask.Args, tasks.Arg{
					Type:  taskResult.Type,
					Value: taskResult.Value,
				})
			}
		}
		worker.server.SendTask(successTask)
	}
        ...
	// 触发chord任务的回掉函数
	shouldTrigger, err := worker.server.GetBackend().TriggerChord(signature.GroupUUID)
	if err != nil {
		return fmt.Errorf("Trigger chord error: %s", err)
	}
        ...
	// 针对group任务的返回值做参数传递
	for _, taskState := range taskStates {
		if !taskState.IsSuccess() {
			return nil
		}

		if signature.ChordCallback.Immutable == false {
			for _, taskResult := range taskState.Results {
				signature.ChordCallback.Args = append(signature.ChordCallback.Args, tasks.Arg{
					Type:  taskResult.Type,
					Value: taskResult.Value,
				})
			}
		}
	}
	// 发送chord任务
	_, err = worker.server.SendTask(signature.ChordCallback)
	if err != nil {
		return err
	}

	return nil
}
  • 任务执行失败

taskFailed(),是在一个任务执行失败(完全失败,即重试也失败)后调用。需要负责更新任务状态,并触发OnError回调函数。

代码语言:javascript
复制
func (worker *Worker) taskFailed(signature *tasks.Signature, taskErr error) error {
   // 任务状态更新 Failure
   if err := worker.server.GetBackend().SetStateFailure(signature, taskErr.Error()); err != nil {
      return fmt.Errorf("Set state failure error: %s", err)
   }

   ...
   // Trigger error callbacks
   for _, errorTask := range signature.OnError {
      // Pass error as a first argument to error callbacks
      args := append([]tasks.Arg{{
         Type:  "string",
         Value: taskErr.Error(),
      }}, errorTask.Args...)
      errorTask.Args = args
      worker.server.SendTask(errorTask)
   }

   return nil
}
  • 任务重试

关于任务重试,machinery中提供了两种方式来实现。

第一种,machinery中通过设置任务的RetryCount和RetryTimeout参数来实现。

第二种,通过返回一个ErrRetryTaskLater类型的值来制定。

由于任务重试,需要依赖于对machinery中任务数据结构的了解,我们将在之后详细介绍。

本篇文章主要介绍了任务队列的背景与说明,同时介绍了machinery的设计结构,并详细的介绍了machinery中的每个具体模块的功能与源码实现。更多关于machinery的源码实现和功能介绍,将在下一篇继续介绍。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 导语
  • 背景
  • 场景与功能
  • 架构设计
  • Broker
    • Broker启动和停止
      • Broker任务获取
        • Broker任务查看
          • Broker任务发布
          • Backend
            • Workflow相关接口
              • Backend任务状态
              • Worker
                • Worker启动和停止
                  • Worker处理
                  相关产品与服务
                  云数据库 SQL Server
                  腾讯云数据库 SQL Server (TencentDB for SQL Server)是业界最常用的商用数据库之一,对基于 Windows 架构的应用程序具有完美的支持。TencentDB for SQL Server 拥有微软正版授权,可持续为用户提供最新的功能,避免未授权使用软件的风险。具有即开即用、稳定可靠、安全运行、弹性扩缩等特点。
                  领券
                  问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档