flume与kafka整合高可靠教程

问题导读 1.安装kafka是否需要安装zookeeper? 2.kafka安装需要哪些步骤? 3.如何验证kafka是否安装成功?

4.flume source目录是哪个? 5.flume在kafka中扮演什么角色? 6.如何测试整合配置是否成功? kafka安装 flume与kafka整合很多人都用到,但是网上却没有一份详细可靠的教程。说的都是些只言片语。这里整理份flume与kafka整合的教程。 flume原先并不兼容kafka。后来兼容添加上去。对于flume及与kafka的相关知识,推荐参考 flume应该思考的问题 http://www.aboutyun.com/forum.php?mod=viewthread&tid=22102 上面只是加深对flume的认识。下面我们开始整合flume与kafka。 思路: 1.安装kafka 2.安装flume,在配置中添加kafka相关配置 这里使用的版本: kafka:kafka_2.11-0.9.0.1.tgz flume:apache-flume-1.6.0-bin.tar.gz kafka的安装 一、安装zookeeper 在master机器进行以下操作。 1. 解压zookeeper 下载zookeeper:http://apache.fayea.com/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz

[Bash shell] 纯文本查看 复制代码

?

tar -zxvf ~/jar/zookeeper-3.4.6.tar.gz -C /data

网盘下载 链接: https://pan.baidu.com/s/1i50vzb3 密码: p5s5 2. 配置zookeeper 涉及到的配置文件为

[Bash shell] 纯文本查看 复制代码

?

${ZOOKEEPER_HOME}/conf/zoo.cfg

zoo.cfg通过cp zoo_sample.cfg zoo.cfg得到。

[Bash shell] 纯文本查看 复制代码

?

# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial 
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just 
# example sakes.
dataDir=/data/zk_data
# the port at which the clients will connect
clientPort=2181
 
server.1=master:2888:3888
server.2=slave1:2888:3888
server.3=slave2:2888:3888

这儿解释下格式为server.X=host:port1:port2的意思,X表示当前host所运行的服务的zookeeper服务的id(在接下来填写myid时需要用到),port1表示zookeeper中的follower连接到leader的端口号,port2表示leadership时所用的端口号。注意:需要手动去创建dataDir所配置的/data/zk_data目录(mkdir -p /data/zk_data)更多配置可参考: zookeeper配置文件详解 http://www.aboutyun.com/thread-13909-1-1.html 3. 填写myid 在zoo.cfg配置文件中的dataDir目录(在这儿是/data/data_zk)下创建myid文件,文件内容为zoo.cfg中master所对应的server.X。

[Bash shell] 纯文本查看 复制代码

?

echo "1" > /data/zk_data/myid

4. 复制到其他节点

[Bash shell] 纯文本查看 复制代码

?

scp -r /data/zookeeper-3.4.6/ /data/zk_data  aboutyun@slave1:/data
scp -r /data/zookeeper-3.4.6/ /data/zk_data  aboutyun@slave2:/data

在slave1上,

[Bash shell] 纯文本查看 复制代码

?

echo “2” > /data/zk_data/myid

在slave2上,

[Bash shell] 纯文本查看 复制代码

?

echo "3" >/data/zk_data/myid

5. 添加到环境变量 在master、slave1、slave2上,分别将以下内容添加到~/.bashrc文件中

[Bash shell] 纯文本查看 复制代码

?

export ZOOKEEPER_HOME=/data/zookeeper-3.4.6
export PATH=$ZOOKEEPER_HOME/bin:$PATH

然后执行以下命令:source ~/.bashrc 6. 启动验证 在master、slave、slave2上,分别执行zookeeper启动命令。

[Bash shell] 纯文本查看 复制代码

?

zkServer.sh start

由于启动时,每个节点都会试图去连接其它节点,因此先启动的刚开始会连接不上其它的,导致日志中会包含错误信息,在未全启动之前,这个属正常现象。启动完成后,使用jps命令和zkServer.sh status命令经行验证 master机器情况:

slave1机器情况:

slave2机器情况:

说明每个节点都成功启动了QuorumPeerMain进程,并且slave1上的进程为leader,master和slave2上的进程为follower 很多人想知道zookeeper在kafka中的作用,可参考下面文章 kafka在zookeeper中存储结构 http://www.aboutyun.com/forum.php?mod=viewthread&tid=9941 二、安装kafka 在master上进行如下操作: 1. 解压kafka 下载kafka:http://apache.fayea.com/kafka/0.9.0.1/kafka_2.11-0.9.0.1.tgz

[Bash shell] 纯文本查看 复制代码

?

tar -zxvf ~/jar/kafka_2.11-0.9.0.1.tgz -C /data

链接: https://pan.baidu.com/s/1sk8CAeT 密码: 8h2r 2. 配置kakfa 涉及到的配置文件为${KAFA_HOME}/config/server.properties 必须要配置的是这三个参数:broker.id、log.dirs、zookeeper.connect broker.id表示当前broker的id,要求是唯一的非负数。log.dirs表示kafka日志的存放目录。zookeeper.connect表示连接的zookeeper的地址。

[Bash shell] 纯文本查看 复制代码

?

broker.id=0
log.dirs=/data/kafka-logs
zookeeper.connect=master:2181,slave1:2181,slave2:2181

注意:需要手动在本地创建/data/kafka-logs目录 3. 复制到其他节点

[Bash shell] 纯文本查看 复制代码

?

scp  /data/kafka_2.11-0.9.0.1/ /data/kafka-logs/ aboutyun@slave1:/data
scp  /data/kafka_2.11-0.9.0.1/ /data/kafka-logs/ aboutyun@slave2:/data

在slave1机器上将server.properties配置文件的broker.id值改为1在slave2机器上将server.properties配置文件的broker.id值改为2 4. 添加环境变量 在master、slave1、slave2机器上,分别将以下内容添加到~/.bashrc文件中

[Bash shell] 纯文本查看 复制代码

?

export KAFKA_HOME=/data/kafka_2.11-0.9.0.1
export PATH=$KAFKA_HOME/bin:$PATH

然后执行以下命令:source ~/.bashrc 5. 启动验证 在master、slave1、slave2机器上,分别执行kafka启动命令

[Bash shell] 纯文本查看 复制代码

?

cd $KAFKA_HOME
kafka-server-start.sh -daemon ./config/server.properties

之后在每台机器上执行jps命令:

如果每台机器上都成功启动了kafka这个进程,说明我们搭建成功。如果发现某台机器上没有kafka这个进程,可以将kafka的启动命令去掉参数-daemon(加上的话表示后台启动),这样可以直接在屏幕上看到错误信息。 三、kakfa使用示例 1. 创建topic

[Bash shell] 纯文本查看 复制代码

?

#创建一个有3个partition、1个副本的 test topic
kafka-topics.sh --zookeeper master:2181,slave1:2181,slave2:2181 --create --topic test --replication-factor 1 --partitions 3

2. 创建producer

[Bash shell] 纯文本查看 复制代码

?

kafka-console-producer.sh --broker-list master:9092,slave1:9092,slave2:9094 --topic test

3. 创建consumer 重新打开一个窗口:

[Bash shell] 纯文本查看 复制代码

?

kafka-console-consumer.sh --zookeeper master:2181,slave1:2181,slave2:2181  --topic test --from-beginning

4. 产生消息并接受 在producer的窗口中输入几条测试信息

查看consumer窗口的情况

说明成功消费了的产生的3条信息

flume与kafka整合安装

flume安装,其实也并不复杂,可是整合的时候,很多人遇到这么个情况,消费者收不到信息。这个的原因很多。出现问题,无非两种。 1.对flume和kafka基本不理解,只是照抄。这里面就容易出现问题。比如配置错误agent名字错误,配置过期等 2.对整个过程不理解。比如有的不报错,只是看到几行信息。很可能是kafka还没有启动。而且很多人遇到过这种情况,网络都是通的,防火墙是关闭的,为何连接还是拒绝的,原因可能就是,服务根本没有启动。 上面两个方法,相信可以解决大部分问题,更多的其实还是需要自己去理解和查看出现错误的地方。 当然对于flume的配置的理解,还是推荐参考 flume应该思考的问题 http://www.aboutyun.com/forum.php?mod=viewthread&tid=22102 上面只是加深对flume的认识。 下面开始安装flume及整合kafka 一、Flume安装 1. 压缩安装包

[Bash shell] 纯文本查看 复制代码

?

tar -zxvf ~/jar/apache-flume-1.6.0-bin.tar.gz -C /data
mv /data/apache-flume-1.6.0-bin/ /data/flume-1.6.0 # 重命名

网盘下载 链接:http://pan.baidu.com/s/1bBnF5O 密码:xoll 2. 配置环境变量 编辑文件 ~/.bashrc sudo vim ~/.bashrc

[Bash shell] 纯文本查看 复制代码

?

export FLUME_HOME=/data/flume-1.6.0
export PATH=$FLUME_HOME/bin:$PATH

[Bash shell] 纯文本查看 复制代码

?

source ~/.bashrc

3. 配置flume

[Bash shell] 纯文本查看 复制代码

?

cp flume-env.sh.template flume-env.sh修改JAVA_HOME
export JAVA_HOME= /data/jdk1.8.0_111

4. 验证安装

[Bash shell] 纯文本查看 复制代码

?

flume-ng version

二、Flume使用 1. 单节点的agent 1) 增加配置文件 cd $FLUME_HOME/conf vim single_agent.conf 将以下内容拷贝进去

[Bash shell] 纯文本查看 复制代码

?

# agent的名称为a1
a1.sources = source1
a1.channels = channel1
a1.sinks = sink1
 
# set source
a1.sources.source1.type = spooldir
a1.sources.source1.spoolDir=/data/aboutyunlog
a1sources.source1.fileHeader = flase
 
# set sink
a1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
#a1.sinks.sink1.kafka.bootstrap.servers = master:9092,slave1:9092,slave2:9092
a1.sinks.sink1.brokerList= master:9092,slave1:9092,slave2:9092
a1.sinks.sink1.topic= aboutyunlog
a1.sinks.sink1.kafka.flumeBatchSize = 20
a1.sinks.sink1.kafka.producer.acks = 1
a1.sinks.sink1.kafka.producer.linger.ms = 1
a1.sinks.sink1.kafka.producer.compression.type = snappy
 
# set channel
a1.channels.channel1.type = file
a1.channels.channel1.checkpointDir = /data/flume_data/checkpoint
a1.channels.channel1.dataDirs= /data/flume_data/data
 
# bind
a1.sources.source1.channels = channel1
a1.sinks.sink1.channel = channel1

可以看到上面配置信息中 #a1.sinks.sink1.kafka.bootstrap.servers = master:9092,slave1:9092,slave2:9092 被注释掉,改换成 a1.sinks.sink1.brokerList= master:9092,slave1:9092,slave2:9092 这里根据官网配置a1.sinks.sink1.brokerList这个属性已经被弃用,但是使用a1.sinks.sink1.kafka.bootstrap.servers 属性会报错。

[Bash shell] 纯文本查看 复制代码

?

ERROR node.AbstractConfigurationProvider: Sink sink1 has been removed due to an error during configuration
org.apache.flume.conf.ConfigurationException: brokerList must contain at least one Kafka broker
        at org.apache.flume.sink.kafka.KafkaSinkUtil.addDocumentedKafkaProps(KafkaSinkUtil.java:55)
        at org.apache.flume.sink.kafka.KafkaSinkUtil.getKafkaProperties(KafkaSinkUtil.java:37)
        at org.apache.flume.sink.kafka.KafkaSink.configure(KafkaSink.java:211)
        at org.apache.flume.conf.Configurables.configure(Configurables.java:41)
        at org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:413)
        at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:98)

所以不管官网如何写,还是使用a1.sinks.sink1.brokerList这个属性 2. 创建所需文件

[Bash shell] 纯文本查看 复制代码

?

mkdir -p /data/aboutyunlog
mkdir -p /data/flume_data/checkpoint
mkdir -p /data/flume_data/data

同时提醒,创建完毕,当前用户一定具有操作权限。最好授权为777. 3. 在kafka上创建名为aboutyunlog的topic

[Bash shell] 纯文本查看 复制代码

?

kafka-topics.sh --zookeeper master:2181,slave1:2181,slave2:2181 --create --topic aboutyunlog --replication-factor 1 --partitions 3

4. 启动flume

[Bash shell] 纯文本查看 复制代码

?

flume-ng agent --conf-file /data/flume-1.6.0/conf/single_agent.conf --name a1 -Dflume.root.logger=INFO,console

5. 创建一个kafka的consumer flume启动其实是启动了产生着,所以这里创建一个消费者。那么这个消费者创建到什么地方。我这里创建在slave1上。

上面有以前创建的内容,这里做一个测试,我们在增加一条 6. 添加文件到flume source目录 这个是在master上执行 echo -e "this is a test file! \nhttp://www.aboutyun.com\n20170710">log.1 mv log.1 /data/aboutyunlog/

7.在slave1上收到

 

有的时候可能会慢,需要等待几秒。

这样就整合成功了。

#############################################

遇到问题:
1.Failed to find leader for Set
详细如下
[2017-07-05 11:26:04,077] WARN [console-consumer-27231_slave1-1499223048151-471ca15a-leader-finder-thread], Failed to find leader for Set([aboutyunlog,2], [aboutyunlog,1], [aboutyunlog,0]) (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread)
kafka.common.KafkaException: fetching topic metadata for topics [Set(aboutyunlog)] from broker [ArrayBuffer()] failed
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:73)
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)
        at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)

原因:kafka未启动
解决办法:自然是启动kafka.
kafka未启动产生的其它问题:
其实kafka未启动,还会有其它错误,比如在创建消费者的时候,你看不到错误,只有一条警告 WARN [console-consumer-90733_master-1498548695990-7aaba945], no brokers found when trying to rebalance. (kafka.consumer.ZookeeperConsumerConnector)
记得还有另外的错误就是如果未启动,生产者和消费者链接端口是拒绝的,这让很多人认为是网络问题。

错误2

Agent configuration invalid for agent 'a11'. It will be removed.
17/06/27 15:46:02 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [a1]
17/06/27 15:46:02 INFO node.AbstractConfigurationProvider: Creating channels
17/06/27 15:46:02 INFO channel.DefaultChannelFactory: Creating instance of channel channel1 type file
17/06/27 15:46:02 INFO node.AbstractConfigurationProvider: Created channel channel1
17/06/27 15:46:02 INFO source.DefaultSourceFactory: Creating instance of source source1, type spooldir
17/06/27 15:46:02 INFO sink.DefaultSinkFactory: Creating instance of sink: sink1, type: org.apache.flume.sink.kafka.KafkaSink
17/06/27 15:46:02 INFO kafka.KafkaSink: Using the static topic: aboutyunlog this may be over-ridden by event headers
17/06/27 15:46:02 INFO kafka.KafkaSinkUtil: context={ parameters:{kafka.bootstrap.servers=master:9092,slave1:9092,slave2:9092, channel=channel1, topic=aboutyunlog, type=org.apache.flume.sink.kafka.KafkaSink} }
17/06/27 15:46:02 ERROR node.AbstractConfigurationProvider: Sink sink1 has been removed due to an error during configuration
org.apache.flume.conf.ConfigurationException: brokerList must contain at least one Kafka broker
        at org.apache.flume.sink.kafka.KafkaSinkUtil.addDocumentedKafkaProps(KafkaSinkUtil.java:55)
        at org.apache.flume.sink.kafka.KafkaSinkUtil.getKafkaProperties(KafkaSinkUtil.java:37)
        at org.apache.flume.sink.kafka.KafkaSink.configure(KafkaSink.java:211)
        at org.apache.flume.conf.Configurables.configure(Configurables.java:41)
        at org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:413)
        at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:98)
        at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
17/06/27 15:46:02 INFO node.AbstractConfigurationProvider: Channel channel1 connected to [source1]
17/06/27 15:46:02 INFO node.Application: Starting new configuration:{ sourceRunners:{source1=EventDrivenSourceRunner: { source:Spool Directory source source1: { spoolDir: /data/aboutyunlog } }} sinkRunners:{} channels:{channel1=FileChannel channel1 { dataDirs: [/data/flume_data/data] }} }
17/06/27 15:46:02 INFO node.Application: Starting Channel channel1

Sink  has been removed due to an error during configuration org.apache.flume.conf.ConfigurationException: brokerList must contain at least one Kafka broker


上面两个问题
第一:agent的名字不一致造成的。所以产生了这个问题
Agent configuration invalid for agent 'a11'. It will be removed.

解决办法:
自然修改为a1,而不是a11
第二:
ERROR node.AbstractConfigurationProvider: Sink sink1 has been removed due to an error during configuration
org.apache.flume.conf.ConfigurationException: brokerList must contain at least one Kafka broker

上面由于官网说brokerList被弃用
a1.sinks.sink1.brokerList= master:9092,slave1:9092,slave2:9092
所以使用下面属性
a1.sinks.sink1.kafka.bootstrap.servers = master:9092,slave1:9092,slave2:9092
所以产生问题。修改为a1.sinks.sink1.brokerList即可

原文发布于微信公众号 - about云(wwwaboutyuncom)

原文发表时间:2017-07-11

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏冷冷

SpringMVC 文件下载时 浏览器不能正确显示另存的文件名

问题:通过打印输出流的方式把文件下载到本地,但是在firebox 中 下载的文件不显示文件的文件名,造成文件不能直接打开,其他浏览器可以直接打开. 原因: 主要...

2145
来自专栏我的博客

mac安装redis以及phpredis扩展

1. git clone git://github.com/nicolasff/phpredis.git 2.cd phpredis 3.phpize 4../...

3226
来自专栏世界第一语言是java

springboot2新版springcloud微服务全家桶实战

本篇文章是springboot2.x升级后的升级springcloud专贴,因为之前版本更新已经好久了,好多人评论可不可以出个新版本,大家一定要注意,这是spr...

4.4K2
来自专栏happyJared

IDEA快捷键拆解系列(九):Build篇

  以下是关于Build导航项及其每一子项的拆解介绍,其中,加粗部分的选项是博主认为比较重要的。

831
来自专栏salesforce零基础学习

salesforce 零基础学习(二十九)Record Types简单介绍

在项目中我们可能会遇见这种情况,不同的Profile拥有不同的页面,页面中的PickList标签可能显示不同的值。这个时候,使用Record Types可以很便...

2056
来自专栏coding

swoole创建tcp服务器tcp server使用telnet连接自定义 tcp client

3462
来自专栏yukong的小专栏

【SpringBoot系列02】SpringBoot之使用Thymeleaf视图模板前言一、目标二、实现三、总结

Thymeleaf 是Java服务端的模板引擎,与传统的JSP不同,前者可以使用浏览器直接打开,因为可以忽略掉拓展属性,相当于打开原生页面,给前端人员也带来一定...

1042
来自专栏Alice

iOS .pch文件的使用

什么是.pch文件预编译头文件(一般扩展名为.PCH),是把一个工程中较稳定的代码预先编译好放在一个文件(.PCH)里。这些预先编译好的代码可以是任何的C/C+...

2927
来自专栏程序猿DD

Spring Boot开发Web应用

《SpringBoot快速入门》 中我们完成了一个简单的RESTful Service,体验了快速开发的特性。在留言中也有朋友提到如何把处理结果渲染到页面上。那...

2126
来自专栏Jackson0714

Ubuntu 安装php+mysql+nginx

https://blog.csdn.net/Msmile_my/article/details/73647809

2275

扫码关注云+社区

领取腾讯云代金券