首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >重磅:Flume1-7结合kafka讲解

重磅:Flume1-7结合kafka讲解

作者头像
Spark学习技巧
发布2018-01-31 11:58:27
2K0
发布2018-01-31 11:58:27
举报
文章被收录于专栏:Spark学习技巧Spark学习技巧

本文主要是将flume监控目录,文件,kafka Source,kafka sink,hdfs sink这几种生产中我们常用的flume+kafka+hadoop场景,希望帮助大家快速入生产。

flume只有一个角色agent,agent里都有三部分构成:source、channel和sink。就相当于source接收数据,通过channel传输数据,sink把数据写到下一端。这就完了,就这么简单。其中source有很多种可以选择,channel有很多种可以选择,sink也同样有多种可以选择,并且都支持自定义。同时,agent还支持选择器,就是一个source支持多个channel和多个sink,这样就完成了数据的分发。

Event是flume数据传输的基本单元 flume以时间的形式将数据从源头传输到目的地 Event由可选的header和载有数据的一个byte array构成: 1,载有数据对flume是不透明的

2,header是容纳了key-value字符串对的无序集合,key在集合内是唯一的。

flume常见的组合方式:

一 Exec Source

Exec源在启动时运行一个给定的Unix命令,并期望该过程持续在标准输出上生成数据(除非将属性logStdErr设置为true,否则stderr将被简单地丢弃)。如果该过程因任何原因而退出,则该来源也退出并且不会产生进一步的数据。这意味着像cat [named pipe]或tail -F [file]这样的配置将产生所需的结果,而日期可能不会 - 前两个命令产生数据流,而后者产生单个事件并退出。

属性名称

默认值

描述

channels

type

-

必须是: exec

command

-

要执行的命令

shell

-

用于运行命令的shell调用。 例如 / bin / sh -c。 仅用于依赖诸如通配符,后退,管道等外壳功能的命令

restartThrottle

10000

尝试重新启动之前的等待时间(以毫秒为单位)

restart

false

停掉执行的cmd是否应该重新启动

logStdErr

false

是否应记录命令的stderr

batchSize

20

一次读取和发送到Channel的最大行数

batchTimeout

3000

在数据被推向下游之前,如果未达到缓冲区大小,则等待的时间(以毫秒为单位)

selector.type

replicating

replicating or multiplexing

selector.*

Depends on the selector.type value

interceptors

-

Space-separated list of interceptors

警告:

ExecSource和其它异步源的问题是,源不能保证,如果消息没有写入Channel,客户端知道。这种情况下数据就丢失了,例如,tail -F [file]。 虽然这是可能的,但存在明显的问题。如果channel填满,Flume无法发送event,会发生什么情况?flume无法向应用程序表名由于某种原因他需要保留日志或者事件没有被发送。 如果没有意义,只需要知道这一点:使用单向异步接口(如ExecSource)时,应用程序永远不能保证已收到数据!要获得更高的可靠性保证,请考虑Spooling Directory Source或通过SDK直接与Flume集成。

agent名称为a1的示例:

  1. a1.sources = r1
  2. a1.channels = c1
  3. a1.sources.r1.type = exec
  4. a1.sources.r1.command = tail -F /var/log/secure
  5. a1.sources.r1.channels = c1

“shell”配置用于通过命令shell(例如Bash或Powershell)调用“command”。'command'作为参数传递给'shell'来执行。这允许“command”使用shell中的功能,例如通配符,back tick,管道,循环,条件等。

在没有'shell'配置的情况下,'command'将被直接调用。 'shell'的常见值:'/ bin / sh -c','/ bin / ksh -c','cmd / c','powershell -Command'等

示例:

  1. a1.sources.tailsource-1.type = exec
  2. a1.sources.tailsource-1.shell = /bin/bash -c
  3. a1.sources.tailsource-1.command = for i in /path/*.txt; do cat $i; done

二 Spooling Directory Source

这种source允许你通过往指定目录里防止文件的方式写入数据。这个Source会监控指定的目录是否有新文件产生,然后立即解析新文件里的事件。事件解析逻辑是可以插拔的。如果新文件的数据被读完,就被重命名为完成或者可删除。

不同于exec Source,该source是可靠的并且不会丢失数据,即使flume被重启或者杀死。为了交换这种可靠性,只有不可变的,唯一命名的文件可以放入监控目录。Flume试图检测这些问题条件,如果违反,将会失败:

1, 如果放入到监控目录的文件还在被写入,flume将在其日志文件中输出错误并停止。

2, 如果稍后重新使用了文件名,flume将在其日志里输出错误并停止处理。

为了避免上面的情况,给logs文件名加一个唯一的标识(如时间错)会很有用。

尽管数据源是有可靠性保证的,但是如果发生某些下游故障,仍然有事件出现重复。

例子:

  1. a1.channels = ch-1
  2. a1.sources = src-1
  3. a1.sources.src-1.type = spooldir
  4. a1.sources.src-1.channels = ch-1
  5. a1.sources.src-1.spoolDir = /var/log/apache/flumeSpool
  6. a1.sources.src-1.fileHeader = true

三 kafka sink

flume sink可以将数据发布到kafka一个topic。其中一个目标是将Flume与Kafka集成,以便进行基于拉式的处理系统可以处理来自各种Flume源的数据。Flume当前版本支持kafka0.9系列。Flume1.7已经不支持老版本(0.8.x)kafka。

属性名字

默认值

描述

channels

type

-

必须为: org.apache.flume.sink.kafka.KafkaSin

kafka.bootstrap.servers

-

Kafka的Broker,逗号隔开的hostname:port

kafka.topic

Defaultflume-topic

接受数据的kafka,topic

flumeBatchSize

100

一批中处理多少条消息 更大的批次可以提高吞吐量,同时增加延迟。

kafka.producer.acks

1

在考虑成功写入之前,有多少副本必须确认一条消息。 可用值为0(不等待确认),1(仅等待leader),-1(等待所有副本)将其设置为-1以避免在某些leader失败的情况下数据丢失。

useFlumeEventFormat

false

默认情况下,事件直接从事件body作为字节消息内容放到Kafka主题上。设置为true来存储events为Flume Avro二进制格式。 与相同属性的KafkaSource或者有parseAsFlumeEvent 属性的KafkaChannel一起使用,将保留任何Flume头。

defaultPartitionId

-

如果不被partitionIdHeader覆盖,配置该整形值会使得当前channel的所有消息发送到该值指定的kafka分区。默认情况,如果该值没有设置,事件将由kafka分配生成分区-包括如果指定key(或者由kafka.partitioner.class指定的分区器)

partitionIdHeader

-

设置后,sink将从事件header中获取使用此属性值命名的字段的值,并将消息发送到主题的指定分区。 如果该值表示一个无效分区,则会抛出EventDeliveryException异常。 如果标题值存在,则此设置将覆盖defaultPartitionId。

kafka.producer.security.protocol

PLAINTEXT

如果使用某种安全机制写入Kafka,则设置为SASL_PLAINTEXT,SASL_SSL或SSL。 见下文有关安全设置的更多信息。

more producer security props

如果使用SASL_PLAINTEXT,SASL_SSL或SSL,请参阅Kafka安全性以获取生产者所需的其他属性。

Other Kafka ProducerProperties

-

支持任何kafka支持的Producer属性,使用时需要加上kafka.producer.前缀, kafka.producer.linger.ms

注意:

Kafka Sink使用FlumeEventheader中的topic和key属性将事件发送到Kafka。 如果header中存在topic,则会将该事件发送到该特定topic,覆盖为sink配置的topic。 如果header中存在key,则Kafka将使用该key对topic分区之间的数据进行分区。 具有相同key的事件将被发送到相同的分区。 如果key为空,事件将被发送到随机分区。

Kafka汇也提供了key.serializer(org.apache.kafka.common.serialization.StringSerializer)和

value.serializer(org.apache.kafka.common.serialization.ByteArraySerializer)。 不建议修改这些参数。

下面给出一个Kafka sink的配置示例。 以前缀kafka.producer开始的属性Kafka生产者。 创建Kafka生产者时传递的属性不限于本例中给出的属性。 也可以在这里包含您的自定义属性,并通过作为方法参数传入的Flume Context对象在预处理器中访问它们。它们通过作为方法参数传入的Flume Context对象在预处理器内部。

实例一:监控文件,写入kafka

  1. kafkasink配置
  2. ## define agent
  3. a1.sources = r1
  4. a1.channels = c1
  5. a1.sinks = k1
  6. ## define sources
  7. a1.sources.r1.channels = c1
  8. a1.sources.r1.type = exec
  9. a1.sources.r1.command = tail -f /opt/logs.txt
  10. a1.sources.r1.shell = /bin/bash -c
  11. ## define channels
  12. a1.channels.c1.type = memory
  13. a1.channels.c1.capacity = 1000
  14. a1.channels.c1.transactionCapacity = 100
  15. ##sinks
  16. a1.sinks.k1.channel = c1
  17. a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
  18. a1.sinks.k1.kafka.topic = mytopic
  19. a1.sinks.k1.kafka.bootstrap.servers = localhost:9092
  20. a1.sinks.k1.kafka.flumeBatchSize = 20
  21. a1.sinks.k1.kafka.producer.acks = 1
  22. a1.sinks.k1.kafka.producer.linger.ms = 1
  23. a1.sinks.ki.kafka.producer.compression.type = snappy
  24. 启动kafka
  25. zkServer.sh start
  26. nohup /opt/modules/kafka_2.11-0.11.0.1/bin/kafka-server-start.sh /opt/modules/kafka_2.11-0.11.0.1/config/server.properties >/dev/null 2>&1 &
  27. flume启动
  28. bin/flume-ng agent --conf conf --name a1 --conf-file conf/kafkasink.properties -Dflume.root.logger=INFO,console
  29. 消费者启动
  30. 1,从上次偏移启动
  31. bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic mytopic
  32. 2,从头消费
  33. bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic mytopic --from-beginning

四 kafka Source

Kafka Source是一个Apache Kafka 消费者,它从Kafka主题中读取消息。 如果您有多个Kafka source运行,您可以使用相同的消费者组配置它们,以便于每个kafka Source实例消费单独的一组partition数据。

属性名称

默认值

描述

channels

kafka.bootstrap.servers

-

Kafka Broker列表

kafka.consumer.group.id

flume

消费者组的唯一标识,多个Source设置相同的id,表示它们同属于相同的消费者组。

kafka.topics

-

逗号分隔的topic列表

kafka.topics.regex

-

正则的方式定义订阅的topic。优先级高于kafka.topics,会覆盖kafka.topics假如同时配置的话。

batchSize

1000

一个批次写入Channel的最大消息

batchDurationMillis

1000

一个批次消息发送给Channel的最大延迟。Time和size任意一个达到界限都会立即发送消息。

backoffSleepIncrement

1000

Kafka topic为空时触发的初始和增量等待时间。 等待周期将会减少对Kafka topic的pinging攻击。 一秒钟是在用例中的理想选择,但对于有拦截器的低延迟操作可能需要较低的值。

maxBackoffSleep

5000

Kafka topic为空的时候,最大等待时间,5s是理想的选择。但是带有拦截器的低延迟操作可能需要更小的值。

useFlumeEventFormat

false

默认从kafka Topic取的消息是event body。设置为true将为以Flume Avro binary格式读取event。与相同属性的KafkaSource或者有parseAsFlumeEvent 属性的KafkaChannel一起使用,将保留任何Flume头。

migrateZookeeperOffsets

true

当找不到Kafka存储的偏移量时,在Zookeeper中查找偏移量并将它们提交给Kafka。 这应该是ture,以支持从旧版本的Flume无缝的Kafka客户端迁移。 一旦迁移,这可以设置为false,但通常不需要。 如果找不到Zookeeper偏移量,则Kafka配置kafka.consumer.auto.offset.reset定义如何处理偏移量。

kafka.consumer.security.protocol

PLAINTEXT

如果使用某种安全机制写入Kafka,则设置为SASL_PLAINTEXT,SASL_SSL或SSL。 见下文有关安全设置的更多信息。

more consumer security props

如果使用SASL_PLAINTEXT,SASL_SSL或SSL,请参阅Kafka安全性以获取消费者所需的其他属性。

Other Kafka ConsumerProperties

Kafka 消费者其它配置可以 以kafka.consumer.为前缀进行设置,例如:kafka.consumer.auto.offset.rese

注意:

kafka Source覆盖了两个kafka参数: auto.commit.enable 被source默认配置未false。Kafka source确保的是至少一次消费语义。当kafka Source启动的时候,消息会被重复消费。Kafka source也提供默认值

为key.deserializer(org.apache.kafka.common.serialization.StringSerializer)和

value.deserializer(org.apache.kafka.common.serialization.ByteArraySerializer)。 不建议修改这些参数。

五 hdfs sink讲解

该sink会将数据写入hdfs,它目前支持创建文本和序列文件,这两种文件格式都支持压缩。可以根据所用时间,数据大小或事件数量定期滚动文件(关闭当前文件并创建一个新文件)。它还通过诸如时间戳或发生事件的机器时间等属性对数据进行存储分桶/分区。HDFS目录路径可能包含格式化转义序列,它们将被HDFSsink替换以生成存储事件的目录/文件名。使用此sink需要安装hadoop,以便Flume可以使用Hadoop jars与HDFS集群进行通信。请注意,需要支持sync()调用的Hadoop版本。

以下是支持的转义序列:

别号

描述

%{host}

替换名为“host”的事件标题的值。 任意标题名称被支持。

%t

Unix时间以毫秒为单位

%a

本地的星期短名称(Mon, Tue, ...)

%A

本地的星期全名称(Monday, Tuesday, ...)

%b

本地月份短名称(Jan, Feb, ...)

%B

本地月份全名称(January, February, ...)

%c

本地日期和时间(Thu Mar 3 23:05:25 2005)

%d

月份中的日期(01,02,03..)

%e

月份中的日期,没有填充(1,2,3..)

%D

日期,类似: %m/%d/%y

%H

hour (00..23)

%I

hour (01..12)

%j

day of year (001..366)

%k

hour ( 0..23)

%m

month (01..12)

%n

month without padding (1..12)

%M

minute (00..59)

%p

locale’s equivalent of am or pm

%s

seconds since 1970-01-01 00:00:00 UTC

%S

second (00..59)

%y

last two digits of year (00..99)

%Y

year (2010)

%z

+hhmm numeric timezone (for example, -0400)

%[localhost]

替换agent正在运行的主机的主机名

%[IP]

替换运行agent的主机的IP地址

%[FQDN]

替换运行代理程序的主机的规范主机名

正在使用的文件的名称将在最后包含“.tmp”。 文件关闭后,该扩展名将被删除。 这允许排除目录中的部分完整文件。

注意:对于所有与时间相关的转义序列,在事件的header中必须存在一个带有“timestamp”key的header(除非hdfs.useLocalTimeStamp被设置为true)。 一种自动添加的方法是使用TimestampInterceptor。

实例二:Kafka Source 和 hdfs sink

  1. kafka source配置
  2. ## define agent
  3. a1.sources = s1
  4. a1.channels = c1
  5. a1.sinks = k1
  6. ## define sources
  7. a1.sources.s1.type = org.apache.flume.source.kafka.KafkaSource
  8. a1.sources.s1.channels = c1
  9. a1.sources.s1.batchSize = 5000
  10. a1.sources.s1.batchDurationMillis = 2000
  11. a1.sources.s1.kafka.bootstrap.servers = localhost:9092
  12. a1.sources.s1.kafka.topics = mytopic
  13. a1.sources.s1.kafka.consumer.group.id = kafka2hdfs
  14. ## define channels
  15. a1.channels.c1.type = memory
  16. a1.channels.c1.capacity = 1000
  17. a1.channels.c1.transactionCapacity = 100
  18. ##sinks
  19. a1.channels = c1
  20. a1.sinks = k1
  21. a1.sinks.k1.type = hdfs
  22. a1.sinks.k1.channel = c1
  23. a1.sinks.k1.hdfs.path = hdfs://Luffy.OnePiece.com:8020/flume/events/%y-%m-%d/%H%M/
  24. a1.sinks.k1.hdfs.filePrefix = flumeData
  25. a1.sinks.k1.hdfs.fileSuffix = .log
  26. sinks.k1.hdfs.round = true
  27. a1.sinks.k1.hdfs.roundValue = 10
  28. a1.sinks.k1.hdfs.roundUnit = minute
  29. a1.sinks.k1.hdfs.useLocalTimeStamp = true
  30. a1.sinks.k1.hdfs.rollInterval = 600
  31. a1.sinks.k1.hdfs.rollSize = 268435456
  32. a1.sinks.k1.hdfs.rollCount = 0
  33. a1.sinks.k1.hdfs.batchSize = 1000
  34. a1.sinks.k1.hdfs.fileType = DataStream
  35. a1.sinks.k1.hdfs.writeFormat = Text
  36. a1.sinks.k1.hdfs.idleTimeout = 60
  37. a1.sinks.k1.hdfs.threadsPoolSize= 1
  38. a1.sinks.k1.hdfs.callTimeout= 30000
  39. 启动kafka
  40. zkServer.sh start
  41. nohup /opt/modules/kafka_2.11-0.11.0.1/bin/kafka-server-start.sh /opt/modules/kafka_2.11-0.11.0.1/config/server.properties >/dev/null 2>&1 &
  42. flume启动
  43. bin/flume-ng agent --conf conf --name a1 --conf-file conf/kafka2hdfs.properties -Dflume.root.logger=INFO,console
  44. 生产者启动
  45. bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mytopic

注意事项:

1,写入hdfs需要进行如下操作,将core-site.xml和hdfs-site.xml复制到flume的conf目录下。

2,/etc/profile要配置好,hadoop信息,使得flume找到hadoop依赖信息。

  1. export JAVA_HOME=/opt/modules/jdk1.8.0_121
  2. export HADOOP_HOME=/opt/modules/hadoop-2.7.4/
  3. export HADOOP_PREFIX=$HADOOP_HOME
  4. export HADOOP_MAPRED_HOME=$HADOOP_HOME
  5. export HADOOP_YARN_HOME=$HADOOP_HOME
  6. export HADOOP_COMMON_HOME=$HADOOP_HOME
  7. export HADOOP_HDFS_HOME=$HADOOP_HOME
  8. export YARN_HOME=$HADOOP_HOME
  9. export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
  10. export YARN_CONF_DIR=$HADOOP_HOME/etc/hadoop

推荐阅读:

1,Hadoop伪分布式集群安装部署

2,大数据基础系列之JAVA引用详解

3,Spark源码系列之Standalone模式下Spark应用的整个启动过程

4,Spark调优系列之硬件要求

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2017-11-15,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 浪尖聊大数据 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档