前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊klog的Flush

聊聊klog的Flush

作者头像
code4it
发布2021-01-12 14:53:40
1.5K0
发布2021-01-12 14:53:40
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下klog的Flush

Flush

k8s.io/klog/v2@v2.4.0/klog.go

代码语言:javascript
复制
// Flush flushes all pending log I/O.
func Flush() {
    logging.lockAndFlushAll()
}

Flush方法执行的是logging.lockAndFlushAll()

init

k8s.io/klog/v2@v2.4.0/klog.go

代码语言:javascript
复制
// init sets up the defaults and runs flushDaemon.
func init() {
    logging.stderrThreshold = errorLog // Default stderrThreshold is ERROR.
    logging.setVState(0, nil, false)
    logging.logDir = ""
    logging.logFile = ""
    logging.logFileMaxSizeMB = 1800
    logging.toStderr = true
    logging.alsoToStderr = false
    logging.skipHeaders = false
    logging.addDirHeader = false
    logging.skipLogHeaders = false
    logging.oneOutput = false
    go logging.flushDaemon()
}

klog的init方法异步协程执行logging.flushDaemon()

logging.flushDaemon()

k8s.io/klog/v2@v2.4.0/klog.go

代码语言:javascript
复制
// flushDaemon periodically flushes the log file buffers.
func (l *loggingT) flushDaemon() {
    for range time.NewTicker(flushInterval).C {
        l.lockAndFlushAll()
    }
}

flushDaemon方法range新建ticker的channel,然后执行l.lockAndFlushAll()

lockAndFlushAll

k8s.io/klog/v2@v2.4.0/klog.go

代码语言:javascript
复制
// lockAndFlushAll is like flushAll but locks l.mu first.
func (l *loggingT) lockAndFlushAll() {
    l.mu.Lock()
    l.flushAll()
    l.mu.Unlock()
}

lockAndFlushAll使用lock执行flushAll

flushAll

k8s.io/klog/v2@v2.4.0/klog.go

代码语言:javascript
复制
const (
    infoLog severity = iota
    warningLog
    errorLog
    fatalLog
    numSeverity = 4
)

// flushAll flushes all the logs and attempts to "sync" their data to disk.
// l.mu is held.
func (l *loggingT) flushAll() {
    // Flush from fatal down, in case there's trouble flushing.
    for s := fatalLog; s >= infoLog; s-- {
        file := l.file[s]
        if file != nil {
            file.Flush() // ignore error
            file.Sync()  // ignore error
        }
    }
}

flushAll方法从fatalLog开始递减到infoLog级别挨个执行l.file[s]的Flush及Sync方法

flushSyncWriter

k8s.io/klog/v2@v2.4.0/klog.go

代码语言:javascript
复制
// flushSyncWriter is the interface satisfied by logging destinations.
type flushSyncWriter interface {
    Flush() error
    Sync() error
    io.Writer
}

type Writer interface {
    Write(p []byte) (n int, err error)
}

flushSyncWriter接口定义了Flush、Sync方法,内嵌了io.Writer接口

redirectBuffer

k8s.io/klog/v2@v2.4.0/klog.go

代码语言:javascript
复制
// redirectBuffer is used to set an alternate destination for the logs
type redirectBuffer struct {
    w io.Writer
}

func (rb *redirectBuffer) Sync() error {
    return nil
}

func (rb *redirectBuffer) Flush() error {
    return nil
}

func (rb *redirectBuffer) Write(bytes []byte) (n int, err error) {
    return rb.w.Write(bytes)
}

redirectBuffer内嵌了io.Writer,其Write方法通过io.Writer来写;其Sync及Flush方法都为空操作

syncBuffer

k8s.io/klog/v2@v2.4.0/klog.go

代码语言:javascript
复制
// syncBuffer joins a bufio.Writer to its underlying file, providing access to the
// file's Sync method and providing a wrapper for the Write method that provides log
// file rotation. There are conflicting methods, so the file cannot be embedded.
// l.mu is held for all its methods.
type syncBuffer struct {
    logger *loggingT
    *bufio.Writer
    file     *os.File
    sev      severity
    nbytes   uint64 // The number of bytes written to this file
    maxbytes uint64 // The max number of bytes this syncBuffer.file can hold before cleaning up.
}

func (sb *syncBuffer) Sync() error {
    return sb.file.Sync()
}

func (sb *syncBuffer) Write(p []byte) (n int, err error) {
    if sb.nbytes+uint64(len(p)) >= sb.maxbytes {
        if err := sb.rotateFile(time.Now(), false); err != nil {
            sb.logger.exit(err)
        }
    }
    n, err = sb.Writer.Write(p)
    sb.nbytes += uint64(n)
    if err != nil {
        sb.logger.exit(err)
    }
    return
}

syncBuffer定义了logger、file、sev、nbytes、maxbytes属性,内嵌了*bufio.Writer;其Sync方法执行的是*os.File.Sync;其Flush方法执行的是*bufio.Writer.Flush

Flush

/usr/local/go/src/bufio/bufio.go

代码语言:javascript
复制
type Writer struct {
    err error
    buf []byte
    n   int
    wr  io.Writer
}

// Flush writes any buffered data to the underlying io.Writer.
func (b *Writer) Flush() error {
    if b.err != nil {
        return b.err
    }
    if b.n == 0 {
        return nil
    }
    n, err := b.wr.Write(b.buf[0:b.n])
    if n < b.n && err == nil {
        err = io.ErrShortWrite
    }
    if err != nil {
        if n > 0 && n < b.n {
            copy(b.buf[0:b.n-n], b.buf[n:b.n])
        }
        b.n -= n
        b.err = err
        return err
    }
    b.n = 0
    return nil
}

*bufio.Writer.Flush方法执行的是底层io.Writer的Write方法

rotateFile

代码语言:javascript
复制
// rotateFile closes the syncBuffer's file and starts a new one.
// The startup argument indicates whether this is the initial startup of klog.
// If startup is true, existing files are opened for appending instead of truncated.
func (sb *syncBuffer) rotateFile(now time.Time, startup bool) error {
    if sb.file != nil {
        sb.Flush()
        sb.file.Close()
    }
    var err error
    sb.file, _, err = create(severityName[sb.sev], now, startup)
    if err != nil {
        return err
    }
    if startup {
        fileInfo, err := sb.file.Stat()
        if err != nil {
            return fmt.Errorf("file stat could not get fileinfo: %v", err)
        }
        // init file size
        sb.nbytes = uint64(fileInfo.Size())
    } else {
        sb.nbytes = 0
    }
    sb.Writer = bufio.NewWriterSize(sb.file, bufferSize)

    if sb.logger.skipLogHeaders {
        return nil
    }

    // Write header.
    var buf bytes.Buffer
    fmt.Fprintf(&buf, "Log file created at: %s\n", now.Format("2006/01/02 15:04:05"))
    fmt.Fprintf(&buf, "Running on machine: %s\n", host)
    fmt.Fprintf(&buf, "Binary: Built with %s %s for %s/%s\n", runtime.Compiler, runtime.Version(), runtime.GOOS, runtime.GOARCH)
    fmt.Fprintf(&buf, "Log line format: [IWEF]mmdd hh:mm:ss.uuuuuu threadid file:line] msg\n")
    n, err := sb.file.Write(buf.Bytes())
    sb.nbytes += uint64(n)
    return err
}

syncBuffer.rotateFile方法会设置其Writer为bufio.NewWriterSize(sb.file, bufferSize),底层writer为syncBuffer的file

小结

klog的init方法异步协程执行logging.flushDaemon(),它内部执行的是l.lockAndFlushAll();Flush方法是执行l.lockAndFlushAll();l.lockAndFlushAll()方法使用lock执行flushAll;flushAll方法从fatalLog开始递减到infoLog级别挨个执行l.file[s]的Flush及Sync方法;对于redirectBuffer,其Flush及Sync方法为空操作;对于syncBuffer,其Sync方法执行的是*os.File.Sync;其Flush方法执行的是*bufio.Writer.Flush,*bufio.Writer.Flush方法执行的是底层io.Writer的Write方法,即syncBuffer的file的Write方法。

doc

  • klog
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-12-28,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Flush
  • init
  • logging.flushDaemon()
  • lockAndFlushAll
  • flushAll
  • flushSyncWriter
  • redirectBuffer
  • syncBuffer
    • Flush
      • rotateFile
      • 小结
      • doc
      相关产品与服务
      容器服务
      腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档