前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink1.9.0源码调试介绍&增加调试超时时间

Flink1.9.0源码调试介绍&增加调试超时时间

作者头像
大数据真好玩
发布2019-12-05 11:55:49
2.7K0
发布2019-12-05 11:55:49
举报
文章被收录于专栏:暴走大数据暴走大数据
一、Flink源码调试概述

在Flink1.9.0源码研究过程中,调试源码是一个非常重要的手段,通过查看真实的运行数据和变量,来了解源码内部运行逻辑

  • 如果是本地Jvm调试Flink,我们可以运行源码各种xxxITCase测试用例,加断点来调试,这个相对容易
  • 如果我们想调试线上集群,获取JobMaster、TaskManager运行数据、运行逻辑,来解决一些难以在本地Jvm复现的问题,则情况变得复杂了

二、动态增加JobMaster、TaskManager调试参数

1、配置调试参数

  • 如果yarn上只有一个全局的Flink应用(即Cluster模式),我们可以在$FLINK_HOME/conf/flink-conf.yaml中增加如下参数,来增加远程调试端口,这种方式很多文章都有介绍:
代码语言:javascript
复制
env.java.opts.taskmanager: "-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5006"
env.java.opts.jobmanager: "-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5006"
  • 如果yarn上有多个Flink应用(即Session模式),通过conf配单的方式就行不通了,因为多个应用尝试占用相同的调试端口,会直接报错,需要通过-yD参数来设置
代码语言:javascript
复制
-yD
env.java.opts.jobmanager=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=8888;
env.java.opts.taskmanager=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=9999;
- yqu root
- ys xxx

2、动态调试参数的解析位置

  • 如果是Yarn Session(per job)模式,

整个 Application Master(AM主进程或者JobMaster)的起点在YarnSessionClusterEntrypoint#main()方法。

  • YarnSessionClusterEntrypoint在执行main函数时,首先获取参数配置configuration:
代码语言:javascript
复制
Configuration configuration = YarnEntrypointUtils.loadConfiguration(workingDirectory, env, LOG);

#loadConfiguration()代码和调试数据:

代码语言:javascript
复制
class YarnEntrypointUtils{
  public static Configuration loadConfiguration(String workingDirectory, Map<String, String> env, Logger log) {
    Configuration configuration = GlobalConfiguration.loadConfiguration(workingDirectory);

    final String remoteKeytabPrincipal = env.get(YarnConfigKeys.KEYTAB_PRINCIPAL);

    final String zooKeeperNamespace = env.get(YarnConfigKeys.ENV_ZOOKEEPER_NAMESPACE);

    final Map<String, String> dynamicProperties = FlinkYarnSessionCli.getDynamicProperties(
      env.get(YarnConfigKeys.ENV_DYNAMIC_PROPERTIES));

    final String hostname = env.get(ApplicationConstants.Environment.NM_HOST.key());

这里的dynamicProperties变量,即我们在外部通过-yD配置的参数内容:

三、增加调试超时时长

如果我们不对Flink默认的JobMaster与TaskManager心跳超时做修改,当你在TaskManager上加一个断点并转入这里后,默认的时间是比较短的,超时整个应用会直接退出,也就没法继续看调试数据了,会收到以下报错信息

代码语言:javascript
复制
java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id container_1562557531896_0048_01_000002 timed out.
        at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1109)
        at org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl$HeartbeatMonitor.run(HeartbeatManagerImpl.java:339)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:154)
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
        at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

为了增加调试的超时时长,通过上述报错信息能快速定位到

代码语言:javascript
复制
org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1109)

这个函数被调用的地方在HeartbeatManagerImpl#HeartbeatMonitor#run(),这个HeartbeatManagerImpl成员变量heartbeatTimeoutIntervalMs很容易猜到就是心跳的超时时间

HeartbeatManagerImpl构造函数只有一个地方使用,即HeartbeatManager#createHeartbeatManager(),我们看看HeartbeatManager自身是如何被创建的:

代码语言:javascript
复制
public class HeartbeatServices {
  public static HeartbeatServices fromConfiguration(Configuration configuration) {
    long heartbeatInterval = configuration.getLong(HeartbeatManagerOptions.HEARTBEAT_INTERVAL);

    long heartbeatTimeout = configuration.getLong(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT);

    return new HeartbeatServices(heartbeatInterval, heartbeatTimeout);
  }
}

HEARTBEAT_INTERVAL就是超时检测间隔(默认为10秒),HEARTBEAT_TIMEOUT就是超时时长(默认为50秒)。

因此,我们可以使用如下参数来增加调试超时时间(@@可以换成’;'号):

代码语言:javascript
复制
-yD
env.java.opts.jobmanager=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=8888;
env.java.opts.taskmanager=-agentlib:
jdwp=transport=dt_socket,server=y,suspend=n,address=9999;
heartbeat.interval=20000@@heartbeat.timeout=180000;

  • 最后,private HeartbeatServices heartbeatServices定义在ClusterEntrypoint,它是YarnSessionClusterEntrypoint的父类,到这里,我们对Flink心跳机制和功能所处位置,也有一个简单的了解
  • HeartbeatManagerImpl#reportHeartbeat()会持续接收TaskManager通过Akka发送过来的心跳消息,截图如下,感兴趣的读者可以继续研究

来源:

blog.csdn.net/LS_ice/article/details/97259650

作者: ls_ice

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-12-02,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据真好玩 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1、配置调试参数
  • 2、动态调试参数的解析位置
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档