前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >milvus启动源码分析

milvus启动源码分析

原创
作者头像
melodyshu
发布2024-04-10 16:17:45
1100
发布2024-04-10 16:17:45
举报
文章被收录于专栏:milvus数据库milvus数据库

milvus启动源码分析

版本:v2.3.2

入口:

代码语言:shell
复制
cmd\main.go

代码如下:

代码语言:go
复制
func main() {
	......
	if idx > 0 {
		......
	} else {
        // 重点分析这里
		milvus.RunMilvus(os.Args)
	}
}

os.Args是一个[]string类型。

代码语言:shell
复制
bin/milvus run standalone

os.Args0是程序的路径,后面的元素则是传递给程序的参数。

os.Args1:run

os.Args2:standalone

RunMilvus

代码语言:shell
复制
cmd\milvus\milvus.go

milvus.RunMilvus(os.Args)相当于是个壳子,一个简洁的程序入口。

代码语言:go
复制
func RunMilvus(args []string) {
	if len(args) < 2 {
		fmt.Fprintln(os.Stderr, usageLine)
		return
	}
	cmd := args[1]
	flags := flag.NewFlagSet(args[0], flag.ExitOnError)
	flags.Usage = func() {
		fmt.Fprintln(os.Stderr, usageLine)
	}

	var c command
	switch cmd {
	case RunCmd:
		c = &run{}
	case StopCmd:
		c = &stop{}
	case DryRunCmd:
		c = &dryRun{}
	case MckCmd:
		c = &mck{}
	default:
		c = &defaultCommand{}
	}

	c.execute(args, flags)
}

在这里使用了flag包,实现了命令行参数解析。

command是一个接口,只有一个方法execute()。

代码语言:go
复制
type command interface {
	execute(args []string, flags *flag.FlagSet)
}

这个方法有2个入参:args和flags。

command有5个实现:

run、stop、mck、dryRun、defaultCommand

switch判断给变量c赋予什么命令实现,这里是run。

这个接口设计是程序可以传入run、stop、mck、dryRun等参数,每种命令都为了实现一定功能,因此抽象一个execute()执行方法。

args作为execute()的参数。run后面还有其它组件参数(standalone、querycoord等)。

execute

分析run命令的execute()方法。

代码语言:go
复制
func (c *run) execute(args []string, flags *flag.FlagSet) {
	......
	//这里serverType的值为standalone
	serverType := args[2]
    // roles是结构体MilvusRoles
	roles := GetMilvusRoles(args, flags)
	......
	roles.Run()
}

GetMilvusRoles()返回结构体类型MilvusRoles。用来决定milvus启动什么组件。

代码语言:go
复制
// MilvusRoles decides which components are brought up with Milvus.
type MilvusRoles struct {
	EnableRootCoord  bool `env:"ENABLE_ROOT_COORD"`
	EnableProxy      bool `env:"ENABLE_PROXY"`
	EnableQueryCoord bool `env:"ENABLE_QUERY_COORD"`
	EnableQueryNode  bool `env:"ENABLE_QUERY_NODE"`
	EnableDataCoord  bool `env:"ENABLE_DATA_COORD"`
	EnableDataNode   bool `env:"ENABLE_DATA_NODE"`
	EnableIndexCoord bool `env:"ENABLE_INDEX_COORD"`
	EnableIndexNode  bool `env:"ENABLE_INDEX_NODE"`

	Local    bool
	Alias    string
	Embedded bool

	closed chan struct{}
	once   sync.Once
}

roles.Run()启动milvus组件。根据roles里面变量的值来判断启动什么组件。

Run

代码语言:shell
复制
cmd\roles\roles.go

源码如下:

代码语言:go
复制
// Run Milvus components.
func (mr *MilvusRoles) Run() {
	......

	// only standalone enable localMsg
    // standalone模式
	if mr.Local {
		if err := os.Setenv(metricsinfo.DeployModeEnvKey, metricsinfo.StandaloneDeployMode); err != nil {
			log.Error("Failed to set deploy mode: ", zap.Error(err))
		}

		if mr.Embedded {
			......
		} else {
            // 读取配置文件参数进行初始化赋值
			paramtable.Init()
		}
        // 配置都存储在变量params,是一个结构体类型ComponentParam
		params := paramtable.Get()
		if params.EtcdCfg.UseEmbedEtcd.GetAsBool() {
			......
		}
		paramtable.SetRole(typeutil.StandaloneRole)
	} else {
		......
	}

	......

	var rootCoord, queryCoord, indexCoord, dataCoord component
	var proxy, dataNode, indexNode, queryNode component
	if mr.EnableRootCoord {
		rootCoord = mr.runRootCoord(ctx, local, &wg)
	}

	if mr.EnableProxy {
		proxy = mr.runProxy(ctx, local, &wg)
	}

	if mr.EnableQueryCoord {
		queryCoord = mr.runQueryCoord(ctx, local, &wg)
	}

	if mr.EnableQueryNode {
		queryNode = mr.runQueryNode(ctx, local, &wg)
	}

	if mr.EnableDataCoord {
		dataCoord = mr.runDataCoord(ctx, local, &wg)
	}

	if mr.EnableDataNode {
		dataNode = mr.runDataNode(ctx, local, &wg)
	}

	if mr.EnableIndexCoord {
		indexCoord = mr.runIndexCoord(ctx, local, &wg)
	}

	if mr.EnableIndexNode {
		indexNode = mr.runIndexNode(ctx, local, &wg)
	}

	wg.Wait()
	......
}

component是一个接口,有如下实现:

DataCoord、DataNode、IndexCoord、IndexNode、Proxy、QueryCoord、QueryNode、RootCoord

代码语言:go
复制
type component interface {
	healthz.Indicator
	Run() error
	Stop() error
}

组件有启动Run(()和停止Stop()方法。

以启动rootCoord为例:

代码语言:go
复制
rootCoord = mr.runRootCoord(ctx, local, &wg)
代码语言:go
复制
func (mr *MilvusRoles) runRootCoord(ctx context.Context, localMsg bool, wg *sync.WaitGroup) component {
	wg.Add(1)
	return runComponent(ctx, localMsg, wg, components.NewRootCoord, metrics.RegisterRootCoord)
}

runComponent是一个通用函数,做了一个包裹,用来启动各组件。

代码语言:go
复制
func runComponent[T component](ctx context.Context,
	localMsg bool,
	runWg *sync.WaitGroup,
	creator func(context.Context, dependency.Factory) (T, error),
	metricRegister func(*prometheus.Registry),
) component {
    // role在这里就是组件
	var role T

	sign := make(chan struct{})
    // 在协程里面启动组件
	go func() {
		factory := dependency.NewFactory(localMsg)
		var err error
        // 返回的是一个component的具体实现。
		role, err = creator(ctx, factory)
		......
        // 启动component
		if err := role.Run(); err != nil {
			panic(err)
		}
		runWg.Done()
	}()
    ......
	return role
}

components.NewRootCoords是一个函数,函数作为入参,替换creator。

总体启动流程设计分析

命令格式:

代码语言:shell
复制
(base) root@db01:/mnt/milvus# bin/milvus
Usage:

milvus run [server type] [flags]
	Start a Milvus Server.
	Tips: Only the server type is 'mixture', flags about starting server can be used.
[flags]
	-rootcoord 'true'
		Start the rootcoord server.
	-querycoord 'true'
		Start the querycoord server.
	-indexcoord 'true'
		Start the indexcoord server.
	-datacoord 'true'
		Start the datacoord server.
	-alias ''
		Set alias


milvus stop [server type] [flags]
	Stop a Milvus Server.
[flags]
	-alias ''
		Set alias


milvus mck run [flags]
	Milvus data consistency check.
	Tips: The flags are optional.
[flags]
	-etcdIp ''
		Ip to connect the ectd server.
	-etcdRootPath ''
		The root path of operating the etcd data.
	-minioAddress ''
		Address to connect the minio server.
	-minioUsername ''
		The username to login the minio server.
	-minioPassword ''
		The password to login the minio server.
	-minioUseSSL 'false'
		Whether to use the ssl to connect the minio server.
	-minioBucketName ''
		The bucket to operate the data in it

milvus mck cleanTrash [flags]
	Clean the back inconsistent data
	Tips: The flags is the same as its of the 'milvus mck [flags]'


[server type]
	indexnode
	datacoord
	datanode
	standalone
	rootcoord
	proxy
	querycoord
	querynode
	mixture

1.根据这个命令格式来设计代码。

bin/milvus run|stop|mck

通过Args1判断执行的是哪种命令。使用switch实现。

每种命令都有execute()方法来具体执行逻辑。因此抽象出一个接口command,接口有一个execute()方法。

command的具体实现就有run、stop、mck等。

2.run命令后面接server type,分别启动不同的组件类型。

Args2:server type

这些组件可以抽象出一个接口component,接口有方法Run()、Stop(),用来启动和停止组件。

serverType参数从命令行传入进来需要如何处理?

常规的思路就是判断serverType是哪一种组件,然后启动相应组件。伪代码如下:

代码语言:go
复制
switch serverType {
   case "rootcoord":
    启动rootcoord()
   case "datacoord":
    启动datacoord()
   case "querycoord":
    启动querycoord()
   case "standalone":
    启动rootcoord()
    启动datacoord()
    启动querycoord()
   ......
}

这样设计会发现有组件启动冗余的代码。那如何设计才能不写重复代码?milvus是如何设计的。

milvus设计了一个结构体用来记录启动哪个组件。

代码语言:go
复制
// MilvusRoles decides which components are brought up with Milvus.
type MilvusRoles struct {
	EnableRootCoord  bool `env:"ENABLE_ROOT_COORD"`
	EnableProxy      bool `env:"ENABLE_PROXY"`
	EnableQueryCoord bool `env:"ENABLE_QUERY_COORD"`
	EnableQueryNode  bool `env:"ENABLE_QUERY_NODE"`
	EnableDataCoord  bool `env:"ENABLE_DATA_COORD"`
	EnableDataNode   bool `env:"ENABLE_DATA_NODE"`
	EnableIndexCoord bool `env:"ENABLE_INDEX_COORD"`
	EnableIndexNode  bool `env:"ENABLE_INDEX_NODE"`
    ......
}

根据传入的serverType参数来填充这个结构体。

代码语言:go
复制
switch serverType {
	case typeutil.RootCoordRole:
		role.EnableRootCoord = true
	case typeutil.ProxyRole:
		role.EnableProxy = true
	case typeutil.QueryCoordRole:
		role.EnableQueryCoord = true
	case typeutil.QueryNodeRole:
		role.EnableQueryNode = true
	case typeutil.DataCoordRole:
		role.EnableDataCoord = true
	case typeutil.DataNodeRole:
		role.EnableDataNode = true
	case typeutil.IndexCoordRole:
		role.EnableIndexCoord = true
	case typeutil.IndexNodeRole:
		role.EnableIndexNode = true
	case typeutil.StandaloneRole, typeutil.EmbeddedRole:
		role.EnableRootCoord = true
		role.EnableProxy = true
		role.EnableQueryCoord = true
		role.EnableQueryNode = true
		role.EnableDataCoord = true
		role.EnableDataNode = true
		role.EnableIndexCoord = true
		role.EnableIndexNode = true
		role.Local = true
		role.Embedded = serverType == typeutil.EmbeddedRole
	case RoleMixture:
		......
	default:
		......
	}

把多次启动代码转化为变量赋值重复的代码,相对来说代码结构变清晰了。

代码语言:go
复制
var rootCoord, queryCoord, indexCoord, dataCoord component
	var proxy, dataNode, indexNode, queryNode component
	if mr.EnableRootCoord {
		rootCoord = mr.runRootCoord(ctx, local, &wg)
	}

	if mr.EnableProxy {
		proxy = mr.runProxy(ctx, local, &wg)
	}

	if mr.EnableQueryCoord {
		queryCoord = mr.runQueryCoord(ctx, local, &wg)
	}

	if mr.EnableQueryNode {
		queryNode = mr.runQueryNode(ctx, local, &wg)
	}

	if mr.EnableDataCoord {
		dataCoord = mr.runDataCoord(ctx, local, &wg)
	}

	if mr.EnableDataNode {
		dataNode = mr.runDataNode(ctx, local, &wg)
	}

	if mr.EnableIndexCoord {
		indexCoord = mr.runIndexCoord(ctx, local, &wg)
	}

	if mr.EnableIndexNode {
		indexNode = mr.runIndexNode(ctx, local, &wg)
	}

3.每个组件都要启动,可以抽取一个方法runComponent()用来启动不同的组件。

代码语言:go
复制
func (mr *MilvusRoles) runRootCoord(ctx context.Context, localMsg bool, wg *sync.WaitGroup) component {
	wg.Add(1)
	return runComponent(ctx, localMsg, wg, components.NewRootCoord, metrics.RegisterRootCoord)
}

NewRootCoord是一个函数,用来创建rootCoord结构体。在runComponent里调用结构体的Run()方法。

堆栈:

代码语言:go
复制
milvus.RunMilvus()(cmd\main.go)
  |--c.execute()(cmd\milvus\milvus.go)
    |--execute()(cmd\milvus\run.go)
      |--roles.Run()(cmd\roles\roles.go)
        |--mr.runRootCoord()(同上)
          |--runComponent()(同上)
            |--role.Run(同上)
              |--Run()(comd\components\root_coord.go)

从堆栈可以很清晰的看出调用逻辑。

入口是main.go,然后调用milvus的run指令,然后调用roles(组件的抽象成层),最终调用components。

package调用方向:main->milvus->roles->components

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • milvus启动源码分析
    • 入口:
      • RunMilvus
        • execute
          • Run
            • 总体启动流程设计分析
            相关产品与服务
            向量数据库
            腾讯云向量数据库(Tencent Cloud VectorDB)是一款全托管的自研企业级分布式数据库服务,专用于存储、检索、分析多维向量数据。该数据库支持多种索引类型和相似度计算方法,单索引支持千亿级向量规模,可支持百万级 QPS 及毫秒级查询延迟。腾讯云向量数据库不仅能为大模型提供外部知识库,提高大模型回答的准确性,还可广泛应用于推荐系统、自然语言处理等 AI 领域。
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档