vector 官方的介绍如下:
vector是使用rust编写的高性能可观测性数据管道,使组织能够控制其可观测性数据。收集、转换您的所有日志、指标和跟踪,并将其路由到您今天需要的任何供应商以及您明天可能需要的任何其他供应商。 Vector 可以在您需要的地方(而不是在供应商最方便的地方)实现显着的成本降低、新颖的数据丰富和数据安全。开源,比任何替代方案快 10 倍。
市面上,vector的同类产品:filebeat、ilogtail、fluentd、logstash
fluentd和logstash的性能不太给力,具体指标可以参考网上其他资料。ilogtail的性能也很强悍,改天有空再开一篇。
vector 的数据流转模型
vector的架构很有特点(背压、缓冲、端到端ACK等),具体可参考这篇 https://vector.dev/docs/about/under-the-hood/architecture/
生产级别大规模的数据处理流: json文件 --> vector 1 --> kafka --> vector 2 --> es
小规模的数据采集,也可以不用kafka: json文件 --> vector 1 --> es
vector的安装很简单,可以rpm包安装也可以二进制安装,不是本文的重点,忽略。
我们下面演示的这种是小规模的场景下的使用案例:
1 创建vector缓存目录和日志文件路径
mkdir -pv /var/lib/vector
2 编写主配置文件
$ cat main.toml
data_dir = "/var/lib/vector"
[api]
enabled = true
address = "127.0.0.1:8686"
3 编写日志到kafka的配置文件
$ cat file2es.toml
[sources.app_json_log]
type = "file"
include = [ "/home/software/vector/*.json" ]
glob_minimum_cooldown_ms = 10000 # 日志文件发现的间隔 10s
ignore_older = 86400 # 1 day
max_line_bytes = 102400 # 单行记录超过102KB则丢弃
read_from = "beginning"
[transforms.app_json_log_parser]
inputs = [ "app_json_log" ]
type = "remap"
drop_on_error = false
source = """
. = parse_json!(.message)
del(.exportable)
.washer_ts = now()
.agent_id = "${HOSTNAME}"
"""
# 这里在parse_json时候做了点自定义配置
#1、移除我用不到的字段
#2、加了个washer_ts 字段,值为当前时间戳,便于评估vector数据清洗链路的耗时
#3、加了个agent_id字段,用于标识这个日志是从哪个机器采集的
[sinks.es_cluster]
inputs = [ "app_json_log_parser" ]
type = "elasticsearch"
endpoints = ["http://192.168.31.181:9200"]
[sinks.es_cluster.batch]
"batch.max_events" = 1000
"batch.timeout_secs" = 2
[sinks.es_cluster.buffer]
type = "disk" # 这个值可为内存或磁盘
max_size = 536870976 # 当vector写ES失败是,在本机开辟有512MB磁盘空间用以缓存待发送的数据
when_full = "block" # 当512MB空间写满后,vector处于阻塞状态,也就是不继续采集下游的原始日志
[sinks.es_cluster.bulk]
index = "vector-%Y.%m.%d"
4 校验配置文件
校验全部配置文件
./bin/vector validate config/*.toml
或者校验指定的配置文件
./bin/vector validate config/file2es.toml
√ Loaded ["config/file2es.toml"]
√ Component configuration
√ Health check "es_cluster"
--------------------------------
Validated
5 前台启动
./bin/vector -c config/main.toml -c config/file2es.toml
6 性能测试
场景:100w条json记录,每条记录 1024bytes。 vector取得后,直接写到ES中。
1、生成100w条记录,写到 test_log.json 文件中,可以看到日志文件体积为978MB
-rw-r--r-- 1 root root 978M 2023-12-16 12:39 test_log.json
2、启动vector进程
$ ./bin/vector -c config/main.toml -c config/file2es.toml
2023-12-16T04:40:15.558619Z INFO vector::app: Log level is enabled. level="vector=info,codec=info,vrl=info,file_source=info,tower_limit=info,rdkafka=info,buffers=info,lapin=info,kube=info"
2023-12-16T04:40:15.565948Z INFO vector::app: Loading configs. paths=["config/file2es.toml", "config/main.toml"]
2023-12-16T04:40:16.751600Z INFO vector::topology::running: Running healthchecks.
2023-12-16T04:40:16.751751Z INFO vector: Vector has started. debug="false" version="0.34.0" arch="x86_64" revision="c909b66 2023-11-07 15:07:26.748571656"
2023-12-16T04:40:16.752450Z INFO source{component_kind="source" component_id=app_json_log component_type=file}: vector::sources::file: Starting file server. include=["/home/software/vector/*.json"] exclude=["/home/software/vector/mth-*.json"]
2023-12-16T04:40:16.753378Z INFO source{component_kind="source" component_id=app_json_log component_type=file}:file_server: file_source::checkpointer: Loaded checkpoint data.
2023-12-16T04:40:16.753852Z INFO vector::internal_events::api: API server running. address=127.0.0.1:8686 playground=http://127.0.0.1:8686/playground
2023-12-16T04:40:16.754494Z INFO source{component_kind="source" component_id=app_json_log component_type=file}:file_server: vector::internal_events::file::source: Found new file to watch. file=/home/software/vector/test_log.json
2023-12-16T04:40:16.755392Z INFO vector::topology::builder: Healthcheck passed.
从vetor控制台日志看到启动时间为:12:40:15 (UTC时间04:40:15,转成东八区时间也就是 12:40:15 )
3、 稍等片刻,去Kibana界面查询日志明细。 从kibana界面上查到的最后一条记录的入库时间 12:40:36
可以粗略估算 vector从采集到入库耗时为 12:40:36 - 12:40:15 = 21秒
可以看到 vector 的性能还是很强的。
vector在transform阶段,还支持很多有意思的功能(例如lua、log_to_metric)。在sinks阶段,还可以同时把消息发送到kafka、es、s3等多种存储中,具体可以看官方文档。
下面是我从生产截的图:
可以vector的washer_ts比原始日志差了20s,可以粗略认为整体elk日志链路的延迟为20s(实际上生产前还需要多次测试)
TIPS:对于大规模场景下,一般是第一层的vector将各自机器上的业务日志采集并简单处理后发送到kafka,然后由第二层的vector(单机或集群)消费kafka数据,并写到ES集群中。
大致的配置如下:
1、json日志到kafka的配置文件:
$ cat file2_kafka.toml
[sources.app_json_log]
type = "file"
include = [ "/home/software/vector/*.json" ]
exclude = [ "/home/software/vector/mth-*.json" ]
glob_minimum_cooldown_ms = 10000 # 日志文件发现的间隔 10s
ignore_older = 86400 # 1 day
max_line_bytes = 102400 # 单行记录超过102KB则丢弃
read_from = "beginning"
[transforms.app_json_log_parser]
inputs = [ "app_json_log" ]
type = "remap"
drop_on_error = false
source = """
. = parse_json!(.message)
del(.exportable)
.washer_ts1 = now()
.agent_id = "${HOSTNAME}"
"""
[sinks.kafka]
type = "kafka" # required
inputs = ["app_json_log_parser"] # required
bootstrap_servers = "127.0.0.1:9092"
compression = "lz4" # optional, default
topic = "vector_apps" # required
encoding.codec = "json" # required
healthcheck.enabled = true
2、kafka到es阶段的配置文件:
$ cat kafka2es.toml
data_dir = "/var/lib/vector"
[sources.log2kafka]
type = "kafka"
bootstrap_servers = "127.0.0.1:9092"
group_id = "consumer-group-name"
topics = [
"^(vector_apps|vector_slowlog)-.+",
]
commit_interval_ms = 5000
auto_offset_reset = "largest"
[transforms.app_json_log_parser2]
inputs = [ "log2kafka" ]
type = "remap"
drop_on_error = false
source = """
. = parse_json!(.message)
del(.exportable)
.washer_ts2 = now()
"""
[sinks.es2]
inputs = [ "app_json_log_parser2" ]
type = "elasticsearch"
endpoints = ["http://192.168.31.181:9200"]
[sinks.es2.batch]
"batch.max_events" = 1000
"batch.timeout_secs" = 2
[sinks.es2.buffer]
type = "disk"
max_size = 536870976
when_full = "block"
[sinks.es2.bulk]
index = "vector-new-%Y.%m.%d"
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。