首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

Apache NIFI ExecuteScript组件脚本使用教程

注意:如果存在多个传入队列,则在一次呼叫中轮询所有队列还是仅轮询单个队列方面,行为是不确定。话虽如此,这里描述了观察到行为(对于NiFi 1.1.0+及之前版本)。...FlowFIle创建一个FlowFIle 示例说明:我们想新建一个文件,这个流文件继承了其他流文件 方法:使用session对象中create(parentFlowFile)方法。...此方法采用父FlowFile引用,并返回新FlowFile对象。新创建FlowFile将继承父对象除UUID以外所有属性(attribute)。...属性是关于内容/流文件元数据,我们在上一章看到了如何使用ExecuteScript来操作它们。流文件内容只是字节集合,而没有固有的结构、模式、格式等。...使用回调读取一个文件内容 方法:使用session对象中read(flowFile,inputStreamCallback)方法。

5.2K40

大数据NiFi(十八):离线同步MySQL数据到HDFS

Max Rows Per Flow File (每个FlowFile行数) 0 在一个FlowFile文件数据行数。通过这个参数可以将很大结果集分到多个FlowFile中。...Max Rows Per Flow File (每个FlowFile行数) 0 在一个FlowFile文件数据行数。通过这个参数可以将很大结果集分到多个FlowFile中。...输出JSON编码为UTF-8编码,如果传入FlowFile包含多个Avro记录,则转换后FlowFile一个含有所有Avro记录JSON数组或一个JSON对象序列(每个Json对象单独成行)。...每个生成FlowFile都由指定数组中一个元素组成,并传输到关系"split",原始文件传输到关系"original"。...Conflict Resolution Strategy (冲突解决) fail replace ignore fail append 指示当输出目录中已经存在同名文件如何处理

4.5K91
您找到你想要的搜索结果了吗?
是的
没有找到

Apache Nifi工作原理

处理器、FlowFile、连接器和FlowFile控制器:NiFi中四个基本概念 让我们看看它是如何工作FlowFile文件 在NiFi中,FlowFile 是在管道处理器中移动信息包。...FlowFile剖析-它包含数据属性以及对关联数据引用 FlowFile分为两个部分: • 属性:是键/值对。例如,文件名、文件路径和唯一标识符是标准属性。...下图总结了带有压缩FlowFiles内容处理器示例。 ? NiFi中写时复制-修改FlowFile后,原始内容仍存在于存储库中。 可靠性 NiFi声称是可靠,实际上如何?...对于系统中当前存在每个FlowFileFlowFile存储库存储: • FlowFile属性 • 指向位于FlowFile存储库中FlowFile内容指针 • FlowFile状态。...FlowFile优先级 NiFi中连接器优先级是高度可配置。您可以选择如何 在队列中确定FlowFiles优先级 ,以决定下一步要处理文件。 在可用可能性中,例如,先进先出顺序-FIFO。

2.9K10

Apache NiFi安装及简单使用

还可以设置队列长度,大小,使系统具有恢复能力。...进程StdOut被重定向,使得写入StdOut内容成为出站FlowFile内容。该处理器是源处理器 - 其输出预计将生成一个FlowFile,并且系统调用预期不会接收输入。...写入StdOut内容成为hte出站FlowFile内容。该处理器不能使用源处理器 - 它必须被馈送进入FlowFiles才能执行其工作。...ListHDFS / FetchHDFS:ListHDFS监视HDFS中用户指定目录,并发出一个FlowFile,其中包含遇到每个文件文件名。...然后,该处理器允许将这些元素分割成单独XML元素。 UnpackContent:解压缩不同类型归档格式,如ZIP和TAR。存档中每个文件随后作为单个FlowFile传输。

5.7K21

Apache NIFI 讲解(读完立即入门)

让我们看看它是如何工作FlowFile 在NIFI中,FlowFile是在pipeline处理器中移动信息包。 ? FlowFile分为两个部分: Attributes,即键/值对。...例如,文件名,文件路径和唯一标识符是标准属性。 Content,对字节流引用构成了FlowFile内容。 FlowFile不包含数据本身,否则会严重限制pipeline吞吐量。...在pipeline每个步骤中,在对流文件进行修改之前,首先将其以预写日志方式(write-ahead log)记录在FlowFile Repository中。...对于系统中当前存在每个FlowFileFlowFile Repository存储: FlowFile属性 指向FlowFile内容指针 FlowFile状态。...优先处理FlowFiles NIFI中Connections是高度可配置。你可以选择如何在队列中确定FlowFiles优先级,以确定接下来要处理文件

10.3K91

大数据NiFi(十五):NiFi入门案例二

”选项进行配置:关于“GenerateFileFile”“PROPERTIES”配置选项解释如下:配置项默认值允许值描述File Size(文件大小)0 B生成每个FlowFile文件大小。...Evaluation Mode(评估模式)Line-by-LineLine-by-LineEntire text对每一行单独进行"替换策略"(Line-by-Line);或将整个文件缓冲到内存中(Entire...PrependAppendRegex ReplaceLiteral ReplaceAlways Replace 指定如何替换FlowFile内容策略。...Evaluation Mode(评估模式) Line-by-Line Line-by-LineEntire text 对每一行单独进行"替换策略"(Line-by-Line);或将整个文件缓冲到内存中...Literal Replace:当"Search Value"值为一个搜索值时,使用"Replacement Value"替换值替换匹配项。

1.4K121

大数据NiFi(二):NiFi架构

一个FlowFile被发送到某个Relationship时,它就被加到了对应COnnect队列里。...这种设计模式带来了很多好处,帮助NiFi成为构建强大可扩展数据流高效平台,包括:适用于可视化创建和管理Processor。本质上是异步,即使在处理和流量波动时也允许非常高吞吐和自然缓冲。...这些扩展也是运行在JVM中FlowFile Repository(FlowFile 存储库):FlowFile Repository 负责保存在目前活动流中FlowFile状态。...Content Repository(内容存储库):Content Repository负责保存在目前活动流中FlowFile实际字节内容。其功能实现是可插拔。...指定主节点是为了运行单节点任务,这种任务不适合在集群中运行组件,例如:读取单节点文件,如果每个节点都读取数据文件会造成重复读取,这时可以配置主节点来指定从某个节点上执行。

2.1K71

Provenance存储库原理

每次为FlowFile发生事件(创建,分叉,克隆,修改FlowFile等)时,都会创建一个Provenance事件。这个出处事件是流文件快照,因为它看起来就是在那个时间点存在流。...例如,即使数据本身无法访问,用户仍然能够看到数据唯一标识符、文件名(如果适用)、何时接收、从何处接收、如何操作、发送到何处等等。...这种分批编制索引方法意味着无法立即提供Provenance事件以进行查询,但是作为回报,这大大提高了性能,因为提交事务和建立索引是非常昂贵任务。 一个单独线程负责处理出处日志删除。...然后,一个单独线程将从队列中提取此信息,并在Lucene中对数据进行索引。...命名Provenance Event Log File名称应使文件名反映文件中第一个事件事件ID。

94720

大数据NiFi(十六):处理器Connection连接

​处理器Connection连接一、查看队列中FlowFile单独启动“GenerateFlowFile”处理器后,可以观察到对应Connection连接队列中有数据,在Connection连接上右键...“List Queue”可以查看队列中FlowFile信息:​二、查看FlowFile自定义属性值队列中FlowFile属性中还可以查看自定义属性信息,例如:在“GenerateFlowFile”...处理器中设置自定义属性“mykey”,对应value值设置为“myvalue”:单独启动“GenerateFlowFile”生产部分数据,查看队列中FlowFile属性如下:三、​​​​​​​Connection...时间可以删除队列中无法及时处理数据,默认设置为0,数据永远不会过期,当设置了一个过期时间,在Connect连接上可以看到一个小时钟图标。...Select Prioritization"优先级:可以指定如何对队列中数据进行优先级排序以便处理优先级高数据。

1.4K61

大数据NiFi(十九):实时Json日志数据导入到Hive

一、配置“TailFile”处理器 “TailFile”处理器作用是"Tails"一个文件文件列表,在文件写入文件时从文件中摄取数据。监控文件为文本格式,当写入新行时会接收数据。...▪Multiple files single file只会tail一个文件, multiple file将tail一个文件列表。...二、配置“EvaluateJsonPath”处理器 “EvaluateJsonPath”处理器根据FlowFile内容计算一个或多个JsonPath表达式。...Path Not Found Behavior (未找到路径) ignore ▪warn ▪ignore 指示在将Destination设置为"flowfile-attribute"时如何处理丢失.../root/test/jsonfile”文件中写入数据时,这时“EvaluateJsonPath”一个FlowFile中会有多条json数据,当获取json属性时,只会获取第一条json对应属性。

2K91

FlowFile存储库原理

Deeper View: FlowFiles in Memory and on Disk 术语“FlowFile”有点用词不当。这会使人相信每个流文件对应于磁盘上一个文件,但事实并非如此。...FlowFile属性存在于两个主要位置:上面解释预写日志和工作内存中hash map。此hash map引用了流中正在使用所有流文件。此映射引用对象与处理器使用对象相同,并保存在连接队列中。...当FlowFile被交换出去时,FlowFile repo会收到通知,并保存交换文件列表。当系统被检查点时,快照包含一个用于交换文件部分。当交换文件被交换回时,流文件被添加回哈希映射。...RepositoryRecord 表示FlowFile抽象,可用于跟踪FlowFile更改状态,以便存在与存储库事务性 QueueProvider 提供一个 FlowFileQueue集合,该集合表示当前流中所有队列...至于写文件时操作系统刷新缓冲区我们暂时不用管,只看代码层级日志数据是如何写到journal文件 @Override public void updateRepository(final Collection

1.2K10

深入理解 Apache NIFI Connection

NiFi FlowFiles由FlowFile内容和FlowFile属性/元数据组成。FlowFile内容永远不会保存在Connection中。...要了解这些排队FlowFile如何影响性能和堆使用情况,让我们首先关注上图底部关于"Connection Queue"剖析。...现在,我们知道如何控制“connection queue”整体大小,下面将其分解为几个部分: ACTIVE QUEUE:FlowFiles进入到一个Connection中将首先被放置在active队列中...swap队列也保存在堆中,并且硬编码为最大10000个FlowFiles。如果活动队列中空间已释放并且不存在交换文件,则交换队列中FlowFiles将直接移到活动队列中。...一些处理器一次处理一个FlowFile,另一些处理器处理批量FlowFile,还有一些处理器可能处理传入连接队列中每个FlowFile

1.1K31

SplitAvro

描述 该处理器根据配置将二进制编码Avro数据文件分割成更小文件。输出策略决定split后文件是Avro数据文件,还是只保留Avro记录(在FlowFile属性中包含元数据信息 )。...如果输出策略是Bare Record,则元数据将存储为FlowFile属性,否则将存储在数据文件头中。 Record 分解传入数据文件策略。...如果输出策略是Bare Record,则元数据将存储为FlowFile属性,否则将存储在数据文件头中。...连接关系 名称 描述 failure 如果一个文件因为某种原因无法处理(例如,流文件不是有效Avro),它将被路由到这个关系 original 被分割原始流文件。...写属性 名称 描述 fragment.identifier 从同一个父流文件生成所有分割流文件都将为该属性添加相同UUID(随机生成) fragment.index 一个增长数字,表示从单个父流文件创建分割流文件顺序

56330

内容存储库原理

Content Repo核心设计是将FlowFile内容保存在磁盘上,并仅在需要时才将其读入JVM内存。这使NiFi可以处理大量小对象,而无需生产者和消费者处理器将完整对象保存在内存中。...与JVM Heap具有垃圾回收过程一样,当需要空间时可以回收无法访问对象,在NiFi中存在一个专用线程来分析内容存储库中未使用内容。将FlowFile内容标识为不再使用后,它将被删除或存档。...如果在nifi.properties中启用了归档,则FlowFile内容将一直存在于Content Repo中,直到过期(一定时间后删除)或由于Content Repo占用太多空间而将其删除。...为了跟踪FlowFile内容,FlowFile具有一个Content Claim对象。该Content Claim声明引用了包含内容、文件中内容偏移量和内容长度Resource Claims。...完成这一抽象层(Resource Claims)是为了确保并非每个FlowFile内容在磁盘上都一一对应一个文件。不变性概念是实现这一点关键。

80310

大数据NiFi(六):NiFi Processors(处理器)

此处理器应将文件一个位置移动到另一个位置,而不是用于复制数据。GetHDFS:监视HDFS中用户指定目录。每当新文件进入HDFS时,它将被复制到NiFi并从HDFS中删除。...此处理器应将文件一个位置移动到另一个位置,而不是用于复制数据。如果在集群中运行,此处理器需仅在主节点上运行。GetKafka:从Apache Kafka获取消息,封装为一个或者多个FlowFile。...PutKafka:将FlowFile内容作为消息发送到Apache Kafka,可以将FlowFile中整个内容作为一个消息也可以指定分隔符将其封装为多个消息发送。...PutHDFS : 将FlowFile数据写入Hadoop分布式文件系统HDFS。四、数据库访问ExecuteSQL:执行用户定义SQL SELECT命令,将结果写入Avro格式FlowFile。...ExtractText:用户提供一个或多个正则表达式,然后根据FlowFile文本内容对其进行评估,然后将结果值提取到用户自己命名Attribute中。

1.9K122

大数据NiFi(二十一):监控日志文件生产到Kafka

​监控日志文件生产到Kafka案例:监控某个目录下文件内容,将消息生产到Kafka中。此案例使用到“TailFile”和“PublishKafka_1_0”处理器。...一、​​​​​​​配置“TailFile”处理器创建“TailFile”处理器并配置:注意:以上需要在NiFi集群中每个节点上创建“/root/test/logdata”文件,“logdata”是文件...发送内容可以是单独FlowFile,也可以通过用户指定分隔符分割FlowFile内容。...对应Kafka'acks'属性。可以配置项如下:Best Effort (尽力交付,相当于ack=0):在向Kafka节点写出消息后,FlowFile将被路由到成功,而不需要等待响应。...三、运行测试1、启动Kafka集群,启动NiFi处理流程2、向/root/test/logdata文件中写入数据并保存向NiFi集群中其中一台节点“logdata”中写入以下数据即可[root@node1

99371

UpdateAttribute

) 连接关系 名称 描述 sucess 所有成功文件都被路由到这个关系 set state fail 如果处理器正在有状态地运行,并且在向流文件添加属性后没有设置状态,那么流文件将被路由到这个关系...写属性 Name Description See additional details 该处理器可以编写或删除零个或多个属性 状态管理 Scope Description LOCAL 提供一个选项,不仅将值存储在流文件中...一种方法是“基本用法”; 默认更改通过处理器每个FlowFile匹配属性。第二种方式是“高级用法”; 可以进行条件属性更改,只有在满足特定条件时才会影响FlowFile。...也就是说,“删除属性表达式”仅适用于输入FlowFile存在属性,如果属性是由此处理器添加,则“删除属性表达式”将不会匹配到它。 示例说明 1:基本用法增加一个属性 ? 结果输出: ?...2:高级用法,添加规则条件,符合条件时update指定属性值 点击ADVANCED ? 添加一个rule,如果id值等于11,就修改id值为22 ? 结果输出: ?

97110

NIFI里你用过PutDatabaseRecord嘛?

描述 PutDatabaseRecord处理器使用指定RecordReader从传入文件中读取(可能是多个,说数组也成)记录。这些记录将转换为SQL语句,并作为一个批次执行。...如果发生任何错误,则将流文件路由到failure或retry,如果执行成功,则将传入文件路由到success。...默认情况下(false),如果在处理FlowFile时发生错误,则FlowFile将根据错误类型路由到“failure”或“retry”关系,处理器可以继续使用下一个FlowFile。...Field ContainingSQL指的是上游来FlowFile一个字段,这个字段值是一个可执行SQL。...如果存在,我们就放到一个集合里存起来。遍历结束后,我们再判断这个集合有没有值,如果是空,就轮到Unmatched Column Behavior了。

3.3K20
领券