前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊flink的Parallel Execution

聊聊flink的Parallel Execution

作者头像
code4it
发布2019-03-04 10:54:17
5370
发布2019-03-04 10:54:17
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下flink的Parallel Execution

实例

Operator Level

代码语言:javascript
复制
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> text = [...]
DataStream<Tuple2<String, Integer>> wordCounts = text
    .flatMap(new LineSplitter())
    .keyBy(0)
    .timeWindow(Time.seconds(5))
    .sum(1).setParallelism(5);

wordCounts.print();

env.execute("Word Count Example");
  • operators、data sources、data sinks都可以调用setParallelism()方法来设置parallelism

Execution Environment Level

代码语言:javascript
复制
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);

DataStream<String> text = [...]
DataStream<Tuple2<String, Integer>> wordCounts = [...]
wordCounts.print();

env.execute("Word Count Example");
  • 在ExecutionEnvironment里头可以通过setParallelism来给operators、data sources、data sinks设置默认的parallelism;如果operators、data sources、data sinks自己有设置parallelism则会覆盖ExecutionEnvironment设置的parallelism

Client Level

代码语言:javascript
复制
./bin/flink run -p 10 ../examples/*WordCount-java*.jar

或者

代码语言:javascript
复制
try {
    PackagedProgram program = new PackagedProgram(file, args);
    InetSocketAddress jobManagerAddress = RemoteExecutor.getInetFromHostport("localhost:6123");
    Configuration config = new Configuration();

    Client client = new Client(jobManagerAddress, config, program.getUserCodeClassLoader());

    // set the parallelism to 10 here
    client.run(program, 10, true);

} catch (ProgramInvocationException e) {
    e.printStackTrace();
}
  • 使用CLI client,可以在命令行调用是用-p来指定,或者Java/Scala调用时在Client.run的参数中指定parallelism

System Level

代码语言:javascript
复制
# The parallelism used for programs that did not specify and other parallelism.

parallelism.default: 1
  • 可以在flink-conf.yaml中通过parallelism.default配置项给所有execution environments指定系统级的默认parallelism

ExecutionEnvironment

flink-java-1.7.1-sources.jar!/org/apache/flink/api/java/ExecutionEnvironment.java

代码语言:javascript
复制
@Public
public abstract class ExecutionEnvironment {
    //......

    private final ExecutionConfig config = new ExecutionConfig();

    /**
     * Sets the parallelism for operations executed through this environment.
     * Setting a parallelism of x here will cause all operators (such as join, map, reduce) to run with
     * x parallel instances.
     *
     * <p>This method overrides the default parallelism for this environment.
     * The {@link LocalEnvironment} uses by default a value equal to the number of hardware
     * contexts (CPU cores / threads). When executing the program via the command line client
     * from a JAR file, the default parallelism is the one configured for that setup.
     *
     * @param parallelism The parallelism
     */
    public void setParallelism(int parallelism) {
        config.setParallelism(parallelism);
    }

    @Internal
    public Plan createProgramPlan(String jobName, boolean clearSinks) {
        if (this.sinks.isEmpty()) {
            if (wasExecuted) {
                throw new RuntimeException("No new data sinks have been defined since the " +
                        "last execution. The last execution refers to the latest call to " +
                        "'execute()', 'count()', 'collect()', or 'print()'.");
            } else {
                throw new RuntimeException("No data sinks have been created yet. " +
                        "A program needs at least one sink that consumes data. " +
                        "Examples are writing the data set or printing it.");
            }
        }

        if (jobName == null) {
            jobName = getDefaultName();
        }

        OperatorTranslation translator = new OperatorTranslation();
        Plan plan = translator.translateToPlan(this.sinks, jobName);

        if (getParallelism() > 0) {
            plan.setDefaultParallelism(getParallelism());
        }
        plan.setExecutionConfig(getConfig());

        // Check plan for GenericTypeInfo's and register the types at the serializers.
        if (!config.isAutoTypeRegistrationDisabled()) {
            plan.accept(new Visitor<org.apache.flink.api.common.operators.Operator<?>>() {

                private final Set<Class<?>> registeredTypes = new HashSet<>();
                private final Set<org.apache.flink.api.common.operators.Operator<?>> visitedOperators = new HashSet<>();

                @Override
                public boolean preVisit(org.apache.flink.api.common.operators.Operator<?> visitable) {
                    if (!visitedOperators.add(visitable)) {
                        return false;
                    }
                    OperatorInformation<?> opInfo = visitable.getOperatorInfo();
                    Serializers.recursivelyRegisterType(opInfo.getOutputType(), config, registeredTypes);
                    return true;
                }

                @Override
                public void postVisit(org.apache.flink.api.common.operators.Operator<?> visitable) {}
            });
        }

        try {
            registerCachedFilesWithPlan(plan);
        } catch (Exception e) {
            throw new RuntimeException("Error while registering cached files: " + e.getMessage(), e);
        }

        // clear all the sinks such that the next execution does not redo everything
        if (clearSinks) {
            this.sinks.clear();
            wasExecuted = true;
        }

        // All types are registered now. Print information.
        int registeredTypes = config.getRegisteredKryoTypes().size() +
                config.getRegisteredPojoTypes().size() +
                config.getRegisteredTypesWithKryoSerializerClasses().size() +
                config.getRegisteredTypesWithKryoSerializers().size();
        int defaultKryoSerializers = config.getDefaultKryoSerializers().size() +
                config.getDefaultKryoSerializerClasses().size();
        LOG.info("The job has {} registered types and {} default Kryo serializers", registeredTypes, defaultKryoSerializers);

        if (config.isForceKryoEnabled() && config.isForceAvroEnabled()) {
            LOG.warn("In the ExecutionConfig, both Avro and Kryo are enforced. Using Kryo serializer");
        }
        if (config.isForceKryoEnabled()) {
            LOG.info("Using KryoSerializer for serializing POJOs");
        }
        if (config.isForceAvroEnabled()) {
            LOG.info("Using AvroSerializer for serializing POJOs");
        }

        if (LOG.isDebugEnabled()) {
            LOG.debug("Registered Kryo types: {}", config.getRegisteredKryoTypes().toString());
            LOG.debug("Registered Kryo with Serializers types: {}", config.getRegisteredTypesWithKryoSerializers().entrySet().toString());
            LOG.debug("Registered Kryo with Serializer Classes types: {}", config.getRegisteredTypesWithKryoSerializerClasses().entrySet().toString());
            LOG.debug("Registered Kryo default Serializers: {}", config.getDefaultKryoSerializers().entrySet().toString());
            LOG.debug("Registered Kryo default Serializers Classes {}", config.getDefaultKryoSerializerClasses().entrySet().toString());
            LOG.debug("Registered POJO types: {}", config.getRegisteredPojoTypes().toString());

            // print information about static code analysis
            LOG.debug("Static code analysis mode: {}", config.getCodeAnalysisMode());
        }

        return plan;
    }

    //......
}
  • ExecutionEnvironment提供了setParallelism方法,给ExecutionConfig指定parallelism;最后createProgramPlan方法创建Plan后会读取ExecutionConfig的parallelism,给Plan设置defaultParallelism

LocalEnvironment

flink-java-1.7.1-sources.jar!/org/apache/flink/api/java/LocalEnvironment.java

代码语言:javascript
复制
@Public
public class LocalEnvironment extends ExecutionEnvironment {

    //......

    public JobExecutionResult execute(String jobName) throws Exception {
        if (executor == null) {
            startNewSession();
        }

        Plan p = createProgramPlan(jobName);

        // Session management is disabled, revert this commit to enable
        //p.setJobId(jobID);
        //p.setSessionTimeout(sessionTimeout);

        JobExecutionResult result = executor.executePlan(p);

        this.lastJobExecutionResult = result;
        return result;
    }

    //......
}
  • LocalEnvironment的execute调用的是LocalExecutor的executePlan

LocalExecutor

flink-clients_2.11-1.7.1-sources.jar!/org/apache/flink/client/LocalExecutor.java

代码语言:javascript
复制
public class LocalExecutor extends PlanExecutor {
    
    //......

    @Override
    public JobExecutionResult executePlan(Plan plan) throws Exception {
        if (plan == null) {
            throw new IllegalArgumentException("The plan may not be null.");
        }

        synchronized (this.lock) {

            // check if we start a session dedicated for this execution
            final boolean shutDownAtEnd;

            if (jobExecutorService == null) {
                shutDownAtEnd = true;

                // configure the number of local slots equal to the parallelism of the local plan
                if (this.taskManagerNumSlots == DEFAULT_TASK_MANAGER_NUM_SLOTS) {
                    int maxParallelism = plan.getMaximumParallelism();
                    if (maxParallelism > 0) {
                        this.taskManagerNumSlots = maxParallelism;
                    }
                }

                // start the cluster for us
                start();
            }
            else {
                // we use the existing session
                shutDownAtEnd = false;
            }

            try {
                // TODO: Set job's default parallelism to max number of slots
                final int slotsPerTaskManager = jobExecutorServiceConfiguration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, taskManagerNumSlots);
                final int numTaskManagers = jobExecutorServiceConfiguration.getInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
                plan.setDefaultParallelism(slotsPerTaskManager * numTaskManagers);

                Optimizer pc = new Optimizer(new DataStatistics(), jobExecutorServiceConfiguration);
                OptimizedPlan op = pc.compile(plan);

                JobGraphGenerator jgg = new JobGraphGenerator(jobExecutorServiceConfiguration);
                JobGraph jobGraph = jgg.compileJobGraph(op, plan.getJobId());

                return jobExecutorService.executeJobBlocking(jobGraph);
            }
            finally {
                if (shutDownAtEnd) {
                    stop();
                }
            }
        }
    }

    //......
}
  • LocalExecutor的executePlan方法还会根据slotsPerTaskManager及numTaskManagers对plan设置defaultParallelism

RemoteEnvironment

flink-java-1.7.1-sources.jar!/org/apache/flink/api/java/RemoteEnvironment.java

代码语言:javascript
复制
@Public
public class RemoteEnvironment extends ExecutionEnvironment {

    //......

    public JobExecutionResult execute(String jobName) throws Exception {
        PlanExecutor executor = getExecutor();

        Plan p = createProgramPlan(jobName);

        // Session management is disabled, revert this commit to enable
        //p.setJobId(jobID);
        //p.setSessionTimeout(sessionTimeout);

        JobExecutionResult result = executor.executePlan(p);

        this.lastJobExecutionResult = result;
        return result;
    }

    //......
}
  • RemoteEnvironment的execute调用的是RemoteExecutor的executePlan

RemoteExecutor

flink-clients_2.11-1.7.1-sources.jar!/org/apache/flink/client/RemoteExecutor.java

代码语言:javascript
复制
public class RemoteExecutor extends PlanExecutor {

    private final Object lock = new Object();

    private final List<URL> jarFiles;

    private final List<URL> globalClasspaths;

    private final Configuration clientConfiguration;

    private ClusterClient<?> client;

    //......

    @Override
    public JobExecutionResult executePlan(Plan plan) throws Exception {
        if (plan == null) {
            throw new IllegalArgumentException("The plan may not be null.");
        }

        JobWithJars p = new JobWithJars(plan, this.jarFiles, this.globalClasspaths);
        return executePlanWithJars(p);
    }

    public JobExecutionResult executePlanWithJars(JobWithJars program) throws Exception {
        if (program == null) {
            throw new IllegalArgumentException("The job may not be null.");
        }

        synchronized (this.lock) {
            // check if we start a session dedicated for this execution
            final boolean shutDownAtEnd;

            if (client == null) {
                shutDownAtEnd = true;
                // start the executor for us
                start();
            }
            else {
                // we use the existing session
                shutDownAtEnd = false;
            }

            try {
                return client.run(program, defaultParallelism).getJobExecutionResult();
            }
            finally {
                if (shutDownAtEnd) {
                    stop();
                }
            }
        }
    }

    //......
}
  • RemoteExecutor的executePlan调用了executePlanWithJars方法,而后者则调用了ClusterClient的run,并在参数中指定了defaultParallelism

ClusterClient

flink-clients_2.11-1.7.1-sources.jar!/org/apache/flink/client/program/ClusterClient.java

代码语言:javascript
复制
public abstract class ClusterClient<T> {
    //......

    public JobSubmissionResult run(JobWithJars program, int parallelism) throws ProgramInvocationException {
        return run(program, parallelism, SavepointRestoreSettings.none());
    }

    public JobSubmissionResult run(JobWithJars jobWithJars, int parallelism, SavepointRestoreSettings savepointSettings)
            throws CompilerException, ProgramInvocationException {
        ClassLoader classLoader = jobWithJars.getUserCodeClassLoader();
        if (classLoader == null) {
            throw new IllegalArgumentException("The given JobWithJars does not provide a usercode class loader.");
        }

        OptimizedPlan optPlan = getOptimizedPlan(compiler, jobWithJars, parallelism);
        return run(optPlan, jobWithJars.getJarFiles(), jobWithJars.getClasspaths(), classLoader, savepointSettings);
    }

    private static OptimizedPlan getOptimizedPlan(Optimizer compiler, JobWithJars prog, int parallelism)
            throws CompilerException, ProgramInvocationException {
        return getOptimizedPlan(compiler, prog.getPlan(), parallelism);
    }

    public static OptimizedPlan getOptimizedPlan(Optimizer compiler, Plan p, int parallelism) throws CompilerException {
        Logger log = LoggerFactory.getLogger(ClusterClient.class);

        if (parallelism > 0 && p.getDefaultParallelism() <= 0) {
            log.debug("Changing plan default parallelism from {} to {}", p.getDefaultParallelism(), parallelism);
            p.setDefaultParallelism(parallelism);
        }
        log.debug("Set parallelism {}, plan default parallelism {}", parallelism, p.getDefaultParallelism());

        return compiler.compile(p);
    }

    //......
}
  • ClusterClient的run方法中的parallelism在parallelism > 0以及p.getDefaultParallelism() <= 0的时候会作用到Plan中

DataStreamSource

flink-streaming-java_2.11-1.7.1-sources.jar!/org/apache/flink/streaming/api/datastream/DataStreamSource.java

代码语言:javascript
复制
@Public
public class DataStreamSource<T> extends SingleOutputStreamOperator<T> {

    boolean isParallel;

    public DataStreamSource(StreamExecutionEnvironment environment,
            TypeInformation<T> outTypeInfo, StreamSource<T, ?> operator,
            boolean isParallel, String sourceName) {
        super(environment, new SourceTransformation<>(sourceName, operator, outTypeInfo, environment.getParallelism()));

        this.isParallel = isParallel;
        if (!isParallel) {
            setParallelism(1);
        }
    }

    public DataStreamSource(SingleOutputStreamOperator<T> operator) {
        super(operator.environment, operator.getTransformation());
        this.isParallel = true;
    }

    @Override
    public DataStreamSource<T> setParallelism(int parallelism) {
        if (parallelism != 1 && !isParallel) {
            throw new IllegalArgumentException("Source: " + transformation.getId() + " is not a parallel source");
        } else {
            super.setParallelism(parallelism);
            return this;
        }
    }
}
  • DataStreamSource继承了SingleOutputStreamOperator,它提供了setParallelism方法,最终调用的是父类SingleOutputStreamOperator的setParallelism

SingleOutputStreamOperator

flink-streaming-java_2.11-1.7.1-sources.jar!/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java

代码语言:javascript
复制
@Public
public class SingleOutputStreamOperator<T> extends DataStream<T> {
    //......

    /**
     * Sets the parallelism for this operator.
     *
     * @param parallelism
     *            The parallelism for this operator.
     * @return The operator with set parallelism.
     */
    public SingleOutputStreamOperator<T> setParallelism(int parallelism) {
        Preconditions.checkArgument(canBeParallel() || parallelism == 1,
                "The parallelism of non parallel operator must be 1.");

        transformation.setParallelism(parallelism);

        return this;
    }

    //......
}
  • SingleOutputStreamOperator的setParallelism最后是作用到StreamTransformation

DataStreamSink

flink-streaming-java_2.11-1.7.1-sources.jar!/org/apache/flink/streaming/api/datastream/DataStreamSink.java

代码语言:javascript
复制
@Public
public class DataStreamSink<T> {

    private final SinkTransformation<T> transformation;

    //......

    /**
     * Sets the parallelism for this sink. The degree must be higher than zero.
     *
     * @param parallelism The parallelism for this sink.
     * @return The sink with set parallelism.
     */
    public DataStreamSink<T> setParallelism(int parallelism) {
        transformation.setParallelism(parallelism);
        return this;
    }

    //......
}
  • DataStreamSink提供了setParallelism方法,最后是作用于SinkTransformation

小结

  • flink可以设置好几个level的parallelism,其中包括Operator Level、Execution Environment Level、Client Level、System Level
  • 在flink-conf.yaml中通过parallelism.default配置项给所有execution environments指定系统级的默认parallelism;在ExecutionEnvironment里头可以通过setParallelism来给operators、data sources、data sinks设置默认的parallelism;如果operators、data sources、data sinks自己有设置parallelism则会覆盖ExecutionEnvironment设置的parallelism
  • ExecutionEnvironment提供的setParallelism方法用于给ExecutionConfig指定parallelism(如果使用CLI client,可以在命令行调用是用-p来指定,或者Java/Scala调用时在Client.run的参数中指定parallelism;LocalEnvironment及RemoteEnvironment设置的parallelism最后都是设置到Plan中);DataStreamSource继承了SingleOutputStreamOperator,它提供了setParallelism方法,最终调用的是父类SingleOutputStreamOperator的setParallelism;SingleOutputStreamOperator的setParallelism最后是作用到StreamTransformation;DataStreamSink提供了setParallelism方法,最后是作用于SinkTransformation

doc

  • Parallel Execution
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-02-12,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 实例
    • Operator Level
      • Execution Environment Level
        • Client Level
          • System Level
          • ExecutionEnvironment
            • LocalEnvironment
              • LocalExecutor
                • RemoteEnvironment
                  • RemoteExecutor
                    • ClusterClient
                    • DataStreamSource
                    • SingleOutputStreamOperator
                    • DataStreamSink
                    • 小结
                    • doc
                    领券
                    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档