前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >golang 源码分析:minio(part I)路由

golang 源码分析:minio(part I)路由

作者头像
golangLeetcode
发布2022-08-03 13:52:02
9500
发布2022-08-03 13:52:02
举报
文章被收录于专栏:golang算法架构leetcode技术php

MinIO的命令行启动只有2个命令,一个是server、一个是gateway,分别用于启动服务和网关,而整个MinIO的启动是从main.go文件开始的

引入了两个包

代码语言:javascript
复制
_ "github.com/minio/minio/internal/init"
_  "github.com/minio/minio/cmd/gateway

看下对应的init函数:

internal/init/init.go

internal/init/init_darwin_amd64.go

代码语言:javascript
复制
 cpuid.CPU.Disable(cpuid.AVX512F

然而,在gateway里面引入了几个常用的实现

cmd/gateway/gateway.go

代码语言:javascript
复制
      _ "github.com/minio/minio/cmd/gateway/nas"
      _ "github.com/minio/minio/cmd/gateway/azure"
      _ "github.com/minio/minio/cmd/gateway/s3"
      _ "github.com/minio/minio/cmd/gateway/hdfs"
      _ "github.com/minio/minio/cmd/gateway/gcs

以s3为例

cmd/gateway/s3/gateway-s3.go

代码语言:javascript
复制
func init() {
  const s3GatewayTemplate = `NAME:
  {{.HelpName}} - {{.Usage}}
   
   
   minio.RegisterGatewayCommand(cli.Command{
    Name:               minio.S3BackendGateway,
    Usage:              "Amazon Simple Storage Service (S3)",
    Action:             s3GatewayMain,
    CustomHelpTemplate: s3GatewayTemplate,
    HideHelpCommand:    true,
  })

调用了RegisterGatewayCommand将对应的命令注册为gatewayCmd的子命令。而对应的处理函数为:

代码语言:javascript
复制
func s3GatewayMain(ctx *cli.Context)
   minio.StartGateway(ctx, &S3{
    host:  args.First(),
    debug: env.Get("_MINIO_SERVER_DEBUG", config.EnableOff) == config.EnableOn,
  })

调用了StartGateway方法,它的实现在cmd/gateway-main.go

代码语言:javascript
复制
func StartGateway(ctx *cli.Context, gw Gateway) 
      router := mux.NewRouter().SkipClean(true).UseEncodedPath()
      registerSTSRouter(router)
      registerAdminRouter(router, false)
      registerAPIRouter(router)
      httpServer := xhttp.NewServer(addrs, setCriticalErrorHandler(corsHandler(router)), getCert)
      newAllSubsystems()
      buckets, err := newObject.ListBuckets(GlobalContext)
      globalConsoleSrv, err = initConsoleServer()
      (globalConsoleSrv.Serve()

注册了STS,Admin和API等router,然后起了一个httpserver,最后调用Serve方法开启端口监听。

sts路由注册函数实现在cmd/sts-handlers.go

代码语言:javascript
复制
func registerSTSRouter(router *mux.Router)
    stsRouter.Methods(http.MethodPost).MatcherFunc(func(r *http.Request, rm *mux.RouteMatch) bool {
    ctypeOk := wildcard.MatchSimple("application/x-www-form-urlencoded*", r.Header.Get(xhttp.ContentType))
    noQueries := len(r.URL.RawQuery) == 0
    return ctypeOk && noQueries
  }).HandlerFunc(httpTraceAll(sts.AssumeRoleWithSSO))
    
    stsRouter.Methods(http.MethodPost).HandlerFunc(httpTraceAll(sts.AssumeRoleWithClientGrants)).
    Queries(stsAction, clientGrants).
    Queries(stsVersion, stsAPIVersion).
    Queries(stsToken, "{Token:.*}")

这里定义了sts常见的一些接口

代码语言:javascript
复制
    const (
  // STS API version.
  stsAPIVersion             = "2011-06-15"
  stsVersion                = "Version"
  stsAction                 = "Action"
  stsPolicy                 = "Policy"

admin路由定义了一系列后台操作的接口 cmd/admin-router.go

代码语言:javascript
复制
func registerAdminRouter(router *mux.Router, enableConfigOps bool)
      adminRouter.Methods(http.MethodPost).Path(adminVersion+"/service").HandlerFunc(gz(httpTraceAll(adminAPI.ServiceHandler))).Queries("action", "{action:.*}")

apirouter 定义了我们真正操作对象存储的接口cmd/api-router.go

代码语言:javascript
复制
func registerAPIRouter(router *mux.Router) 
      gz, err := gzhttp.NewWrapper(gzhttp.MinSize(1000), gzhttp.CompressionLevel(gzip.BestSpeed))
      routers = append(routers, apiRouter.PathPrefix("/{bucket}").Subrouter())
      
      router.Methods(http.MethodHead).Path("/{object:.+}").HandlerFunc(
      collectAPIStats("headobject", maxClients(gz(httpTraceAll(api.HeadObjectHandler)))))

以"/{object:.+}" 为例子,对应的handlerFunc其实被middleware包裹了很多层:cmd/handler-api.go

代码语言:javascript
复制
    func maxClients(f http.HandlerFunc) http.HandlerFunc 
      f.ServeHTTP(w, r)

github.com/klauspost/compress@v1.13.6/gzhttp/gzip.go

代码语言:javascript
复制
func NewWrapper(opts ...option) (func(http.Handler) http.HandlerFunc, error) 
      h.ServeHTTP(gwcn, r)

cmd/handler-utils.go

代码语言:javascript
复制
func httpTraceAll(f http.HandlerFunc) http.HandlerFunc 
      f.ServeHTTP(w, r)

最后调用了headObjectHandler这个接口cmd/object-handlers.go

代码语言:javascript
复制
func (api objectAPIHandlers) HeadObjectHandler(w http.ResponseWriter, r *http.Request) 
      objectAPI := api.ObjectAPI()
      api.headObjectInArchiveFileHandler(ctx, objectAPI, bucket, object, w, r)
      api.headObjectHandler(ctx, objectAPI, bucket, object, w, r)

对应于s3,它的实现在:cmd/s3-zip-handlers.go

代码语言:javascript
复制
func (api objectAPIHandlers) headObjectInArchiveFileHandler(ctx context.Context, objectAPI ObjectLayer, bucket, object string, w http.ResponseWriter, r *http.Request) 
      _, err = getObjectInfo(ctx, bucket, zipPath, opts)
      file, err := zipindex.FindSerialized(zipInfo, object)

cmd/object-api-interface.go

代码语言:javascript
复制
type ObjectLayer interface {
  // Locking operations on object.
  NewNSLock(bucket string, objects ...string) RWLocker
  GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error)

可以看到最终调用的是minio的SDK,去请求远程的s3服务cmd/gateway/s3/gateway-s3.go

代码语言:javascript
复制
func (l *s3Objects) GetObjectInfo(ctx context.Context, bucket string, object string, opts minio.ObjectOptions) (objInfo minio.ObjectInfo, err error) 
      oi, err := l.Client.StatObject(ctx, bucket, object, miniogo.StatObjectOptions{
    ServerSideEncryption: opts.ServerSideEncryption,
  })

github.com/minio/minio-go/v7@v7.0.15/core.go

代码语言:javascript
复制
func (c Core) StatObject(ctx context.Context, bucketName, objectName string, opts StatObjectOptions) (ObjectInfo, error) {
  return c.statObject(ctx, bucketName, objectName, opts)
}

接着,我们回到main.go,main函数实现很简单:

代码语言:javascript
复制
func main() {
  minio.Main(os.Args)
}

cmd/main.go

代码语言:javascript
复制
func Main(args []string)
        if err := newApp(appName).Run(args); err != nil {

创建了一个app对象

代码语言:javascript
复制
func newApp(name string) *cli.App 
    commandsTree := trie.NewTrie()
    registerCommand := func(command cli.Command)
    commands = append(commands, command)
    commandsTree.Insert(command.Name)
    
    findClosestCommands := func(command string) []string {
    for _, value := range commandsTree.Walk(commandsTree.Root()) {
    registerCommand(serverCmd)
    registerCommand(gatewayCmd)
    app := cli.NewApp()

其中父子命令以前缀树的形式存储:trie/trie.go

代码语言:javascript
复制
func NewTrie() *Trie {
  return &Trie{
    root: newNode(),
    size: 0,
  }
}

app对象定义在cli包里cli/app.go

代码语言:javascript
复制
type App struct {
  // The name of the program. Defaults to path.Base(os.Args[0])
  Name string  
代码语言:javascript
复制
func NewApp() *App {

可以看到在newApp方法里注册了serverCmd和gatewayCmd,首先看下serverCmd,代码位置:cmd/server-main.go

代码语言:javascript
复制
var serverCmd = cli.Command{
  Name:   "server",
  Usage:  "start object storage server",
  Flags:  append(ServerFlags, GlobalFlags...),
  Action: serverMain,
  CustomHelpTemplate:

对应的处理方法是:

代码语言:javascript
复制
func serverMain(ctx *cli.Context)
      bitrotSelfTest()
      erasureSelfTest()
      compressSelfTest()
      globalConsoleSys.SetNodeName(globalLocalNodeName)
      newAllSubsystems()
        globalNotificationSys = NewNotificationSys(globalEndpoints)
        globalBucketMetadataSys = NewBucketMetadataSys()
        globalBucketMetadataSys.Reset()
        globalBucketMonitor = bandwidth.NewMonitor(GlobalContext, totalNodeCount())
        globalConfigSys = NewConfigSys()
        globalIAMSys = NewIAMSys()
        globalPolicySys = NewPolicySys()
        globalLifecycleSys = NewLifecycleSys()
        globalBucketSSEConfigSys = NewBucketSSEConfigSys()
        globalBucketObjectLockSys = NewBucketObjectLockSys()
        globalBucketQuotaSys = NewBucketQuotaSys()
        globalBucketVersioningSys = NewBucketVersioningSys()
        globalBucketVersioningSys.Reset()
        globalBucketTargetSys = NewBucketTargetSys()
        globalTierConfigMgr = NewTierConfigMgr()
      checkUpdate(getMinioMode())
      setMaxResources()
        sys.GetMaxThreads()
        sys.GetMaxOpenFileLimit()
         sys.GetMaxMemoryLimit()
      handler, err := configureServerHandler(globalEndpoints)
      httpServer := xhttp.NewServer(addrs, setCriticalErrorHandler(corsHandler(handler)), getCert)
      setHTTPServer(httpServer)
      newObject, err := newObjectLayer(GlobalContext, globalEndpoints)
      initBackgroundExpiry(GlobalContext, newObject)
       err = initServer(GlobalContext, newObject); err != nil
      go globalIAMSys.Init(GlobalContext, newObject, globalEtcdClient, globalRefreshIAMInterval)
      initDataScanner(GlobalContext, newObject)
      initBackgroundReplication(GlobalContext, newObject)
      initBackgroundTransition(GlobalContext, newObject)
      globalTierJournal, err = initTierDeletionJournal(GlobalContext)
      cacheAPI, err = newServerCacheObjects(GlobalContext, globalCacheConfig)
      setCacheObjectLayer(cacheAPI)
      globalConsoleSrv, err = initConsoleServer()
      globalConsoleSrv.Serve()

核心思想是注册路由,最后启动http server,而对应的gatewayCmd的实现在

cmd/gateway-main.go

代码语言:javascript
复制
gatewayCmd = cli.Command{
    Name:            "gateway",
    Usage:           "start object storage gateway",
    Flags:           append(ServerFlags, GlobalFlags...),
    HideHelpCommand: true,
  }

可以看到,它并没有对应的处理方法,很奇怪对吧,那是应为对于不同的对象存储服务,它是以子命令的形式注册进来的,注册的位置在init函数中,也就是前面介绍的注册逻辑。注册函数如下:

代码语言:javascript
复制
func RegisterGatewayCommand(cmd cli.Command) error {
  cmd.Flags = append(append(cmd.Flags, ServerFlags...), GlobalFlags...)
  gatewayCmd.Subcommands = append(gatewayCmd.Subcommands, cmd)
  return nil
}

注册完路由会启动一个httpserver,minio对httpserver 进行了简单的包装,代码位置是 internal/http/server.go

代码语言:javascript
复制
func NewServer(addrs []string, handler http.Handler, getCert certs.GetCertificateFunc) *Server

路由用的是现成的路由包:github.com/gorilla/mux@v1.8.0/mux.go

代码语言:javascript
复制
func NewRouter() *Router {
  return &Router{namedRoutes: make(map[string]*Route)}
}

serverCmd的路由在cmd/routers.go,同样注册了STS,admin和api路由:

代码语言:javascript
复制
func configureServerHandler(endpointServerPools EndpointServerPools) (http.Handler, error) 
    registerAdminRouter(router, true)
    registerSTSRouter(router)
    registerAPIRouter(router)
    router.Use(globalHandlers...)

以api router为例看下具体实现:

代码语言:javascript
复制
func registerAPIRouter(router *mux.Router)
    routers = append(routers, apiRouter.MatcherFunc(func(r *http.Request, match *mux.RouteMatch) bool
    routers = append(routers, apiRouter.Host("{bucket:.+}."+domainName).Subrouter())
    routers = append(routers, apiRouter.PathPrefix("/{bucket}").Subrouter())
    
    router.Methods(http.MethodHead).Path("/{object:.+}").HandlerFunc(
      collectAPIStats("headobject", maxClients(gz(httpTraceAll(api.HeadObjectHandler)))))

cmd/object-handlers.go

代码语言:javascript
复制
func (api objectAPIHandlers) HeadObjectHandler(w http.ResponseWriter, r *http.Request)
      api.headObjectInArchiveFileHandler(ctx, objectAPI, bucket, object, w, r)
      api.headObjectHandler(ctx, objectAPI, bucket, object, w, r)

cmd/s3-zip-handlers.go

代码语言:javascript
复制
func (api objectAPIHandlers) headObjectInArchiveFileHandler(ctx context.Context, objectAPI ObjectLayer, bucket, object string, w http.ResponseWriter, r *http.Request) 
      getObjectInfo := objectAPI.GetObjectInfo
      _, err = getObjectInfo(ctx, bucket, zipPath, opts)

cmd/object-api-interface.go

代码语言:javascript
复制
type ObjectLayer interface {
  // Locking operations on object.
  NewNSLock(bucket string, objects ...string) RWLocker

整体路由serverCmd和gatewayCmd实现上是非常相似的。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-11-14,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 golang算法架构leetcode技术php 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
命令行工具
腾讯云命令行工具 TCCLI 是管理腾讯云资源的统一工具。使用腾讯云命令行工具,您可以快速调用腾讯云 API 来管理您的腾讯云资源。此外,您还可以基于腾讯云的命令行工具来做自动化和脚本处理,以更多样的方式进行组合和重用。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档