前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >flink指定jobid

flink指定jobid

作者头像
路过君
发布2022-06-01 10:01:50
7930
发布2022-06-01 10:01:50
举报
文章被收录于专栏:路过君BLOG from CSDN

版本

flink 1.14.4

方法

代码语言:javascript
复制
Configuration configuration = new Configuration();
configuration.setString(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, "xxx");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);

源码分析

org.apache.flink.streaming.api.environment.StreamExecutionEnvironment

代码语言:javascript
复制
    public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
        JobClient jobClient = this.executeAsync(streamGraph);
        ...
    }
    @Internal
    public JobClient executeAsync(StreamGraph streamGraph) throws Exception {
        PipelineExecutorFactory executorFactory = this.executorServiceLoader.getExecutorFactory(this.configuration);
        CompletableFuture jobClientFuture = executorFactory.getExecutor(this.configuration).execute(streamGraph, this.configuration, this.userClassloader);
        try {
            JobClient jobClient = (JobClient)jobClientFuture.get();
        ...
    }

org.apache.flink.client.program.PerJobMiniClusterFactory

代码语言:javascript
复制
    public CompletableFuture<JobClient> submitJob(JobGraph jobGraph, ClassLoader userCodeClassloader) throws Exception {
        MiniClusterConfiguration miniClusterConfig = this.getMiniClusterConfig(jobGraph.getMaximumParallelism());
        MiniCluster miniCluster = (MiniCluster)this.miniClusterFactory.apply(miniClusterConfig);
        miniCluster.start();
        return miniCluster.submitJob(jobGraph).thenApplyAsync(FunctionUtils.uncheckedFunction((submissionResult) -> {
            ClientUtils.waitUntilJobInitializationFinished(() -> {
                return (JobStatus)miniCluster.getJobStatus(submissionResult.getJobID()).get();
            }, () -> {
                return (JobResult)miniCluster.requestJobResult(submissionResult.getJobID()).get();
            }, userCodeClassloader);
            return submissionResult;
        })).thenApply((result) -> {
            return new MiniClusterJobClient(result.getJobID(), miniCluster, userCodeClassloader, JobFinalizationBehavior.SHUTDOWN_CLUSTER);
        }).whenComplete((ignored, throwable) -> {
            if (throwable != null) {
                shutDownCluster(miniCluster);
            }

        }).thenApply(Function.identity());
    }

org.apache.flink.client.deployment.executors.LocalExecutor

代码语言:javascript
复制
    public CompletableFuture<JobClient> execute(Pipeline pipeline, Configuration configuration, ClassLoader userCodeClassloader) throws Exception {
        Configuration effectiveConfig = new Configuration();
        effectiveConfig.addAll(this.configuration);
        effectiveConfig.addAll(configuration);
        JobGraph jobGraph = this.getJobGraph(pipeline, effectiveConfig);
        return PerJobMiniClusterFactory.createWithFactory(effectiveConfig, this.miniClusterFactory).submitJob(jobGraph, userCodeClassloader);
    }
    private JobGraph getJobGraph(Pipeline pipeline, Configuration configuration) throws MalformedURLException {
        if (pipeline instanceof Plan) {
            Plan plan = (Plan)pipeline;
            int slotsPerTaskManager = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, plan.getMaximumParallelism());
            int numTaskManagers = configuration.getInteger("local.number-taskmanager", 1);
            plan.setDefaultParallelism(slotsPerTaskManager * numTaskManagers);
        }

        return PipelineExecutorUtils.getJobGraph(pipeline, configuration);
    }
  • 创建JobGraph,检查配置中是否存在配置项**$internal.pipeline.job-id**,如果存在则作为jobId设置 org.apache.flink.client.deployment.executors.PipelineExecutorUtils
代码语言:javascript
复制
    public static JobGraph getJobGraph(@Nonnull Pipeline pipeline, @Nonnull Configuration configuration) throws MalformedURLException {
        Preconditions.checkNotNull(pipeline);
        Preconditions.checkNotNull(configuration);
        ExecutionConfigAccessor executionConfigAccessor = ExecutionConfigAccessor.fromConfiguration(configuration);
        JobGraph jobGraph = FlinkPipelineTranslationUtil.getJobGraph(pipeline, configuration, executionConfigAccessor.getParallelism());
        configuration.getOptional(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID).ifPresent((strJobID) -> {
            jobGraph.setJobID(JobID.fromHexString(strJobID));
        });
        jobGraph.addJars(executionConfigAccessor.getJars());
        jobGraph.setClasspaths(executionConfigAccessor.getClasspaths());
        jobGraph.setSavepointRestoreSettings(executionConfigAccessor.getSavepointRestoreSettings());
        return jobGraph;
    }
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022-05-31,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 版本
  • 方法
  • 源码分析
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档