首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

使用Groovy (InvokedScriptedProcessor)将错误的json记录写入单独的flowFile

使用Groovy (InvokedScriptedProcessor)将错误的json记录写入单独的flowFile的过程如下:

  1. 首先,确保已经安装了NiFi,并且已经配置好了Groovy (InvokedScriptedProcessor)。
  2. 创建一个新的NiFi流程,并添加一个Groovy (InvokedScriptedProcessor)处理器。
  3. 在Groovy (InvokedScriptedProcessor)的配置中,选择使用Groovy脚本来处理数据。
  4. 编写Groovy脚本来处理错误的JSON记录。以下是一个示例脚本:
代码语言:txt
复制
import groovy.json.JsonSlurper

def flowFile = session.get()
if (!flowFile) return

def jsonContent = flowFile.getAttribute('jsonContent')
def jsonSlurper = new JsonSlurper()
try {
    def json = jsonSlurper.parseText(jsonContent)
    // 处理正确的JSON记录
    // ...
} catch (Exception e) {
    // 处理错误的JSON记录
    // 创建一个新的flowFile来存储错误的JSON记录
    def errorFlowFile = session.create()
    errorFlowFile.write(jsonContent.getBytes('UTF-8'))
    errorFlowFile = session.putAttribute(errorFlowFile, 'errorType', 'Invalid JSON')
    session.transfer(errorFlowFile, REL_SUCCESS)
}

session.remove(flowFile)

在上面的示例脚本中,我们首先获取flowFile中的JSON内容,并使用JsonSlurper解析JSON。如果解析成功,则处理正确的JSON记录。如果解析失败,我们创建一个新的flowFile来存储错误的JSON记录,并将错误类型设置为"Invalid JSON"。最后,我们将原始的flowFile从处理器中移除。

  1. 配置Groovy (InvokedScriptedProcessor)的输入和输出连接。
  2. 运行NiFi流程,将包含JSON记录的flowFile传入Groovy (InvokedScriptedProcessor)处理器。

通过以上步骤,你可以使用Groovy (InvokedScriptedProcessor)将错误的JSON记录写入单独的flowFile。请注意,这只是一个示例脚本,你可以根据实际需求进行修改和扩展。

页面内容是否对你有帮助?
有帮助
没帮助

相关·内容

错误记录Groovy工程中文件查找策略 ( main 函数中需要使用 srcmaingroovyScript.groovy | Groovy 脚本直接使用代码相对路径 )

文章目录 一、报错信息 二、解决方案 一、报错信息 ---- 在 Java 类中 , 调用 Groovy 脚本 , 出现如下错误 ; java.io.FileNotFoundException: Y:\..., 但是涉及到 Java 与 Groovy 路径查找机制不同 ; Java 类 JavaClass 位于 Groovy_Demo\src\main\groovy 目录下 , 要在该 Java 类中调用同目录...Script.groovy 脚本 ; 此处必须使用完整路径 “src/main/groovy/Script.groovy” , 才能查找到 “Script.groovy” 脚本 ; Java 类中调用...Groovy 脚本 , 需要使用 “src/main/groovy/Script.groovy” 路径 ; import groovy.lang.Binding; import groovy.lang.GroovyShell...脚本中调用 另外一个 Groovy 脚本 , 如果两个 Groovy 脚本在同一个目录中 , 可以直接使用相对路径 " Script.groovy " 进行调用即可 ; 参考 【GroovyGroovy

2.4K30

大数据NiFi(二十):实时同步MySQL数据到Hive

,获取对应binlog操作类型,再将想要处理数据路由到“EvaluateJsonPath”处理器,该处理器可以json格式binlog数据解析,通过自定义json 表达式获取json数据中属性放入...”数据写入到Hive表。...CDC事件包括INSERT,UPDATE,DELETE操作,事件按操作发生时顺序输出为单独FlowFile文件。...),但是经过测试,此NiFi版本出现以下错误(无效binlog位置,目测是一个版本bug错误): 所以在之后测试中,我们可以“CaptureChangeMysql”处理器读取binlog状态清空...默认false指的是如果在处理FlowFile时发生错误,则FlowFile根据错误类型路由到“failure”或“retry”关系,处理器继续处理下一个FlowFile

2.8K121

大数据NiFi(十九):实时Json日志数据导入到Hive

​实时Json日志数据导入到Hive 案例:使用NiFi某个目录下产生json类型日志文件导入到Hive。...如果目标是"flowfile-attribute",而表达式不匹配任何内容,那么将使用空字符串作为属性值,并且FlowFile始终被路由到"matched"。...▪flowfile-attribute 指示是否JsonPath计算结果写入FlowFile内容或FlowFile属性;如果使用flowfile-attribute,则必须指定属性名称。.../root/test/jsonfile”文件中写入数据时,这时“EvaluateJsonPath”一个FlowFile中会有多条json数据,当获取json属性时,只会获取第一条json对应属性。...六、配置“ConvertRecord”处理器 “ConvertRecord”根据配置记录读取器”和“记录写出控制器”来记录从一种数据格式转换为另一种数据格式。

2.1K91

Apache NiFi安装及简单使用

2、Processor上错误 ? 简单使用2 先来添加处理器 ? 这里选择getfile处理器,它会获取本地磁盘数据,然后删除源文件 ?...消息可以作为每个消息FlowFile发出,或者可以使用用户指定分隔符进行批处理。 GetMongo:对MongoDB执行用户指定查询,并将内容写入FlowFile。...SplitJson:允许用户将由数组或许多子对象组成JSON对象拆分为每个JSON元素FlowFile。...然后,该处理器允许这些元素分割成单独XML元素。 UnpackContent:解压缩不同类型归档格式,如ZIP和TAR。存档中每个文件随后作为单个FlowFile传输。...PutS3Object:使用配置凭据,密钥和存储桶名称 FlowFile内容写入到Amazon S3对象。

5.8K21

NIFI里你用过PutDatabaseRecord嘛?

描述 PutDatabaseRecord处理器使用指定RecordReader从传入流文件中读取(可能是多个,说数组也成)记录。这些记录转换为SQL语句,并作为一个批次执行。...默认情况下(false),如果在处理FlowFile时发生错误,则FlowFile根据错误类型路由到“failure”或“retry”关系,处理器可以继续使用下一个FlowFile。...应用场景 在PutDatabaseRecord之前,我们想要写入数据到数据库,往往需要使用ConvertJsonToSql+PutSQL组合,尤其是当数据格式不是json时候还需要先将数据转换为json...,而使用ConvertJsonToSql属于一遍连接了目标库,一边要在内存解析一次数据,转成了参数化SQL,并且参数也是放到FlowFile属性中,平白无故这个FlowFile也就更吃内存了。...PutDatabaseRecord好处就是我们可以任何NIFI支持Record写入指定目的,在内存解析一次数据就可以了。

3.4K20

错误记录Groovy 闭包使用报错 ( 闭包中不能直接使用外部对象方法 | 需要先设置 delegate 代理 )

文章目录 一、报错信息 二、解决方案 一、报错信息 ---- 在 Groovy Closure 闭包中 , 直接调用外部对象方法 , 会报错 ; class Test { def fun...) at Groovy.run(Groovy.groovy:14) Process finished with exit code 1 二、解决方案 ---- 在 Closure 闭包中 , 如果要调用外部对象方法..., 需要先设置 Closure 闭包对象 delegate 成员为指定外部对象 ; class Test { def fun() { println "fun" }...} // 闭包中不能直接调用 Test 对象中方法 // 此时可以通过改变闭包代理进行调用 def closure = { fun() } closure.delegate = new...Test() closure() 设置完 Closure 闭包对象 delegate 之后 , 执行效果 :

86620

PutHiveStreaming

描述 该处理器使用Hive流流文件数据发送到Apache Hive表。传入流文件需要是Avro格式,表必须存在于Hive中。有关Hive表需求(格式、分区等),请参阅Hive文档。...分区值是根据处理器中指定分区列名称,然后从Avro记录中提取。注意:如果为这个处理器配置了多个并发任务,那么一个线程在任何时候只能写入一个表。写入同一表其他任务等待当前任务完成对表写入。...注意:当一个错误发生在一个源自相同输入FlowFile已提交Hive流事务,(即FlowFile包含比记录每笔交易记录错误发生在第二个事务或其他版本),成功记录将被转移到“成功”关系,而原始输入...注意:当一个错误发生在一个源自相同输入FlowFile已提交Hive流事务,(即FlowFile包含比记录每笔交易记录错误发生在第二个事务或其他版本),成功记录将被转移到“成功”关系,而原始输入...写属性 Name Description hivestreaming.record.count 此属性写入路由到“成功”和“失败”关系流文件,并包含分别写入成功和未成功传入流文件中记录数。

95630

Provenance存储库原理

Provenance Repository 在Provenance存储库中存储每个FlowFile历史记录。此历史记录用于提供每个数据数据沿袭(也称为产销监管链)。...这意味着新创建Provenance事件开始写入由16个日志文件组成新组,并且原始文件将被处理以进行长期存储。首先,经过滚动日志合并到一个文件中。...我们不会在写入数据时编制索引,因为这样做会降低吞吐量。 在压缩数据时,我们会跟踪压缩块索引。我们1 MB数据写入GZIP流,然后增加压缩块索引。...我们最多只能读取1 MB(解压缩)数据。这使我们可以非常快速地访问这些记录写入每条记录后,然后将其与指向数据指针一起放在队列中。...然后,一个单独线程将从队列中提取此信息,并在Lucene中对数据进行索引。

95420

FlowFile存储库原理

系统通过序列化哈希映射中每个流文件并用文件名“.partial”将其写入磁盘来计算新基本检查点。随着检查点进行,新FlowFile基线写入“.partial”文件。...当FlowFile发生更改时,delta将被写入预写日志,并相应地修改内存中对象。这使系统能够快速处理流文件,同时还可以跟踪已发生事情以及提交会话时发生事情。...SequentialAccessWriteAheadLog WriteAheadRepository接口此实现提供了通过写入单个日志文件来所有更新顺序写入存储库功能。...这种实现方式假设只有一个线程可以在任何时候发布给定Record更新。即,该实现是线程安全,但如果两个线程同时使用同一记录更新来更新预写日志,则不能保证记录可以正确恢复(没有的事情)。...更新FlowFile存储库(即预写FlowFile变化日志) 最底层方法是WriteAheadRepositoryupdate /** * 使用指定记录更新存储库。

1.2K10

深入解析Apache NIFI调度策略

新拉取一个ExecuteGroovyScript组件,选择Timer driven并设置2秒运行一次,然后在Script Body配置中添加Groovy代码 //创建一个流文件 flowFile = session.create...() //添加一个属性,在FlowFIle记录一个时间,姑且把这个时间当做本次调度开始时间 flowFile = session.putAttribute(flowFile, 'Time', String.valueOf...('我被调度了') //创建一个流文件 flowFile = session.create() //添加一个属性,在FlowFIle记录一个时间,姑且把这个时间当做本次调度开始时间 flowFile...然后我们观察日志,如果日志输出多条日志之间是1000多秒,那么证明我们上面说结论是错误,如果日志之间是2秒多,那么说明检测组件是否有工作动作频率应该还是10ms。 ? ?...= session.create() //添加一个属性,在FlowFIle记录一个时间,姑且把这个时间当做本次调度开始时间 flowFile = session.putAttribute(flowFile

1.9K30

大数据NiFi(十五):NiFi入门案例二

需要将“Data Format”设置为Text并且“Unique FlowFiles”设置为false,这时生成文件大小不定,忽略设置“File Size”Character Set(字符编码)...在“Line-by-Line”模式中,建议使用8 KB或16 KB这样值。如果“替换策略”设置为以下其中之一:Append、Prepend、Always Replace,则忽略该值。...Evaluation Mode(评估模式)Line-by-LineLine-by-LineEntire text对每一行单独进行"替换策略"(Line-by-Line);或整个文件缓冲到内存中(Entire...Evaluation Mode(评估模式) Line-by-Line Line-by-LineEntire text 对每一行单独进行"替换策略"(Line-by-Line);或整个文件缓冲到内存中...三、配置“PutFile”处理器关于“PutFile”处理器创建及配置参数参照案例一,这里直接给出“PutFile”处理器配置,替换后FlowFile写入外部路径中“/root/test/matchFile

1.4K121

大数据NiFi(二十一):监控日志文件生产到Kafka

​监控日志文件生产到Kafka案例:监控某个目录下文件内容,消息生产到Kafka中。此案例使用到“TailFile”和“PublishKafka_1_0”处理器。...二、配置“PublishKafka_1_0”处理器“PublishKafka_1_0”处理器作用是使用Kafka 1.0生产者APIFlowFile内容作为消息发送给Apache Kafka。...发送内容可以是单独FlowFile,也可以通过用户指定分隔符分割FlowFile内容。...Guarantee Single Node Delivery(保证单节点交付,相当于ack=1,Kafka中默认配置):KafkaProducer把消息发送出去,至少要等待leader已经成功数据写入本地...Guarantee Single Node Delivery(保证单节点交付,相当于ack=1,Kafka中默认配置): KafkaProducer把消息发送出去,至少要等待leader已经成功数据写入本地

1K71

基于Apache NiFi 实现ETL过程中数据转换

0 前言 Apache NiFi 是广泛使用数据流管理工具,也可以实现ETL功能....本次讨论如何在NiFi实现ETL过程中实现转换功能,此处以列名转换为例. 1 应用场景 列名转换是ETL过程中常常遇到场景。...例如来源表user主键id,要求写入目标表useruid字段内,那么就需要列名转换. 2 方案选型 既然限定在 NiFi 框架内,那么只涉及实现方案选型. 2.1 基于执行自定义SELECT SQL...from FLOWFILE 2.3 基于ExecuteGroovyScript 等可以执行脚本语言处理器 场景 适用于要实现复杂转换,且性能要求不高场景 实现 实现方式因人而异,原理就是在...Groovy 脚本内解析数据,做列名转换再输出即可 优势 能实现复杂规则,且可以热加载,不需要部署和重启NiFi 劣势 需要学习 nifi groovy 代码编写方法 2.4 自定义处理器 场景 适用于要实现复杂转换

2.4K00

Apache Nifi工作原理

• 您希望您同僚对您创建错误处理流程提供 反馈吗?NiFi决定将错误路径视为有效结果,这是一项设计决策。期望流程审查比传统代码审查要短。 你应该使用NiFi吗? NiFi品牌本身就易于使用。...当前使用所有FlowFiles属性以及对其内容引用都存储在FlowFile 存储库中。 在流水线每个步骤中,在对流文件进行修改之前,首先将其记录在流文件存储库中预写日志中 。...来源存储库 每次修改FlowFile时,NiFi都会在此时为FlowFile及其上下文拍摄快照。NiFi中此快照名称是“ 来源事件”。该来 源库 记录出处活动。...借助来源库可以追溯数据历史记录 等等,FlowFile资料库和来源资料库有什么区别? • FlowFile资料库和来源资料库背后想法非常相似,但是它们不是解决相同问题。...但是,您甚至可以使用FlowFile中选择属性来优先处理传入数据包。 流控制器 流控制器是一切融合在一起粘合剂。它为处理器分配和管理线程。这就是执行数据流方式。 ?

3K10

大数据NiFi(十七):NiFi术语

filename:在数据存储到磁盘或外部服务时可以使用可读文件名 path:在数据存储到磁盘或外部服务时可以使用分层结构值,以便数据不存储在单个目录中。...DFM能够每一个关系连接到其他组件,以指定FlowFile应该在哪里进行下一步处理。 五、Connection Connection可以将不同Processor连接在一起创建自动数据处理流程。...除了每个组件"黄色三角形"警告以外,每个组件运行有错误时还会报告错误公告,这个错误会显示在处理器右上角,以红色图标显示。系统级公告显示在页面顶部附近状态栏上。...十四、flow.xml.gz 用户界面画布所有组件内容都实时写入一个名为flow.xml.gz文件,该文件默认位于$NIFI_HOME/conf目录中。...此外,NiFi在更新时会自动备份此文件,您可以使用这些备份来回滚配置,如果想要回滚,先停止NiFi,flow.xml.gz替换为所需备份,然后重新启动NiFi。

1.6K11

Apache NIFI 讲解(读完立即入门)

易于使用 Processors-boxes-通过连接器链接-箭头创建流程。NIFI提供了一个基于流编程体验。 NIFI让我们一眼就能理解一组数据流操作,而这或许需要数百行源代码来实现。...你是否需要同行反馈,以帮助你创建新错误处理流程?NIFI决定将错误路径视为有效结果,这是一项设计决策。期望流程审查比传统代码审查要短。 你应该使用它吗?或许吧 NIFI本身就易于使用。...但是,如果你必须使用NIFI,则可能需要更多地了解其工作原理。 在第二部分中,我说明Apache NIFI关键概念。 剖析Apache NIFI 启动NIFI时,你会进入其Web界面。...并非所有处理器都需要访问FlowFile内容来执行其操作-例如,聚合两个FlowFiles内容不需要将其内容加载到内存中。 当处理器修改FlowFile内容时,保留先前数据。...在pipeline每个步骤中,在对流文件进行修改之前,首先将其以预写日志方式(write-ahead log)记录FlowFile Repository中。

10.7K91

内容存储库原理

Content Repo核心设计是FlowFile内容保存在磁盘上,并仅在需要时才将其读入JVM内存。这使NiFi可以处理大量小对象,而无需生产者和消费者处理器完整对象保存在内存中。...与JVM Heap具有垃圾回收过程一样,当需要空间时可以回收无法访问对象,在NiFi中存在一个专用线程来分析内容存储库中未使用内容。FlowFile内容标识为不再使用后,它将被删除或存档。...如果在nifi.properties中启用了归档,则FlowFile内容一直存在于Content Repo中,直到过期(一定时间后删除)或由于Content Repo占用太多空间而将其删除。...由于一旦写入内容就永远不会更改(使用copy on write进行更改),因此,如果FlowFile内容发生更改,则不会出现内存碎片或移动数据。...ContentClaim接口 最后,我们经常在UI上就能看到,一个FlowFile保留是ContentClaim,如下接口,ContentClaim里保存着它所在ResourceClaim,并且还记录了偏移量

81210
领券