前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >oklog/run 源码分析

oklog/run 源码分析

作者头像
golangLeetcode
发布2022-08-02 19:01:02
2450
发布2022-08-02 19:01:02
举报
文章被收录于专栏:golang算法架构leetcode技术php

waitgroup可以等待所有子协程结束,如何在一个协程结束后,终止所有协程呢,于是有了oklog/run,注册了运行函数和终止函数,如果第一个协程结束,就会运行其他协程的终止函数,终止其他协程。

github.com/oklog/run

1. 问题引入

oklog/run 包提供了一套非常简单、易用的 Go routine 编排框架。在介绍 oklog/run 前,我们先考虑以下问题:

假设我们有四个 Go routine 组件,如图所示,分别是运行一个状态机 sm.Run 、启动一个 HTTP 服务器、执行定时任务 cronJobs(sm) 读取状态机状态、和运行信号监听器。每个 Go routine 组件互相独立运行。

问题在于,我们如何将各个组件作为一个整体运行,并有序地结束?

对于每一个 Go routine 组件,我们都有相应的办法来执行结束操作。状态机通过 Context 对象,HTTP 服务器通过调用 Listener 的 Close 方法,定时任务和监听器通过 channel。当一个组件结束的时候,需要通知其他组件有序执行结束操作。这个问题的解决方法可以用 Actor 模型来描述。每个 Go routine 都是一个 actor,互相独立,互相之间只能通过 message 通信。oklog/run 包实现了 Actor 模型,能非常简洁的实现 Go routine 编排功能。

2. oklog/run 包介绍

oklog/run 包非常简单,只有一个类型,两个方法,共 60 行代码。其中 Group 是一组 actor,通过调用 Add 方法将 actor 添加到 Group 中。

代码语言:javascript
复制
type Group
func (g *Group) Add(execute func() error, interrupt func(error))
func (g *Group) Run() error
代码语言:javascript
复制
type Group struct {
	actors []actor
}

func (g *Group) Add(execute func() error, interrupt func(error)) {
	g.actors = append(g.actors, actor{execute, interrupt})
}

每个 actor 有两个方法:execute 和 interrupt。execute 完成 Go routine 的计算任务,interrupt 结束 Go routine 并退出。

代码语言:javascript
复制
type actor struct {
	execute   func() error
	interrupt func(error)
}

调用 Run 方法后会启动所有 Go routine(或者称为 actor),并等待第一个结束的 Go routine(无论正常退出或因为异常终止)。一旦捕获到第一个结束信号,会依次结束其他 Go routine 直到所有 Go routine 完全退出。

代码语言:javascript
复制
func (g *Group) Run() error {
	if len(g.actors) == 0 {
		return nil
	}

	// Run each actor.
	errors := make(chan error, len(g.actors))
	for _, a := range g.actors {
		go func(a actor) {
			errors <- a.execute()
		}(a)
	}

	// Wait for the first actor to stop.
	err := <-errors

	// Signal all actors to stop.
	for _, a := range g.actors {
		a.interrupt(err)
	}

	// Wait for all actors to stop.
	for i := 1; i < cap(errors); i++ {
		<-errors
	}

	// Return the original error.
	return err
}

3. 使用例子

下面例子定义了三个 actor,前两个 actor 一直等待。第三个 actor 在 3s 后结束退出。引起前两个 actor 退出。

代码语言:javascript
复制
package main

import (
	"fmt"
	"github.com/oklog/run"
	"time"
)

func main() {
	g := run.Group{}
	{
		cancel := make(chan struct{})
		g.Add(
			func() error {

				select {
				case <- cancel:
					fmt.Println("Go routine 1 is closed")
					break
				}

				return nil
			},
			func(error) {
				close(cancel)
			},
		)
	}
	{
		cancel := make(chan struct{})
		g.Add(
			func() error {

				select {
				case <- cancel:
					fmt.Println("Go routine 2 is closed")
					break
				}

				return nil
			},
			func(error) {
				close(cancel)
			},
		)
	}
	{
		g.Add(
			func() error {
				for i := 0; i <= 3; i++ {
					time.Sleep(1 * time.Second)
					fmt.Println("Go routine 3 is sleeping...")
				}
				fmt.Println("Go routine 3 is closed")
				return nil
			},
			func(error) {
				return
			},
		)
	}
	g.Run()
}

打印结果:

代码语言:javascript
复制
Go routine 3 is sleeping...
Go routine 3 is sleeping...
Go routine 3 is sleeping...
Go routine 3 is closed
Go routine 2 is closed
Go routine 1 is closed

4. oklog/run 在 Prometheus 中的使用

Prometheus 的组件间协调也是用了 oklog/run。这些组件有 服务发现组件(Scrape discovery manager)、采集组件(Scrape Manager)、配置加载组件(Reload handler)等

Finally, the server runs all components in an actor-like model, using github.com/oklog/oklog/pkg/group to coordinate the startup and shutdown of all interconnected actors. Multiple channels are used to enforce ordering constraints, such as not enabling the web interface before the storage is ready and the initial configuration file load has happened.

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-01-09,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 golang算法架构leetcode技术php 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • waitgroup可以等待所有子协程结束,如何在一个协程结束后,终止所有协程呢,于是有了oklog/run,注册了运行函数和终止函数,如果第一个协程结束,就会运行其他协程的终止函数,终止其他协程。
  • github.com/oklog/run
    • 1. 问题引入
      • 2. oklog/run 包介绍
        • 3. 使用例子
          • 4. oklog/run 在 Prometheus 中的使用
          相关产品与服务
          Prometheus 监控服务
          Prometheus 监控服务(TencentCloud Managed Service for Prometheus,TMP)是基于开源 Prometheus 构建的高可用、全托管的服务,与腾讯云容器服务(TKE)高度集成,兼容开源生态丰富多样的应用组件,结合腾讯云可观测平台-告警管理和 Prometheus Alertmanager 能力,为您提供免搭建的高效运维能力,减少开发及运维成本。
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档