记录Flume的Error while trying to hflushOrSync异常处理流程
1
问题描述
业务场景
采用Flume消费Kafka的一个topic下沉到HDFS目录,作为一个Hive的外部表
tier1.sources = testSource
#flume测试配置
#source
tier1.sources.testSource.channels = testChannel
tier1.sources.testSource.type=org.apache.flume.source.kafka.KafkaSource
tier1.sources.testSource.kafka.bootstrap.servers=xx1:9092,xx2:9092,xx3:9092
tier1.sources.testSource.kafka.topics=test-data
tier1.sources.testSource.kafka.consumer.group.id=flume-test-data
tier1.sources.testSource.kafka.batchSize = 50000
tier1.sources.testSource.kafka.batchDurationMillis = 2000
#channel
tier1.channels.testChannel.type=memory
tier1.channels.testChannel.capacity=150000
tier1.channels.testChannel.transactionCapacity=50000
#sink
tier1.sinks.testSink.type=hdfs
tier1.sinks.testSink.channel = testChannel
tier1.sinks.testSink.hdfs.path=hdfs://nameservice/test-data/%Y/%m/%d
tier1.sinks.testSink.hdfs.filePrefix = uaes
tier1.sinks.testSink.hdfs.writeFormat=Text
tier1.sinks.testSink.hdfs.fileType=DataStream
tier1.sinks.testSink.hdfs.callTimeout=60000
tier1.sinks.testSink.hdfs.batchSize = 10000
tier1.sinks.testSink.hdfs.rollSize = 536750000
tier1.sinks.testSink.hdfs.rollCount = 0
tier1.sinks.testSink.hdfs.rollInterval = 7200
2
问题排查
通过查看不同flume的agent日志发现,同名的文件被不同的flume agent打开,在文件第二次打开后,先前打开的agent拥有的token就失效了,因此无法关闭它,就会不断的报错:Error while trying to hflushOrSync!
查找到的不同flume agent的日志如下:
flume-agent1:
WARN org.apache.flume.sink.hdfs.BucketWriter:
Closing file: hdfs://nameservice/test-data/2020/04/29/uaes.1588089601760.tmp failed.
Will retry again in 180 seconds.
flume-agent2:
WARN org.apache.flume.sink.hdfs.BucketWriter:
Closing file: hdfs://nameservice/test-data/2020/04/29/uaes.1588089601760.tmp failed.
Will retry again in 180 seconds.
查看之前的flume配置文件发现,每一个flume-agent配置的hdfsSink是完全一样的,每个flume-agent读取的source相同,有很大概率会出现多个flume-agent同时写同名文件,导致部分flume-agent无法继续。
#主要是这个配置导致的写同名文件
tier1.sinks.testSink.hdfs.filePrefix = uaes
3
解决方案
使用Flume的Host拦截器为各个agent处理的文件打上主机信息。
flume的host拦截器使用官方连接:
https://flume.apache.org/FlumeUserGuide.html#host-interceptor
样例配置:
a1.sources = r1
a1.channels = c1
#配置拦截器
a1.sources.r1.interceptors = i1
#给定拦截器类型
a1.sources.r1.interceptors.i1.type = host
#配置ip还是host
a1.sources.r1.interceptors.i1.useIp = false
修改hdfs Sink的filePrefix配置,添加host,主要是添加一个flume的interceptors去添加flume-agent机器的ip,其他配置不变
# 核心配置项
# source增加配置
tier1.sources.uaesSource.interceptors = testincp
tier1.sources.uaesSource.interceptors.testincp.type = host
# sink增加配置
tier1.sinks.uaesSink.hdfs.filePrefix = %{host}
修改flume-agent的配置后,刷新配置,运行一段时间查看日志,故障消失。
查看hdfs生成的文件,发现每个文件都会有对应的flume-agent ip信息,故障消除。