前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >日志从Kafka到Loki的N种方式​

日志从Kafka到Loki的N种方式​

作者头像
云原生小白
发布2021-05-13 09:58:32
2.7K0
发布2021-05-13 09:58:32
举报
文章被收录于专栏:Loki

最近群里有小伙伴有说到自己的日志存储路径先是从客户端到Kafka,再通过消费kafka到ElasticSearch。现在要将ES换成Loki面临需要同时支持Kafka和Loki插件的工具。小白查了下当前市面上满足需求且足够可靠的工具分别为FluentdLogstash以及Vector

  • Fluentd

CNCF已毕业的云原生日志采集客户端。与kubernetes结合比较紧密,插件丰富且有大厂背书。不足之处在于受ruby限制,在日志量大(建议使用FluentBit)时本身的资源消耗不小。

  • Logstash

ELK栈中老牌的日志采集和聚合工具,使用广泛且插件丰富,不足之处在于资源消耗整体比较高,单机日志并发处理效率不高。

  • Vector

刚开源不久的轻量级日志客户端,产品集成度比较高,资源消耗极低。不足之处就是当下产品似乎没有还没有广泛的最佳实践。

官方性能报告 https://vector.dev/#performance

以下是vector分别对上述产品做的一个性能测试对比,大家可以参考下:

Test

Vector

FluentBit

FluentD

Logstash

TCP to Blackhole

86mib/s

64.4mib/s

27.7mib/s

40.6mib/s

File to TCP

76.7mib/s

35mib/s

26.1mib/s

3.1mib/s

Regex Parsing

13.2mib/s

20.5mib/s

2.6mib/s

4.6mib/s

TCP to HTTP

26.7mib/s

19.6mib/s

<1mib/s

2.7mib/s

TCP to TCP

69.9mib/s

67.1mib/s

3.9mib/s

10mib/s

那么接下来进入主题吧,当我们需要将Kafka里的日志存进Loki时,我们有哪些方法实现,先来看个简单的?

Vector

Vector内部已经集成好了kafka和loki方法,我们只需下载vector和配置就能直接用起来。

安装vector

代码语言:javascript
复制
curl --proto '=https' --tlsv1.2 -sSf https://sh.vector.dev | sh

或者你可以直接使用docker镜像

代码语言:javascript
复制
docker pull timberio/vector:0.10.0-alpine
vector配置
代码语言:javascript
复制
[sources.in]
  bootstrap_servers = "<kafka地址>"
  group_id = "<消费组id>"
  topics = ["^(prefix1|prefix2)-.+", "topic-1", "topic-2"]  \\topic名字,支持正则
  type = "kafka"

[sinks.out]
  endpoint = "http://<loki地址>" 
  inputs = ["in"]      \\source.in 
  type = "loki"
  labels.key = "value"  \\自定义key
  labels.key = "{{ event_field }}" \\event的动态值

关于vector-loki更多参数可以参考:https://vector.dev/docs/reference/sinks/loki/

Fluentd

Input - fluent-plugin-kafka

fluent-plugin-kafka插件是fluent的官方处理kafka的插件,可同时用于input和output两个阶段。它的安装方式如下:

代码语言:javascript
复制
gem install fluent-plugin-kafka

当它用于input阶段时,这时fluentd就会作为一个kafka的消费者,从指定的topic中取出消息并做相关处理,它的配置如下:

代码语言:javascript
复制
<source>
  @type kafka

  brokers <kafka borker地址>
  topics <topic信息,多个topic用逗号分隔>
  format <日志处理格式,默认是json,支持text|json|ltsv|msgpack>
  message_key <日志key,仅用于text格式的日志>
  add_prefix <添加fluentd的tag前缀>
  add_suffix <添加fluentd的tag后缀>
</source>

如果你想指定从不同topic的偏移量开始消费消息的话,就需要如下配置:

代码语言:javascript
复制
<source>
  @type kafka

  brokers  <kafka borker地址>
  format   <日志处理格式,默认是json,支持text|json|ltsv|msgpack>
  <topic>
    topic     <单个topic名>
    partition <partition号>
    offset    <从offset开始>
  </topic>
  <topic>
    topic     <单个topic名>
    partition <partition号>
    offset    <从offset开始>
  </topic>
</source>

熟悉fluentd的同学可能知道,在fluentd中是以tag名来处理pipeline的,默认情况下kafka插件会用topic名来做你tag名,如果你想做一些全局的filter可以添加tag前缀/后缀来全局匹配实现。

Output - fluent-plugin-grafana-loki

fluent-plugin-grafana-loki是grafana lab贡献的一个从fluentd发送日志到loki的插件。之前小白在《Loki和Fluentd的那点事儿》里介绍过,这里不过多展开。

配置直接从以前的文章中copy过来,主要的区别在于tag的匹配,参考如下:

代码语言:javascript
复制
<match $kafka.topic>  \\此处为kafka的topic
  @type loki
  @id loki.output
  url "http://loki:3100"
  <label>
    key1          \\如果你的日志json格式,那么你可以将需要提取
    key2          \\的字段作为你的loki labels
  </label>
  <buffer label>
    @type file
    path /var/log/fluentd-buffers/loki.buffer
    flush_mode interval
    flush_thread_count 4
    flush_interval 3s
    retry_type exponential_backoff
    retry_wait 2s
    retry_max_interval 60s
    retry_timeout 12h
    chunk_limit_size 8M
    total_limit_size 5G
    queued_chunks_limit_size 64
    overflow_action drop_oldest_chunk
  </buffer>
</match>

Logstash

Input - logstash-input-kafka

logstash-input-kafka是elastic官方提供的kafka消费端插件,对于input阶段的配置也比较简单。

代码语言:javascript
复制
input {
    kafka {
        bootstrap_servers => "<kafka borker地址>"
        topics => "<topic名字>"
        codec => "<日志类型,默认plain>"
        tags => "<tag标识,方便后面处理>"
        }
    }

更多的参数参考https://www.elastic.co/guide/en/logstash/7.10/plugins-inputs-kafka.html

Output - logstash-output-loki

logstash-output-loki也是由grafana lab贡献的用于处理往loki输出的logstash插件。安装时直接执行下列命令即可:

代码语言:javascript
复制
logstash-plugin install logstash-output-loki

logstash输出到loki的参数不多,小白捡主要的几个说明如下:

代码语言:javascript
复制
output {
  loki {
    url => "http://<loki地址>/loki/api/v1/push"
    batch_size => 112640 #单次push的日志包大小
    retries => 5
    min_delay => 3
    max_delay => 500
    message_field => "<日志消息行的key,改key由logstas传递过来,默认为message>"
  }
}

总结

以上三个工具均没有做filter和解析,仅仅只是充当管道将日志从kafka里转存到loki,实际环境可能比较复杂,需要对日志做进一步分析。不过从小白的体验来看vector对于日志从kafka到loki的配置算是比较简单直接,fluentd和logstash整体差不多,就看大家自己的顺手程度了。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-11-20,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 云原生小白 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Vector
    • 安装vector
      • vector配置
  • Fluentd
    • Input - fluent-plugin-kafka
      • Output - fluent-plugin-grafana-loki
      • Logstash
        • Input - logstash-input-kafka
          • Output - logstash-output-loki
          • 总结
          相关产品与服务
          Grafana 服务
          Grafana 服务(TencentCloud Managed Service for Grafana,TCMG)是腾讯云基于社区广受欢迎的开源可视化项目 Grafana ,并与 Grafana Lab 合作开发的托管服务。TCMG 为您提供安全、免运维 Grafana 的能力,内建腾讯云多种数据源插件,如 Prometheus 监控服务、容器服务、日志服务 、Graphite 和 InfluxDB 等,最终实现数据的统一可视化。
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档