前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊flink taskmanager的jvm-exit-on-oom配置

聊聊flink taskmanager的jvm-exit-on-oom配置

原创
作者头像
code4it
发布2019-02-23 11:31:31
1.1K0
发布2019-02-23 11:31:31
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下flink taskmanager的jvm-exit-on-oom配置

taskmanager.jvm-exit-on-oom

flink-1.7.2/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java

代码语言:javascript
复制
@PublicEvolving
public class TaskManagerOptions {
    //......
​
    /**
     * Whether to kill the TaskManager when the task thread throws an OutOfMemoryError.
     */
    public static final ConfigOption<Boolean> KILL_ON_OUT_OF_MEMORY =
            key("taskmanager.jvm-exit-on-oom")
            .defaultValue(false)
            .withDescription("Whether to kill the TaskManager when the task thread throws an OutOfMemoryError.");
​
    //......
}
  • taskmanager.jvm-exit-on-oom配置默认为false,用于指定当task线程抛出OutOfMemoryError的时候,是否需要kill掉TaskManager

TaskManagerConfiguration

flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java

代码语言:javascript
复制
public class TaskManagerConfiguration implements TaskManagerRuntimeInfo {
​
    private static final Logger LOG = LoggerFactory.getLogger(TaskManagerConfiguration.class);
​
    private final int numberSlots;
​
    private final String[] tmpDirectories;
​
    private final Time timeout;
​
    // null indicates an infinite duration
    @Nullable
    private final Time maxRegistrationDuration;
​
    private final Time initialRegistrationPause;
    private final Time maxRegistrationPause;
    private final Time refusedRegistrationPause;
​
    private final UnmodifiableConfiguration configuration;
​
    private final boolean exitJvmOnOutOfMemory;
​
    private final FlinkUserCodeClassLoaders.ResolveOrder classLoaderResolveOrder;
​
    private final String[] alwaysParentFirstLoaderPatterns;
​
    @Nullable
    private final String taskManagerLogPath;
​
    @Nullable
    private final String taskManagerStdoutPath;
​
    public TaskManagerConfiguration(
        int numberSlots,
        String[] tmpDirectories,
        Time timeout,
        @Nullable Time maxRegistrationDuration,
        Time initialRegistrationPause,
        Time maxRegistrationPause,
        Time refusedRegistrationPause,
        Configuration configuration,
        boolean exitJvmOnOutOfMemory,
        FlinkUserCodeClassLoaders.ResolveOrder classLoaderResolveOrder,
        String[] alwaysParentFirstLoaderPatterns,
        @Nullable String taskManagerLogPath,
        @Nullable String taskManagerStdoutPath) {
​
        this.numberSlots = numberSlots;
        this.tmpDirectories = Preconditions.checkNotNull(tmpDirectories);
        this.timeout = Preconditions.checkNotNull(timeout);
        this.maxRegistrationDuration = maxRegistrationDuration;
        this.initialRegistrationPause = Preconditions.checkNotNull(initialRegistrationPause);
        this.maxRegistrationPause = Preconditions.checkNotNull(maxRegistrationPause);
        this.refusedRegistrationPause = Preconditions.checkNotNull(refusedRegistrationPause);
        this.configuration = new UnmodifiableConfiguration(Preconditions.checkNotNull(configuration));
        this.exitJvmOnOutOfMemory = exitJvmOnOutOfMemory;
        this.classLoaderResolveOrder = classLoaderResolveOrder;
        this.alwaysParentFirstLoaderPatterns = alwaysParentFirstLoaderPatterns;
        this.taskManagerLogPath = taskManagerLogPath;
        this.taskManagerStdoutPath = taskManagerStdoutPath;
    }
​
    public int getNumberSlots() {
        return numberSlots;
    }
​
    public Time getTimeout() {
        return timeout;
    }
​
    @Nullable
    public Time getMaxRegistrationDuration() {
        return maxRegistrationDuration;
    }
​
    public Time getInitialRegistrationPause() {
        return initialRegistrationPause;
    }
​
    @Nullable
    public Time getMaxRegistrationPause() {
        return maxRegistrationPause;
    }
​
    public Time getRefusedRegistrationPause() {
        return refusedRegistrationPause;
    }
​
    @Override
    public Configuration getConfiguration() {
        return configuration;
    }
​
    @Override
    public String[] getTmpDirectories() {
        return tmpDirectories;
    }
​
    @Override
    public boolean shouldExitJvmOnOutOfMemoryError() {
        return exitJvmOnOutOfMemory;
    }
​
    public FlinkUserCodeClassLoaders.ResolveOrder getClassLoaderResolveOrder() {
        return classLoaderResolveOrder;
    }
​
    public String[] getAlwaysParentFirstLoaderPatterns() {
        return alwaysParentFirstLoaderPatterns;
    }
​
    @Nullable
    public String getTaskManagerLogPath() {
        return taskManagerLogPath;
    }
​
    @Nullable
    public String getTaskManagerStdoutPath() {
        return taskManagerStdoutPath;
    }
​
    // --------------------------------------------------------------------------------------------
    //  Static factory methods
    // --------------------------------------------------------------------------------------------
​
    public static TaskManagerConfiguration fromConfiguration(Configuration configuration) {
        int numberSlots = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);
​
        if (numberSlots == -1) {
            numberSlots = 1;
        }
​
        final String[] tmpDirPaths = ConfigurationUtils.parseTempDirectories(configuration);
​
        final Time timeout;
​
        try {
            timeout = Time.milliseconds(AkkaUtils.getTimeout(configuration).toMillis());
        } catch (Exception e) {
            throw new IllegalArgumentException(
                "Invalid format for '" + AkkaOptions.ASK_TIMEOUT.key() +
                    "'.Use formats like '50 s' or '1 min' to specify the timeout.");
        }
​
        LOG.info("Messages have a max timeout of " + timeout);
​
        final Time finiteRegistrationDuration;
​
        try {
            Duration maxRegistrationDuration = Duration.create(configuration.getString(TaskManagerOptions.REGISTRATION_TIMEOUT));
            if (maxRegistrationDuration.isFinite()) {
                finiteRegistrationDuration = Time.milliseconds(maxRegistrationDuration.toMillis());
            } else {
                finiteRegistrationDuration = null;
            }
        } catch (NumberFormatException e) {
            throw new IllegalArgumentException("Invalid format for parameter " +
                TaskManagerOptions.REGISTRATION_TIMEOUT.key(), e);
        }
​
        final Time initialRegistrationPause;
        try {
            Duration pause = Duration.create(configuration.getString(TaskManagerOptions.INITIAL_REGISTRATION_BACKOFF));
            if (pause.isFinite()) {
                initialRegistrationPause = Time.milliseconds(pause.toMillis());
            } else {
                throw new IllegalArgumentException("The initial registration pause must be finite: " + pause);
            }
        } catch (NumberFormatException e) {
            throw new IllegalArgumentException("Invalid format for parameter " +
                TaskManagerOptions.INITIAL_REGISTRATION_BACKOFF.key(), e);
        }
​
        final Time maxRegistrationPause;
        try {
            Duration pause = Duration.create(configuration.getString(
                TaskManagerOptions.REGISTRATION_MAX_BACKOFF));
            if (pause.isFinite()) {
                maxRegistrationPause = Time.milliseconds(pause.toMillis());
            } else {
                throw new IllegalArgumentException("The maximum registration pause must be finite: " + pause);
            }
        } catch (NumberFormatException e) {
            throw new IllegalArgumentException("Invalid format for parameter " +
                TaskManagerOptions.INITIAL_REGISTRATION_BACKOFF.key(), e);
        }
​
        final Time refusedRegistrationPause;
        try {
            Duration pause = Duration.create(configuration.getString(TaskManagerOptions.REFUSED_REGISTRATION_BACKOFF));
            if (pause.isFinite()) {
                refusedRegistrationPause = Time.milliseconds(pause.toMillis());
            } else {
                throw new IllegalArgumentException("The refused registration pause must be finite: " + pause);
            }
        } catch (NumberFormatException e) {
            throw new IllegalArgumentException("Invalid format for parameter " +
                TaskManagerOptions.INITIAL_REGISTRATION_BACKOFF.key(), e);
        }
​
        final boolean exitOnOom = configuration.getBoolean(TaskManagerOptions.KILL_ON_OUT_OF_MEMORY);
​
        final String classLoaderResolveOrder =
            configuration.getString(CoreOptions.CLASSLOADER_RESOLVE_ORDER);
​
        final String[] alwaysParentFirstLoaderPatterns = CoreOptions.getParentFirstLoaderPatterns(configuration);
​
        final String taskManagerLogPath = configuration.getString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, System.getProperty("log.file"));
        final String taskManagerStdoutPath;
​
        if (taskManagerLogPath != null) {
            final int extension = taskManagerLogPath.lastIndexOf('.');
​
            if (extension > 0) {
                taskManagerStdoutPath = taskManagerLogPath.substring(0, extension) + ".out";
            } else {
                taskManagerStdoutPath = null;
            }
        } else {
            taskManagerStdoutPath = null;
        }
​
        return new TaskManagerConfiguration(
            numberSlots,
            tmpDirPaths,
            timeout,
            finiteRegistrationDuration,
            initialRegistrationPause,
            maxRegistrationPause,
            refusedRegistrationPause,
            configuration,
            exitOnOom,
            FlinkUserCodeClassLoaders.ResolveOrder.fromString(classLoaderResolveOrder),
            alwaysParentFirstLoaderPatterns,
            taskManagerLogPath,
            taskManagerStdoutPath);
    }
}
  • TaskManagerConfiguration的静态方法fromConfiguration通过configuration.getBoolean(TaskManagerOptions.KILL_ON_OUT_OF_MEMORY)读取exitOnOom,然后传到构造器中的exitJvmOnOutOfMemory属性;同时提供了shouldExitJvmOnOutOfMemoryError方法来读取exitJvmOnOutOfMemory属性

Task

flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java

代码语言:javascript
复制
public class Task implements Runnable, TaskActions, CheckpointListener {
    //......
​
    @Override
    public void run() {
​
        // ----------------------------
        //  Initial State transition
        // ----------------------------
        //......
​
        // all resource acquisitions and registrations from here on
        // need to be undone in the end
        Map<String, Future<Path>> distributedCacheEntries = new HashMap<>();
        AbstractInvokable invokable = null;
​
        try {
            //......
​
            // ----------------------------------------------------------------
            //  call the user code initialization methods
            // ----------------------------------------------------------------
​
            TaskKvStateRegistry kvStateRegistry = network.createKvStateTaskRegistry(jobId, getJobVertexId());
​
            Environment env = new RuntimeEnvironment(
                jobId,
                vertexId,
                executionId,
                executionConfig,
                taskInfo,
                jobConfiguration,
                taskConfiguration,
                userCodeClassLoader,
                memoryManager,
                ioManager,
                broadcastVariableManager,
                taskStateManager,
                accumulatorRegistry,
                kvStateRegistry,
                inputSplitProvider,
                distributedCacheEntries,
                producedPartitions,
                inputGates,
                network.getTaskEventDispatcher(),
                checkpointResponder,
                taskManagerConfig,
                metrics,
                this);
​
            // now load and instantiate the task's invokable code
            invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env);
​
            // ----------------------------------------------------------------
            //  actual task core work
            // ----------------------------------------------------------------
​
            // we must make strictly sure that the invokable is accessible to the cancel() call
            // by the time we switched to running.
            this.invokable = invokable;
​
            // switch to the RUNNING state, if that fails, we have been canceled/failed in the meantime
            if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
                throw new CancelTaskException();
            }
​
            // notify everyone that we switched to running
            taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING));
​
            // make sure the user code classloader is accessible thread-locally
            executingThread.setContextClassLoader(userCodeClassLoader);
​
            // run the invokable
            invokable.invoke();
​
            // make sure, we enter the catch block if the task leaves the invoke() method due
            // to the fact that it has been canceled
            if (isCanceledOrFailed()) {
                throw new CancelTaskException();
            }
​
            // ----------------------------------------------------------------
            //  finalization of a successful execution
            // ----------------------------------------------------------------
​
            // finish the produced partitions. if this fails, we consider the execution failed.
            for (ResultPartition partition : producedPartitions) {
                if (partition != null) {
                    partition.finish();
                }
            }
​
            // try to mark the task as finished
            // if that fails, the task was canceled/failed in the meantime
            if (!transitionState(ExecutionState.RUNNING, ExecutionState.FINISHED)) {
                throw new CancelTaskException();
            }
        }
        catch (Throwable t) {
​
            // unwrap wrapped exceptions to make stack traces more compact
            if (t instanceof WrappingRuntimeException) {
                t = ((WrappingRuntimeException) t).unwrap();
            }
​
            // ----------------------------------------------------------------
            // the execution failed. either the invokable code properly failed, or
            // an exception was thrown as a side effect of cancelling
            // ----------------------------------------------------------------
​
            try {
                // check if the exception is unrecoverable
                if (ExceptionUtils.isJvmFatalError(t) ||
                        (t instanceof OutOfMemoryError && taskManagerConfig.shouldExitJvmOnOutOfMemoryError())) {
​
                    // terminate the JVM immediately
                    // don't attempt a clean shutdown, because we cannot expect the clean shutdown to complete
                    try {
                        LOG.error("Encountered fatal error {} - terminating the JVM", t.getClass().getName(), t);
                    } finally {
                        Runtime.getRuntime().halt(-1);
                    }
                }
​
                // transition into our final state. we should be either in DEPLOYING, RUNNING, CANCELING, or FAILED
                // loop for multiple retries during concurrent state changes via calls to cancel() or
                // to failExternally()
                while (true) {
                    ExecutionState current = this.executionState;
​
                    if (current == ExecutionState.RUNNING || current == ExecutionState.DEPLOYING) {
                        if (t instanceof CancelTaskException) {
                            if (transitionState(current, ExecutionState.CANCELED)) {
                                cancelInvokable(invokable);
                                break;
                            }
                        }
                        else {
                            if (transitionState(current, ExecutionState.FAILED, t)) {
                                // proper failure of the task. record the exception as the root cause
                                failureCause = t;
                                cancelInvokable(invokable);
​
                                break;
                            }
                        }
                    }
                    else if (current == ExecutionState.CANCELING) {
                        if (transitionState(current, ExecutionState.CANCELED)) {
                            break;
                        }
                    }
                    else if (current == ExecutionState.FAILED) {
                        // in state failed already, no transition necessary any more
                        break;
                    }
                    // unexpected state, go to failed
                    else if (transitionState(current, ExecutionState.FAILED, t)) {
                        LOG.error("Unexpected state in task {} ({}) during an exception: {}.", taskNameWithSubtask, executionId, current);
                        break;
                    }
                    // else fall through the loop and
                }
            }
            catch (Throwable tt) {
                String message = String.format("FATAL - exception in exception handler of task %s (%s).", taskNameWithSubtask, executionId);
                LOG.error(message, tt);
                notifyFatalError(message, tt);
            }
        }
        finally {
            //......
        }
    }
​
    //......
}
  • Task实现了Runnable接口,其run方法对invokable.invoke()进行了try catch,在catch的时候会判断,如果是ExceptionUtils.isJvmFatalError(t)或者(t instanceof OutOfMemoryError && taskManagerConfig.shouldExitJvmOnOutOfMemoryError()),则会调用Runtime.getRuntime().halt(-1)来停止JVM

ExceptionUtils.isJvmFatalError

flink-1.7.2/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java

代码语言:javascript
复制
@Internal
public final class ExceptionUtils {
    //......
​
    /**
     * Checks whether the given exception indicates a situation that may leave the
     * JVM in a corrupted state, meaning a state where continued normal operation can only be
     * guaranteed via clean process restart.
     *
     * <p>Currently considered fatal exceptions are Virtual Machine errors indicating
     * that the JVM is corrupted, like {@link InternalError}, {@link UnknownError},
     * and {@link java.util.zip.ZipError} (a special case of InternalError).
     * The {@link ThreadDeath} exception is also treated as a fatal error, because when
     * a thread is forcefully stopped, there is a high chance that parts of the system
     * are in an inconsistent state.
     *
     * @param t The exception to check.
     * @return True, if the exception is considered fatal to the JVM, false otherwise.
     */
    public static boolean isJvmFatalError(Throwable t) {
        return (t instanceof InternalError) || (t instanceof UnknownError) || (t instanceof ThreadDeath);
    }
​
    //......
}
  • isJvmFatalError方法判断Throwable是否是InternalError或者UnknownError或者ThreadDeath,如果是则返回true

Runtime.getRuntime().halt

java.base/java/lang/Runtime.java

代码语言:javascript
复制
public class Runtime {
    //......
​
    private static final Runtime currentRuntime = new Runtime();
​
    /**
     * Returns the runtime object associated with the current Java application.
     * Most of the methods of class {@code Runtime} are instance
     * methods and must be invoked with respect to the current runtime object.
     *
     * @return  the {@code Runtime} object associated with the current
     *          Java application.
     */
    public static Runtime getRuntime() {
        return currentRuntime;
    }
​
    /**
     * Forcibly terminates the currently running Java virtual machine.  This
     * method never returns normally.
     *
     * <p> This method should be used with extreme caution.  Unlike the
     * {@link #exit exit} method, this method does not cause shutdown
     * hooks to be started.  If the shutdown sequence has already been
     * initiated then this method does not wait for any running
     * shutdown hooks to finish their work.
     *
     * @param  status
     *         Termination status. By convention, a nonzero status code
     *         indicates abnormal termination. If the {@link Runtime#exit exit}
     *         (equivalently, {@link System#exit(int) System.exit}) method
     *         has already been invoked then this status code
     *         will override the status code passed to that method.
     *
     * @throws SecurityException
     *         If a security manager is present and its
     *         {@link SecurityManager#checkExit checkExit} method
     *         does not permit an exit with the specified status
     *
     * @see #exit
     * @see #addShutdownHook
     * @see #removeShutdownHook
     * @since 1.3
     */
    public void halt(int status) {
        SecurityManager sm = System.getSecurityManager();
        if (sm != null) {
            sm.checkExit(status);
        }
        Shutdown.beforeHalt();
        Shutdown.halt(status);
    }
​
    //......
}
  • halt方法在SecurityManager不为null是会调用SecurityManager.checkExit;然后调用Shutdown.beforeHalt()以及Shutdown.halt(status)来停止JVM

小结

  • taskmanager.jvm-exit-on-oom配置默认为false,用于指定当task线程抛出OutOfMemoryError的时候,是否需要kill掉TaskManager
  • TaskManagerConfiguration的静态方法fromConfiguration通过configuration.getBoolean(TaskManagerOptions.KILL_ON_OUT_OF_MEMORY)读取exitOnOom,然后传到构造器中的exitJvmOnOutOfMemory属性;同时提供了shouldExitJvmOnOutOfMemoryError方法来读取exitJvmOnOutOfMemory属性
  • Task实现了Runnable接口,其run方法对invokable.invoke()进行了try catch,在catch的时候会判断,如果是ExceptionUtils.isJvmFatalError(t)或者(t instanceof OutOfMemoryError && taskManagerConfig.shouldExitJvmOnOutOfMemoryError()),则会调用Runtime.getRuntime().halt(-1)来停止JVM;isJvmFatalError方法判断Throwable是否是InternalError或者UnknownError或者ThreadDeath,如果是则返回true;halt方法在SecurityManager不为null是会调用SecurityManager.checkExit;然后调用Shutdown.beforeHalt()以及Shutdown.halt(status)来停止JVM

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • taskmanager.jvm-exit-on-oom
  • TaskManagerConfiguration
  • Task
  • ExceptionUtils.isJvmFatalError
  • Runtime.getRuntime().halt
  • 小结
  • doc
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档