前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊dapr的fswatcher

聊聊dapr的fswatcher

作者头像
code4it
发布2021-03-24 17:29:19
3120
发布2021-03-24 17:29:19
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下dapr的fswatcher

fswatcher

dapr/pkg/fswatcher/fswatcher.go

代码语言:javascript
复制
import (
    "context"
    "strings"
    "time"

    "github.com/fsnotify/fsnotify"
    "github.com/pkg/errors"
)

func Watch(ctx context.Context, dir string, eventCh chan<- struct{}) error {
    watcher, err := fsnotify.NewWatcher()
    if err != nil {
        return errors.Wrap(err, "failed to create watcher")
    }
    defer watcher.Close()

    if err := watcher.Add(dir); err != nil {
        return errors.Wrap(err, "watcher error")
    }

LOOP:
    for {
        select {
        // watch for events
        case event := <-watcher.Events:
            if event.Op == fsnotify.Create || event.Op == fsnotify.Write {
                if strings.Contains(event.Name, dir) {
                    // give time for other updates to occur
                    time.Sleep(time.Second * 1)
                    eventCh <- struct{}{}
                }
            }
        case <-watcher.Errors:
            break LOOP
        case <-ctx.Done():
            break LOOP
        }
    }
    return nil
}

Watch方法使用fsnotify的watcher来监听文件,之后通过for循环进行select,如果监听到fsnotify.Create或者fsnotify.Write的时候判断event.Name是否包含dir,如果包含则sleep一秒然后通知eventCh

Add

github.com/fsnotify/fsnotify@v1.4.9/kqueue.go

代码语言:javascript
复制
// Add starts watching the named file or directory (non-recursively).
func (w *Watcher) Add(name string) error {
    w.mu.Lock()
    w.externalWatches[name] = true
    w.mu.Unlock()
    _, err := w.addWatch(name, noteAllEvents)
    return err
}

Add方法设置externalWatches[name]为true,然后执行addWatch(name, noteAllEvents)

addWatch

github.com/fsnotify/fsnotify@v1.4.9/kqueue.go

代码语言:javascript
复制
// addWatch adds name to the watched file set.
// The flags are interpreted as described in kevent(2).
// Returns the real path to the file which was added, if any, which may be different from the one passed in the case of symlinks.
func (w *Watcher) addWatch(name string, flags uint32) (string, error) {
    var isDir bool
    // Make ./name and name equivalent
    name = filepath.Clean(name)

    w.mu.Lock()
    if w.isClosed {
        w.mu.Unlock()
        return "", errors.New("kevent instance already closed")
    }
    watchfd, alreadyWatching := w.watches[name]
    // We already have a watch, but we can still override flags.
    if alreadyWatching {
        isDir = w.paths[watchfd].isDir
    }
    w.mu.Unlock()

    if !alreadyWatching {
        fi, err := os.Lstat(name)
        if err != nil {
            return "", err
        }

        // Don't watch sockets.
        if fi.Mode()&os.ModeSocket == os.ModeSocket {
            return "", nil
        }

        // Don't watch named pipes.
        if fi.Mode()&os.ModeNamedPipe == os.ModeNamedPipe {
            return "", nil
        }

        // Follow Symlinks
        // Unfortunately, Linux can add bogus symlinks to watch list without
        // issue, and Windows can't do symlinks period (AFAIK). To  maintain
        // consistency, we will act like everything is fine. There will simply
        // be no file events for broken symlinks.
        // Hence the returns of nil on errors.
        if fi.Mode()&os.ModeSymlink == os.ModeSymlink {
            name, err = filepath.EvalSymlinks(name)
            if err != nil {
                return "", nil
            }

            w.mu.Lock()
            _, alreadyWatching = w.watches[name]
            w.mu.Unlock()

            if alreadyWatching {
                return name, nil
            }

            fi, err = os.Lstat(name)
            if err != nil {
                return "", nil
            }
        }

        watchfd, err = unix.Open(name, openMode, 0700)
        if watchfd == -1 {
            return "", err
        }

        isDir = fi.IsDir()
    }

    const registerAdd = unix.EV_ADD | unix.EV_CLEAR | unix.EV_ENABLE
    if err := register(w.kq, []int{watchfd}, registerAdd, flags); err != nil {
        unix.Close(watchfd)
        return "", err
    }

    if !alreadyWatching {
        w.mu.Lock()
        w.watches[name] = watchfd
        w.paths[watchfd] = pathInfo{name: name, isDir: isDir}
        w.mu.Unlock()
    }

    if isDir {
        // Watch the directory if it has not been watched before,
        // or if it was watched before, but perhaps only a NOTE_DELETE (watchDirectoryFiles)
        w.mu.Lock()

        watchDir := (flags&unix.NOTE_WRITE) == unix.NOTE_WRITE &&
            (!alreadyWatching || (w.dirFlags[name]&unix.NOTE_WRITE) != unix.NOTE_WRITE)
        // Store flags so this watch can be updated later
        w.dirFlags[name] = flags
        w.mu.Unlock()

        if watchDir {
            if err := w.watchDirectoryFiles(name); err != nil {
                return "", err
            }
        }
    }
    return name, nil
}

addWatch方法针对尚未watch的执行os.Lstat(name)及unix.Open(name, openMode, 0700);之后注册registerAdd;另外针对isDir的情况执行watchDirectoryFiles

watchDirectoryFiles

github.com/fsnotify/fsnotify@v1.4.9/kqueue.go

代码语言:javascript
复制
// watchDirectoryFiles to mimic inotify when adding a watch on a directory
func (w *Watcher) watchDirectoryFiles(dirPath string) error {
    // Get all files
    files, err := ioutil.ReadDir(dirPath)
    if err != nil {
        return err
    }

    for _, fileInfo := range files {
        filePath := filepath.Join(dirPath, fileInfo.Name())
        filePath, err = w.internalWatch(filePath, fileInfo)
        if err != nil {
            return err
        }

        w.mu.Lock()
        w.fileExists[filePath] = true
        w.mu.Unlock()
    }

    return nil
}

watchDirectoryFiles遍历files,挨个执行internalWatch

internalWatch

github.com/fsnotify/fsnotify@v1.4.9/kqueue.go

代码语言:javascript
复制
func (w *Watcher) internalWatch(name string, fileInfo os.FileInfo) (string, error) {
    if fileInfo.IsDir() {
        // mimic Linux providing delete events for subdirectories
        // but preserve the flags used if currently watching subdirectory
        w.mu.Lock()
        flags := w.dirFlags[name]
        w.mu.Unlock()

        flags |= unix.NOTE_DELETE | unix.NOTE_RENAME
        return w.addWatch(name, flags)
    }

    // watch file to mimic Linux inotify
    return w.addWatch(name, noteAllEvents)
}

internalWatch针对dir设置的flag为NOTE_DELETE、NOTE_RENAME

小结

dapr的fswatcher使用fsnotify的watcher来监听文件,之后通过for循环进行select,如果监听到fsnotify.Create或者fsnotify.Write的时候判断event.Name是否包含dir,如果包含则sleep一秒然后通知eventCh。

doc

  • dapr
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-03-08,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • fswatcher
  • Add
  • addWatch
  • watchDirectoryFiles
  • internalWatch
  • 小结
  • doc
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档