注意:如果存在多个传入队列,则在一次呼叫中轮询所有队列还是仅轮询单个队列方面,行为是不确定的。话虽如此,这里描述了观察到的行为(对于NiFi 1.1.0+及之前版本)。...FlowFIle创建一个新的FlowFIle 示例说明:我们想新建一个流文件,这个流文件继承了其他流文件 方法:使用session对象中的create(parentFlowFile)方法。...此方法采用父FlowFile引用,并返回新的子FlowFile对象。新创建的FlowFile将继承父对象的除UUID以外的所有属性(attribute)。...属性是关于内容/流文件的元数据,我们在上一章看到了如何使用ExecuteScript来操作它们。流文件的内容只是字节的集合,而没有固有的结构、模式、格式等。...使用回调读取一个流文件的内容 方法:使用session对象中的read(flowFile,inputStreamCallback)方法。
Max Rows Per Flow File (每个FlowFile行数) 0 在一个FlowFile文件中的数据行数。通过这个参数可以将很大的结果集分到多个FlowFile中。...Max Rows Per Flow File (每个FlowFile行数) 0 在一个FlowFile文件中的数据行数。通过这个参数可以将很大的结果集分到多个FlowFile中。...输出的JSON编码为UTF-8编码,如果传入的FlowFile包含多个Avro记录,则转换后的FlowFile是一个含有所有Avro记录的JSON数组或一个JSON对象序列(每个Json对象单独成行)。...每个生成的FlowFile都由指定数组中的一个元素组成,并传输到关系"split",原始文件传输到关系"original"。...Conflict Resolution Strategy (冲突解决) fail replace ignore fail append 指示当输出目录中已经存在同名文件时如何处理
处理器、FlowFile、连接器和FlowFile控制器:NiFi中的四个基本概念 让我们看看它是如何工作的。 FlowFile流文件 在NiFi中,FlowFile 是在管道处理器中移动的信息包。...FlowFile的剖析-它包含数据的属性以及对关联数据的引用 FlowFile分为两个部分: • 属性:是键/值对。例如,文件名、文件路径和唯一标识符是标准属性。...下图总结了带有压缩FlowFiles内容的处理器的示例。 ? NiFi中写时复制-修改FlowFile后,原始内容仍存在于存储库中。 可靠性 NiFi声称是可靠的,实际上如何?...对于系统中当前存在的每个FlowFile,FlowFile存储库存储: • FlowFile属性 • 指向位于FlowFile存储库中的FlowFile内容的指针 • FlowFile的状态。...FlowFile优先级 NiFi中的连接器的优先级是高度可配置的。您可以选择如何 在队列中确定FlowFiles的优先级 ,以决定下一步要处理的文件。 在可用的可能性中,例如,先进先出顺序-FIFO。
还可以设置队列长度,大小,使系统具有恢复能力。...进程的StdOut被重定向,使得写入StdOut的内容成为出站FlowFile的内容。该处理器是源处理器 - 其输出预计将生成一个新的FlowFile,并且系统调用预期不会接收输入。...写入StdOut的内容成为hte出站FlowFile的内容。该处理器不能使用源处理器 - 它必须被馈送进入FlowFiles才能执行其工作。...ListHDFS / FetchHDFS:ListHDFS监视HDFS中用户指定的目录,并发出一个FlowFile,其中包含遇到的每个文件的文件名。...然后,该处理器允许将这些元素分割成单独的XML元素。 UnpackContent:解压缩不同类型的归档格式,如ZIP和TAR。存档中的每个文件随后作为单个FlowFile传输。
让我们看看它是如何工作的。 FlowFile 在NIFI中,FlowFile是在pipeline处理器中移动的信息包。 ? FlowFile分为两个部分: Attributes,即键/值对。...例如,文件名,文件路径和唯一标识符是标准属性。 Content,对字节流的引用构成了FlowFile内容。 FlowFile不包含数据本身,否则会严重限制pipeline的吞吐量。...在pipeline的每个步骤中,在对流文件进行修改之前,首先将其以预写日志的方式(write-ahead log)记录在FlowFile Repository中。...对于系统中当前存在的每个FlowFile,FlowFile Repository存储: FlowFile属性 指向FlowFile内容的指针 FlowFile的状态。...优先处理FlowFiles NIFI中的Connections是高度可配置的。你可以选择如何在队列中确定FlowFiles的优先级,以确定接下来要处理的文件。
”选项进行配置:关于“GenerateFileFile”的“PROPERTIES”配置选项解释如下:配置项默认值允许值描述File Size(文件大小)0 B生成每个FlowFile文件的大小。...Evaluation Mode(评估模式)Line-by-LineLine-by-LineEntire text对每一行单独进行"替换策略"(Line-by-Line);或将整个文件缓冲到内存中(Entire...PrependAppendRegex ReplaceLiteral ReplaceAlways Replace 指定如何替换FlowFile内容的策略。...Evaluation Mode(评估模式) Line-by-Line Line-by-LineEntire text 对每一行单独进行"替换策略"(Line-by-Line);或将整个文件缓冲到内存中...Literal Replace:当"Search Value"值为一个搜索值时,使用"Replacement Value"替换值替换匹配项。
当一个FlowFile被发送到某个Relationship时,它就被加到了对应的COnnect队列里。...这种设计模式带来了很多好处,帮助NiFi成为构建强大的可扩展数据流高效的平台,包括:适用于可视化的创建和管理Processor。本质上是异步的,即使在处理和流量波动时也允许非常高的吞吐和自然缓冲。...这些扩展也是运行在JVM中的。FlowFile Repository(FlowFile 存储库):FlowFile Repository 负责保存在目前活动流中FlowFile的状态。...Content Repository(内容存储库):Content Repository负责保存在目前活动流中FlowFile的实际字节内容。其功能实现是可插拔的。...指定主节点是为了运行单节点任务,这种任务不适合在集群中运行的组件,例如:读取单节点文件,如果每个节点都读取数据文件会造成重复读取,这时可以配置主节点来指定从某个节点上执行。
每次为FlowFile发生事件(创建,分叉,克隆,修改FlowFile等)时,都会创建一个新的Provenance事件。这个出处事件是流文件的快照,因为它看起来就是在那个时间点存在的流。...例如,即使数据本身无法访问,用户仍然能够看到数据的唯一标识符、文件名(如果适用)、何时接收、从何处接收、如何操作、发送到何处等等。...这种分批编制索引的方法意味着无法立即提供Provenance事件以进行查询,但是作为回报,这大大提高了性能,因为提交事务和建立索引是非常昂贵的任务。 一个单独的线程负责处理出处日志的删除。...然后,一个单独的线程将从队列中提取此信息,并在Lucene中对数据进行索引。...命名Provenance Event Log File的名称应使文件名反映文件中第一个事件的事件ID。
CDC事件包括INSERT,UPDATE,DELETE操作,事件按操作发生时的顺序输出为单独的FlowFile文件。...如果处理器状态中存在binlog文件名和位置值,则忽略此属性的值。...4).如果处理器State中不存在binlog数据,并指定binlog文件名和位置,此值设置为false意味着从指定binlog尾部开始读取数据。...▪Route to 'matched' if any matches 至少有一个用户定义的表达式求值为'true',才能认为FlowFile是匹配的。...默认false指的是如果在处理FlowFile时发生错误,则FlowFile将根据错误类型路由到“failure”或“retry”关系,处理器继续处理下一个FlowFile。
处理器Connection连接一、查看队列中的FlowFile单独启动“GenerateFlowFile”处理器后,可以观察到对应的Connection连接队列中有数据,在Connection连接上右键...“List Queue”可以查看队列中的FlowFile信息:二、查看FlowFile自定义属性值队列中的FlowFile属性中还可以查看自定义的属性信息,例如:在“GenerateFlowFile”...处理器中设置自定义属性“mykey”,对应的value值设置为“myvalue”:单独启动“GenerateFlowFile”生产部分数据,查看队列中的FlowFile属性如下:三、Connection...时间可以删除队列中无法及时处理的数据,默认设置为0,数据永远不会过期,当设置了一个过期时间,在Connect连接上可以看到一个小时钟图标。...Select Prioritization"优先级:可以指定如何对队列中的数据进行优先级排序以便处理优先级高的数据。
现在我们要自定义一个Processor,假设它叫MyProcessor.java,那么这个Java文件写在哪里呢?...自定义一个独立的子Moudle,子Moudle里面有两个子项目:processors和processor-nar子项目。以NIFI源码的amqp为例 ?...一个Processor的调度方法对应的就是onTrigger,在这里实现对流文件数据的处理。...常见的两个参数ProcessContext可以拿到当前Processor的属性配置,ProcessSession用来读写流文件内容、流文件属性。...每一个Processor的Moudle,在resource下都定义了一个org.apache.nifi.processor.Processor的文件,把你自定义Processor的全类名写上去就可以的。
一、配置“TailFile”处理器 “TailFile”处理器作用是"Tails"一个文件或文件列表,在文件写入文件时从文件中摄取数据。监控的文件为文本格式,当写入新行时会接收数据。...▪Multiple files single file只会tail一个文件, multiple file将tail一个文件列表。...二、配置“EvaluateJsonPath”处理器 “EvaluateJsonPath”处理器根据FlowFile的内容计算一个或多个JsonPath表达式。...Path Not Found Behavior (未找到路径) ignore ▪warn ▪ignore 指示在将Destination设置为"flowfile-attribute"时如何处理丢失的.../root/test/jsonfile”文件中写入数据时,这时“EvaluateJsonPath”一个FlowFile中会有多条json数据,当获取json属性时,只会获取第一条json对应的属性。
Deeper View: FlowFiles in Memory and on Disk 术语“FlowFile”有点用词不当。这会使人相信每个流文件对应于磁盘上的一个文件,但事实并非如此。...FlowFile属性存在于两个主要位置:上面解释的预写日志和工作内存中的hash map。此hash map引用了流中正在使用的所有流文件。此映射引用的对象与处理器使用的对象相同,并保存在连接队列中。...当FlowFile被交换出去时,FlowFile repo会收到通知,并保存交换文件的列表。当系统被检查点时,快照包含一个用于交换文件的部分。当交换文件被交换回时,流文件被添加回哈希映射。...RepositoryRecord 表示FlowFile的抽象,可用于跟踪FlowFile的更改状态,以便存在与存储库的事务性 QueueProvider 提供一个 FlowFileQueue的集合,该集合表示当前流中的所有队列...至于写文件时操作系统刷新缓冲区我们暂时不用管,只看代码层级的日志数据是如何写到journal文件里的 @Override public void updateRepository(final Collection
NiFi FlowFiles由FlowFile内容和FlowFile属性/元数据组成。FlowFile内容永远不会保存在Connection中。...要了解这些排队的FlowFile如何影响性能和堆使用情况,让我们首先关注上图底部的关于"Connection Queue"的剖析。...现在,我们知道如何控制“connection queue”的整体大小,下面将其分解为几个部分: ACTIVE QUEUE:FlowFiles进入到一个Connection中将首先被放置在active队列中...swap队列也保存在堆中,并且硬编码为最大10000个FlowFiles。如果活动队列中的空间已释放并且不存在交换文件,则交换队列中的FlowFiles将直接移到活动队列中。...一些处理器一次处理一个FlowFile,另一些处理器处理批量的FlowFile,还有一些处理器可能处理传入连接队列中的每个FlowFile。
描述 该处理器根据配置将二进制编码的Avro数据文件分割成更小的文件。输出策略决定split后的文件是Avro数据文件,还是只保留Avro记录(在FlowFile属性中包含元数据信息 )。...如果输出策略是Bare Record,则元数据将存储为FlowFile属性,否则将存储在数据文件头中。 Record 分解传入数据文件的策略。...如果输出策略是Bare Record,则元数据将存储为FlowFile属性,否则将存储在数据文件头中。...连接关系 名称 描述 failure 如果一个流文件因为某种原因无法处理(例如,流文件不是有效的Avro),它将被路由到这个关系 original 被分割的原始流文件。...写属性 名称 描述 fragment.identifier 从同一个父流文件生成的所有分割流文件都将为该属性添加相同的UUID(随机生成) fragment.index 一个增长的数字,表示从单个父流文件创建的分割流文件的顺序
Content Repo的核心设计是将FlowFile的内容保存在磁盘上,并仅在需要时才将其读入JVM内存。这使NiFi可以处理大量小的对象,而无需生产者和消费者处理器将完整的对象保存在内存中。...与JVM Heap具有垃圾回收过程一样,当需要空间时可以回收无法访问的对象,在NiFi中存在一个专用线程来分析内容存储库中未使用的内容。将FlowFile的内容标识为不再使用后,它将被删除或存档。...如果在nifi.properties中启用了归档,则FlowFile的内容将一直存在于Content Repo中,直到过期(一定时间后删除)或由于Content Repo占用太多空间而将其删除。...为了跟踪FlowFile的内容,FlowFile具有一个Content Claim对象。该Content Claim声明引用了包含内容、文件中内容的偏移量和内容长度的Resource Claims。...完成这一抽象层(Resource Claims)是为了确保并非每个FlowFile的内容在磁盘上都一一对应一个文件。不变性的概念是实现这一点的关键。
此处理器应将文件从一个位置移动到另一个位置,而不是用于复制数据。GetHDFS:监视HDFS中用户指定的目录。每当新文件进入HDFS时,它将被复制到NiFi并从HDFS中删除。...此处理器应将文件从一个位置移动到另一个位置,而不是用于复制数据。如果在集群中运行,此处理器需仅在主节点上运行。GetKafka:从Apache Kafka获取消息,封装为一个或者多个FlowFile。...PutKafka:将FlowFile的内容作为消息发送到Apache Kafka,可以将FlowFile中整个内容作为一个消息也可以指定分隔符将其封装为多个消息发送。...PutHDFS : 将FlowFile数据写入Hadoop分布式文件系统HDFS。四、数据库访问ExecuteSQL:执行用户定义的SQL SELECT命令,将结果写入Avro格式的FlowFile。...ExtractText:用户提供一个或多个正则表达式,然后根据FlowFile的文本内容对其进行评估,然后将结果值提取到用户自己命名的Attribute中。
监控日志文件生产到Kafka案例:监控某个目录下的文件内容,将消息生产到Kafka中。此案例使用到“TailFile”和“PublishKafka_1_0”处理器。...一、配置“TailFile”处理器创建“TailFile”处理器并配置:注意:以上需要在NiFi集群中的每个节点上创建“/root/test/logdata”文件,“logdata”是文件...发送的内容可以是单独的FlowFile,也可以通过用户指定分隔符分割的FlowFile内容。...对应Kafka的'acks'属性。可以配置的项如下:Best Effort (尽力交付,相当于ack=0):在向Kafka节点写出消息后,FlowFile将被路由到成功,而不需要等待响应。...三、运行测试1、启动Kafka集群,启动NiFi处理流程2、向/root/test/logdata文件中写入数据并保存向NiFi集群中的其中一台节点的“logdata”中写入以下数据即可[root@node1
) 连接关系 名称 描述 sucess 所有成功的流文件都被路由到这个关系 set state fail 如果处理器正在有状态地运行,并且在向流文件添加属性后没有设置状态,那么流文件将被路由到这个关系...写属性 Name Description See additional details 该处理器可以编写或删除零个或多个属性 状态管理 Scope Description LOCAL 提供一个选项,不仅将值存储在流文件中...一种方法是“基本用法”; 默认更改通过处理器的每个FlowFile的匹配的属性。第二种方式是“高级用法”; 可以进行条件属性更改,只有在满足特定条件时才会影响FlowFile。...也就是说,“删除属性表达式”仅适用于输入FlowFile中存在的属性,如果属性是由此处理器添加的,则“删除属性表达式”将不会匹配到它。 示例说明 1:基本用法增加一个属性 ? 结果输出: ?...2:高级用法,添加规则条件,符合条件时update指定的属性值 点击ADVANCED ? 添加一个rule,如果id的值等于11,就修改id的值为22 ? 结果输出: ?
描述 PutDatabaseRecord处理器使用指定的RecordReader从传入的流文件中读取(可能是多个,说数组也成)记录。这些记录将转换为SQL语句,并作为一个批次执行。...如果发生任何错误,则将流文件路由到failure或retry,如果执行成功,则将传入的流文件路由到success。...默认情况下(false),如果在处理FlowFile时发生错误,则FlowFile将根据错误类型路由到“failure”或“retry”关系,处理器可以继续使用下一个FlowFile。...Field ContainingSQL指的是上游来的FlowFile中的一个字段,这个字段值是一个可执行的SQL。...如果存在,我们就放到一个集合里存起来。遍历结束后,我们再判断这个集合有没有值,如果是空的,就轮到Unmatched Column Behavior了。
领取专属 10元无门槛券
手把手带您无忧上云