Logstash 6.6.2版本下载
https://www.elastic.co/cn/downloads/past-releases/logstash-6-6-2
官方文档
https://www.elastic.co/guide/en/logstash/6.6/first-event.html
上传文件后解压
[root@hadoop01 software]# tar -zxf logstash-6.6.2.tar.gz -C ../install/
java 1.8环境
java -version
java version "1.8.0_261"
启动:进入logstash/bin目录
logstash-6.6.2]# bin/logstash -e 'input { stdin { } } output { stdout {} }'
测试:输入文本
Logstash将时间戳和IP地址信息添加到消息中
停止:
CTRL-D
参考之前写过的kafka
https://cloud.tencent.com/developer/article/1757036
启动zk
启动kafka
创建topic
kafka_2.13-2.6.0]# bin/kafka-topics.sh --create --zookeeper 192.168.137.121:2181 --replication-factor 1 --partitions 1 --topic logstash_k
查看主题
bin/kafka-topics.sh --zookeeper 192.168.137.121:2181 --describe
// 可查看已创建的topic列表
bin/kafka-topics.sh --zookeeper 192.168.137.121:2181 --list
// 可具体指定topic
bin/kafka-topics.sh --zookeeper 192.168.137.121:2181 --describe --topic logstash_k
测试kafka环境(写入消费正常?)
生产:product向broker写入数据
kafka_2.13-2.6.0]# bin/kafka-console-producer.sh --broker-list 192.168.137.121:9092 --topic logstash_k
消费:consumer从topic的partition消费数据
bin/kafka-console-consumer.sh --bootstrap-server 192.168.137.121:9092 --topic logstash_k --from-beginning
创建配置文件
logstash-6.6.2]# touch logstash.conf
[root@hadoop01 logstash-6.6.2]# vim logstash.conf
input{
stdin{}
}
output{
kafka{
topic_id => "logstash_k"
bootstrap_servers => "192.168.137.121:9092" # kafka的地址
batch_size => 5
}
stdout{
codec => rubydebug
}
}
效果图
logstash数据写到kafka中
kafka消费到的数据
input{
stdin{}
}
output{
kafka{
topic_id => "odeon_test_tymiao"
bootstrap_servers => "," # kafka的地址
jaas_path => "/opt/install/logstash-XXX/odeon_jass.config"
security_protocol => "SASL_PLAINTEXT"
sasl_mechanism => "SCRAM-SHA-256"
codec => "json"
}
}