构建自定义Apache NiFi操作仪表板(第1部分)

使用NiFi和Spring Boot进行操作,为您在Apache NiFi应用程序中使用的数据创建自定义仪表板。

简单的Apache NiFi操作仪表板

这是一个正在进行的工作; 请参与进来,一切都是开源的。Milind和我正在开发一个项目来构建一些对团队有用的东西来分析他们的流程,当前的集群状态,启动和停止流程,并拥有一个丰富的单一仪表板。

Apache NiFi和相关工具提供了大量数据来聚合,排序,分类,搜索,并最终进行机器学习分析。

开箱即用的工具有很多可以解决部分问题。Ambari Metrics,Grafana和Log Search提供大量数据和分析功能。您可以在日志搜索中轻松找到所有错误,并查看Ambari Metrics和Grafana中正在发生的事情的精美图表。

Apache NiFi的优点在于它具有站点到站点的任务,可以将您需要的所有出处,分析,指标和运营数据发送到您想要的任何地方。这包括Apache NiFi!这是监控驱动开发(MDD)。

监控驱动开发(MDD)

在这个小概念验证工作中,我们抓住其中一些流程在Apache NiFi中处理它们,然后将它们存储在Apache Hive 3表格中进行分析。我们应该把数据推送到HBase用于聚合和德鲁伊的时间序列。我们将看到这种情况会扩大。

还有其他数据访问选项,包括NiFi REST API和NiFi Python API。

Boostrap通知器

  • 当NiFi意外启动,停止或死亡时发送通知。
  • 两个OOTB通知。
  • 电子邮件通知服务
  • HTTP通知服务。
  • 编写自定义通知服务很容易。
  • 文件

报告任务

  • AmbariReportingTask(全局,每个进程组)
  • MonitorDiskUsage(Flowfile,内容,来源存储库)
  • MonitorMemory

监视磁盘使用情况

MonitorActivity

看到

这些对于清除连接等操作特别有用。

清除它!

  • nipyapi.canvas.purge_connection(CON_ID)
  • nipyapi.canvas.purge_process_group(process_group,stop = False)
  • nipyapi.canvas.delete_process_group(process_group,force = True,refresh = True)

用例

度量标准数据示例

[{
  “appid”:“nifi”,
  “instanceid”:“7c84501d-d10c-407c-b9f3-1d80e38fe36a”,
  “hostname”:“princeton1.field.hortonworks.com”,
  “timestamp”:1539411679652,
  “loadAverage1min”:0.93,
  “availableCores”:16,
  “FlowFilesReceivedLast5Minutes”:14,
  “BytesReceivedLast5Minutes”:343779,
  “FlowFilesSentLast5Minutes”:0,
  “BytesSentLast5Minutes”:0,
  “FlowFilesQueued”:59952,
  “BytesQueued”:294693938,
esReadLast5Minutes”:241681,
  “BytesWrittenLast5Minutes”:398753,
  “ActiveThreads”:2,
  “TotalTaskDurationSeconds”:273,
  “TotalTaskDurationNanoSeconds”:273242860763,
  “jvmuptime”:224997,
  “jvmheap_used”:5.15272616E8,
  “jvmheap_usage”:0.9597700387239456,
  “jvmnon_heap_usage”:- 5.1572632E8,
  “jvmthread_statesrunnable”:11,
  “jvmthread_statesblocked”:2,
  “jvmthread_statestimed_waiting”:26,
  “jvmthread_statesterminated”:0,
  “jvmthread_count”:242,
  “jvmdaemon_thread_count”:125,
  “jvmfile_descriptor_usage”:0.0709,
  “jvmgcruns”:null,
  “jvmgctime”:null
}]

示例状态数据

[ {
  "appid" : "nifi",
  "instanceid" : "7c84501d-d10c-407c-b9f3-1d80e38fe36a",
  "hostname" : "princeton1.field.hortonworks.com",
  "timestamp" : 1539411679652,
  "loadAverage1min" : 0.93,
  "availableCores" : 16,
  "FlowFilesReceivedLast5Minutes" : 14,
  "BytesReceivedLast5Minutes" : 343779,
  "FlowFilesSentLast5Minutes" : 0,
  "BytesSentLast5Minutes" : 0,
  "FlowFilesQueued" : 59952,
  "BytesQueued" : 294693938,
  "BytesReadLast5Minutes" : 241681,
  "BytesWrittenLast5Minutes" : 398753,
  "ActiveThreads" : 2,
  "TotalTaskDurationSeconds" : 273,
  "TotalTaskDurationNanoSeconds" : 273242860763,
  "jvmuptime" : 224997,
  "jvmheap_used" : 5.15272616E8,
  "jvmheap_usage" : 0.9597700387239456,
  "jvmnon_heap_usage" : -5.1572632E8,
  "jvmthread_statesrunnable" : 11,
  "jvmthread_statesblocked" : 2,
  "jvmthread_statestimed_waiting" : 26,
  "jvmthread_statesterminated" : 0,
  "jvmthread_count" : 242,
  "jvmdaemon_thread_count" : 125,
  "jvmfile_descriptor_usage" : 0.0709,
  "jvmgcruns" : null,
  "jvmgctime" : null
} ]

故障数据示例

[ {
  "objectId" : "34c3249c-4a42-41ce-b94e-3563409ad55b",
  "platform" : "nifi",
  "project" : null,
  "bulletinId" : 28321,
  "bulletinCategory" : "Log Message",
  "bulletinGroupId" : "0b69ea51-7afb-32dd-a7f4-d82b936b37f9",
  "bulletinGroupName" : "Monitoring",
  "bulletinLevel" : "ERROR",
  "bulletinMessage" : "QueryRecord[id=d0258284-69ae-34f6-97df-fa5c82402ef3] Unable to query StandardFlowFileRecord[uuid=cd305393-f55a-40f7-8839-876d35a2ace1,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1539633295746-10, container=default, section=10], offset=95914, length=322846],offset=0,name=783936865185030,size=322846] due to Failed to read next record in stream for StandardFlowFileRecord[uuid=cd305393-f55a-40f7-8839-876d35a2ace1,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1539633295746-10, container=default, section=10], offset=95914, length=322846],offset=0,name=783936865185030,size=322846] due to -40: org.apache.nifi.processor.exception.ProcessException: Failed to read next record in stream for StandardFlowFileRecord[uuid=cd305393-f55a-40f7-8839-876d35a2ace1,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1539633295746-10, container=default, section=10], offset=95914, length=322846],offset=0,name=783936865185030,size=322846] due to -40",
  "bulletinNodeAddress" : null,
  "bulletinNodeId" : "91ab706b-5d92-454e-bc7a-6911d155fdca",
  "bulletinSourceId" : "d0258284-69ae-34f6-97df-fa5c82402ef3",
  "bulletinSourceName" : "QueryRecord",
  "bulletinSourceType" : "PROCESSOR",
  "bulletinTimestamp" : "2018-10-18T20:54:39.179Z"
} ]

Apache Hive 3表

CREATE EXTERNAL TABLE IF NOT EXISTS failure (statusId STRING, timestampMillis BIGINT, `timestamp` STRING, actorHostname STRING, componentType STRING, componentName STRING, parentId STRING, platform STRING, `application` STRING, componentId STRING, activeThreadCount BIGINT, flowFilesReceived BIGINT, flowFilesSent BIGINT, bytesReceived BIGINT, bytesSent BIGINT, queuedCount BIGINT, bytesRead BIGINT, bytesWritten BIGINT, bytesTransferred BIGINT, flowFilesTransferred BIGINT, inputContentSize BIGINT, outputContentSize BIGINT, queuedContentSize BIGINT, activeRemotePortCount BIGINT, inactiveRemotePortCount BIGINT, receivedContentSize BIGINT, receivedCount BIGINT, sentContentSize BIGINT, sentCount BIGINT, averageLineageDuration BIGINT, inputBytes BIGINT, inputCount BIGINT, outputBytes BIGINT, outputCount BIGINT, sourceId STRING, sourceName STRING, destinationId STRING, destinationName STRING, maxQueuedBytes BIGINT, maxQueuedCount BIGINT, queuedBytes BIGINT, backPressureBytesThreshold BIGINT, backPressureObjectThreshold BIGINT, isBackPressureEnabled STRING, processorType STRING, averageLineageDurationMS BIGINT, flowFilesRemoved BIGINT, invocations BIGINT, processingNanos BIGINT) STORED AS ORC
   LOCATION '/failure';
CREATE EXTERNAL TABLE IF NOT EXISTS bulletin (objectId STRING, platform STRING, project STRING, bulletinId BIGINT, bulletinCategory STRING, bulletinGroupId STRING, bulletinGroupName STRING, bulletinLevel STRING, bulletinMessage STRING, bulletinNodeAddress STRING, bulletinNodeId STRING, bulletinSourceId STRING, bulletinSourceName STRING, bulletinSourceType STRING, bulletinTimestamp STRING) STORED AS ORC
LOCATION '/error';
CREATE EXTERNAL TABLE IF NOT EXISTS memory (objectId STRING, platform STRING, project STRING, bulletinId BIGINT, bulletinCategory STRING, bulletinGroupId STRING, bulletinGroupName STRING, bulletinLevel STRING, bulletinMessage STRING, bulletinNodeAddress STRING, bulletinNodeId STRING, bulletinSourceId STRING, bulletinSourceName STRING, bulletinSourceType STRING, bulletinTimestamp STRING) STORED AS ORC
LOCATION '/memory'
;
// backpressure
CREATE EXTERNAL TABLE IF NOT EXISTS status (statusId STRING, timestampMillis BIGINT, `timestamp` STRING, actorHostname STRING, componentType STRING, componentName STRING, parentId STRING, platform STRING, `application` STRING, componentId STRING, activeThreadCount BIGINT, flowFilesReceived BIGINT, flowFilesSent BIGINT, bytesReceived BIGINT, bytesSent BIGINT, queuedCount BIGINT, bytesRead BIGINT, bytesWritten BIGINT, bytesTransferred BIGINT, flowFilesTransferred BIGINT, inputContentSize BIGINT, outputContentSize BIGINT, queuedContentSize BIGINT, activeRemotePortCount BIGINT, inactiveRemotePortCount BIGINT, receivedContentSize BIGINT, receivedCount BIGINT, sentContentSize BIGINT, sentCount BIGINT, averageLineageDuration BIGINT, inputBytes BIGINT, inputCount BIGINT, outputBytes BIGINT, outputCount BIGINT, sourceId STRING, sourceName STRING, destinationId STRING, destinationName STRING, maxQueuedBytes BIGINT, maxQueuedCount BIGINT, queuedBytes BIGINT, backPressureBytesThreshold BIGINT, backPressureObjectThreshold BIGINT, isBackPressureEnabled STRING, processorType STRING, averageLineageDurationMS BIGINT, flowFilesRemoved BIGINT, invocations BIGINT, processingNanos BIGINT) STORED AS ORC
   LOCATION '/status';

原文标题《Building a Custom Apache NiFi Operations Dashboard (Part 1)》

作者: Tim Spann

译者:Sonia

不代表云加社区观点,更多详情请查看原文链接

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

编辑于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏编程坑太多

springboot (11) mybatis

15130
来自专栏dalaoyang

SpringBoot整合Mybatis

介绍一下SpringBoot整合mybatis,数据库选用的是mysql。 首先创建数据库 CREATE DATABASE test; 建表以及插入初始数据(s...

45660
来自专栏数据库新发现

Oracle诊断案例-Job任务停止执行[最终版]

Last Updated: Friday, 2004-11-26 9:48 Eygle

13330
来自专栏python3

数据库分批导出为csv文件

15420
来自专栏杨建荣的学习笔记

ORA-00439的解决(笔记93天)

今天用gc配置了stream后,重启数据库后发现有以下的问题 SQL> startup ORA-00439: feature not enabled: Real...

376100
来自专栏扎心了老铁

数据库表反向生成(一) MyBatis-generator与IDEA的集成

在Dao层,通过数据库表反向生成,可以节省我们很多的精力,把更多的精力投入复杂的业务中。 数据库表反向生成,指的是通过数据库如mysql中的库表schema生成...

1.1K80
来自专栏大数据智能实战

HBase时间老化的测试

最近想着测试一下HBase存储上的时间老化问题。 Hbase本身还是提供这种功能的,总体上还是非常不错的。 首先建立一个测试表。create 'ttt','f'...

24880
来自专栏岑玉海

Hbase 学习(十一)使用hive往hbase当中导入数据

  我们可以有很多方式可以把数据导入到hbase当中,比如说用map-reduce,使用TableOutputFormat这个类,但是这种方式不是最优的方式。 ...

509130
来自专栏数据库新发现

Oracle诊断案例-Sql_trace之二

http://www.eygle.com/case/sql_trace_2.htm

12340
来自专栏杨建荣的学习笔记

ORACLE 11g导入9i dump的问题及解决

因为系统迁移,需要将一部分的9i的数据导入11g的库里, 目标库是11.2.0.3.0 64位的环境。 导入dump的时候,有一个比较大的分区表,需要用导入分...

38470

扫码关注云+社区

领取腾讯云代金券