本教程中使用的ELK和filebeat
版本是 6.2.4
关于elk的配置参考我之前的一篇文章,不在累述: elk安装地址: https://jjlu521016.github.io/2018/05/01/springboot-logback-log4j-elk.html#2-elk%E9%85%8D%E7%BD%AE
参考我之前的文章、把对应的配置改成单机即可:
我本地zookeeper配置了环境变量
zkServer.sh start
启动kafka
# bin/kafka-server-start.sh config/server.properties
创建topic
# bin/kafka-topics.sh --create --zookeeper 192.168.188.110:2181 --replication-factor 1 --partitions 1 --topic test
创建生产者
# bin/kafka-console-producer.sh --broker-list 192.168.188.110:9092 --topic test
创建消费者
bin/kafka-console-consumer.sh --zookeeper 192.168.188.110:2181 --topic test --from-beginning
此时生产者输入的内容可以在消费者输出,证明kafka调用通了。
参考: https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html vi logstash_agent.conf
input {
kafka {
bootstrap_servers => "192.168.188.110:9092"
#topic id
topics => "test"
codec => "json"
}
}
filter {
json {
source => "message"
remove_field => "message"
}
}
output {
elasticsearch{
hosts => ["192.168.188.110:9200"]
index => "kafka-%{+YYYY.MM.dd}"
}
stdout { codec => rubydebug }
}
启动elk之后,在kafka生产者输入一条信息之后logstash打印出对应的内容,说明集成成功了 。在kibana设置对应的日志。
解压
tar -zxvf filebeat-6.2.3-linux-x86_64.tar.gz
配置filebeat.yml
filebeat.prospectors:
# 这里面配置实际的日志路径
- type: log
enabled: true
paths:
- /var/log/test1.log
fields:
log_topics: test1
- type: log
enabled: true
paths:
- /var/log/test2.log
fields:
log_topics: test2
output.kafka:
enabled: true
hosts: ["10.112.101.90:9092"]
topic: '%{[fields][log_topics]}'
运行
./filebeat -e -c filebeat.yml
把logstash停止,修改配置文件,添加filebeat配置的topic。重启logstash。打开kibana刷新页面,看到filebeat收集到的日志。
注意: 在logstash中,当input里面有多个kafka输入源时,client_id => “xxx”必须添加且需要不同,否则会报错javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=logstash-0。
input {
kafka {
group_id => "es1"
}
kafka {
client_id => "es2"
}
}