前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >自动监控文件并上传S3对象存储服务器 | Golang

自动监控文件并上传S3对象存储服务器 | Golang

作者头像
ZGGSONG
发布2022-09-23 11:21:15
1.2K0
发布2022-09-23 11:21:15
举报
文章被收录于专栏:日志

前言

需求:

  • 监控目录下文件变动
  • 上传文件至S3服务器

本地平台:Windows 10 专业版 21H2 (19044.1826)、开发语言:go1.18.3 windows/amd64

监控目录下文件变动使用 github.com/fsnotify/fsnotify 上传测试服务器使用 Minio 进行测试

实现

监控文件生成

根据仓库中的示例代码也可以实现

代码语言:javascript
复制
package main

import (
    "log"

    "github.com/fsnotify/fsnotify"
)

func main() {
    // Create new watcher.
    watcher, err := fsnotify.NewWatcher()
    if err != nil {
        log.Fatal(err)
    }
    defer watcher.Close()

    // Start listening for events.
    go func() {
        for {
            select {
            case event, ok := <-watcher.Events:
                if !ok {
                    return
                }
                log.Println("event:", event)
                if event.Op == fsnotify.Write {
                    log.Println("modified file:", event.Name)
                }
            case err, ok := <-watcher.Errors:
                if !ok {
                    return
                }
                log.Println("error:", err)
            }
        }
    }()

    // Add a path.
    err = watcher.Add("/tmp")
    if err != nil {
        log.Fatal(err)
    }

    // Block main goroutine forever.
    <-make(chan struct{})
}

监控文件生成时有两种监测结果(winodws):

  • 复制文件是触发一次Create事件,两次Write事件
  • 直接创建文件(cmd&代码)触发一次Create事件,一次Write事件

经过测试Windows上是这样,Linux就比较一致,不论怎么创建文件,都是一次Create事件+一次Write事件

这个示例只能解决监控当前目录下的内容,子目录下的内容无法监控

解决:在监控到创建了目录以后,把新创建的目录加入到监控目录中去

代码语言:javascript
复制
func StartWatch(dir string) {
    watch, _ := fsnotify.NewWatcher()
    w := Watch{
        watch: watch,
    }
    w.watchEx(dir)
    log.Println("开始监控目录: ", dir, "...")
    select {}
}

func (w *Watch) watchEx(dir string) {
    //通过Walk来遍历目录下的所有子目录
    err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
        //这里判断是否为目录,只需监控目录即可 || 目录下的文件也在监控范围内,不需要我们一个一个加
        if info.IsDir() {
            path, err := filepath.Abs(path)
            if err != nil {
                return err
            }
            err = w.watch.Add(path)
            if err != nil {
                return err
            }
        }
        return nil
    })
    if err != nil {
        log.Println("监控失败 : ", err.Error())
        return
    }
    go w.watchExec()
}

func (w *Watch) watchExec() {
    for {
        select {
        case ev := <-w.watch.Events:
            {
                if ev.Op&fsnotify.Create == fsnotify.Create {
                    fmt.Println("创建文件 : ", ev.Name)
                    //获取新创建文件的信息,如果是目录,则加入监控中
                    file, err := os.Stat(ev.Name)
                    if err == nil && file.IsDir() {
                        w.watch.Add(ev.Name)
                        fmt.Println("添加监控 : ", ev.Name)
                    }
                }
                if ev.Op&fsnotify.Write == fsnotify.Write {
                    fmt.Println("写入文件 : ", ev.Name)
                }
                if ev.Op&fsnotify.Remove == fsnotify.Remove {
                    fmt.Println("删除文件 : ", ev.Name)
                    //如果删除文件是目录,则移除监控
                    fi, err := os.Stat(ev.Name)
                    if err == nil && fi.IsDir() {
                        w.watch.Remove(ev.Name)
                        fmt.Println("删除监控 : ", ev.Name)
                    }
                }
                if ev.Op&fsnotify.Rename == fsnotify.Rename {
                    //如果重命名文件是目录,则移除监控 ,注意这里无法使用os.Stat来判断是否是目录了
                    //因为重命名后,go已经无法找到原文件来获取信息了,所以简单粗爆直接remove
                    fmt.Println("重命名文件 : ", ev.Name)
                    w.watch.Remove(ev.Name)
                }
                if ev.Op&fsnotify.Chmod == fsnotify.Chmod {
                    fmt.Println("修改权限 : ", ev.Name)
                }
            }
        case err := <-w.watch.Errors:
            {
                log.Errorln("监控目录出错: ", err)
                return
            }
        }
    }
}

type Watch struct {
    watch *fsnotify.Watcher
}

上传S3服务器

  1. 上传之前得先有一台S3对象存储服务器,这里我直接就使用 Minio 镜像进行搭建

Minio新版和旧版还是有出入的,搭建以及后续维护和旧版差别比较大,而且网上各类教程主要针对旧版,方便后续排错,我是直接安装旧版

代码语言:javascript
复制
docker pull minio/minio:RELEASE.2021-06-17T00-10-46Z

docker run -d -p 9000:9000 --restart=always --name minioDemo\
  -e "MINIO_ACCESS_KEY=admin" \
  -e "MINIO_SECRET_KEY=admin123." \
  -v D:/docker/minio/data:/data \
  -v D:/docker/minio/config:/root/.minio \
  minio/minio:RELEASE.2021-06-17T00-10-46Z server /data
  1. 我这边使用的是 https://github.com/aws/aws-sdk-go-v2 的SDK,基本参照 官方文档 进行开发,整体难度也不高,主要是在创建私有S3服务的Client 创建上有点坑,我也记录写下来过 详情查看

过了段时间再看,发现官方的文档整理了,好多东西都没了,主要的代码Github也没有了 另一个官方文档

如下代码片段仅供参考(仅实现了上传对象及Tag)

代码语言:javascript
复制
func uploadHandler(ctx context.Context, path, keyName, tags string) error {
    endpoint := global.GLO_CONF.S3EndPoint
    accessKey := global.GLO_CONF.S3AccessKey
    secretKey := global.GLO_CONF.S3SecretKey
    bucket := global.GLO_CONF.S3Bucket

    client, err := util.GetClient(ctx, endpoint, accessKey, secretKey)
    if err != nil {
        return errors.New("创建S3连接请求失败, " + err.Error())
    }

    if err = util.UPutObject(ctx, client, path, bucket, keyName); err != nil {
        return err
    }
    if err = util.UPutTag(ctx, client, bucket, keyName, tags); err != nil {
        return err
    }
    return nil
}
代码语言:javascript
复制
func GetClient(ctx context.Context, endpoint, accessKey, secretKey string) (*s3.Client, error) {
    cfg, err := config.LoadDefaultConfig(
        ctx,
        config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(accessKey, secretKey, "")),
        config.WithEndpointResolverWithOptions(
            aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) {
                return aws.Endpoint{URL: endpoint}, nil
            })),
        config.WithRegion("us-east-1"),
    )
    if err != nil {
        return nil, err
    }

    client := s3.NewFromConfig(cfg, func(o *s3.Options) {
        o.UsePathStyle = true
        o.EndpointOptions.DisableHTTPS = true
    })
    return client, nil
}

type S3PutObjectAPI interface {
    PutObject(ctx context.Context,
        params *s3.PutObjectInput,
        optFns ...func(*s3.Options)) (*s3.PutObjectOutput, error)
}

func PutFile(c context.Context, api S3PutObjectAPI, input *s3.PutObjectInput) (*s3.PutObjectOutput, error) {
    return api.PutObject(c, input)
}

type S3PutTaggingAPI interface {
    PutObjectTagging(ctx context.Context,
        params *s3.PutObjectTaggingInput,
        optFns ...func(*s3.Options)) (*s3.PutObjectTaggingOutput, error)
}

func putTag(c context.Context, api S3PutTaggingAPI, input *s3.PutObjectTaggingInput) (*s3.PutObjectTaggingOutput, error) {
    return api.PutObjectTagging(c, input)
}

// UPutObject 上传对象
func UPutObject(ctx context.Context, client *s3.Client, path, bucket, key string) error {
    file, err := os.Open(path)
    if err != nil {
        return errors.New("上传对象时打开文件失败, " + err.Error())
    }
    defer file.Close()

    input := &s3.PutObjectInput{
        Bucket: aws.String(bucket),
        Key:    aws.String(key),
        Body:   file,
    }
    _, err = PutFile(ctx, client, input)
    if err != nil {
        return errors.New("上传对象时发生错误, " + err.Error())
    }
    return nil
}

// UPutTag 上传标签
func UPutTag(ctx context.Context, client *s3.Client, bucket, key string, tags map[string]string) error {
    input := &s3.PutObjectTaggingInput{
        Bucket: aws.String(bucket),
        Key:    aws.String(key),
    }
    index := 0
    for k, v := range tags {
        input.Tagging.TagSet[index] = types.Tag{Key: aws.String(k), Value: aws.String(v)}
        index++
    }
    _, err := putTag(ctx, client, input)
    if err != nil {
        return errors.New("上传标签时发生错误, " + err.Error())
    }
    return nil
}

后续

在实际实现过程中发现大文件在生成是比较耗时,而且创建文件的方式多种多样,一方面可以从监控的角度去解决,另一方面(我的方式),通过数据库的方式,这样刚好解决了上传超时或失败后的重传问题,多个协程之间通过 channel 来解决通信问题,最后代码主体结构变成了如下:

代码语言:javascript
复制
//init
...
  
//开启文件监控
go util.InitWatch(listeningPath)

//每隔INTERVAL 自动检测上传给对象存储服务器
go func() {
  for {
    timer := time.NewTimer(time.Second * time.Duration(global.GLO_CONF.Interval))
    <-timer.C
    core.UploadServe(global.GLO_DB)
  }
}()

//每隔INTERVAL 转发完成信息
go func() {
  for {
    timer := time.NewTimer(time.Second * time.Duration(global.GLO_CONF.Interval))
    <-timer.C
    core.ScadaMesServe(global.GLO_DB)
  }
}()

//删除数据库废弃数据, 超时时间: 30天
go func() {
  for {
    // 检查频率: 24小时
    timer := time.NewTimer(time.Hour * time.Duration(24))
    <-timer.C
    core.CleanServe(global.GLO_DB)
  }
}()

//阻塞等待文件变化并保存至数据库
for {
  select {
    case path := <-global.GLO_CH_FILE:
    go core.Add2Cache(path, kmap)
    case reqByte := <-global.GLO_CH_REQ:
    go core.UpdateCache(reqByte)
  }
}

当然也有别的方式,只是我没想到

参考

本文作者:ZGGSONG

本文链接:https://www.zggsong.cn/archives/listen_file_upload_s3_with_golang.html

版权声明:本站所有未注明转载的文章均为原创,并采用CC BY-NV-SA 4.0授权协议,转载请注明来源

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 实现
    • 监控文件生成
      • 上传S3服务器
      • 后续
      • 参考
      相关产品与服务
      容器服务
      腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档