前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >读猿码系列——3. 从filebeat和go-stash深入日志收集及处理(filebeat篇)

读猿码系列——3. 从filebeat和go-stash深入日志收集及处理(filebeat篇)

作者头像
才浅Coding攻略
发布2022-12-12 18:08:24
5180
发布2022-12-12 18:08:24
举报
文章被收录于专栏:才浅coding攻略才浅coding攻略

阿巩

伙伴们,端午节快乐哦!

提到容器的日志采集,在实际生产开发流程中,我们通常是先自己封装的日志库,然后走 filebeat + kafka + logstash + es这个完整的日志收集处理流程。关于kafka和es的资料网上比较多,这两块我们暂且不细看。go-satsh是logstash 的 Go 语言替代版,是go-zero生态中的一个组件,这部分我们将在go-satsh篇介绍。事不宜迟,日拱一卒,我们开始吧!

filebeat官方文档:

https://www.elastic.co/guide/en/beats/filebeat/current/filebeat-installation-configuration.html

filebeat项目地址:

https://github.com/elastic/beats/tree/main/filebeat

在基于elk的日志系统中,filebeat几乎是其中必不可少的一个组件,也是云原生时代下足够轻量级和高性能的容器日志采集工具。filebeat归属于beats项目,beats项目的设计初衷是为了采集各类的数据,和其他beats一样都基于libbeat库实现。其中,libbeat是一个提供公共功能的库,它实现了内存缓存队列memqueue、几种output日志发送客户端,数据的过滤处理processor等通用功能,filebeat只需要实现日志文件的读取等和日志相关的逻辑即可。

filebeat目录组织如下:

代码语言:javascript
复制
├── autodiscover        # 包含filebeat的autodiscover适配器(adapter),当autodiscover发现新容器时创建对应类型的输入
├── beater              # 包含与libbeat库交互相关的文件
├── channel             # 包含filebeat输出到pipeline相关的文件
├── config              # 包含filebeat配置结构和解析函数
├── fileset             # 包含module和fileset相关的结构
├── harvester           # 包含Harvester接口定义、Reader接口及实现等
├── input               # 包含所有输入类型的实现(比如: log, stdin, syslog)
├── inputsource         # 在syslog输入类型中用于读取tcp或udp syslog
├── module              # 包含各module和fileset配置
├── modules.d           # 包含各module对应的日志路径配置文件,用于修改默认路径
├── processor           # 用于从容器日志的事件字段source中提取容器id
├── registrar           # 包含Registrar结构和方法
└── ...

在filebeat的入口函数main.go中作者提到了几个执行的基本模块:

  • input:找到配置的日志文件,启动harvester;
  • harvester: 读取文件,发送至spooler;
  • spooler: 缓存日志数据,直到可以发送至publisher;
  • publisher: 发送日志至后端,同时通知registrar;
  • registrar: 记录日志文件被采集的状态。

在filebeat中日志被采集经过以下流程:首先找到日志文件——>读取日志文件——>将数据存放到缓存队列中——>通知消费者到缓存队列中消费日志数据——>消费者获取日志数据发送到管道中供client读取——>从消费者发送的管道读日志数据调用client.Publish批量发送日志。

接下来我们展开来看上述流程。首先对于日志文件的采集和生命周期的管理,filebeat抽象出一个crawler结构体,其数据结构如下(filebeat/beate/crawler.go):

代码语言:javascript
复制
type crawler struct {
    log             *logp.Logger
    inputs          map[uint64]cfgfile.Runner // 包含所有输入的runner
    inputConfigs    []*conf.C+
    wg              sync.WaitGroup
    inputsFactory   cfgfile.RunnerFactory
    modulesFactory  cfgfile.RunnerFactory
    modulesReloader *cfgfile.Reloader
    inputReloader   *cfgfile.Reloader
    once            bool
    beatDone        chan struct{}
}

启动filebeat后crawler会根据配置创建,然后遍历并运行每个input。在每个input运行逻辑中,采用linux glob的规则(而非正则)来根据配置获取匹配的日志文件。

代码语言:javascript
复制
for _, inputConfig := range c.inputConfigs {
    err := c.startInput(pipeline, inputConfig)
    if err != nil {
        return fmt.Errorf("starting input failed: %w", err)
    }
}
代码语言:javascript
复制
for _, path := range p.config.Paths {
    matches, err := filepath.Glob(path)
    if err != nil {
        logger.Errorf("glob(%s) failed: %v", path, err)
        continue
    }...
}

其中log类型Input结构体数据结构如下(filebeat/input/log/input.go):

代码语言:javascript
复制
// Input contains the input and its config
type Input struct {
    cfg                 *conf.C
    logger              *logp.Logger
    config              config
    states              *file.States
    harvesters          *harvester.Registry
    outlet              channel.Outleter
    stateOutlet         channel.Outleter
    done                chan struct{}
    numHarvesters       atomic.Uint32
    meta                map[string]string
    stopOnce            sync.Once
    fileStateIdentifier file.StateIdentifier
}

匹配到需要采集的日志文件后,filebeat会对每个文件启动harvester goroutine,在该goroutine中不停的读取日志,并发送给内存缓存队列memqueue。harvester模块对应一个输入源,是收集数据的实际工作者。在配置中,一个具体的Input可以包含多个输入源(harvester)。

代码语言:javascript
复制
func (h *Harvester) Run() error {
    ...
    for {
        ...
        message, err := h.reader.Next()
        if err != nil {
            switch err {
            case ErrFileTruncate:
                logger.Info("File was truncated. Begin reading file from offset 0.")
                h.state.Offset = 0
                filesTruncated.Add(1)
            case ErrRemoved:
                logger.Info("File was removed. Closing because close_removed is enabled.")
            case ErrRenamed:
                logger.Info("File was renamed. Closing because close_renamed is enabled.")
            case ErrClosed:
                logger.Info("Reader was closed. Closing.")
            case io.EOF:
                logger.Info("End of file reached. Closing because close_eof is enabled.")
            case ErrInactive:
                logger.Infof("File is inactive. Closing because close_inactive of %v reached.", h.config.CloseInactive)
            default:
                logger.Errorf("Read line error: %v", err)
            }
            return nil
        }
        ...
        // Stop harvester in case of an error
        if !h.onMessage(forwarder, state, message, startingOffset) {
            return nil
        }
        ...
    }
}

我们看到harvester.reader.Next()方法会不停读取日志,如果没有异常返回,则发送日志数据到缓存队列中。其中h.onMessage()方法中完成更新状态及发送事件:

代码语言:javascript
复制
err := forwarder.Send(beat.Event{
    Timestamp: timestamp,
    Fields:    fields,
    Meta:      meta,
    Private:   state,
})    

harvester结构体数据结构如下(filebeat/input/log/harvester.go):

代码语言:javascript
复制
// Harvester contains all harvester related data
type Harvester struct {
    logger *logp.Logger

    id     uuid.UUID
    config config
    source harvester.Source // the source being watched

    // shutdown handling
    done     chan struct{}
    doneWg   *sync.WaitGroup
    stopOnce sync.Once
    stopWg   *sync.WaitGroup
    stopLock sync.Mutex

    // internal harvester state
    state  file.State
    states *file.States
    log    *Log

    // file reader pipeline
    reader          reader.Reader
    encodingFactory encoding.EncodingFactory
    encoding        encoding.Encoding

    // event/state publishing
    outletFactory OutletFactory
    publishState  func(file.State) bool

    metrics *harvesterProgressMetrics

    onTerminate func()
}

之后的流程是将数据存放到缓存队列中,缓存队列bufferingEventLoop数据结构如下:

代码语言:javascript
复制
type bufferingEventLoop struct {
    broker *broker

    buf       *batchBuffer
    flushList flushList
    eventCount int

    unackedEventCount int

    minEvents    int
    maxEvents    int
    flushTimeout time.Duration

    // ack handling
    pendingACKs chanList

    // buffer flush timer state
    timer *time.Timer
    idleC <-chan time.Time
}    

bufferingEventLoop的run()方法同样是被放到一个无限循环中,这里可以看做一个日志事件调度器。之前提到的harvester goroutine不断读取日志,并发送给内存缓存队列memqueue,即日志数据转换成的事件会被发送到bufferingEventLoop的PushRequest channel中。

被发送进来的事件被放入pushChan,并触发执行handleInsert方法,将

数据添加到bufferingEventLoop的buf中,buf即是实际缓存日志数据的队列。

代码语言:javascript
复制
func (l *bufferingEventLoop) run() {
    broker := l.broker

    for {
        var pushChan chan pushRequest
        if l.eventCount < l.maxEvents {
            pushChan = l.broker.pushChan
        }

        var getChan chan getRequest
        if !l.flushList.empty() {
            getChan = l.broker.getChan
        }

        var schedACKs chan chanList
        if !l.pendingACKs.empty() {
            schedACKs = l.broker.scheduledACKs
        }

        select {
        case <-broker.done:
            return

        case req := <-pushChan: 
            l.handleInsert(&req)

        case req := <-l.broker.cancelChan: 
            l.handleCancel(&req)

        case req := <-getChan:
            l.handleGetRequest(&req)

        case schedACKs <- l.pendingACKs:
            l.pendingACKs = chanList{}

        case count := <-l.broker.ackChan:
            l.handleACK(count)

        case req := <-l.broker.metricChan: 
            l.handleMetricsRequest(&req)

        case <-l.idleC:
            l.idleC = nil
            l.timer.Stop()
            if l.buf.length() > 0 {
                l.flushBuffer()
            }
        }
    }
}

当flushList不为空时触发req := <-getChan,执行handleGetRequest方法,获取到消费者的response channel,并通知消费者到缓存队列中消费日志数据,其中核心逻辑是这句:

代码语言:javascript
复制
req.responseChan <- getResponse{acker.ackChan, entries}

消费的逻辑在libeat/publisher/pipeline/consumer.go中:

代码语言:javascript
复制
func (c *eventConsumer) run() {
    queueReader := makeQueueReader()
    go func() {
        queueReader.run(log)
    }()
outerLoop:
    for {
        if queueBatch == nil && !pendingRead {
            pendingRead = true
            queueReader.req <- queueReaderRequest{
                queue:      c.queue,
                retryer:    c,
                batchSize:  target.batchSize,
                timeToLive: target.timeToLive,
            }
        }
        ...
        var outputChan chan publisher.Batch
        if active != nil {
            outputChan = target.ch
        }
        select {
        case outputChan <- active:
            if len(retryBatches) > 0 {
                c.observer.eventsRetry(len(active.Events()))
                retryBatches = retryBatches[1:]
            } else {
                queueBatch = nil
            }

        case target = <-c.targetChan:

        case queueBatch = <-queueReader.resp:
            pendingRead = false

        case req := <-c.retryChan:
            ...
            retryBatches = append(retryBatches, req.batch)

        case <-c.done:
            break outerLoop
        }
    }
}

消费者consumer从Broker中获取日志数据,然后发送至out的channel中被output client发送代码如下(libbeat/publisher/queue/memqueue/broker.go)

代码语言:javascript
复制
func (b *broker) Get(count int) (queue.Batch, error) {
    responseChan := make(chan getResponse, 1)
    select {
    case <-b.done:
        return nil, io.EOF
    case b.getChan <- getRequest{
        entryCount: count, responseChan: responseChan}:
    }
    resp := <-responseChan
    return &batch{
        queue:   b,
        entries: resp.entries,
        ackChan: resp.ackChan,
    }, nil
}

getRequest和getResponse的结构如下。其中getResponse里包含了日志的数据,而getRequest包含了一个发送至消费者的channel。在之前bufferingEventLoop缓冲队列的handleGetRequest方法中接收到的参数为getRequest,里面包含了consumer请求的getResponse channel。

代码语言:javascript
复制
type getRequest struct {
    entryCount   int              
    responseChan chan getResponse 
}

type getResponse struct {
    ackChan chan batchAckMsg
    entries []queueEntry
}
代码语言:javascript
复制
type queueEntry struct {
    event  interface{}
    client clientState
}

从消费者发送的管道中读日志数据调用client.Publish批量发送日志。libbeat的outputs下包含了kafka、logstash、redis、es等等几种client,它们均实现了client接口,其中最重要是实现Publish接口,将日志发送出去。

代码语言:javascript
复制
type Client interface {
    Close() error
    Publish(context.Context, publisher.Batch) error
    String() string
}

例如kafka下的client.go中Publish接口实现如下:

代码语言:javascript
复制
func (c *client) Publish(_ context.Context, batch publisher.Batch) error {
    events := batch.Events()
    c.observer.NewBatch(len(events))

    ref := &msgRef{
        client: c,
        count:  int32(len(events)),
        total:  len(events),
        failed: nil,
        batch:  batch,
    }

    ch := c.producer.Input()
    for i := range events {
        d := &events[i]
        msg, err := c.getEventMessage(d)
        if err != nil {
            c.log.Errorf("Dropping event: %+v", err)
            ref.done()
            c.observer.Dropped(1)
            continue
        }

        msg.ref = ref
        msg.initProducerMessage()
        ch <- &msg.msg
    }

    return nil
}

本篇中我们从源码层面了解了日志文件是如何被filbebeat发现又是如何被采集的整个流程。在go-stash篇中,将介绍有着logstash 5倍的吞吐性能,并且通过一个可执行文件便可部署的处理工具。

参考:

https://www.elastic.co/guide/en/beats/filebeat/current/filebeat-installation-configuration.html

https://cloud.tencent.com/developer/article/1367784

https://zhuanlan.zhihu.com/p/72912085

END

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-06-05,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 才浅coding攻略 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
容器服务
腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档