我在nifi集群中有3个节点。我使用"Partition“将流文件分发到nifi集群中的其他nifi节点。例如,我使用"type"属性来表示"Partition“;
FlowFile -> type: A
FlowFile -> type: B
FlowFile -> type: C
对于此场景,具有相同“类型”的FlowFiles将转到同一个节点。但是,假设一个节点(A类型的流文件被发送)被关闭。虽然带有“类型B和C”的FlowFiles将继续被成功发送,但是带有“类型A”的FlowFiles将保持队列状态(不会被发送)。
与此同时,一款“X
我编写了一个Python/Jython脚本,在NiFi的ExecuteScript处理器中运行,以解析无效的JSON文档。我根据问题中的脚本和Matt的奇妙的编写了下面的脚本,但是它没有返回多个流文件。相反,它返回输入流文件,并应用regex校正,但只作为一个文件。为了返回循环中每一行的一个流文件,我需要修改什么?
脚本
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCal
我想知道NiFi中的总队列大小何时超过某个特定值。我尝试使用NiFi工具包,但我无法在其中找到表示总队列大小的任何内容。 我一直在尝试this,它应该允许我通过API访问NiFi中一组进程中的连接。但是,我无法从NiFi成功连接到该接口。 当我尝试在没有凭据的情况下连接到API时,我在NiFi中得到以下错误。 Unknown user with identity 'anonymous'. Contact the system administrator. 我相信我能够通过REST API查看队列大小,但是为了做到这一点,我需要使用OAuth和Keycloak进行身份验证,我不
我正在为Apache NiFi开发一个自定义处理器。我已经创建了处理器的nar,并将它放在nifi的lib文件夹中,并启动了nifi。我已经在eclipse中设置了远程调试器,并在onTrigger()的第一行上启用了断点。在调试时,我正在我的nifi管道中一次运行一个处理器。我可以在自定义处理器的输入队列中找到单个流文件,但是我的自定义处理器没有接收到任何流文件。当我启动我的自定义处理器时,它会击中onTrigger()方法中的断点。在此方法中,当我这样做时:
public class MyCustomProc extends AbstractProcessor {
@Overri
我使用的是nifi 1.1.0,并且在上面运行了太多的处理器,所以它有太多的负载无法运行。由于负载过大,处理器运行非常慢,我收到一些错误:
The rate of the dataflow is exceeding the provenance recording rate. slowing down flow to accommodate.
我在"nifi.properties“文件中更改了起源存储库的存储大小,但没有任何改进。我更改了以下属性:
nifi.provenance.repository.max.storage.size = 2 GB
我用谷歌搜索了一下,但没有找到合适的