我创建了一个简单的NiFi管道,它从一个卡夫卡主题(使用ConsumeKafka)读取数据流,并将其写入HDFS (使用PutHDFS)。目前,我看到很多在HDFS上创建的小文件。新文件大约每秒钟创建一次,有些只有一两条记录。
我希望更少、更大的文件被写入HDFS。
我在ConsumeKafka中有以下设置
Message Demarcator = <new line>
Max Poll Records = 10000
Max Uncommitted Time = 20s
在过去,我使用的是Flume,而不是Nifi,它有batchSize和batchDurationMillis,
我有一个扩展MergeContent进程的自定义处理器,当NiFi启动时,我在日志中看到以下错误:
2016-09-09 18:17:00,607 ERROR [main] org.apache.nifi.NiFi Failure to launch NiFi due to java.util.ServiceConfigurationError: org.apache.nifi.processor.Processor: Provider org.apache.nifi.processors.standard.DetectDuplicate could not be instantiated
j
我正在尝试使用Apache nifi中的Putmongo处理器将json数组插入到mongodb中。但是我得到了一个错误: error: current bson type is an array and not a document 下面是我的json: [{"Name":"computer","TotalRevenue":15000},
{"Name":"music","TotalRevenue":1500},
{"Name":"space","Tota
在这种情况下,我希望使用SQL查询将数据作为每月的CSV存储到SFTP服务器中。
例如,我的查询是:
select fooId, bar from FooBar
where query_date>=20180101 and query_date<20180201 --(for the month of January 2018)
我想把它作为20180101_FooBar.csv存储到我的SFTP服务器上。类似地,其他月份的其他文件遵循相同的进程,具有不同的query_date间隔。
要做的重要考虑:I have to store the *fooId* as MD5 H