01
Flume起源
Flume最早是Cloudera开发的实时日志收集系统,最早的时候Flume的版本称为Flume OG(original generation),随着功能的扩展和代码的重构,随之出现了我们熟知的Flume NG(next generation),后来也捐给了Apache基金会成为了Apache的顶级项目。Apache Flume 是一个分布式、高可靠(事务)、高可用(failover)的用来收集、聚合、转移不同来源的大量日志数据到中央数据仓库的工具。
02Flume架构
Event:Flume定义的一个数据流传输的最小单元,数据被封装到Event中往后传输。Event由Header和Byte Payload组成:
Agent:Flume最小的独立运行单位,一个Agent就是一个Flume的实例,本质是一个JVM进程,该JVM进程控制Event数据流从外部日志生产者那里传输到目的地(或者是下一个Agent)。同时,一个Agent就对应一个配置文件。
Source:对接输入源,监控外部数据源的数据,传输给Channel。
Source类型:
Channel类型:
Sink类型:
03
Flume事务流程
Channel使用被动存储机制,依靠Source完成数据写入(推送)、依靠Sink完成数据读取(拉取)。
Flume 推送事务流程
Flume拉取事务流程
04
Flume参数调优
05
Flume的一个bug插曲
有一次我使用file作为channel重启时候碰见一个错误,长这样:
ERROR org.apache.flume.SinkRunner: Unable to deliver event. Exception follows.
java.lang.IllegalStateException: Channel closed [channel=fileChannel]. Due to java.io.EOFException: null
at org.apache.flume.channel.file.FileChannel.createTransaction(FileChannel.java:340)
at org.apache.flume.channel.BasicChannelSemantics.getTransaction(BasicChannelSemantics.java:122)
at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:368)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.EOFException
at java.io.RandomAccessFile.readInt(RandomAccessFile.java:827)
at java.io.RandomAccessFile.readLong(RandomAccessFile.java:860)
at org.apache.flume.channel.file.EventQueueBackingStoreFactory.get(EventQueueBackingStoreFactory.java:80)
at org.apache.flume.channel.file.Log.replay(Log.java:426)
at org.apache.flume.channel.file.FileChannel.start(FileChannel.java:290)
at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:251)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
... 1 more
后来查到这是Flume的一个bug:
https://issues.apache.org/jira/browse/FLUME-2282
当时这个业务不是必须保证不丢数据,我就清空了下面两个目录:
/channel/flume/collector/checkpoint
/channel/flume/collector/data
然后重启大法,成功了!
面试官,如果要保证不丢数据应该怎么处理呀?
06
Flume展望
面试官大人,对于Flume我还有一点自己的理解和疑惑不知道怎么解决。
1、Flume的监控感觉是个问题,怎么便捷的进行数据采集和传输的准确性呢?
2、多个Flume agent怎么便捷高效的管理呢?
3、有没有能够替代Flume的数据采集工具呢?
最后,这次面试还是没通过,现在还不知道为什么?
end
历史好文推荐