前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >使用fluent bit+ClickHouse 实现K8s日志采集

使用fluent bit+ClickHouse 实现K8s日志采集

作者头像
灵雀云
发布2021-12-28 16:26:33
2.4K0
发布2021-12-28 16:26:33
举报

准备工作和了解

Fluent bit是一个用C写成的插件式、轻量级、多平台开源日志收集工具。它允许从不同的源收集数据并发送到多个目的地。完全兼容docker和kubernetes生态环境。

img

上图就是对它工作流程的全局概述,它通过输入、转换、过滤、缓冲、路由到输出而完成日志的收集。

通过对FluentBit input和output插件的配置可以实现从收集不同渠道的日志输出到目标渠道中。

fluent bit 本身是C语言编写,扩展插件有一定的难度。官方提供实现了官方提供了fluent-bit-go,可以实现采用go语言来编写插件,目前只支持output的编写。

使用FluentBit采集K8s容器日志,需要在FluentBit的conf文件中完成以下input配置(这里配置了docker中运行的容器log文件位置,fluent会采集文件中的日志并自动完成简单的过滤清洗操作):

代码语言:javascript
复制
[INPUT]
 Name              tail #这里使用tail插件,具有与 tail -f shell 命令类似的行为,下面的配置项会因为这里选择的插件不同而不同
 Tag               kube.*  #读取的行的标记
 Path              /var/log/containers/*.log #通过使用通用通配符指定特定日志文件或多个日志文件的路径。
 Parser            docker #指定解析器的名称 
 DB                /var/log/flb_kube.db  #指定数据库文件以跟踪受监视的文件和偏移量
 Mem_Buf_Limit     5MB    #设置 Tail 插件可以使用的内存限制。
 Skip_Long_Lines   On     #当被监控的文件由于一行很长而达到缓冲容量时,默认停止监控
 Refresh_Interval  10 //刷新被监控的文件列表的时间间隔,以秒为单位

outPut配置根据自己编写的output插件情况完成配置(编写了一个ClickHouse插件):

代码语言:javascript
复制
[OUTPUT]
 Name            clickhouse #插件名
 Match           * #匹配范围

目前fluent bit 官方没有支持clickhouse 的output 插件,需要自己开发。

将output插件打包为OS包合并fluent-Bit一起打成docker镜像,通过挂载configMap配置文件方式配置fluent-Bit在k8s中以容器方式运行,在每个node节点上都运行一个pod,fluent-Bit通过配置文件中配置的日志采集位置对日志进行采集,经过处理后采用配置的output插件将数据存储到ClickHouse中。

根据用户需要用户可以通过编辑output插件完成对日志对接其他数据库或中间件输出(比如mysql、redis等)

以下附开发的支持clickhouse 的output 插件代码:

FluentBit会调用用户编写的FLBPluginRegister(插件注册,标识插件的名称,需和fluent-Bit配置中的OUTPUT.Name对应)、FLBPluginInit(插件初始化)、FLBPluginFlush(插件的输出逻辑,可以简单处理日志数据,完成输出操作)、FLBPluginExit(插件的退出)方法完成日志数据的输出。

代码语言:javascript
复制
package main

import (
   "C"
   "database/sql"
   "fmt"
   "os"
   "strconv"
   "sync"
   "time"
   "unsafe"
   "github.com/fluent/fluent-bit-go/output"
   "github.com/kshvakov/clickhouse"
   klog "k8s.io/klog"
)

var (
   client *sql.DB

   database  string
   table     string
   batchSize int

   insertSQL = "INSERT INTO %s.%s(date, cluster, namespace, app, pod_name, container_name, host, log, ts) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"

   rw     sync.RWMutex
   buffer = make([]Log, 0)
)

const (
   DefaultWriteTimeout string = "20"
   DefaultReadTimeout  string = "10"

   DefaultBatchSize int = 1024
)

type Log struct {
   Cluster   string
   Namespace string
   App       string
   Pod       string
   Container string
   Host      string
   Log       string
   Ts        time.Time
}

//export FLBPluginRegister
func FLBPluginRegister(ctx unsafe.Pointer) int {
   return output.FLBPluginRegister(ctx, "clickhouse", "Clickhouse Output Plugin.!")
}

//export FLBPluginInit
// ctx (context) pointer to fluentbit context (state/ c code)
func FLBPluginInit(ctx unsafe.Pointer) int {

   var host string
   if v := os.Getenv("CLICKHOUSE_HOST"); v != "" {
      host = v
   } else {
      klog.Error("you must set host of clickhouse!")
      return output.FLB_ERROR
   }

   var user string
   if v := os.Getenv("CLICKHOUSE_USER"); v != "" {
      user = v
   } else {
      klog.Error("you must set user of clickhouse!")
      return output.FLB_ERROR
   }

   var password string
   if v := os.Getenv("CLICKHOUSE_PASSWORD"); v != "" {
      password = v
   } else {
      klog.Error("you must set password of clickhouse!")
      return output.FLB_ERROR
   }

   if v := os.Getenv("CLICKHOUSE_DATABASE"); v != "" {
      database = v
   } else {
      klog.Error("you must set database of clickhouse!")
      return output.FLB_ERROR
   }

   if v := os.Getenv("CLICKHOUSE_TABLE"); v != "" {
      table = v
   } else {
      klog.Error("you must set table of clickhouse!")
      return output.FLB_ERROR
   }

   if v := os.Getenv("CLICKHOUSE_BATCH_SIZE"); v != "" {
      size, err := strconv.Atoi(v)
      if err != nil {
         klog.Infof("you set the default bacth_size: %d", DefaultBatchSize)
         batchSize = DefaultBatchSize
      }
      batchSize = size
   } else {
      klog.Infof("you set the default bacth_size: %d", DefaultBatchSize)
      batchSize = DefaultBatchSize
   }

   var writeTimeout string
   if v := os.Getenv("CLICKHOUSE_WRITE_TIMEOUT"); v != "" {
      writeTimeout = v
   } else {
      klog.Infof("you set the default write_timeout: %s", DefaultWriteTimeout)
      writeTimeout = DefaultWriteTimeout
   }

   var readTimeout string
   if v := os.Getenv("CLICKHOUSE_READ_TIMEOUT"); v != "" {
      readTimeout = v
   } else {
      klog.Infof("you set the default read_timeout: %s", DefaultReadTimeout)
      readTimeout = DefaultReadTimeout
   }

   dsn := "tcp://" + host + "?username=" + user + "&password=" + password + "&database=" + database + "&write_timeout=" + writeTimeout + "&read_timeout=" + readTimeout

   db, err := sql.Open("clickhouse", dsn)
   if err != nil {
      klog.Error("connecting to clickhouse: %v", err)
      return output.FLB_ERROR
   }

   if err := db.Ping(); err != nil {
      if exception, ok := err.(*clickhouse.Exception); ok {
         klog.Errorf("[%d] %s \n%s\n", exception.Code, exception.Message, exception.StackTrace)
      } else {
         klog.Errorf("Failed to ping clickhouse: %v", err)
      }
      return output.FLB_ERROR
   }
   // ==
   client = db

   return output.FLB_OK
}

//export FLBPluginFlush
// FLBPluginFlush is called from fluent-bit when data need to be sent. is called from fluent-bit when data need to be sent.
func FLBPluginFlush(data unsafe.Pointer, length C.int, tag *C.char) int {
   rw.Lock()
   defer rw.Unlock()
   if err := client.Ping(); err != nil {
      if exception, ok := err.(*clickhouse.Exception); ok {
         klog.Errorf("[%d] %s \n%s\n", exception.Code, exception.Message, exception.StackTrace)
      } else {
         klog.Errorf("Failed to ping clickhouse: %v", err)
      }
      return output.FLB_ERROR
   }
   var ret int
   var timestampData interface{}
   var mapData map[interface{}]interface{}
   dec := output.NewDecoder(data, int(length))

   for {
      ret, timestampData, mapData = output.GetRecord(dec)
      if ret != 0 {
         break
      }
      var timestamp time.Time
      switch t := timestampData.(type) {
      case output.FLBTime:
         timestamp = timestampData.(output.FLBTime).Time
      case uint64:
         timestamp = time.Unix(int64(t), 0)
      default:
         timestamp = time.Now()
      }

      flattenData, err := Flatten(mapData, "", UnderscoreStyle)
      if err != nil {
         break
      }

      log := Log{}
      for k, v := range flattenData {
         value := ""
         switch t := v.(type) {
         case string:
            value = t
         case []byte:
            value = string(t)
         default:
            value = fmt.Sprintf("%v", v)
         }

         switch k {
         case "cluster":
            log.Cluster = value
         case "kubernetes_namespace_name":
            log.Namespace = value
         case "kubernetes_labels_app":
            log.App = value
         case "kubernetes_labels_k8s-app":
            log.App = value
         case "kubernetes_pod_name":
            log.Pod = value
         case "kubernetes_container_name":
            log.Container = value
         case "kubernetes_host":
            log.Host = value
         case "log":
            log.Log = value
         }

      }

      log.Ts = timestamp
      buffer = append(buffer, log)
   }

   // sink data
   if len(buffer) < batchSize {
      return output.FLB_OK
   }

   sql := fmt.Sprintf(insertSQL, database, table)

   tx, err := client.Begin()
   if err != nil {
      klog.Errorf("begin transaction failure: %s", err.Error())
      return output.FLB_ERROR
   }

   // build statements
   smt, err := tx.Prepare(sql)
   if err != nil {
      klog.Errorf("prepare statement failure: %s", err.Error())
      return output.FLB_ERROR
   }
   for _, l := range buffer {
      _, err = smt.Exec(l.Ts, l.Cluster, l.Namespace, l.App, l.Pod, l.Container, l.Host,
         l.Log, l.Ts)

      if err != nil {
         klog.Errorf("statement exec failure: %s", err.Error())
         return output.FLB_ERROR
      }
   }

   // commit and record metrics
   if err = tx.Commit(); err != nil {
      klog.Errorf("commit failed failure: %s", err.Error())
      return output.FLB_ERROR
   }

   buffer = make([]Log, 0)

   return output.FLB_OK
}

//export FLBPluginExit
func FLBPluginExit() int {
   return output.FLB_OK
}

func main() {
}

ClickHouse的数据结构

建表sql

这里的表结构可以根据需要修改,具体的入库操作在output插件中进行开发。

代码语言:javascript
复制
CREATE DATABASE IF NOT EXISTS scmp;
CREATE TABLE IF NOT EXISTS scmp.logs(
    date Date DEFAULT toDate(0),
    cluster String,namespace String,
    app String,
    pod_name String,
    container_name String,
    host String,log String,
    ts DateTime) 
ENGINE = MergeTree() PARTITION BY toYYYYMMDD(date) 
ORDER BY (cluster, namespace, app, pod_name, container_name, host,ts);

表结构

img

部署对应K8s资源

挂载的配置文件:

代码语言:javascript
复制
apiVersion: v1
kind: ConfigMap
metadata:
  name: k8s-log-agent-config
  namespace: kube
  labels:
    k8s-app: k8s-log-agent
data:
  # Configuration files: server, input, filters and output
  # ======================================================
  fluent-bit.conf: |
    [SERVICE]
     Flush         1
     Log_Level     error
     Daemon        off
     Parsers_File  parsers.conf
     HTTP_Server   On
     HTTP_Listen   0.0.0.0
     HTTP_Port     2020

    @INCLUDE input-kubernetes.conf
    @INCLUDE filter-kubernetes.conf
    @INCLUDE output-kubernetes.conf

  input-kubernetes.conf: |
    [INPUT]
        Name              tail
        Tag               kube.*
        Path              /var/log/containers/*.log
        Parser            docker
        DB                /var/log/flb_kube.db
        Mem_Buf_Limit     5MB
        Skip_Long_Lines   On
        Refresh_Interval  10

  filter-kubernetes.conf: |
    [FILTER]
     Name                kubernetes
     Match               *
     Kube_URL            https://kubernetes.default.svc.cluster.local:443
     Merge_Log           On
     Annotations         Off
     Kube_Tag_Prefix     kube.var.log.containers.
     Merge_Log_Key       log_processed

    [FILTER]
     Name                modify
     Match               *
     Set  cluster  ${CLUSTER_NAME}
  output-kubernetes.conf: |
    [OUTPUT]
     Name            clickhouse
     Match           *
  parsers.conf: |
    [PARSER]
     Name   apache
     Format regex
     Regex  ^(?<host>[^ ]*) [^ ]* (?<user>[^ ]*) \[(?<time>[^\]]*)\] "(?<method>\S+)(?: +(?<path>[^\"]*?)(?: +\S*)?)?" (?<code>[^ ]*) (?<size>[^ ]*)(?: "(?<referer>[^\"]*)" "(?<agent>[^\"]*)")?$
     Time_Key time
     Time_Format %d/%b/%Y:%H:%M:%S %z

    [PARSER]
     Name   apache2
     Format regex
     Regex  ^(?<host>[^ ]*) [^ ]* (?<user>[^ ]*) \[(?<time>[^\]]*)\] "(?<method>\S+)(?: +(?<path>[^ ]*) +\S*)?" (?<code>[^ ]*) (?<size>[^ ]*)(?: "(?<referer>[^\"]*)" "(?<agent>[^\"]*)")?$
     Time_Key time
     Time_Format %d/%b/%Y:%H:%M:%S %z

    [PARSER]
     Name   apache_error
     Format regex
     Regex  ^\[[^ ]* (?<time>[^\]]*)\] \[(?<level>[^\]]*)\](?: \[pid (?<pid>[^\]]*)\])?( \[client (?<client>[^\]]*)\])? (?<message>.*)$

    [PARSER]
     Name   nginx
     Format regex
     Regex ^(?<remote>[^ ]*) (?<host>[^ ]*) (?<user>[^ ]*) \[(?<time>[^\]]*)\] "(?<method>\S+)(?: +(?<path>[^\"]*?)(?: +\S*)?)?" (?<code>[^ ]*) (?<size>[^ ]*)(?: "(?<referer>[^\"]*)" "(?<agent>[^\"]*)")?$
     Time_Key time
     Time_Format %d/%b/%Y:%H:%M:%S %z

    [PARSER]
     Name   json
     Format json
     Time_Key time
     Time_Format %d/%b/%Y:%H:%M:%S %z

    [PARSER]
     Name         docker
     Format       json
     Time_Key     time
     Time_Format  %Y-%m-%dT%H:%M:%S.%L
     Time_Keep    On

    [PARSER]
     Name        syslog
     Format      regex
     Regex       ^\<(?<pri>[0-9]+)\>(?<time>[^ ]* {1,2}[^ ]* [^ ]*) (?<host>[^ ]*) (?<ident>[a-zA-Z0-9_\/\.\-]*)(?:\[(?<pid>[0-9]+)\])?(?:[^\:]*\:)? *(?<message>.*)$
     Time_Key    time
     Time_Format %b %d %H:%M:%S

资源权限控制:

代码语言:javascript
复制
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
    name: k8s-log-agent-read
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: k8s-log-agent-read
subjects:
  - kind: ServiceAccount
    name: k8s-log-agent
    namespace: kube-system

---

apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: k8s-log-agent-read
rules:
  - apiGroups: [""]
    resources:
      - namespaces
      - pods
    verbs: ["get", "list", "watch"]

---


apiVersion: v1
kind: ServiceAccount
metadata:
  name:k8s-log-agent
  namespace: kube

在各个node上启动容器:

代码语言:javascript
复制
apiVersion: apps/v1
kind: DaemonSet
metadata:
  name: k8s-log-agent
  namespace: kube
  labels:
    k8s-app: k8s-log-agent
    version: v1
    kubernetes.io/cluster-service: "true"
spec:
  selector:
    matchLabels:
      k8s-app: k8s-log-agent
  template:
    metadata:
      labels:
        k8s-app: k8s-log-agent
        version: v1
        kubernetes.io/cluster-service: "true"
      annotations:
        prometheus.io/scrape: "true"
        prometheus.io/port: "2020"
        prometheus.io/path: /api/v1/metrics/prometheus
    spec:
      containers:
        - name: fluent-bit
          image: iyacontrol/fluent-bit-ck:1.2.2
          imagePullPolicy: Always
          ports:
            - containerPort: 2020
          resources:
            limits:
              cpu: 200m
              memory: 200Mi
            requests:
              cpu: 200m
              memory: 200Mi
          env:
            - name: CLUSTER_NAME
              value: "clickhouse"
            - name: CLICKHOUSE_HOST
              value: "10.1.62.62:9150"
            - name: CLICKHOUSE_USER
              value: "oms"
            - name: CLICKHOUSE_PASSWORD
              value: "EBupt123"
            - name: CLICKHOUSE_DATABASE
              value: "scmp"
            - name: CLICKHOUSE_TABLE
              value: "logs"
            - name: NODENAME
              valueFrom:
                fieldRef:
                  fieldPath: spec.nodeName
          volumeMounts:
            - name: varlog
              mountPath: /var/log
            - name: varlibdockercontainers
              mountPath: /var/lib/docker/containers
              readOnly: true
            - name: k8s-log-agent-config
              mountPath: /fluent-bit/etc/
      terminationGracePeriodSeconds: 10
      volumes:
      - name: varlog
        hostPath:
          path: /var/log
      - name: varlibdockercontainers
        hostPath:
          path: /var/lib/docker/containers
      - name: k8s-log-agent-config
        configMap:
          name: k8s-log-agent-config
      serviceAccountName: k8s-log-agent
      tolerations:
      - key: node-role.kubernetes.io/master
        operator: Exists
        effect: NoSchedule

数据库存储结果

img

img

fluent简单性能测试

测试准备

FLB input插件配置:

代码语言:javascript
复制
Name              Tail  #使用tail模式
Buffer_Chunk_Size 32k   #设置初始缓冲区大小以读取文件数据 默认为32
Buffer_Max_Siz    32k   #设置每个被监控文件的缓冲区大小限制 默认为32
Mem_Buf_Limit     5MB   #设置 Tail 插件在将数据附加到引擎时可以使用的内存限制。如果达到限制,则暂停;当数据被刷新时,它会恢复。 无默认值
Skip_Long_Lines   On    #当一行日志长度超过Buffer_Max_Size的设置时否停止监控该文件,设置为on为跳过长行并继续处理适合缓冲区大小的其他行 默认off为停止监控该文件
Refresh_Interval  10    #重新获取一次日志的时间间隔 默认60
Read_from_Head    False #对于开始时新发现的文件(没有数据库偏移/位置),从文件的头部读取内容,而不是尾部 默认False
Rotate_Wait       5      #指定在刷新某些挂起数据时监控文件一次的额外时间 默认5
DB                /var/log/flb_kube.db  #指定数据库文件以记录受监控的文件和偏移量 
DB.sync           normal    #设置默认同步 (I/O) 方法 默认normal
DB.locking        false     #指定数据库只能由 Fluent Bit 访问 默认false
DB.journal_mode   WAL       #设置数据库的日志模式 默认wal
exit_on_eof       false     #当读取文件时,它会在到达文件末尾时立即退出 默认false

FLB filter配置:

代码语言:javascript
复制
Name                kubernetes #使用tail模式
Match               * #匹配规则
Kube_URL            https://kubernetes.default.svc.cluster.local:443 #k8s API 服务器端点
Merge_Log           On   #启用后,它会检查日志字段内容是否为 JSON 字符串映射,如果是,则将映射字段附加为日志结构的一部分。
Annotations         On   #在额外的元数据中包含 Kubernetes 资源注释 默认为on
Kube_Tag_Prefix     kube.var.log.containers. #当源记录来自 Tail 输入插件时,此选项指定 Tail 配置中使用的前缀
Merge_Log_Key       log_processed #启用 Merge_Log 后,过滤器会尝试假定传入消息中的日志字段是 JSON 字符串消息,并在映射中日志字段的同一级别对其进行结构化表示,设置key后从原始日志内容中提取的所有新结构化字段都将插入到新键下

FLB运行环境cpu和mem均未设置上限

测试开始

分别记录FLB空载、日志打印频率为5000条/秒、1000条/秒、500条/秒、200条/秒、50条/秒、1条/秒几个状态10分钟内cpu和内存的变化范围、记录各条件下开始记录日志到开始出现滞留的时间,出现滞留后一段时间的日志滞留数目。

测试直接使用htop命令进行

3分钟后滞留条数在10秒产生的日志数目内没有被记录

img

测试中发现了比较有意思的点是当日志打印频率在1000条/秒和500条/秒两个范围时,cpu的波动情况比较稳定,不会超过10个%,但是在其他状况,5000条/秒和100条/秒50条/秒几个状态下,cpu的波动范围很大,最高值都可以达到52,最低值可以达到35,这种特殊值出现的次数比1000条/秒和500条/秒出现的次数要多。

结合之前多次测试的情况,也出现了在1000条/秒时cpu的稳定区间要比500条/秒低的情况。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-12-23,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 云原生技术社区 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • ClickHouse的数据结构
    • 建表sql
      • 表结构
      • 部署对应K8s资源
      • 数据库存储结果
      • fluent简单性能测试
        • 测试准备
          • 测试开始
          相关产品与服务
          容器镜像服务
          容器镜像服务(Tencent Container Registry,TCR)为您提供安全独享、高性能的容器镜像托管分发服务。您可同时在全球多个地域创建独享实例,以实现容器镜像的就近拉取,降低拉取时间,节约带宽成本。TCR 提供细颗粒度的权限管理及访问控制,保障您的数据安全。
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档