前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊promtail的positions

聊聊promtail的positions

作者头像
code4it
发布2021-02-08 22:18:40
4160
发布2021-02-08 22:18:40
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下promtail的positions

Positions

loki/pkg/promtail/positions/positions.go

代码语言:javascript
复制
type Positions interface {
    // GetString returns how far we've through a file as a string.
    // JournalTarget writes a journal cursor to the positions file, while
    // FileTarget writes an integer offset. Use Get to read the integer
    // offset.
    GetString(path string) string
    // Get returns how far we've read through a file. Returns an error
    // if the value stored for the file is not an integer.
    Get(path string) (int64, error)
    // PutString records (asynchronously) how far we've read through a file.
    // Unlike Put, it records a string offset and is only useful for
    // JournalTargets which doesn't have integer offsets.
    PutString(path string, pos string)
    // Put records (asynchronously) how far we've read through a file.
    Put(path string, pos int64)
    // Remove removes the position tracking for a filepath
    Remove(path string)
    // SyncPeriod returns how often the positions file gets resynced
    SyncPeriod() time.Duration
    // Stop the Position tracker.
    Stop()
}

Positions接口定义了GetString、Get、PutString、Put、Remove、SyncPeriod、Stop方法

positions

loki/pkg/promtail/positions/positions.go

代码语言:javascript
复制
// Positions tracks how far through each file we've read.
type positions struct {
    logger    log.Logger
    cfg       Config
    mtx       sync.Mutex
    positions map[string]string
    quit      chan struct{}
    done      chan struct{}
}

func (p *positions) Stop() {
    close(p.quit)
    <-p.done
}

func (p *positions) PutString(path string, pos string) {
    p.mtx.Lock()
    defer p.mtx.Unlock()
    p.positions[path] = pos
}

func (p *positions) Put(path string, pos int64) {
    p.PutString(path, strconv.FormatInt(pos, 10))
}

func (p *positions) GetString(path string) string {
    p.mtx.Lock()
    defer p.mtx.Unlock()
    return p.positions[path]
}

func (p *positions) Get(path string) (int64, error) {
    p.mtx.Lock()
    defer p.mtx.Unlock()
    pos, ok := p.positions[path]
    if !ok {
        return 0, nil
    }
    return strconv.ParseInt(pos, 10, 64)
}

func (p *positions) Remove(path string) {
    p.mtx.Lock()
    defer p.mtx.Unlock()
    p.remove(path)
}

func (p *positions) remove(path string) {
    delete(p.positions, path)
}

func (p *positions) SyncPeriod() time.Duration {
    return p.cfg.SyncPeriod
}

positions定义了logger、cfg、mtx、positions、quit、done属性;它实现了Positions接口;其Get方法从p.positions读取数据;其Put方法写数据到p.positions中;其SyncPeriod方法返回的是p.cfg.SyncPeriod;其Remove方法将path从p.positions中删除

New

loki/pkg/promtail/positions/positions.go

代码语言:javascript
复制
// New makes a new Positions.
func New(logger log.Logger, cfg Config) (Positions, error) {
    positionData, err := readPositionsFile(cfg, logger)
    if err != nil {
        return nil, err
    }

    p := &positions{
        logger:    logger,
        cfg:       cfg,
        positions: positionData,
        quit:      make(chan struct{}),
        done:      make(chan struct{}),
    }

    go p.run()
    return p, nil
}

New方法会通过readPositionsFile读取positionData创建positions,然后异步执行p.run()

run

loki/pkg/promtail/positions/positions.go

代码语言:javascript
复制
func (p *positions) run() {
    defer func() {
        p.save()
        level.Debug(p.logger).Log("msg", "positions saved")
        close(p.done)
    }()

    ticker := time.NewTicker(p.cfg.SyncPeriod)
    for {
        select {
        case <-p.quit:
            return
        case <-ticker.C:
            p.save()
            p.cleanup()
        }
    }
}

func (p *positions) save() {
    if p.cfg.ReadOnly {
        return
    }
    p.mtx.Lock()
    positions := make(map[string]string, len(p.positions))
    for k, v := range p.positions {
        positions[k] = v
    }
    p.mtx.Unlock()

    if err := writePositionFile(p.cfg.PositionsFile, positions); err != nil {
        level.Error(p.logger).Log("msg", "error writing positions file", "error", err)
    }
}

func (p *positions) cleanup() {
    p.mtx.Lock()
    defer p.mtx.Unlock()
    toRemove := []string{}
    for k := range p.positions {
        // If the position file is prefixed with journal, it's a
        // JournalTarget cursor and not a file on disk.
        if strings.HasPrefix(k, "journal-") {
            continue
        }

        if _, err := os.Stat(k); err != nil {
            if os.IsNotExist(err) {
                // File no longer exists.
                toRemove = append(toRemove, k)
            } else {
                // Can't determine if file exists or not, some other error.
                level.Warn(p.logger).Log("msg", "could not determine if log file "+
                    "still exists while cleaning positions file", "error", err)
            }
        }
    }
    for _, tr := range toRemove {
        p.remove(tr)
    }
}

run方法通过time.NewTicker(p.cfg.SyncPeriod)来触发执行p.save()及p.cleanup();save方法将positions持久化到文件;cleanup方法遍历p.positions,从内存中移除文件不存在的position

readPositionsFile

loki/pkg/promtail/positions/positions.go

代码语言:javascript
复制
func readPositionsFile(cfg Config, logger log.Logger) (map[string]string, error) {

    cleanfn := filepath.Clean(cfg.PositionsFile)
    buf, err := ioutil.ReadFile(cleanfn)
    if err != nil {
        if os.IsNotExist(err) {
            return map[string]string{}, nil
        }
        return nil, err
    }

    var p File
    err = yaml.UnmarshalStrict(buf, &p)
    if err != nil {
        // return empty if cfg option enabled
        if cfg.IgnoreInvalidYaml {
            level.Debug(logger).Log("msg", "ignoring invalid positions file", "file", cleanfn, "error", err)
            return map[string]string{}, nil
        }

        return nil, fmt.Errorf("invalid yaml positions file [%s]: %v", cleanfn, err)
    }

    // p.Positions will be nil if the file exists but is empty
    if p.Positions == nil {
        p.Positions = map[string]string{}
    }

    return p.Positions, nil
}

readPositionsFile方法从文件读取位置到p.Positions

writePositionFile

loki/pkg/promtail/positions/positions.go

代码语言:javascript
复制
func writePositionFile(filename string, positions map[string]string) error {
    buf, err := yaml.Marshal(File{
        Positions: positions,
    })
    if err != nil {
        return err
    }

    target := filepath.Clean(filename)
    temp := target + "-new"

    err = ioutil.WriteFile(temp, buf, os.FileMode(positionFileMode))
    if err != nil {
        return err
    }

    return os.Rename(temp, target)
}

writePositionFile方法将positions写入文件

小结

promtail的Positions接口定义了GetString、Get、PutString、Put、Remove、SyncPeriod、Stop方法;positions实现了Positions接口;其Get方法从p.positions读取数据;其Put方法写数据到p.positions中;其SyncPeriod方法返回的是p.cfg.SyncPeriod;其Remove方法将path从p.positions中删除。

doc

  • promtail
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-01-23,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Positions
  • positions
  • New
  • run
  • readPositionsFile
  • writePositionFile
  • 小结
  • doc
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档