前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >从头分析flink源码第二篇之streamGraph的生成

从头分析flink源码第二篇之streamGraph的生成

作者头像
山行AI
发布2021-07-01 15:33:51
9320
发布2021-07-01 15:33:51
举报
文章被收录于专栏:山行AI山行AI

上一篇我们分析了DataStream wordcount程序的几步操作中整个DataStream的转变,包括不同类型的Transformation的生成、整个Transformation链的串联、执行环境的初始化等。本篇我们来从源码角度分析下flink怎么由第一篇的transformations列表来生成StreamGraph的。

StreamExecutionEnvironment#getStreamGraph(java.lang.String)方法

代码语言:javascript
复制
@Internal
public StreamGraph getStreamGraph(String jobName) {
    return getStreamGraph(jobName, true);
}

@Internal
public StreamGraph getStreamGraph(String jobName, boolean clearTransformations) {
    StreamGraph streamGraph = getStreamGraphGenerator().setJobName(jobName).generate();
    if (clearTransformations) {
        this.transformations.clear();
    }
    return streamGraph;
}

整个StreamGraph的生成是在DataStream的StreamExecutionEnvironment中进行的。可以看到这里是先获取StreamGraphGenerator,然后通过它的generate方法生成StreamGraph。

我们先来分析下StreamExecutionEnvironment#getStreamGraphGenerator方法:

代码语言:javascript
复制
private StreamGraphGenerator getStreamGraphGenerator() {
        if (transformations.size() <= 0) {
            throw new IllegalStateException("No operators defined in streaming topology. Cannot execute.");
        }
        final RuntimeExecutionMode executionMode =
                configuration.get(ExecutionOptions.RUNTIME_MODE);
        return new StreamGraphGenerator(transformations, config, checkpointCfg, getConfiguration())
            .setRuntimeExecutionMode(executionMode)
            .setStateBackend(defaultStateBackend)
            .setChaining(isChainingEnabled)
            .setUserArtifacts(cacheFile)
            .setTimeCharacteristic(timeCharacteristic)
            .setDefaultBufferTimeout(bufferTimeout);
    }

创建StreamGraphGenerator并设置如下一些属性:

•transformations列表:第一篇中DataStream运行时向StreamExecutionEnvironment添加的transformation,主要有OneInputTransformation、ReduceTransformation以及LegacySinkTransformation;•ExecutionConfig:并行度、重启策略等配置信息;•CheckpointConfig:checkpoint的超时、checkpoint间隔等配置信息;•Configuration:其他配置信息,如类加载器、后面的ExecutionVertex的输入输出信息等;•RuntimeExecutionMode:运行时执行模式,是Streaming模式或者Batch模式;•StateBackend:状态存储方式,这里设置默认值,没有传则为null;•isChainingEnabled:是否可以chain在一起,默认为true,如果设置为false则所有节点都不会chain在一起;•UserArtifacts:用户的一些类加载器、第三方jar等;•TimeCharacteristic:是使用enentTime还是processTime或者ingestTime;•defaultBufferTimeout:默认的buffer超时时间。

StreamGraphGenerator#generate

代码语言:javascript
复制
public StreamGraph generate() {
        // 创建StreamGraph,参数为执行配置、checkpoint配置、savepoint恢复设置
        streamGraph = new StreamGraph(executionConfig, checkpointConfig, savepointRestoreSettings);
        // 是批处理模式还是流处理模式
        shouldExecuteInBatchMode = shouldExecuteInBatchMode(runtimeExecutionMode);
        // 配置streamGraph
        configureStreamGraph(streamGraph);
        alreadyTransformed = new HashMap<>();
        for (Transformation<?> transformation: transformations) {
            transform(transformation);
        }
        final StreamGraph builtStreamGraph = streamGraph;
        alreadyTransformed.clear();
        alreadyTransformed = null;
        streamGraph = null;
        return builtStreamGraph;
    }

在之方法中构建StreamGraph的核心方法是StreamGraphGenerator#transform方法,在上面的方法中会遍历transformations列表来生成整个StreamGraph。

直接来看StreamGraphGenerator#transform方法:

代码语言:javascript
复制
private Collection<Integer> transform(Transformation<?> transform) {
        // 在下面会有递归调用,这里为了避免重复处理
        if (alreadyTransformed.containsKey(transform)) {
            return alreadyTransformed.get(transform);
        }
        LOG.debug("Transforming " + transform);
        if (transform.getMaxParallelism() <= 0) {
            // 如果没有设置Transformation的并行度,则使用默认的ExecutionConfig中的并行度
            // if the max parallelism hasn't been set, then first use the job wide max parallelism
            // from the ExecutionConfig.
            int globalMaxParallelismFromConfig = executionConfig.getMaxParallelism();
            if (globalMaxParallelismFromConfig > 0) {
                transform.setMaxParallelism(globalMaxParallelismFromConfig);
            }
        }
        // call at least once to trigger exceptions about MissingTypeInfo
        transform.getOutputType();
        // 获取translator
        @SuppressWarnings("unchecked")
        final TransformationTranslator<?, Transformation<?>> translator =
            (TransformationTranslator<?, Transformation<?>>) translatorMap.get(transform.getClass());
        Collection<Integer> transformedIds;
        if (translator != null) {
            // 将transformation转换成Operator放入到streamGraph中去
            transformedIds = translate(translator, transform);
        } else {
            // feedBackTransform
            transformedIds = legacyTransform(transform);
        }
        // need this check because the iterate transformation adds itself before
        // transforming the feedback edges
        if (!alreadyTransformed.containsKey(transform)) {
            alreadyTransformed.put(transform, transformedIds);
        }
        return transformedIds;
    }

该方法中主要处理的内容有:

•设置并行度,如果transform的最大并行度小于0则使用默认的并行度;•translator静态工厂,会根据transformations列表中的transformation类型去工厂中去获取对应的translator,工厂的代码如下:

在我们这个wordcount的场景下,整个transformation链路情况如下图所示:

在上下文的transformations列表中维护的transformation情况如下图:

在StreamGraphGenerator#transform中会遍历上面这个列表然后去获取对应的translator,如果获取到了translator则调用StreamGraphGenerator#translate方法进行StreamGraph的构建,否则调用StreamGraphGenerator#legacyTransform来进行StreamGraph的构建。

StreamGraphGenerator#translate方法

代码语言:javascript
复制
    private Collection<Integer> translate(
        final TransformationTranslator<?, Transformation<?>> translator,
        final Transformation<?> transform) {
        checkNotNull(translator);
        checkNotNull(transform);
        // 获取所有输入的id
        final List<Collection<Integer>> allInputIds = getParentInputIds(transform.getInputs());
        // the recursive call might have already transformed this
        if (alreadyTransformed.containsKey(transform)) {
            return alreadyTransformed.get(transform);
        }
        // 决定使用哪个slotSharingGroup,默认是使用default
        final String slotSharingGroup = determineSlotSharingGroup(
            transform.getSlotSharingGroup(),
            // 进行扁平化操作将所有的元素合并
            allInputIds.stream()
                .flatMap(Collection::stream)
                .collect(Collectors.toList()));
        // 创建Context对象,里面包括streamGraphGenerator对象、streamGraph对象、slotSharingGroup和configuration信息
        final TransformationTranslator.Context context = new ContextImpl(
            this, streamGraph, slotSharingGroup, configuration);
        return shouldExecuteInBatchMode
            // 转换batch操作
            ? translator.translateForBatch(transform, context)
            // 转换streaming操作
            : translator.translateForStreaming(transform, context);
    }

针对这个方法主要从以下几个方面展开:

1.getParentInputIds方法,对当前transformation的输入进入transform操作并获取到其对应的transformationId列表;2.determineSlotSharingGroup方法,用于获取当前transformation的slotSharingGroup,默认为default;3.TransformationTranslator#translateForStreaming方法,将transformation转换成StreamGraph中的StreamNode。

1. StreamGraphGenerator#getParentInputIds方法

直接看代码:

代码语言:javascript
复制
    private List<Collection<Integer>> getParentInputIds(
        @Nullable final Collection<Transformation<?>> parentTransformations) {
        final List<Collection<Integer>> allInputIds = new ArrayList<>();
        if (parentTransformations == null) {
            return allInputIds;
        }
        for (Transformation<?> transformation : parentTransformations) {
            // 递归地处理parentTransformations列表
            allInputIds.add(transform(transformation));
        }
        return allInputIds;
    }

内部会递归地向上将transformation的input进行transform操作,首先执行的是LegacySourceTransformation的transform操作,是通过LegacySourceTransformationTranslator的translateForStreamingInternal方法完成的。然后一层层地直到完成当前最下游的transformation向StreamNode转换的操作。

2. StreamGraphGenerator#determineSlotSharingGroup方法

直接来看代码:

代码语言:javascript
复制
private String determineSlotSharingGroup(String specifiedGroup, Collection<Integer> inputIds) {
        if (specifiedGroup != null) {
            return specifiedGroup;
        } else {
            String inputGroup = null;
            for (int id : inputIds) {
                String inputGroupCandidate = streamGraph.getSlotSharingGroup(id);
                if (inputGroup == null) {
                    // 第一个输入对应的sharingGroup
                    inputGroup = inputGroupCandidate;
                    // 如果所有input的sharingGroup都相同,则使用这个sharingGroup;否则使用默认的sharingGroup
                } else if (!inputGroup.equals(inputGroupCandidate)) {
                    return DEFAULT_SLOT_SHARING_GROUP;
                }
            }
            return inputGroup == null ? DEFAULT_SLOT_SHARING_GROUP : inputGroup;
        }
    }

这里的逻辑很简单:如果没有输入,则使用默认的slotSharingGroup;如果所有input的slotSharingGroup不全部一样则使用默认的slotSharingGroup;如果所有输入的slotSharingGroup都一样,则使用这个slotSharingGroup。

3. TransformationTranslator#translateForStreaming方法

我们先来看一下TransformationTranslator的实现类:

我们根据上面的代码中执行的顺序来逐一分析一下。

LegacySourceTransformationTranslator

首先是LegacySourceTransformationTranslator,它的translateForStreaming方法为其父类SimpleTransformationTranslator中的实现,代码如下:

代码语言:javascript
复制
@Override
    public Collection<Integer> translateForStreaming(final T transformation, final Context context) {
        checkNotNull(transformation);
        checkNotNull(context);
        // 内部执行转换的操作
        final Collection<Integer> transformedIds =
                translateForStreamingInternal(transformation, context);
        // 设置StreamNode的uid、user provided node hash、资源、托管内存信息等
        configure(transformation, context);
        return transformedIds;
    }

这里的translateForStreamingInternal会调用LegacySourceTransformationTranslator中的实现,即LegacySourceTransformationTranslator#translateForStreamingInternal方法:

代码语言:javascript
复制
@Override
    protected Collection<Integer> translateForStreamingInternal(
            final LegacySourceTransformation<OUT> transformation,
            final Context context) {
        return translateInternal(transformation, context);
    }

这个方法内部调用的是LegacySourceTransformationTranslator#translateInternal方法,代码如下:

代码语言:javascript
复制
    private Collection<Integer> translateInternal(
            final LegacySourceTransformation<OUT> transformation,
            final Context context) {
        checkNotNull(transformation);
        checkNotNull(context);
        final StreamGraph streamGraph = context.getStreamGraph();
        final String slotSharingGroup = context.getSlotSharingGroup();
        final int transformationId = transformation.getId();
        final ExecutionConfig executionConfig = streamGraph.getExecutionConfig();
        // 添加operator,也是在StreamGraph中生成StreamNode的核心逻辑
        streamGraph.addLegacySource(
                transformationId,
                slotSharingGroup,
                transformation.getCoLocationGroupKey(),
                transformation.getOperatorFactory(),
                null,
                transformation.getOutputType(),
                "Source: " + transformation.getName());
        // 如果是InputFormatOperatorFactory类型的需要设置StreamGraph的inputFormat
        if (transformation.getOperatorFactory() instanceof InputFormatOperatorFactory) {
            streamGraph.setInputFormat(
                    transformationId,
                    ((InputFormatOperatorFactory<OUT>) transformation.getOperatorFactory())
                            .getInputFormat());
        }
        // 设置并行度
        final int parallelism = transformation.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT
                ? transformation.getParallelism()
                : executionConfig.getParallelism();
        streamGraph.setParallelism(transformationId, parallelism);
        // 设置最大并行度
        streamGraph.setMaxParallelism(transformationId, transformation.getMaxParallelism());
        return Collections.singleton(transformationId);
    }

这里的逻辑主要是用于创建Source节点的StreamNode,StreamNode的属性为:

•id:StreamNode的id,也是前面的transformation的id;•operatorName:算子名称;•operatorFactory:算子工厂实例,里面包裹着operator,在Source节点对应的operator为StreamSource;•jobVertexClass:顶点任务的class类型,source节点为SourceStreamTask,其他类型为OneInputStreamTask;•slotSharingGroup:slot共享组;•coLocationGroup:设置标识co-location组的键。具有相同co-location键的操作符将被调度器放置到相同的槽中。设置为null意味着没有co-location约束。

在source节点对应的StreamNode创建完成后,会将该节点放到StreamGraph的sources列表中。

接下来我们看下其他节点的生成过程,我们按照Transformation的transform顺序依次来看。

OneInputTransformationTranslator#translateForStreamingInternal方法

来看代码:

代码语言:javascript
复制
@Override
    public Collection<Integer> translateForStreamingInternal(
            final OneInputTransformation<IN, OUT> transformation,
            final Context context) {
        return translateInternal(transformation,
            transformation.getOperatorFactory(),
            transformation.getInputType(),
            // 获取必须用于在此操作中分区键值状态的{@code KeySelector},也就是Partitioner,如ForwardPartitioner等
            transformation.getStateKeySelector(),
            // 状态key的类型
            transformation.getStateKeyType(),
            context
        );
    }

这里调用的是AbstractOneInputTransformationTranslator#translateInternal方法,代码如下:

代码语言:javascript
复制
protected Collection<Integer> translateInternal(
            final Transformation<OUT> transformation,
            final StreamOperatorFactory<OUT> operatorFactory,
            final TypeInformation<IN> inputType,
            @Nullable final KeySelector<IN, ?> stateKeySelector,
            @Nullable final TypeInformation<?> stateKeyType,
            final Context context) {
        checkNotNull(transformation);
        checkNotNull(operatorFactory);
        checkNotNull(inputType);
        checkNotNull(context);
        // 获取当前StreamGraph引用
        final StreamGraph streamGraph = context.getStreamGraph();
        final String slotSharingGroup = context.getSlotSharingGroup();
        final int transformationId = transformation.getId();
        final ExecutionConfig executionConfig = streamGraph.getExecutionConfig();
        // 向StreamGraph中添加operator
        streamGraph.addOperator(
            transformationId,
            slotSharingGroup,
            transformation.getCoLocationGroupKey(),
            operatorFactory,
            inputType,
            transformation.getOutputType(),
            transformation.getName());
        // 设置StreamNode输入的key selector信息,注意这个在Source translator中是没有的
        if (stateKeySelector != null) {
            TypeSerializer<?> keySerializer = stateKeyType.createSerializer(executionConfig);
            streamGraph.setOneInputStateKey(transformationId, stateKeySelector, keySerializer);
        }
        // 并行度
        int parallelism = transformation.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT
            ? transformation.getParallelism()
            : executionConfig.getParallelism();
        streamGraph.setParallelism(transformationId, parallelism);
        streamGraph.setMaxParallelism(transformationId, transformation.getMaxParallelism());
        // 所有的input输入
        final List<Transformation<?>> parentTransformations = transformation.getInputs();
        checkState(
            parentTransformations.size() == 1,
            "Expected exactly one input transformation but found " + parentTransformations.size());
        // 由于是OneInput类型的,所以这里只处理输入列表中的第一个input;而且在上面也有判断
        for (Integer inputId: context.getStreamNodeIds(parentTransformations.get(0))) {
            // 添加边信息,用于串联输入节点和当前节点的边。对于输入节点来说是输出边,对于当前节点来说是输入边
            streamGraph.addEdge(inputId, transformationId, 0);
        }

        return Collections.singleton(transformationId);
    }

这里的主要操作如下:

•创建StreamNode节点放到StreamGraph的streamNodes列表中;•对当前StreamNode添加stateKeySelector,也就是key选择器,这点是上面的Source节点对应的StreamNode中没有的;•往StreamGraph的当前StreamNode中添加边的信息,用于串联输入节点和当前节点的边。对于输入节点来说是输出边,对于当前节点来说是输入边。

PartitionTransformationTranslator#translateForStreamingInternal方法

直接来看代码:

代码语言:javascript
复制
@Override
    protected Collection<Integer> translateForStreamingInternal(
            final PartitionTransformation<OUT> transformation,
            final Context context) {
        return translateInternal(transformation, context);
    }

这里调用的是PartitionTransformationTranslator#translateInternal方法,代码如下:

代码语言:javascript
复制
private Collection<Integer> translateInternal(
            final PartitionTransformation<OUT> transformation,
            final Context context) {
        checkNotNull(transformation);
        checkNotNull(context);
        // 当前的StreamGraph
        final StreamGraph streamGraph = context.getStreamGraph();
        // 当前transformation的所有的input
        final List<Transformation<?>> parentTransformations = transformation.getInputs();
        checkState(
                parentTransformations.size() == 1,
                "Expected exactly one input transformation but found " + parentTransformations.size());
        final Transformation<?> input = parentTransformations.get(0);

        List<Integer> resultIds = new ArrayList<>();
        // 这里根据取到已经转换过的transformation
        for (Integer inputId: context.getStreamNodeIds(input)) {
            final int virtualId = Transformation.getNewNodeId();
            // 添加到虚拟的partition节点列表中去
            streamGraph.addVirtualPartitionNode(
                    // 输入节点的id
                    inputId,
                    // 虚拟节点的id
                    virtualId,
                    // 分区器,也就是StreamPartitioner
                    transformation.getPartitioner(),
                    // shuffle模式
                    transformation.getShuffleMode());
            resultIds.add(virtualId);
        }
        return resultIds;
    }

这里主要做的操作有:

•创建虚拟分区节点,虚拟分区节点是一个三元组,属性为输入节点的id,分区器和shffle模式。并以当前虚拟节点的id为key,将这个虚拟分区节点放到StreamGraph的virtualPartitionNodes中去。•需要注意的是:PartitionTransformation也是单输入类型的。

ReduceTransformationTranslator#translateForStreamingInternal方法

直接来看代码:

代码语言:javascript
复制
@Override
    public Collection<Integer> translateForStreamingInternal(
            final ReduceTransformation<IN, KEY> transformation,
            final Context context) {
        // 创建StreamGroupedReduceOperator,它继承自AbstractUdfStreamOperator是OneInputStreamOperator类型的
        StreamGroupedReduceOperator<IN> groupedReduce = new StreamGroupedReduceOperator<>(
            transformation.getReducer(),
            transformation
                .getInputType()
                .createSerializer(context.getStreamGraph().getExecutionConfig())
        );
        // 创建对应该operator的operatorFactory
        SimpleOperatorFactory<IN> operatorFactory = SimpleOperatorFactory.of(groupedReduce);
        operatorFactory.setChainingStrategy(transformation.getChainingStrategy());
        return translateInternal(
            transformation,
            operatorFactory,
            transformation.getInputType(),
            transformation.getKeySelector(),
            transformation.getKeyTypeInfo(),
            context);
    }

这里的代码与上面的稍有不同,这里先根据transformation的信息创建一个operator并生成对应的operatorFactory,然后执行AbstractOneInputTransformationTranslator#translateInternal方法,该方法与OneInputTransformation最终执行的方法相同,可以参考理解。

LegacySinkTransformationTranslator#translateForStreamingInternal方法
代码语言:javascript
复制
private Collection<Integer> translateInternal(
            final LegacySinkTransformation<IN> transformation,
            final Context context) {
        checkNotNull(transformation);
        checkNotNull(context);

        final StreamGraph streamGraph = context.getStreamGraph();
        final String slotSharingGroup = context.getSlotSharingGroup();
        final int transformationId = transformation.getId();
        final ExecutionConfig executionConfig = streamGraph.getExecutionConfig();

        final List<Transformation<?>> parentTransformations = transformation.getInputs();
        checkState(
                parentTransformations.size() == 1,
                "Expected exactly one input transformation but found " + parentTransformations.size());
        final Transformation<?> input = parentTransformations.get(0);
        // 添加sink对应的StreamNode节点并放到StreamGraph的sinks列表中
        streamGraph.addSink(
                transformationId,
                slotSharingGroup,
                transformation.getCoLocationGroupKey(),
                transformation.getOperatorFactory(),
                input.getOutputType(),
                null,
                "Sink: " + transformation.getName());
        // 并行度设置
        final int parallelism = transformation.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT
                ? transformation.getParallelism()
                : executionConfig.getParallelism();
        streamGraph.setParallelism(transformationId, parallelism);
        streamGraph.setMaxParallelism(transformationId, transformation.getMaxParallelism());
        // 设置边信息
        for (Integer inputId: context.getStreamNodeIds(input)) {
            streamGraph.addEdge(inputId, transformationId, 0);
        }
        // 设置key selector信息
        if (transformation.getStateKeySelector() != null) {
            TypeSerializer<?> keySerializer = transformation.getStateKeyType().createSerializer(executionConfig);
            streamGraph.setOneInputStateKey(transformationId, transformation.getStateKeySelector(), keySerializer);
        }

        return Collections.emptyList();
    }

这里会添加sink对应的StreamNode节点并放到StreamGraph的sinks列表中,设置输入节点和当前节点的边信息等,无殊。

可以看出,整个由Transformation列表向StreamGraph转换的过程总体来说主要是创建和梳理一下StreamNode之间的关系,StreamNode之间由StreamEdge来连接,从而形成一个StreamGraph。

StreamGraphGenerator#legacyTransform方法

主要用于处理FeedbackTransformation类型的,目前主要用于IterativeStream中,后续我们再用专门的篇幅进行分析。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-06-12,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 开发架构二三事 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • StreamExecutionEnvironment#getStreamGraph(java.lang.String)方法
  • StreamGraphGenerator#generate
    • StreamGraphGenerator#translate方法
      • 1. StreamGraphGenerator#getParentInputIds方法
      • 2. StreamGraphGenerator#determineSlotSharingGroup方法
      • 3. TransformationTranslator#translateForStreaming方法
      • LegacySourceTransformationTranslator
      • OneInputTransformationTranslator#translateForStreamingInternal方法
      • PartitionTransformationTranslator#translateForStreamingInternal方法
      • ReduceTransformationTranslator#translateForStreamingInternal方法
      • LegacySinkTransformationTranslator#translateForStreamingInternal方法
    • StreamGraphGenerator#legacyTransform方法
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档