对象存储服务(Object Storage Service,OSS)是一种海量、安全、低成本、高可靠的云存储服务,适合存放任意类型的文件。容量和处理能力弹性扩展,多种存储类型供选择,全面优化存储成本。Minio 除了直接作为对象存储使用,还可以作为云上对象存储服务的网关层,无缝对接到 Amazon S3、MicroSoft Azure。 在学习minio的源码之前,先阅读下minio的客户端mc和golang sdk minio-go
https://github.com/minio/minio-go
https://github.com/minio/mc
1,MC
mc是在 minio-go的基础上做了命令行的包装,常用的命令如下:
ls 列出文件和文件夹。
mb 创建一个存储桶或一个文件夹。
cat 显示文件和对象内容。
pipe 将一个STDIN重定向到一个对象或者文件或者STDOUT。
share 生成用于共享的URL。
cp 拷贝文件和对象。
mirror 给存储桶和文件夹做镜像。
find 基于参数查找文件。
diff 对两个文件夹或者存储桶比较差异。
rm 删除文件和对象。
events 管理对象通知。
watch 监听文件和对象的事件。
policy 管理访问策略。
session 为cp命令管理保存的会话。
config 管理mc配置文件。
update 检查软件更新。
version 输出版本信息。
当然,入口函数还是main.go:
func main() {
mc.Main(os.Args)
}
对应的每一个命令的实现在cmd目录下,由于mc的命令有自己特殊的模板,所以它没有用常用的cobra,而是自己定义了一套minio/cli,首先我们看下Main函数,它定义在cmd/main.go中
func Main(args []string)
mainComplete()
defer profile.Start(profile.CPUProfile, profile.ProfilePath(mustGetProfileDir())).Stop()
probe.Init()
if err := registerApp(appName).Run(args); err != nil
mainComplete定义在cmd/auto-complete.go
func mainComplete() error
for _, cmd := range appCmds {
if cmd.Hidden {
continue
}
complCmds[cmd.Name] = cmdToCompleteCmd(cmd, "")
}
mcComplete := complete.Command{
Sub: complCmds,
GlobalFlags: complFlags,
}
complete.New(filepath.Base(os.Args[0]), mcComplete).Run()
它把appCmds里面的命令注册成mc的子命令,appCmds是子命令列表,定义在cmd/main.go中:
var appCmds = []cli.Command{
aliasCmd,
lsCmd,
mbCmd,
rbCmd,
cpCmd,
mirrorCmd,
catCmd,
headCmd,
pipeCmd,
shareCmd,
findCmd,
sqlCmd,
statCmd,
mvCmd,
treeCmd,
duCmd,
retentionCmd,
legalHoldCmd,
diffCmd,
rmCmd,
versionCmd,
ilmCmd,
encryptCmd,
eventCmd,
watchCmd,
undoCmd,
anonymousCmd,
policyCmd,
tagCmd,
replicateCmd,
adminCmd,
configCmd,
updateCmd,
}
probe定义在pkg/probe/probe.go
func Init() {
_, file, _, _ := runtime.Caller(1)
rootPath = filepath.Dir(file)
其中的Run命令是minio/cli框架的接口,分别定义在
minio/cli@v1.22.0/app.go
func (a *App) Run(arguments []string) (err error)
a.Setup()
c := a.Command(name)
if c != nil {
return c.Run(context)
}
minio/cli@v1.22.0/command.go
func (c Command) Run(ctx *Context) (err error)
err = HandleAction(c.Action, context)
下面以tree命令为例,看下实现的细节: cmd/tree-main.go
var treeCmd = cli.Command{
Name: "tree",
Usage: "list buckets and objects in a tree format",
Action: mainTree,
OnUsageError: onUsageError,
Before: setGlobalsFromContext,
Flags: append(treeFlags, globalFlags...),
CustomHelpTemplate
对应执行的命令是mainTree
func mainTree(cliCtx *cli.Context) error
args, depth, includeFiles, timeRef := parseTreeSyntax(ctx, cliCtx)
if e := doTree(ctx, targetURL, timeRef, 1, false, "", depth, includeFiles); e != nil
clnt, err := newClientFromAlias(targetAlias, targetURL)
e := doList(ctx, clnt, true, false, false, timeRef, false);
在doTree方法里初始化了一个client,调用了对应的list接口:
func doTree(ctx context.Context, url string, timeRef time.Time, level int, leaf bool, branchString string, depth int, includeFiles bool) error
clnt, err := newClientFromAlias(targetAlias, targetURL)
show := func(end bool) error
for content := range clnt.List(ctx, ListOptions{Recursive: false, TimeRef: timeRef, ShowDir: DirFirst})
client是一个客户端的接口cmd/client.go
type Client interface {
// Common operations
Stat(ctx context.Context, opts StatOptions) (content *ClientContent, err *probe.Error)
List(ctx context.Context, opts ListOptions) <-chan *ClientContent
对应有两个具体实现,一个是本地文件系统,一个s3:
cmd/client-fs.go
func (f *fsClient) List(ctx context.Context, opts ListOptions) <-chan *ClientContent
go f.listRecursiveInRoutine(contentCh, opts.WithMetadata)
go f.listDirOpt(contentCh, opts.Incomplete, opts.WithMetadata, opts.ShowDir)
go f.listInRoutine(contentCh, opts.WithMetadata)
通过walk方法递归调用visit方法做树状渲染展示:
func (f *fsClient) listRecursiveInRoutine(contentCh chan *ClientContent, isMetadata bool)
visitFS := func(fp string, fi os.FileInfo, e error) error
e := xfilepath.Walk(dirName, visitFS)
cmd/client-s3.go
func (c *S3Client) List(ctx context.Context, opts ListOptions) <-chan *ClientContent
c.versionedList(ctx, contentCh, opts)
c.unversionedList(ctx, contentCh, opts)
func (c *S3Client) versionedList(ctx context.Context, contentCh chan *ClientContent, opts ListOptions)
b, o := c.url2BucketAndObject()
func (c *S3Client) url2BucketAndObject() (bucketName, objectName string)
buckets, err := c.api.ListBuckets(ctx)
for _, bucket := range buckets {
contentCh <- c.bucketInfo2ClientContent(bucket)
for objectVersion := range c.listVersions(ctx, bucket.Name, "",
opts.Recursive, opts.TimeRef, opts.WithOlderVersions, opts.WithDeleteMarkers)
c.unversionedList(ctx, contentCh, opts)
func (c *S3Client) unversionedList(ctx context.Context, contentCh chan *ClientContent, opts ListOptions)
c.listIncompleteRecursiveInRoutine(ctx, contentCh, opts)
c.listIncompleteInRoutine(ctx, contentCh, opts)
c.listRecursiveInRoutine(ctx, contentCh, opts)
c.listInRoutine(ctx, contentCh, opts)
func (c *S3Client) listIncompleteRecursiveInRoutine(ctx context.Context, contentCh chan *ClientContent, opts ListOptions)
buckets, err := c.api.ListBuckets(ctx)
for _, bucket := range buckets {
for object := range c.api.ListIncompleteUploads(ctx, bucket.Name, o, isRecursive)
func (c *S3Client) listRecursiveInRoutine(ctx context.Context, contentCh chan *ClientContent, opts ListOptions)
buckets, err := c.api.ListBuckets(ctx)
for object := range c.listObjectWrapper(ctx, bucket.Name, o, isRecursive, time.Time{}, false, false, opts.WithMetadata, -1)
func (c *S3Client) listObjectWrapper(ctx context.Context, bucket, object string, isRecursive bool, timeRef time.Time, withVersions, withDeleteMarkers bool, metadata bool, maxKeys int) <-chan minio.ObjectInfo
return c.listVersions(ctx, bucket, object, isRecursive, timeRef, withVersions, withDeleteMarkers)
c.api.ListObjects(ctx, bucket, minio.ListObjectsOptions{Prefix: object, Recursive: isRecursive, UseV1: true, MaxKeys: maxKeys})
c.api.ListObjects(ctx, bucket, minio.ListObjectsOptions{Prefix: object, Recursive: isRecursive, WithMetadata: metadata, MaxKeys: maxKeys})
最终都是调用了SDK中的对应方法:
minio/minio-go/v7@v7.0.16-0.20211108161804-a7a36ee131df/api-list.go
func (c *Client) ListObjects(ctx context.Context, bucketName string, opts ListObjectsOptions) <-chan ObjectInfo
func (c *Client) ListIncompleteUploads(ctx context.Context, bucketName, objectPrefix string, recursive bool) <-chan ObjectMultipartInfo {
return c.listIncompleteUploads(ctx, bucketName, objectPrefix, recursive)
}
我们看下另外一个ls命令,实现也是类似的:
cmd/ls-main.go
var lsCmd = cli.Command{
Name: "ls",
Usage: "list buckets and objects",
Action: mainList,
OnUsageError: onUsageError,
Before: setGlobalsFromContext,
Flags: append(lsFlags, globalFlags...),
CustomHelpTemplate:
func mainList(cliCtx *cli.Context) error
if e := doList(ctx, clnt, isRecursive, isIncomplete, isSummary, timeRef, withOlderVersions); e != nil
cmd/ls.go
func doList(ctx context.Context, clnt Client, isRecursive, isIncomplete, isSummary bool, timeRef time.Time, withOlderVersions bool) error
for content := range clnt.List(ctx, ListOptions{
Recursive: isRecursive,
Incomplete: isIncomplete,
TimeRef: timeRef,
WithOlderVersions: withOlderVersions || !timeRef.IsZero(),
WithDeleteMarkers: true,
ShowDir: DirNone,
})
2,SDK:minio-go
首先我们看下sdk是如何使用的:
1,创建client对象:
minioClient, err := minio.New(endpoint, &minio.Options{
Creds: credentials.NewStaticV4(accessKeyID, secretAccessKey, ""),
Secure: useSSL,
})
2,创建bucket,或者确认bucket是否存在:
err = minioClient.MakeBucket(ctx, bucketName, minio.MakeBucketOptions{Region: location})
exists, errBucketExists := minioClient.BucketExists(ctx, bucketName)
3,创建文件
info, err := minioClient.FPutObject(ctx, bucketName, objectName, filePath, minio.PutObjectOptions{ContentType: contentType})
client对象定义在api.go
type Client struct {
/// Standard options.
// Parsed endpoint url provided by the user.
endpointURL *url.URL
// Holds various credential providers.
credsProvider *credentials.Credentials
// Custom signerType value overrides all credentials.
overrideSignerType credentials.SignatureType
// User supplied.
appInfo struct {
appName string
appVersion string
}
// Indicate whether we are using https or not
secure bool
// Needs allocation.
httpClient *http.Client
bucketLocCache *bucketLocationCache
// Advanced functionality.
isTraceEnabled bool
traceErrorsOnly bool
traceOutput io.Writer
// S3 specific accelerated endpoint.
s3AccelerateEndpoint string
// Region endpoint
region string
// Random seed.
random *rand.Rand
// lookup indicates type of url lookup supported by server. If not specified,
// default to Auto.
lookup BucketLookupType
// Factory for MD5 hash functions.
md5Hasher func() md5simd.Hasher
sha256Hasher func() md5simd.Hasher
healthStatus int32
}
func New(endpoint string, opts *Options) (*Client, error)
这个文件下还定义了一个executeMethod方法,这个方法是所有http请求的入口:
func (c *Client) executeMethod(ctx context.Context, method string, metadata requestMetadata) (res *http.Response, err error)
bodyCloser, ok := metadata.contentBody.(io.Closer)
for range c.newRetryTimer(retryCtx, reqRetry, DefaultRetryUnit, DefaultRetryCap, MaxJitter)
req, err = c.newRequest(ctx, method, metadata)
res, err = c.do(req)
do方法简单包装了http client的do方法:
func (c *Client) do(req *http.Request) (resp *http.Response, err error)
resp, err = c.httpClient.Do(req)
api-put-bucket.go里定义了创建bucket的方法:
func (c *Client) MakeBucket(ctx context.Context, bucketName string, opts MakeBucketOptions) (err error) {
return c.makeBucket(ctx, bucketName, opts)
}
func (c *Client) makeBucket(ctx context.Context, bucketName string, opts MakeBucketOptions) (err error)
err = c.doMakeBucket(ctx, bucketName, opts.Region, opts.ObjectLocking)
最终调用了上述executeMethod方法:
func (c *Client) doMakeBucket(ctx context.Context, bucketName string, location string, objectLockEnabled bool) (err error)
createBucketConfigBytes, err = xml.Marshal(createBucketConfig)
reqMetadata.contentMD5Base64 = sumMD5Base64(createBucketConfigBytes)
resp, err := c.executeMethod(ctx, http.MethodPut, reqMetadata)
BucketExists方法定义在api-stat.go
func (c *Client) BucketExists(ctx context.Context, bucketName string) (bool, error)
resp, err := c.executeMethod(ctx, http.MethodHead, requestMetadata{
bucketName: bucketName,
contentSHA256Hex: emptySHA256Hex,
})
FPutObject定义在:api-put-object-file-context.go
func (c *Client) FPutObject(ctx context.Context, bucketName, objectName, filePath string, opts PutObjectOptions) (info UploadInfo, err error)
fileReader, err := os.Open(filePath)
fileStat, err := fileReader.Stat()
fileSize := fileStat.Size()
return c.PutObject(ctx, bucketName, objectName, fileReader, fileSize, opts)
api-put-object.go
func (c *Client) PutObject(ctx context.Context, bucketName, objectName string, reader io.Reader, objectSize int64,
opts PutObjectOptions) (info UploadInfo, err error)
return c.putObjectCommon(ctx, bucketName, objectName, reader, objectSize, opts)
根据大小和url的类型确定上传方式,可以整体也可以分片,还可以流式
func (c *Client) putObjectCommon(ctx context.Context, bucketName, objectName string, reader io.Reader, size int64, opts PutObjectOptions) (info UploadInfo, err error)
if size > int64(maxMultipartPutObjectSize)
if s3utils.IsGoogleEndpoint(*c.endpointURL) {
return c.putObject(ctx, bucketName, objectName, reader, size, opts)
if c.overrideSignerType.IsV2() {
if size >= 0 && size < int64(partSize) || opts.DisableMultipart {
return c.putObject(ctx, bucketName, objectName, reader, size, opts)
}
return c.putObjectMultipart(ctx, bucketName, objectName, reader, size, opts)
}
return c.putObjectMultipartStream(ctx, bucketName, objectName, reader, size, opts)
这里定义了一个常量,最大允许5T大小
api-put-object-streaming.go
func (c *Client) putObject(ctx context.Context, bucketName, objectName string, reader io.Reader, size int64, opts PutObjectOptions) (info UploadInfo, err error)
return c.putObjectDo(ctx, bucketName, objectName, readSeeker, md5Base64, "", size, opts)
func (c *Client) putObjectDo(ctx context.Context, bucketName, objectName string, reader io.Reader, md5Base64, sha256Hex string, size int64, opts PutObjectOptions) (UploadInfo, error)
resp, err := c.executeMethod(ctx, http.MethodPut, reqMetadata)
api-put-object-multipart.go
func (c *Client) putObjectMultipart(ctx context.Context, bucketName, objectName string, reader io.Reader, size int64,
opts PutObjectOptions) (info UploadInfo, err error)
info, err = c.putObjectMultipartNoStream(ctx, bucketName, objectName, reader, opts)
return c.putObject(ctx, bucketName, objectName, reader, size, opts)
分片上传根据分片大小,计算出分片数目,然后创建上传的id,最后合并分片:
func (c *Client) putObjectMultipartNoStream(ctx context.Context, bucketName, objectName string, reader io.Reader, opts PutObjectOptions) (info UploadInfo, err error)
totalPartsCount, partSize, _, err := OptimalPartInfo(-1, opts.PartSize)
uploadID, err := c.newUploadID(ctx, bucketName, objectName, opts)
for partNumber <= totalPartsCount {
objPart, uerr := c.uploadPart(ctx, bucketName, objectName, uploadID, rd, partNumber,
md5Base64, sha256Hex, int64(length), opts.ServerSideEncryption)
uploadInfo, err := c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload, PutObjectOptions{})
func (c *Client) uploadPart(ctx context.Context, bucketName, objectName, uploadID string, reader io.Reader,
partNumber int, md5Base64, sha256Hex string, size int64, sse encrypt.ServerSide) (ObjectPart, error)
resp, err := c.executeMethod(ctx, http.MethodPut, reqMetadata)
func (c *Client) completeMultipartUpload(ctx context.Context, bucketName, objectName, uploadID string,
complete completeMultipartUpload, opts PutObjectOptions) (UploadInfo, error)
resp, err := c.executeMethod(ctx, http.MethodPost, reqMetadata)
除了基本的上传,在examples下面还定义了一些其他的接口的使用例子:
examples/minio/listen-notification.go
minioClient.ListenNotification(context.Background(), "PREFIX", "SUFFIX", []string{
"s3:BucketCreated:*",
"s3:BucketRemoved:*",
"s3:ObjectCreated:*",
"s3:ObjectAccessed:*",
"s3:ObjectRemoved:*",
})
examples/minio/listenbucketnotification.go
minioClient.ListenBucketNotification(context.Background(), "YOUR-BUCKET", "PREFIX", "SUFFIX", []string{
"s3:ObjectCreated:*",
"s3:ObjectAccessed:*",
"s3:ObjectRemoved:*",
})
examples/minio/putobjectsnowball.go
minioClient.ListObjects(context.Background(), YOURBUCKET, lopts)
examples/minio/getbucketreplicationmetrics.go
s3Client.TraceOn(os.Stderr)
m, err := s3Client.GetBucketReplicationMetrics(context.Background(), "bucket")
api-bucket-notification.go
func (c *Client) ListenNotification(ctx context.Context, prefix, suffix string, events []string) <-chan notification.Info
func (c *Client) ListenBucketNotification(ctx context.Context, bucketName, prefix, suffix string, events []string) <-chan notification.Info
resp, err := c.executeMethod(ctx, http.MethodGet, requestMetadata{
bucketName: bucketName,
queryValues: urlValues,
contentSHA256Hex: emptySHA256Hex,
})
以上就是mc和sdk的源码,整体来说就是对minio的接口做了一层httpclient 的封装,加了一些参数校验的逻辑。
本文分享自 golang算法架构leetcode技术php 微信公众号,前往查看
如有侵权,请联系 cloudcommunity@tencent.com 删除。
本文参与 腾讯云自媒体同步曝光计划 ,欢迎热爱写作的你一起参与!