聊聊flink LocalEnvironment的execute方法

本文主要研究一下flink LocalEnvironment的execute方法

实例

        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSet<RecordDto> csvInput = env.readCsvFile(csvFilePath)
                .pojoType(RecordDto.class, "playerName", "country", "year", "game", "gold", "silver", "bronze", "total");

        DataSet<Tuple2<String, Integer>> groupedByCountry = csvInput
                .flatMap(new FlatMapFunction<RecordDto, Tuple2<String, Integer>>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public void flatMap(RecordDto record, Collector<Tuple2<String, Integer>> out) throws Exception {

                        out.collect(new Tuple2<String, Integer>(record.getCountry(), 1));
                    }
                }).groupBy(0).sum(1);
        System.out.println("===groupedByCountry===");
        groupedByCountry.print();
  • 这里使用DataSet从csv读取数据,然后进行flatMap、groupBy、sum操作,最后调用print输出

DataSet.print

flink-java-1.6.2-sources.jar!/org/apache/flink/api/java/DataSet.java

    /**
     * Prints the elements in a DataSet to the standard output stream {@link System#out} of the JVM that calls
     * the print() method. For programs that are executed in a cluster, this method needs
     * to gather the contents of the DataSet back to the client, to print it there.
     *
     * <p>The string written for each element is defined by the {@link Object#toString()} method.
     *
     * <p>This method immediately triggers the program execution, similar to the
     * {@link #collect()} and {@link #count()} methods.
     *
     * @see #printToErr()
     * @see #printOnTaskManager(String)
     */
    public void print() throws Exception {
        List<T> elements = collect();
        for (T e: elements) {
            System.out.println(e);
        }
    }
  • print方法这里主要是调用collect方法,获取结果,然后挨个打印

DataSet.collect

flink-java-1.6.2-sources.jar!/org/apache/flink/api/java/DataSet.java

    /**
     * Convenience method to get the elements of a DataSet as a List.
     * As DataSet can contain a lot of data, this method should be used with caution.
     *
     * @return A List containing the elements of the DataSet
     */
    public List<T> collect() throws Exception {
        final String id = new AbstractID().toString();
        final TypeSerializer<T> serializer = getType().createSerializer(getExecutionEnvironment().getConfig());

        this.output(new Utils.CollectHelper<>(id, serializer)).name("collect()");
        JobExecutionResult res = getExecutionEnvironment().execute();

        ArrayList<byte[]> accResult = res.getAccumulatorResult(id);
        if (accResult != null) {
            try {
                return SerializedListAccumulator.deserializeList(accResult, serializer);
            } catch (ClassNotFoundException e) {
                throw new RuntimeException("Cannot find type class of collected data type.", e);
            } catch (IOException e) {
                throw new RuntimeException("Serialization error while deserializing collected data", e);
            }
        } else {
            throw new RuntimeException("The call to collect() could not retrieve the DataSet.");
        }
    }
  • 这里调用了getExecutionEnvironment().execute()来获取JobExecutionResult;executionEnvironment这里是LocalEnvironment

ExecutionEnvironment.execute

flink-java-1.6.2-sources.jar!/org/apache/flink/api/java/ExecutionEnvironment.java

    /**
     * Triggers the program execution. The environment will execute all parts of the program that have
     * resulted in a "sink" operation. Sink operations are for example printing results ({@link DataSet#print()},
     * writing results (e.g. {@link DataSet#writeAsText(String)},
     * {@link DataSet#write(org.apache.flink.api.common.io.FileOutputFormat, String)}, or other generic
     * data sinks created with {@link DataSet#output(org.apache.flink.api.common.io.OutputFormat)}.
     *
     * <p>The program execution will be logged and displayed with a generated default name.
     *
     * @return The result of the job execution, containing elapsed time and accumulators.
     * @throws Exception Thrown, if the program executions fails.
     */
    public JobExecutionResult execute() throws Exception {
        return execute(getDefaultName());
    }

    /**
     * Gets a default job name, based on the timestamp when this method is invoked.
     *
     * @return A default job name.
     */
    private static String getDefaultName() {
        return "Flink Java Job at " + Calendar.getInstance().getTime();
    }

    /**
     * Triggers the program execution. The environment will execute all parts of the program that have
     * resulted in a "sink" operation. Sink operations are for example printing results ({@link DataSet#print()},
     * writing results (e.g. {@link DataSet#writeAsText(String)},
     * {@link DataSet#write(org.apache.flink.api.common.io.FileOutputFormat, String)}, or other generic
     * data sinks created with {@link DataSet#output(org.apache.flink.api.common.io.OutputFormat)}.
     *
     * <p>The program execution will be logged and displayed with the given job name.
     *
     * @return The result of the job execution, containing elapsed time and accumulators.
     * @throws Exception Thrown, if the program executions fails.
     */
    public abstract JobExecutionResult execute(String jobName) throws Exception;
  • 具体的execute抽象方法由子类去实现,这里我们主要看一下LocalEnvironment的execute方法

LocalEnvironment.execute

flink-java-1.6.2-sources.jar!/org/apache/flink/api/java/LocalEnvironment.java

    @Override
    public JobExecutionResult execute(String jobName) throws Exception {
        if (executor == null) {
            startNewSession();
        }

        Plan p = createProgramPlan(jobName);

        // Session management is disabled, revert this commit to enable
        //p.setJobId(jobID);
        //p.setSessionTimeout(sessionTimeout);

        JobExecutionResult result = executor.executePlan(p);

        this.lastJobExecutionResult = result;
        return result;
    }

    @Override
    @PublicEvolving
    public void startNewSession() throws Exception {
        if (executor != null) {
            // we need to end the previous session
            executor.stop();
            // create also a new JobID
            jobID = JobID.generate();
        }

        // create a new local executor
        executor = PlanExecutor.createLocalExecutor(configuration);
        executor.setPrintStatusDuringExecution(getConfig().isSysoutLoggingEnabled());

        // if we have a session, start the mini cluster eagerly to have it available across sessions
        if (getSessionTimeout() > 0) {
            executor.start();

            // also install the reaper that will shut it down eventually
            executorReaper = new ExecutorReaper(executor);
        }
    }
  • 这里判断executor为null的话,会调用startNewSession,startNewSession通过PlanExecutor.createLocalExecutor(configuration)来创建executor;如果sessionTimeout大于0,则这里会立马调用executor.start(),默认该值为0
  • 之后通过createProgramPlan方法来创建plan
  • 最后通过executor.executePlan(p)来获取JobExecutionResult

PlanExecutor.createLocalExecutor

flink-core-1.6.2-sources.jar!/org/apache/flink/api/common/PlanExecutor.java

    private static final String LOCAL_EXECUTOR_CLASS = "org.apache.flink.client.LocalExecutor";

    /**
     * Creates an executor that runs the plan locally in a multi-threaded environment.
     * 
     * @return A local executor.
     */
    public static PlanExecutor createLocalExecutor(Configuration configuration) {
        Class<? extends PlanExecutor> leClass = loadExecutorClass(LOCAL_EXECUTOR_CLASS);
        
        try {
            return leClass.getConstructor(Configuration.class).newInstance(configuration);
        }
        catch (Throwable t) {
            throw new RuntimeException("An error occurred while loading the local executor ("
                    + LOCAL_EXECUTOR_CLASS + ").", t);
        }
    }

    private static Class<? extends PlanExecutor> loadExecutorClass(String className) {
        try {
            Class<?> leClass = Class.forName(className);
            return leClass.asSubclass(PlanExecutor.class);
        }
        catch (ClassNotFoundException cnfe) {
            throw new RuntimeException("Could not load the executor class (" + className
                    + "). Do you have the 'flink-clients' project in your dependencies?");
        }
        catch (Throwable t) {
            throw new RuntimeException("An error occurred while loading the executor (" + className + ").", t);
        }
    }
  • PlanExecutor.createLocalExecutor方法通过反射创建org.apache.flink.client.LocalExecutor

LocalExecutor.executePlan

flink-clients_2.11-1.6.2-sources.jar!/org/apache/flink/client/LocalExecutor.java

    /**
     * Executes the given program on a local runtime and waits for the job to finish.
     *
     * <p>If the executor has not been started before, this starts the executor and shuts it down
     * after the job finished. If the job runs in session mode, the executor is kept alive until
     * no more references to the executor exist.</p>
     *
     * @param plan The plan of the program to execute.
     * @return The net runtime of the program, in milliseconds.
     *
     * @throws Exception Thrown, if either the startup of the local execution context, or the execution
     *                   caused an exception.
     */
    @Override
    public JobExecutionResult executePlan(Plan plan) throws Exception {
        if (plan == null) {
            throw new IllegalArgumentException("The plan may not be null.");
        }

        synchronized (this.lock) {

            // check if we start a session dedicated for this execution
            final boolean shutDownAtEnd;

            if (jobExecutorService == null) {
                shutDownAtEnd = true;

                // configure the number of local slots equal to the parallelism of the local plan
                if (this.taskManagerNumSlots == DEFAULT_TASK_MANAGER_NUM_SLOTS) {
                    int maxParallelism = plan.getMaximumParallelism();
                    if (maxParallelism > 0) {
                        this.taskManagerNumSlots = maxParallelism;
                    }
                }

                // start the cluster for us
                start();
            }
            else {
                // we use the existing session
                shutDownAtEnd = false;
            }

            try {
                // TODO: Set job's default parallelism to max number of slots
                final int slotsPerTaskManager = jobExecutorServiceConfiguration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, taskManagerNumSlots);
                final int numTaskManagers = jobExecutorServiceConfiguration.getInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
                plan.setDefaultParallelism(slotsPerTaskManager * numTaskManagers);

                Optimizer pc = new Optimizer(new DataStatistics(), jobExecutorServiceConfiguration);
                OptimizedPlan op = pc.compile(plan);

                JobGraphGenerator jgg = new JobGraphGenerator(jobExecutorServiceConfiguration);
                JobGraph jobGraph = jgg.compileJobGraph(op, plan.getJobId());

                return jobExecutorService.executeJobBlocking(jobGraph);
            }
            finally {
                if (shutDownAtEnd) {
                    stop();
                }
            }
        }
    }
  • 这里当jobExecutorService为null的时候,会调用start方法启动cluster创建jobExecutorService
  • 之后创建JobGraphGenerator,然后通过JobGraphGenerator.compileJobGraph方法,将plan构建为JobGraph
  • 最后调用jobExecutorService.executeJobBlocking(jobGraph),执行这个jobGraph,然后返回JobExecutionResult

LocalExecutor.start

flink-clients_2.11-1.6.2-sources.jar!/org/apache/flink/client/LocalExecutor.java

    @Override
    public void start() throws Exception {
        synchronized (lock) {
            if (jobExecutorService == null) {
                // create the embedded runtime
                jobExecutorServiceConfiguration = createConfiguration();

                // start it up
                jobExecutorService = createJobExecutorService(jobExecutorServiceConfiguration);
            } else {
                throw new IllegalStateException("The local executor was already started.");
            }
        }
    }

    private Configuration createConfiguration() {
        Configuration newConfiguration = new Configuration();
        newConfiguration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, getTaskManagerNumSlots());
        newConfiguration.setBoolean(CoreOptions.FILESYTEM_DEFAULT_OVERRIDE, isDefaultOverwriteFiles());

        newConfiguration.addAll(baseConfiguration);

        return newConfiguration;
    }

    private JobExecutorService createJobExecutorService(Configuration configuration) throws Exception {
        final JobExecutorService newJobExecutorService;
        if (CoreOptions.NEW_MODE.equals(configuration.getString(CoreOptions.MODE))) {

            if (!configuration.contains(RestOptions.PORT)) {
                configuration.setInteger(RestOptions.PORT, 0);
            }

            final MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder()
                .setConfiguration(configuration)
                .setNumTaskManagers(
                    configuration.getInteger(
                        ConfigConstants.LOCAL_NUMBER_TASK_MANAGER,
                        ConfigConstants.DEFAULT_LOCAL_NUMBER_TASK_MANAGER))
                .setRpcServiceSharing(RpcServiceSharing.SHARED)
                .setNumSlotsPerTaskManager(
                    configuration.getInteger(
                        TaskManagerOptions.NUM_TASK_SLOTS, 1))
                .build();

            final MiniCluster miniCluster = new MiniCluster(miniClusterConfiguration);
            miniCluster.start();

            configuration.setInteger(RestOptions.PORT, miniCluster.getRestAddress().getPort());

            newJobExecutorService = miniCluster;
        } else {
            final LocalFlinkMiniCluster localFlinkMiniCluster = new LocalFlinkMiniCluster(configuration, true);
            localFlinkMiniCluster.start();

            newJobExecutorService = localFlinkMiniCluster;
        }

        return newJobExecutorService;
    }
  • start方法这里先通过createConfiguration创建配置文件,再通过createJobExecutorService创建JobExecutorService
  • createConfiguration主要设置了TaskManagerOptions.NUM_TASK_SLOTS以及CoreOptions.FILESYTEM_DEFAULT_OVERRIDE
  • createJobExecutorService方法这里主要是根据configuration.getString(CoreOptions.MODE)的配置来创建不同的newJobExecutorService
  • 默认是CoreOptions.NEW_MODE模式,它先创建MiniClusterConfiguration,然后创建MiniCluster(JobExecutorService),然后调用MiniCluster.start方法启动之后返回
  • 非CoreOptions.NEW_MODE模式,则创建的是LocalFlinkMiniCluster(JobExecutorService),然后调用LocalFlinkMiniCluster.start()启动之后返回

MiniCluster.executeJobBlocking

flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/minicluster/MiniCluster.java

    /**
     * This method runs a job in blocking mode. The method returns only after the job
     * completed successfully, or after it failed terminally.
     *
     * @param job  The Flink job to execute
     * @return The result of the job execution
     *
     * @throws JobExecutionException Thrown if anything went amiss during initial job launch,
     *         or if the job terminally failed.
     */
    @Override
    public JobExecutionResult executeJobBlocking(JobGraph job) throws JobExecutionException, InterruptedException {
        checkNotNull(job, "job is null");

        final CompletableFuture<JobSubmissionResult> submissionFuture = submitJob(job);

        final CompletableFuture<JobResult> jobResultFuture = submissionFuture.thenCompose(
            (JobSubmissionResult ignored) -> requestJobResult(job.getJobID()));

        final JobResult jobResult;

        try {
            jobResult = jobResultFuture.get();
        } catch (ExecutionException e) {
            throw new JobExecutionException(job.getJobID(), "Could not retrieve JobResult.", ExceptionUtils.stripExecutionException(e));
        }

        try {
            return jobResult.toJobExecutionResult(Thread.currentThread().getContextClassLoader());
        } catch (IOException | ClassNotFoundException e) {
            throw new JobExecutionException(job.getJobID(), e);
        }
    }
  • MiniCluster.executeJobBlocking方法,先调用submitJob(job)方法,提交这个JobGraph,它返回一个CompletableFuture(submissionFuture)
  • 该CompletableFuture(submissionFuture)通过thenCompose连接了requestJobResult方法来根据jobId请求jobResult(jobResultFuture)
  • 最后通过jobResultFuture.get()获取JobExecutionResult

小结

  • DataSet的print方法调用了collect方法,而collect方法则调用getExecutionEnvironment().execute()来获取JobExecutionResult,executionEnvironment这里是LocalEnvironment
  • ExecutionEnvironment.execute方法内部调用了抽象方法execute(String jobName),该抽象方法由子类实现,这里是LocalEnvironment.execute,它先通过startNewSession,使用PlanExecutor.createLocalExecutor创建LocalExecutor,之后通过createProgramPlan创建plan,最后调用LocalExecutor.executePlan来获取JobExecutionResult
  • LocalExecutor.executePlan方法它先判断jobExecutorService,如果为null,则调用start方法创建jobExecutorService(这里根据CoreOptions.MODE配置,如果是CoreOptions.NEW_MODE则创建的jobExecutorService是MiniCluster,否则创建的jobExecutorService是LocalFlinkMiniCluster),这里创建的jobExecutorService为MiniCluster;之后通过JobGraphGenerator将plan转换为jobGraph;最后调用jobExecutorService.executeJobBlocking(jobGraph),执行这个jobGraph,然后返回JobExecutionResult

doc

  • LocalEnvironment
  • LocalExecutor
  • MiniCluster

原文发布于微信公众号 - 码匠的流水账(geek_luandun)

原文发表时间:2018-11-21

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏码匠的流水账

聊聊storm的reportError

storm-2.0.0/storm-client/src/jvm/org/apache/storm/task/IErrorReporter.java

17530
来自专栏函数式编程语言及工具

PICE(3):CassandraStreaming - gRPC-CQL Service

  在上一篇博文里我们介绍了通过gRPC实现JDBC数据库的streaming,这篇我们介绍关于cassandra的streaming实现方式。如果我们需要从一...

20400
来自专栏开发与安全

90% of python in 90 minutes

注:本文整理自 http://www.slideshare.net/MattHarrison4/learn-90 -----------------------...

23600
来自专栏小樱的经验随笔

HDU 4256 The Famous Clock

The Famous Clock Time Limit: 2000/1000 MS (Java/Others)    Memory Limit: 32768/3...

28060
来自专栏算法修养

HOJ 2226&POJ2688 Cleaning Robot(BFS+TSP(状态压缩DP))

Cleaning Robot Time Limit: 1000MS Memory Limit: 65536K Total Submission...

31040
来自专栏个人分享

Socket与Http方式解析发送xml消息封装中间件jar包

  最近项目代码中太多重复的编写Document,不同的接口需要不同的模板,于是重写提取公共部分打成jar包,方便各个系统统一使用~

17530
来自专栏算法修养

UVALive 6933 Virus synthesis(回文树)

Viruses are usually bad for your health. How about ghting them with... other vir...

36570
来自专栏算法修养

FZU 2150 Fire Game(BFS)

Problem 2150 Fire Game Accept: 1302    Submit: 4569 Time Limit: 1000 mSec    M...

34340
来自专栏函数式编程语言及工具

SDP(12): MongoDB-Engine - Streaming

   在akka-alpakka工具包里也提供了对MongoDB的stream-connector,能针对MongoDB数据库进行streaming操作。这个M...

497100
来自专栏算法修养

HOJ 1438 The Tower of Babylon(线性DP)

The Tower of Babylon My Tags Cancel - Seperate tags with commas. Source...

299110

扫码关注云+社区

领取腾讯云代金券