在Flink1.9.0源码研究过程中,调试源码是一个非常重要的手段,通过查看真实的运行数据和变量,来了解源码内部运行逻辑
二、动态增加JobMaster、TaskManager调试参数
env.java.opts.taskmanager: "-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5006"
env.java.opts.jobmanager: "-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5006"
-yD
env.java.opts.jobmanager=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=8888;
env.java.opts.taskmanager=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=9999;
- yqu root
- ys xxx
整个 Application Master(AM主进程或者JobMaster)的起点在YarnSessionClusterEntrypoint#main()方法。
Configuration configuration = YarnEntrypointUtils.loadConfiguration(workingDirectory, env, LOG);
#loadConfiguration()代码和调试数据:
class YarnEntrypointUtils{
public static Configuration loadConfiguration(String workingDirectory, Map<String, String> env, Logger log) {
Configuration configuration = GlobalConfiguration.loadConfiguration(workingDirectory);
final String remoteKeytabPrincipal = env.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
final String zooKeeperNamespace = env.get(YarnConfigKeys.ENV_ZOOKEEPER_NAMESPACE);
final Map<String, String> dynamicProperties = FlinkYarnSessionCli.getDynamicProperties(
env.get(YarnConfigKeys.ENV_DYNAMIC_PROPERTIES));
final String hostname = env.get(ApplicationConstants.Environment.NM_HOST.key());
这里的dynamicProperties变量,即我们在外部通过-yD配置的参数内容:
三、增加调试超时时长
如果我们不对Flink默认的JobMaster与TaskManager心跳超时做修改,当你在TaskManager上加一个断点并转入这里后,默认的时间是比较短的,超时整个应用会直接退出,也就没法继续看调试数据了,会收到以下报错信息
java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id container_1562557531896_0048_01_000002 timed out.
at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1109)
at org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl$HeartbeatMonitor.run(HeartbeatManagerImpl.java:339)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:154)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
为了增加调试的超时时长,通过上述报错信息能快速定位到
org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1109)
这个函数被调用的地方在HeartbeatManagerImpl#HeartbeatMonitor#run(),这个HeartbeatManagerImpl成员变量heartbeatTimeoutIntervalMs很容易猜到就是心跳的超时时间
HeartbeatManagerImpl构造函数只有一个地方使用,即HeartbeatManager#createHeartbeatManager(),我们看看HeartbeatManager自身是如何被创建的:
public class HeartbeatServices {
public static HeartbeatServices fromConfiguration(Configuration configuration) {
long heartbeatInterval = configuration.getLong(HeartbeatManagerOptions.HEARTBEAT_INTERVAL);
long heartbeatTimeout = configuration.getLong(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT);
return new HeartbeatServices(heartbeatInterval, heartbeatTimeout);
}
}
HEARTBEAT_INTERVAL就是超时检测间隔(默认为10秒),HEARTBEAT_TIMEOUT就是超时时长(默认为50秒)。
因此,我们可以使用如下参数来增加调试超时时间(@@可以换成’;'号):
-yD
env.java.opts.jobmanager=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=8888;
env.java.opts.taskmanager=-agentlib:
jdwp=transport=dt_socket,server=y,suspend=n,address=9999;
heartbeat.interval=20000@@heartbeat.timeout=180000;
来源:
blog.csdn.net/LS_ice/article/details/97259650
作者: ls_ice