前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >构建自定义Apache NiFi操作仪表板(第1部分)

构建自定义Apache NiFi操作仪表板(第1部分)

原创
作者头像
用户3382876
修改2018-11-07 16:49:15
1.8K0
修改2018-11-07 16:49:15
举报
文章被收录于专栏:技术翻译技术翻译

使用NiFi和Spring Boot进行操作,为您在Apache NiFi应用程序中使用的数据创建自定义仪表板。

简单的Apache NiFi操作仪表板

这是一个正在进行的工作; 请参与进来,一切都是开源的。Milind和我正在开发一个项目来构建一些对团队有用的东西来分析他们的流程,当前的集群状态,启动和停止流程,并拥有一个丰富的单一仪表板。

Apache NiFi和相关工具提供了大量数据来聚合,排序,分类,搜索,并最终进行机器学习分析。

开箱即用的工具有很多可以解决部分问题。Ambari Metrics,Grafana和Log Search提供大量数据和分析功能。您可以在日志搜索中轻松找到所有错误,并查看Ambari Metrics和Grafana中正在发生的事情的精美图表。

Apache NiFi的优点在于它具有站点到站点的任务,可以将您需要的所有出处,分析,指标和运营数据发送到您想要的任何地方。这包括Apache NiFi!这是监控驱动开发(MDD)。

监控驱动开发(MDD)

在这个小概念验证工作中,我们抓住其中一些流程在Apache NiFi中处理它们,然后将它们存储在Apache Hive 3表格中进行分析。我们应该把数据推送到HBase用于聚合和德鲁伊的时间序列。我们将看到这种情况会扩大。

还有其他数据访问选项,包括NiFi REST API和NiFi Python API。

Boostrap通知器

  • 当NiFi意外启动,停止或死亡时发送通知。
  • 两个OOTB通知。
  • 电子邮件通知服务
  • HTTP通知服务。
  • 编写自定义通知服务很容易。
  • 文件

报告任务

  • AmbariReportingTask(全局,每个进程组)
  • MonitorDiskUsage(Flowfile,内容,来源存储库)
  • MonitorMemory

监视磁盘使用情况

MonitorActivity

看到

这些对于清除连接等操作特别有用。

清除它!

  • nipyapi.canvas.purge_connection(CON_ID)
  • nipyapi.canvas.purge_process_group(process_group,stop = False)
  • nipyapi.canvas.delete_process_group(process_group,force = True,refresh = True)

用例

度量标准数据示例

代码语言:javascript
复制
[{
  “appid”:“nifi”,
  “instanceid”:“7c84501d-d10c-407c-b9f3-1d80e38fe36a”,
  “hostname”:“princeton1.field.hortonworks.com”,
  “timestamp”:1539411679652,
  “loadAverage1min”:0.93,
  “availableCores”:16,
  “FlowFilesReceivedLast5Minutes”:14,
  “BytesReceivedLast5Minutes”:343779,
  “FlowFilesSentLast5Minutes”:0,
  “BytesSentLast5Minutes”:0,
  “FlowFilesQueued”:59952,
  “BytesQueued”:294693938,
esReadLast5Minutes”:241681,
代码语言:javascript
复制
  “BytesWrittenLast5Minutes”:398753,
  “ActiveThreads”:2,
  “TotalTaskDurationSeconds”:273,
  “TotalTaskDurationNanoSeconds”:273242860763,
  “jvmuptime”:224997,
  “jvmheap_used”:5.15272616E8,
  “jvmheap_usage”:0.9597700387239456,
  “jvmnon_heap_usage”:- 5.1572632E8,
  “jvmthread_statesrunnable”:11,
  “jvmthread_statesblocked”:2,
  “jvmthread_statestimed_waiting”:26,
  “jvmthread_statesterminated”:0,
  “jvmthread_count”:242,
  “jvmdaemon_thread_count”:125,
  “jvmfile_descriptor_usage”:0.0709,
  “jvmgcruns”:null,
  “jvmgctime”:null
}]

示例状态数据

代码语言:javascript
复制
[ {
  "appid" : "nifi",
  "instanceid" : "7c84501d-d10c-407c-b9f3-1d80e38fe36a",
  "hostname" : "princeton1.field.hortonworks.com",
  "timestamp" : 1539411679652,
  "loadAverage1min" : 0.93,
  "availableCores" : 16,
  "FlowFilesReceivedLast5Minutes" : 14,
  "BytesReceivedLast5Minutes" : 343779,
  "FlowFilesSentLast5Minutes" : 0,
  "BytesSentLast5Minutes" : 0,
  "FlowFilesQueued" : 59952,
  "BytesQueued" : 294693938,
  "BytesReadLast5Minutes" : 241681,
  "BytesWrittenLast5Minutes" : 398753,
  "ActiveThreads" : 2,
  "TotalTaskDurationSeconds" : 273,
  "TotalTaskDurationNanoSeconds" : 273242860763,
  "jvmuptime" : 224997,
  "jvmheap_used" : 5.15272616E8,
  "jvmheap_usage" : 0.9597700387239456,
  "jvmnon_heap_usage" : -5.1572632E8,
  "jvmthread_statesrunnable" : 11,
  "jvmthread_statesblocked" : 2,
  "jvmthread_statestimed_waiting" : 26,
  "jvmthread_statesterminated" : 0,
  "jvmthread_count" : 242,
  "jvmdaemon_thread_count" : 125,
  "jvmfile_descriptor_usage" : 0.0709,
  "jvmgcruns" : null,
  "jvmgctime" : null
} ]

故障数据示例

代码语言:javascript
复制
[ {
  "objectId" : "34c3249c-4a42-41ce-b94e-3563409ad55b",
  "platform" : "nifi",
  "project" : null,
  "bulletinId" : 28321,
  "bulletinCategory" : "Log Message",
  "bulletinGroupId" : "0b69ea51-7afb-32dd-a7f4-d82b936b37f9",
  "bulletinGroupName" : "Monitoring",
  "bulletinLevel" : "ERROR",
  "bulletinMessage" : "QueryRecord[id=d0258284-69ae-34f6-97df-fa5c82402ef3] Unable to query StandardFlowFileRecord[uuid=cd305393-f55a-40f7-8839-876d35a2ace1,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1539633295746-10, container=default, section=10], offset=95914, length=322846],offset=0,name=783936865185030,size=322846] due to Failed to read next record in stream for StandardFlowFileRecord[uuid=cd305393-f55a-40f7-8839-876d35a2ace1,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1539633295746-10, container=default, section=10], offset=95914, length=322846],offset=0,name=783936865185030,size=322846] due to -40: org.apache.nifi.processor.exception.ProcessException: Failed to read next record in stream for StandardFlowFileRecord[uuid=cd305393-f55a-40f7-8839-876d35a2ace1,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1539633295746-10, container=default, section=10], offset=95914, length=322846],offset=0,name=783936865185030,size=322846] due to -40",
  "bulletinNodeAddress" : null,
  "bulletinNodeId" : "91ab706b-5d92-454e-bc7a-6911d155fdca",
  "bulletinSourceId" : "d0258284-69ae-34f6-97df-fa5c82402ef3",
  "bulletinSourceName" : "QueryRecord",
  "bulletinSourceType" : "PROCESSOR",
  "bulletinTimestamp" : "2018-10-18T20:54:39.179Z"
} ]

Apache Hive 3表

代码语言:javascript
复制
CREATE EXTERNAL TABLE IF NOT EXISTS failure (statusId STRING, timestampMillis BIGINT, `timestamp` STRING, actorHostname STRING, componentType STRING, componentName STRING, parentId STRING, platform STRING, `application` STRING, componentId STRING, activeThreadCount BIGINT, flowFilesReceived BIGINT, flowFilesSent BIGINT, bytesReceived BIGINT, bytesSent BIGINT, queuedCount BIGINT, bytesRead BIGINT, bytesWritten BIGINT, bytesTransferred BIGINT, flowFilesTransferred BIGINT, inputContentSize BIGINT, outputContentSize BIGINT, queuedContentSize BIGINT, activeRemotePortCount BIGINT, inactiveRemotePortCount BIGINT, receivedContentSize BIGINT, receivedCount BIGINT, sentContentSize BIGINT, sentCount BIGINT, averageLineageDuration BIGINT, inputBytes BIGINT, inputCount BIGINT, outputBytes BIGINT, outputCount BIGINT, sourceId STRING, sourceName STRING, destinationId STRING, destinationName STRING, maxQueuedBytes BIGINT, maxQueuedCount BIGINT, queuedBytes BIGINT, backPressureBytesThreshold BIGINT, backPressureObjectThreshold BIGINT, isBackPressureEnabled STRING, processorType STRING, averageLineageDurationMS BIGINT, flowFilesRemoved BIGINT, invocations BIGINT, processingNanos BIGINT) STORED AS ORC
   LOCATION '/failure';
CREATE EXTERNAL TABLE IF NOT EXISTS bulletin (objectId STRING, platform STRING, project STRING, bulletinId BIGINT, bulletinCategory STRING, bulletinGroupId STRING, bulletinGroupName STRING, bulletinLevel STRING, bulletinMessage STRING, bulletinNodeAddress STRING, bulletinNodeId STRING, bulletinSourceId STRING, bulletinSourceName STRING, bulletinSourceType STRING, bulletinTimestamp STRING) STORED AS ORC
LOCATION '/error';
CREATE EXTERNAL TABLE IF NOT EXISTS memory (objectId STRING, platform STRING, project STRING, bulletinId BIGINT, bulletinCategory STRING, bulletinGroupId STRING, bulletinGroupName STRING, bulletinLevel STRING, bulletinMessage STRING, bulletinNodeAddress STRING, bulletinNodeId STRING, bulletinSourceId STRING, bulletinSourceName STRING, bulletinSourceType STRING, bulletinTimestamp STRING) STORED AS ORC
LOCATION '/memory'
;
// backpressure
CREATE EXTERNAL TABLE IF NOT EXISTS status (statusId STRING, timestampMillis BIGINT, `timestamp` STRING, actorHostname STRING, componentType STRING, componentName STRING, parentId STRING, platform STRING, `application` STRING, componentId STRING, activeThreadCount BIGINT, flowFilesReceived BIGINT, flowFilesSent BIGINT, bytesReceived BIGINT, bytesSent BIGINT, queuedCount BIGINT, bytesRead BIGINT, bytesWritten BIGINT, bytesTransferred BIGINT, flowFilesTransferred BIGINT, inputContentSize BIGINT, outputContentSize BIGINT, queuedContentSize BIGINT, activeRemotePortCount BIGINT, inactiveRemotePortCount BIGINT, receivedContentSize BIGINT, receivedCount BIGINT, sentContentSize BIGINT, sentCount BIGINT, averageLineageDuration BIGINT, inputBytes BIGINT, inputCount BIGINT, outputBytes BIGINT, outputCount BIGINT, sourceId STRING, sourceName STRING, destinationId STRING, destinationName STRING, maxQueuedBytes BIGINT, maxQueuedCount BIGINT, queuedBytes BIGINT, backPressureBytesThreshold BIGINT, backPressureObjectThreshold BIGINT, isBackPressureEnabled STRING, processorType STRING, averageLineageDurationMS BIGINT, flowFilesRemoved BIGINT, invocations BIGINT, processingNanos BIGINT) STORED AS ORC
   LOCATION '/status';

原文标题《Building a Custom Apache NiFi Operations Dashboard (Part 1)》

作者: Tim Spann

译者:Sonia

不代表云加社区观点,更多详情请查看原文链接

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 使用NiFi和Spring Boot进行操作,为您在Apache NiFi应用程序中使用的数据创建自定义仪表板。
  • 简单的Apache NiFi操作仪表板
  • 监控驱动开发(MDD)
  • 用例
  • 度量标准数据示例
  • 示例状态数据
  • 故障数据示例
  • Apache Hive 3表
相关产品与服务
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档