flume 整合 kafka:
一、安装部署Kafka
1.0.0 is the latest release. The current stable version is 1.0.0.
You can verify your download by following these procedures and using these KEYS.
We build for multiple versions of Scala. This only matters if you are using Scala and you want a version built for the same Scala version you use. Otherwise any version should work (2.11 is recommended).
tar zxvf kafka_2.11-1.0.0.tgz
mv kafka_2.12-1.0.0 /usr/local/kafka2.12
....
#nohup bin/kafka-server-start.sh config/server.properties &
#bin/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 1 --replication-factor 1 --topic test
Created topic "test".
# bin/kafka-topics.sh --list --zookeeper localhost:2181
__consumer_offsets
test
#bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
输入:my test
#bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
二、安装部署flume
flume下载:
Apache Flume is distributed under the Apache License, version 2.0
The link in the Mirrors column should display a list of available mirrors with a default selection based on your inferred location. If you do not see that page, try a different browser. The checksum and signature are links to the originals on the main distribution server.
Apache Flume binary (tar.gz) | apache-flume-1.8.0-bin.tar.gz | apache-flume-1.8.0-bin.tar.gz.md5 | apache-flume-1.8.0-bin.tar.gz.sha1 | apache-flume-1.8.0-bin.tar.gz.asc |
---|---|---|---|---|
Apache Flume source (tar.gz) | apache-flume-1.8.0-src.tar.gz | apache-flume-1.8.0-src.tar.gz.md5 | apache-flume-1.8.0-src.tar.gz.sha1 | apache-flume-1.8.0-src.tar.gz.asc |
It is essential that you verify the integrity of the downloaded files using the PGP or MD5 signatures. Please read Verifying Apache HTTP Server Releases for more information on why you should verify our releases.
wget apache-flume-1.8.0-bin.tar.gz
tar zxvf apache-flume-1.8.0-bin.tar.gz
mv apache-flume-1.8.0-bin /usr/local/flume1.8
安装java并设置java环境变量,flume环境变量,在`/etc/profile`中加入
export JAVA_HOME=/usr/java/jdk1.8.0_65
export FLUME_HOME=/usr/local/flume1.8
export PATH=$PATH:$JAVA_HOME/bin:$FLUME_HOME
执行:source /etc/profile 生效变量
/tmp/logs/kafka.log
拷贝配置模板:
# cp conf/flume-conf.properties.template conf/flume-conf.properties
# cp conf/flume-env.properties.template conf/flume-env.properties
编辑配置如下:
agent.sources = s1
agent.channels = c1
agent.sinks = k1
agent.sources.s1.type=exec
#日志采集位置
agent.sources.s1.command=tail -F /tmp/logs/kafka.log
agent.sources.s1.channels=c1
agent.channels.c1.type=memory
agent.channels.c1.capacity=10000
agent.channels.c1.transactionCapacity=100
agent.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSink
#kafka 地址
agent.sinks.k1.brokerList=localhost:9092
#kafka topic
agent.sinks.k1.topic=test
agent.sinks.k1.serializer.class=kafka.serializer.StringEncoder
agent.sinks.k1.channel=c1
功能验证
# bin/flume-ng agent --conf ./conf/ -f conf/kafka.properties -Dflume.root.logger=DEBUG,console -n agent
运行日志位于logs目录,或者启动时添加-Dflume.root.logger=INFO,console 选项前台启动,输出打印日志,查看具体运行日志,服务异常时查原因。
for((i=0;i<=1000;i++));
do echo "kafka_flume_test-"+$i>>/tmp/logs/kafka.log;
do
./log_producer_test.sh
观察kafka日志消费情况。。。