前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >vector 数据采集工具的使用

vector 数据采集工具的使用

原创
作者头像
保持热爱奔赴山海
修改2023-12-19 14:57:27
4170
修改2023-12-19 14:57:27
举报
文章被收录于专栏:饮水机管理员饮水机管理员

vector 官方的介绍如下:

vector是使用rust编写的高性能可观测性数据管道,使组织能够控制其可观测性数据。收集、转换您的所有日志、指标和跟踪,并将其路由到您今天需要的任何供应商以及您明天可能需要的任何其他供应商。 Vector 可以在您需要的地方(而不是在供应商最方便的地方)实现显着的成本降低、新颖的数据丰富和数据安全。开源,比任何替代方案快 10 倍。

市面上,vector的同类产品:filebeat、ilogtail、fluentd、logstash

fluentd和logstash的性能不太给力,具体指标可以参考网上其他资料。ilogtail的性能也很强悍,改天有空再开一篇。

vector 的数据流转模型

pipeline
pipeline

vector的架构很有特点(背压、缓冲、端到端ACK等),具体可参考这篇 https://vector.dev/docs/about/under-the-hood/architecture/

生产级别大规模的数据处理流: json文件 --> vector 1 --> kafka --> vector 2 --> es

小规模的数据采集,也可以不用kafka: json文件 --> vector 1 --> es

vector的安装很简单,可以rpm包安装也可以二进制安装,不是本文的重点,忽略。

我们下面演示的这种是小规模的场景下的使用案例:

1 创建vector缓存目录和日志文件路径

代码语言:shell
复制
mkdir -pv /var/lib/vector

2 编写主配置文件

代码语言:shell
复制
$ cat main.toml 
data_dir = "/var/lib/vector"

[api]
enabled = true
address = "127.0.0.1:8686"

3 编写日志到kafka的配置文件

代码语言:shell
复制
$ cat file2es.toml 
[sources.app_json_log]
type = "file"
include = [ "/home/software/vector/*.json" ] 
glob_minimum_cooldown_ms = 10000  # 日志文件发现的间隔 10s 
ignore_older = 86400  # 1 day
max_line_bytes = 102400   # 单行记录超过102KB则丢弃
read_from = "beginning"

[transforms.app_json_log_parser]
inputs = [ "app_json_log" ]
type = "remap"
drop_on_error = false
source = """
. = parse_json!(.message)
del(.exportable)
.washer_ts = now()
.agent_id = "${HOSTNAME}"
"""
# 这里在parse_json时候做了点自定义配置
#1、移除我用不到的字段
#2、加了个washer_ts 字段,值为当前时间戳,便于评估vector数据清洗链路的耗时
#3、加了个agent_id字段,用于标识这个日志是从哪个机器采集的

[sinks.es_cluster]
inputs = [ "app_json_log_parser" ]
type = "elasticsearch"
endpoints = ["http://192.168.31.181:9200"]

  [sinks.es_cluster.batch]
  "batch.max_events" = 1000
  "batch.timeout_secs" = 2

  [sinks.es_cluster.buffer]
  type = "disk"  # 这个值可为内存或磁盘
  max_size = 536870976  # 当vector写ES失败是,在本机开辟有512MB磁盘空间用以缓存待发送的数据
  when_full = "block"  # 当512MB空间写满后,vector处于阻塞状态,也就是不继续采集下游的原始日志

  [sinks.es_cluster.bulk]
  index = "vector-%Y.%m.%d"

4 校验配置文件

代码语言:shell
复制
校验全部配置文件
./bin/vector validate config/*.toml  


或者校验指定的配置文件
./bin/vector validate config/file2es.toml
√ Loaded ["config/file2es.toml"]
√ Component configuration
√ Health check "es_cluster"
--------------------------------
                       Validated

5 前台启动

代码语言:shell
复制
./bin/vector -c config/main.toml -c config/file2es.toml

6 性能测试

代码语言:shell
复制
场景:100w条json记录,每条记录 1024bytes。 vector取得后,直接写到ES中。

1、生成100w条记录,写到 test_log.json 文件中,可以看到日志文件体积为978MB
-rw-r--r-- 1 root root 978M 2023-12-16 12:39 test_log.json

2、启动vector进程
$ ./bin/vector -c config/main.toml -c config/file2es.toml
2023-12-16T04:40:15.558619Z  INFO vector::app: Log level is enabled. level="vector=info,codec=info,vrl=info,file_source=info,tower_limit=info,rdkafka=info,buffers=info,lapin=info,kube=info"
2023-12-16T04:40:15.565948Z  INFO vector::app: Loading configs. paths=["config/file2es.toml", "config/main.toml"]
2023-12-16T04:40:16.751600Z  INFO vector::topology::running: Running healthchecks.
2023-12-16T04:40:16.751751Z  INFO vector: Vector has started. debug="false" version="0.34.0" arch="x86_64" revision="c909b66 2023-11-07 15:07:26.748571656"
2023-12-16T04:40:16.752450Z  INFO source{component_kind="source" component_id=app_json_log component_type=file}: vector::sources::file: Starting file server. include=["/home/software/vector/*.json"] exclude=["/home/software/vector/mth-*.json"]
2023-12-16T04:40:16.753378Z  INFO source{component_kind="source" component_id=app_json_log component_type=file}:file_server: file_source::checkpointer: Loaded checkpoint data.
2023-12-16T04:40:16.753852Z  INFO vector::internal_events::api: API server running. address=127.0.0.1:8686 playground=http://127.0.0.1:8686/playground
2023-12-16T04:40:16.754494Z  INFO source{component_kind="source" component_id=app_json_log component_type=file}:file_server: vector::internal_events::file::source: Found new file to watch. file=/home/software/vector/test_log.json
2023-12-16T04:40:16.755392Z  INFO vector::topology::builder: Healthcheck passed.

从vetor控制台日志看到启动时间为:12:40:15 (UTC时间04:40:15,转成东八区时间也就是 12:40:15 )


3、	稍等片刻,去Kibana界面查询日志明细。 从kibana界面上查到的最后一条记录的入库时间 12:40:36
可以粗略估算 vector从采集到入库耗时为 12:40:36 - 12:40:15  = 21秒
kibana
kibana

可以看到 vector 的性能还是很强的。

vector在transform阶段,还支持很多有意思的功能(例如lua、log_to_metric)。在sinks阶段,还可以同时把消息发送到kafka、es、s3等多种存储中,具体可以看官方文档。

下面是我从生产截的图:

vector
vector

可以vector的washer_ts比原始日志差了20s,可以粗略认为整体elk日志链路的延迟为20s(实际上生产前还需要多次测试)

TIPS:对于大规模场景下,一般是第一层的vector将各自机器上的业务日志采集并简单处理后发送到kafka,然后由第二层的vector(单机或集群)消费kafka数据,并写到ES集群中。

大致的配置如下:

代码语言:shell
复制
1、json日志到kafka的配置文件:
$ cat file2_kafka.toml 
[sources.app_json_log]
type = "file"
include = [ "/home/software/vector/*.json" ] 
exclude = [ "/home/software/vector/mth-*.json" ] 
glob_minimum_cooldown_ms = 10000  # 日志文件发现的间隔 10s 
ignore_older = 86400  # 1 day
max_line_bytes = 102400   # 单行记录超过102KB则丢弃
read_from = "beginning"

[transforms.app_json_log_parser]
inputs = [ "app_json_log" ]
type = "remap"
drop_on_error = false
source = """
. = parse_json!(.message)
del(.exportable)
.washer_ts1 = now()
.agent_id = "${HOSTNAME}"
"""

[sinks.kafka]
  type = "kafka" # required
  inputs = ["app_json_log_parser"] # required
  bootstrap_servers = "127.0.0.1:9092"
  compression = "lz4" # optional, default
  topic = "vector_apps" # required
  encoding.codec = "json" # required
  healthcheck.enabled = true




2、kafka到es阶段的配置文件:
$ cat kafka2es.toml 
data_dir = "/var/lib/vector"

[sources.log2kafka]
type = "kafka"
bootstrap_servers = "127.0.0.1:9092"
group_id = "consumer-group-name"
topics = [ 
  "^(vector_apps|vector_slowlog)-.+",
]
commit_interval_ms = 5000
auto_offset_reset = "largest"

[transforms.app_json_log_parser2]
inputs = [ "log2kafka" ]
type = "remap"
drop_on_error = false
source = """
. = parse_json!(.message)
del(.exportable)
.washer_ts2 = now()
"""


[sinks.es2]
inputs = [ "app_json_log_parser2" ]
type = "elasticsearch"
endpoints = ["http://192.168.31.181:9200"]

  [sinks.es2.batch]
  "batch.max_events" = 1000
  "batch.timeout_secs" = 2

  [sinks.es2.buffer]
  type = "disk"
  max_size = 536870976
  when_full = "block"

  [sinks.es2.bulk]
  index = "vector-new-%Y.%m.%d"

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
Elasticsearch Service
腾讯云 Elasticsearch Service(ES)是云端全托管海量数据检索分析服务,拥有高性能自研内核,集成X-Pack。ES 支持通过自治索引、存算分离、集群巡检等特性轻松管理集群,也支持免运维、自动弹性、按需使用的 Serverless 模式。使用 ES 您可以高效构建信息检索、日志分析、运维监控等服务,它独特的向量检索还可助您构建基于语义、图像的AI深度应用。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档