Flink History Server

History Server概述

Flink有一个History Server,可以用来在相应的Flink集群关闭后查询已完成作业的统计信息。例如有个批处理作业是凌晨才运行的,并且我们都知道只有当作业处于运行中的状态,才能够查看到相关的日志信息和统计信息。所以如果作业由于异常退出或者处理结果有问题,我们又无法及时查看(凌晨运行的)作业的相关日志信息。那么History Server就显得十分重要了,因为通过History Server我们才能查询这些已完成作业的统计信息,无论是正常退出还是异常退出。

此外,它对外提供了REST API,它接受HTTP请求并使用JSON数据进行响应。Flink任务停止后,JobManager会将已经完成任务的统计信息进行存档,History Server进程则在任务停止后可以对任务统计信息进行查询。比如:最后一次的checkpoint、任务运行时的相关配置。

官方文档:


History Server的使用

History Server允许查询由JobManager归档的已完成作业的状态和统计信息。已完成作业的归档在JobManager上进行,JobManager会将归档的作业信息upload到文件系统目录,这个文件系统可以是本地文件系统、HDFS、H3等,这个目录是可以在配置文件中指定的。然后还需要配置History Server去扫描这个目录,并且可以配置扫描的间隔时间。

因此,我们在使用History Server之前需要配置一下这几个配置项:

[root@hadoop01 /usr/local/flink]# vim conf/flink-conf.yaml
# 指定由JobManager归档的作业信息所存放的目录,这里使用的是HDFS
jobmanager.archive.fs.dir: hdfs://hadoop01:8020/completed-jobs/
# 指定History Server扫描哪些归档目录,多个目录使用逗号分隔
historyserver.archive.fs.dir: hdfs://hadoop01:8020/completed-jobs/
# 指定History Server间隔多少毫秒扫描一次归档目录
historyserver.archive.fs.refresh-interval: 10000
# History Server所绑定的ip,0.0.0.0代表允许所有ip访问
historyserver.web.address: 0.0.0.0
# 指定History Server所监听的端口号
historyserver.web.port: 8082

配置完成后,可以使用如下命令启动History Server:

[root@hadoop01 /usr/local/flink]# ./bin/historyserver.sh start
Starting historyserver daemon on host hadoop01.
[root@hadoop01 /usr/local/flink]# 

检查一下是否启动成功:

[root@hadoop01 /usr/local/flink]# netstat -lntp |grep 8082
tcp6       0      0 :::8082           :::*         LISTEN      3200/java           
[root@hadoop01 /usr/local/flink]# jps |grep HistoryServer
3200 HistoryServer
[root@hadoop01 /usr/local/flink]# 

提交一个作业跑一下,看看完成后是否会生成归档信息:

[root@hadoop01 /usr/local/flink]# ./bin/flink run -m yarn-cluster ./examples/batch/WordCount.jar

作业跑完后,可以在HDFS中看到生成的归档目录:

[root@hadoop01 /usr/local/flink]# hadoop fs -ls /
Found 1 items
drwxr-xr-x   - root supergroup          0 2020-09-30 11:00 /completed-jobs
[root@hadoop01 /usr/local/flink]# 

然后使用浏览器访问8082端口可以在web界面上查看已运行完的作业信息:

点进去可以看到详细的统计信息:

这些信息都是以JSON的格式存放在归档目录下的文件中,文件以作业的id命名:

[root@hadoop01 /usr/local/flink]# hadoop fs -ls /completed-jobs
Found 1 items
-rw-r--r--   1 root supergroup      31606 2020-09-30 11:00 /completed-jobs/3f9f7ec2a7a765660bdc09922d0b7d0f
[root@hadoop01 /usr/local/flink]# 

History Server REST API使用

根据官方文档的描述,History Server提供了如下REST API,所有API的响应数据都是JSON格式:

  • /config
  • /jobs/overview
  • /jobs/<jobid>
  • /jobs/<jobid>/vertices
  • /jobs/<jobid>/config
  • /jobs/<jobid>/exceptions
  • /jobs/<jobid>/accumulators
  • /jobs/<jobid>/vertices/<vertexid>
  • /jobs/<jobid>/vertices/<vertexid>/subtasktimes
  • /jobs/<jobid>/vertices/<vertexid>/taskmanagers
  • /jobs/<jobid>/vertices/<vertexid>/accumulators
  • /jobs/<jobid>/vertices/<vertexid>/subtasks/accumulators
  • /jobs/<jobid>/vertices/<vertexid>/subtasks/<subtasknum>
  • /jobs/<jobid>/vertices/<vertexid>/subtasks/<subtasknum>/attempts/<attempt>
  • /jobs/<jobid>/vertices/<vertexid>/subtasks/<subtasknum>/attempts/<attempt>/accumulators
  • /jobs/<jobid>/plan

/config接口可以获取基础配置信息,请求示例:

[root@hadoop01 ~]# curl http://localhost:8082/config
{"refresh-interval":10000,"timezone-name":"中国时间","timezone-offset":28800000,"flink-version":"1.11.2","flink-revision":"DeadD0d0 @ 1970-01-01T01:00:00+01:00","features":{"web-submit":false}}
[root@hadoop01 ~]# 

/config接口可以获取已完成的job信息列表,请求示例:

[root@hadoop01 ~]# curl http://localhost:8082/jobs/overview
{"jobs":[{"jid":"3f9f7ec2a7a765660bdc09922d0b7d0f","name":"Flink Java Job at Wed Sep 30 11:00:11 CST 2020","state":"FINISHED","start-time":1601434820548,"end-time":1601434826749,"duration":6201,"last-modification":1601434826749,"tasks":{"total":3,"created":0,"scheduled":0,"deploying":0,"running":0,"finished":3,"canceling":0,"canceled":0,"failed":0,"reconciling":0}}]}
[root@hadoop01 ~]#

/jobs/<jobid>接口可以获取指定Job的详细信息,我们可以基于上一个接口返回的Job ID获取指定Job的详细信息,由于内容太多就不贴出来了:

[root@hadoop01 ~]# curl http://localhost:8082/jobs/3f9f7ec2a7a765660bdc09922d0b7d0f

/jobs/<jobid>/config接口可以获取指定Job的配置信息:

[root@hadoop01 ~]# curl http://localhost:8082/jobs/3f9f7ec2a7a765660bdc09922d0b7d0f/config
{"jid":"3f9f7ec2a7a765660bdc09922d0b7d0f","name":"Flink Java Job at Wed Sep 30 11:00:11 CST 2020","execution-config":{"execution-mode":"PIPELINED","restart-strategy":"Cluster level default restart strategy","job-parallelism":1,"object-reuse-mode":false,"user-config":{}}}
[root@hadoop01 ~]# 

/jobs/<jobid>/exceptions接口可以获取指定Job的异常信息:

[root@hadoop01 ~]# curl http://localhost:8082/jobs/3f9f7ec2a7a765660bdc09922d0b7d0f/exceptions
{"root-exception":null,"timestamp":null,"all-exceptions":[],"truncated":false}
[root@hadoop01 ~]# 

/jobs/<jobid>/accumulators可以获取指定Job的计数器信息:

[root@hadoop01 ~]# curl http://localhost:8082/jobs/3f9f7ec2a7a765660bdc09922d0b7d0f/accumulators

其余API也是类似的,这里就不逐一演示了。


Monitoring REST API

除了History Server REST API,Flink还提供了Monitoring REST API,该API也是RESTFul风格,接受HTTP请求,响应JSON数据。监控API可以用来查询正在运行的作业以及最近完成的作业的状态和统计信息。Flink自己的dashboard就是使用的这个监控API,并且该监控API也可以被自定义的监控工具使用,例如我们可以自己基于这些API开发属于自己的监控工具。官方文档:

监控API由web服务器支持作为 Dispatcher 的一部分运行。默认情况下,此服务监听在8081端口,可以在flink-conf.yaml通过rest.port进行配置。需要注意的是,目前监控API的web服务和仪表板的web服务是相同的,因此在同一端口上一起运行。不过,它们响应不同的HTTP Url。

在有多个 Dispatcher 的情况下(为了高可用性),每个 Dispatcher 将运行其自己的监控API实例,该实例提供有关已完成和正在运行的作业的信息,而该 Dispatcher 会被选为集群的leader。

官方文档中有详细列出所有的监控API,如果需要开发自己的监控平台,就可以深入了解下:


Flink对外提供了一个度量(Metrics)系统,它允许收集和向外部系统提供度量信息。官方文档:

可以在任何继承了RichFunction的用户函数内部调用 getRuntimeContext().getMetricGroup() 方法来访问度量系统。此方法返回一个MetricGroup对象,你可以在该对象上创建和注册新的度量。如下示例:

class MyMapper extends RichMapFunction[String,String] {
  @transient private var counter: Counter = _

  override def open(parameters: Configuration): Unit = {
    counter = getRuntimeContext()
      .getMetricGroup()
      // 注册一个计数器度量
      .counter("myCounter")
  }

  override def map(value: String): String = {
    counter.inc()
    value
  }
}

默认情况下,Flink收集了几个可以深入了解当前状态的指标。官方文档对所有指标都有相应的描述:

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Mycat 快速入门

    至于什么是Mycat,可能在不同的角色下有不同的理解。对MySQL架构有过了解的话,都知道MySQL实际上是由Server层和存储引擎层组成的。所以对于DBA来...

    端碗吹水
  • Kafka集群监控、安全机制与最佳实践

    所以本小节先介绍该监控工具的安装及配置,到如下地址可以下载各个版本的Kafka Manager:

    端碗吹水
  • Collections工具类与map集合

    Collections是针对List系列集合操作的一个工具类。使用TreeSet集合添加自己写的类,必须要实现Comparable接口才能够进行添加,不然就会报...

    端碗吹水
  • 一位精益敏捷布道者的传奇故事

    有这样一位台湾 DevOps 老专家,拥有 30 多年软件研发经验,从早期大型银行软件系统到各项新技术研究,到后来接受各大公司的邀请,坐镇于百位年轻工程师中间,...

    DevOps时代
  • 【程序源代码】基于企业微信的开源SCRM系统

    正文:LinkWeChat,是一款基于企业微信的开源SCRM系统,为企业构建私域流量系统的综合解决方案,显著提升企业社交运营效率!

    程序源代码
  • Docker更名Moby,也是无奈之举

    在上周二于德克萨斯州奥斯汀市召开的DockerCon 2017大会上,这家容器厂商宣布推出“Moby项目”,将其旗下的Docker产品(包括DockerCE与D...

    Debian中国
  • tensorflow学习笔记(十八):Multiple GPUs

    ke1th
  • QQ个人业务社区网站源码带后台

    Youngxj
  • Harbor:开源企业级容器Registry架构简介

    VMware公司最近开源了企业级Registry项目Harbor,由VMware中国研发的团队负责开发。Harbor项目是帮助用户迅速搭建一个企业级的regis...

    Henry Zhang
  • 类别变量-卡方分箱

    建模中遇到类别变量时,经常将其转为哑变量进行处理,但若类别变量的属性过多,会生成过多的哑变量,从而导致维度增加,并且很多情况下,只有部分哑变量进入模型,可能损...

    小石头记

扫码关注云+社区

领取腾讯云代金券