上一篇我们分析了DataStream wordcount程序的几步操作中整个DataStream的转变,包括不同类型的Transformation的生成、整个Transformation链的串联、执行环境的初始化等。本篇我们来从源码角度分析下flink怎么由第一篇的transformations列表来生成StreamGraph的。
@Internal
public StreamGraph getStreamGraph(String jobName) {
return getStreamGraph(jobName, true);
}
@Internal
public StreamGraph getStreamGraph(String jobName, boolean clearTransformations) {
StreamGraph streamGraph = getStreamGraphGenerator().setJobName(jobName).generate();
if (clearTransformations) {
this.transformations.clear();
}
return streamGraph;
}
整个StreamGraph的生成是在DataStream的StreamExecutionEnvironment中进行的。可以看到这里是先获取StreamGraphGenerator,然后通过它的generate方法生成StreamGraph。
我们先来分析下StreamExecutionEnvironment#getStreamGraphGenerator方法:
private StreamGraphGenerator getStreamGraphGenerator() {
if (transformations.size() <= 0) {
throw new IllegalStateException("No operators defined in streaming topology. Cannot execute.");
}
final RuntimeExecutionMode executionMode =
configuration.get(ExecutionOptions.RUNTIME_MODE);
return new StreamGraphGenerator(transformations, config, checkpointCfg, getConfiguration())
.setRuntimeExecutionMode(executionMode)
.setStateBackend(defaultStateBackend)
.setChaining(isChainingEnabled)
.setUserArtifacts(cacheFile)
.setTimeCharacteristic(timeCharacteristic)
.setDefaultBufferTimeout(bufferTimeout);
}
创建StreamGraphGenerator并设置如下一些属性:
•transformations列表:第一篇中DataStream运行时向StreamExecutionEnvironment添加的transformation,主要有OneInputTransformation、ReduceTransformation以及LegacySinkTransformation;•ExecutionConfig:并行度、重启策略等配置信息;•CheckpointConfig:checkpoint的超时、checkpoint间隔等配置信息;•Configuration:其他配置信息,如类加载器、后面的ExecutionVertex的输入输出信息等;•RuntimeExecutionMode:运行时执行模式,是Streaming模式或者Batch模式;•StateBackend:状态存储方式,这里设置默认值,没有传则为null;•isChainingEnabled:是否可以chain在一起,默认为true,如果设置为false则所有节点都不会chain在一起;•UserArtifacts:用户的一些类加载器、第三方jar等;•TimeCharacteristic:是使用enentTime还是processTime或者ingestTime;•defaultBufferTimeout:默认的buffer超时时间。
public StreamGraph generate() {
// 创建StreamGraph,参数为执行配置、checkpoint配置、savepoint恢复设置
streamGraph = new StreamGraph(executionConfig, checkpointConfig, savepointRestoreSettings);
// 是批处理模式还是流处理模式
shouldExecuteInBatchMode = shouldExecuteInBatchMode(runtimeExecutionMode);
// 配置streamGraph
configureStreamGraph(streamGraph);
alreadyTransformed = new HashMap<>();
for (Transformation<?> transformation: transformations) {
transform(transformation);
}
final StreamGraph builtStreamGraph = streamGraph;
alreadyTransformed.clear();
alreadyTransformed = null;
streamGraph = null;
return builtStreamGraph;
}
在之方法中构建StreamGraph的核心方法是StreamGraphGenerator#transform方法,在上面的方法中会遍历transformations列表来生成整个StreamGraph。
直接来看StreamGraphGenerator#transform方法:
private Collection<Integer> transform(Transformation<?> transform) {
// 在下面会有递归调用,这里为了避免重复处理
if (alreadyTransformed.containsKey(transform)) {
return alreadyTransformed.get(transform);
}
LOG.debug("Transforming " + transform);
if (transform.getMaxParallelism() <= 0) {
// 如果没有设置Transformation的并行度,则使用默认的ExecutionConfig中的并行度
// if the max parallelism hasn't been set, then first use the job wide max parallelism
// from the ExecutionConfig.
int globalMaxParallelismFromConfig = executionConfig.getMaxParallelism();
if (globalMaxParallelismFromConfig > 0) {
transform.setMaxParallelism(globalMaxParallelismFromConfig);
}
}
// call at least once to trigger exceptions about MissingTypeInfo
transform.getOutputType();
// 获取translator
@SuppressWarnings("unchecked")
final TransformationTranslator<?, Transformation<?>> translator =
(TransformationTranslator<?, Transformation<?>>) translatorMap.get(transform.getClass());
Collection<Integer> transformedIds;
if (translator != null) {
// 将transformation转换成Operator放入到streamGraph中去
transformedIds = translate(translator, transform);
} else {
// feedBackTransform
transformedIds = legacyTransform(transform);
}
// need this check because the iterate transformation adds itself before
// transforming the feedback edges
if (!alreadyTransformed.containsKey(transform)) {
alreadyTransformed.put(transform, transformedIds);
}
return transformedIds;
}
该方法中主要处理的内容有:
•设置并行度,如果transform的最大并行度小于0则使用默认的并行度;•translator静态工厂,会根据transformations列表中的transformation类型去工厂中去获取对应的translator,工厂的代码如下:
在我们这个wordcount的场景下,整个transformation链路情况如下图所示:
在上下文的transformations列表中维护的transformation情况如下图:
在StreamGraphGenerator#transform中会遍历上面这个列表然后去获取对应的translator,如果获取到了translator则调用StreamGraphGenerator#translate方法进行StreamGraph的构建,否则调用StreamGraphGenerator#legacyTransform来进行StreamGraph的构建。
private Collection<Integer> translate(
final TransformationTranslator<?, Transformation<?>> translator,
final Transformation<?> transform) {
checkNotNull(translator);
checkNotNull(transform);
// 获取所有输入的id
final List<Collection<Integer>> allInputIds = getParentInputIds(transform.getInputs());
// the recursive call might have already transformed this
if (alreadyTransformed.containsKey(transform)) {
return alreadyTransformed.get(transform);
}
// 决定使用哪个slotSharingGroup,默认是使用default
final String slotSharingGroup = determineSlotSharingGroup(
transform.getSlotSharingGroup(),
// 进行扁平化操作将所有的元素合并
allInputIds.stream()
.flatMap(Collection::stream)
.collect(Collectors.toList()));
// 创建Context对象,里面包括streamGraphGenerator对象、streamGraph对象、slotSharingGroup和configuration信息
final TransformationTranslator.Context context = new ContextImpl(
this, streamGraph, slotSharingGroup, configuration);
return shouldExecuteInBatchMode
// 转换batch操作
? translator.translateForBatch(transform, context)
// 转换streaming操作
: translator.translateForStreaming(transform, context);
}
针对这个方法主要从以下几个方面展开:
1.getParentInputIds方法,对当前transformation的输入进入transform操作并获取到其对应的transformationId列表;2.determineSlotSharingGroup方法,用于获取当前transformation的slotSharingGroup,默认为default;3.TransformationTranslator#translateForStreaming方法,将transformation转换成StreamGraph中的StreamNode。
直接看代码:
private List<Collection<Integer>> getParentInputIds(
@Nullable final Collection<Transformation<?>> parentTransformations) {
final List<Collection<Integer>> allInputIds = new ArrayList<>();
if (parentTransformations == null) {
return allInputIds;
}
for (Transformation<?> transformation : parentTransformations) {
// 递归地处理parentTransformations列表
allInputIds.add(transform(transformation));
}
return allInputIds;
}
内部会递归地向上将transformation的input进行transform操作,首先执行的是LegacySourceTransformation的transform操作,是通过LegacySourceTransformationTranslator的translateForStreamingInternal方法完成的。然后一层层地直到完成当前最下游的transformation向StreamNode转换的操作。
直接来看代码:
private String determineSlotSharingGroup(String specifiedGroup, Collection<Integer> inputIds) {
if (specifiedGroup != null) {
return specifiedGroup;
} else {
String inputGroup = null;
for (int id : inputIds) {
String inputGroupCandidate = streamGraph.getSlotSharingGroup(id);
if (inputGroup == null) {
// 第一个输入对应的sharingGroup
inputGroup = inputGroupCandidate;
// 如果所有input的sharingGroup都相同,则使用这个sharingGroup;否则使用默认的sharingGroup
} else if (!inputGroup.equals(inputGroupCandidate)) {
return DEFAULT_SLOT_SHARING_GROUP;
}
}
return inputGroup == null ? DEFAULT_SLOT_SHARING_GROUP : inputGroup;
}
}
这里的逻辑很简单:如果没有输入,则使用默认的slotSharingGroup;如果所有input的slotSharingGroup不全部一样则使用默认的slotSharingGroup;如果所有输入的slotSharingGroup都一样,则使用这个slotSharingGroup。
我们先来看一下TransformationTranslator的实现类:
我们根据上面的代码中执行的顺序来逐一分析一下。
首先是LegacySourceTransformationTranslator,它的translateForStreaming方法为其父类SimpleTransformationTranslator中的实现,代码如下:
@Override
public Collection<Integer> translateForStreaming(final T transformation, final Context context) {
checkNotNull(transformation);
checkNotNull(context);
// 内部执行转换的操作
final Collection<Integer> transformedIds =
translateForStreamingInternal(transformation, context);
// 设置StreamNode的uid、user provided node hash、资源、托管内存信息等
configure(transformation, context);
return transformedIds;
}
这里的translateForStreamingInternal会调用LegacySourceTransformationTranslator中的实现,即LegacySourceTransformationTranslator#translateForStreamingInternal方法:
@Override
protected Collection<Integer> translateForStreamingInternal(
final LegacySourceTransformation<OUT> transformation,
final Context context) {
return translateInternal(transformation, context);
}
这个方法内部调用的是LegacySourceTransformationTranslator#translateInternal方法,代码如下:
private Collection<Integer> translateInternal(
final LegacySourceTransformation<OUT> transformation,
final Context context) {
checkNotNull(transformation);
checkNotNull(context);
final StreamGraph streamGraph = context.getStreamGraph();
final String slotSharingGroup = context.getSlotSharingGroup();
final int transformationId = transformation.getId();
final ExecutionConfig executionConfig = streamGraph.getExecutionConfig();
// 添加operator,也是在StreamGraph中生成StreamNode的核心逻辑
streamGraph.addLegacySource(
transformationId,
slotSharingGroup,
transformation.getCoLocationGroupKey(),
transformation.getOperatorFactory(),
null,
transformation.getOutputType(),
"Source: " + transformation.getName());
// 如果是InputFormatOperatorFactory类型的需要设置StreamGraph的inputFormat
if (transformation.getOperatorFactory() instanceof InputFormatOperatorFactory) {
streamGraph.setInputFormat(
transformationId,
((InputFormatOperatorFactory<OUT>) transformation.getOperatorFactory())
.getInputFormat());
}
// 设置并行度
final int parallelism = transformation.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT
? transformation.getParallelism()
: executionConfig.getParallelism();
streamGraph.setParallelism(transformationId, parallelism);
// 设置最大并行度
streamGraph.setMaxParallelism(transformationId, transformation.getMaxParallelism());
return Collections.singleton(transformationId);
}
这里的逻辑主要是用于创建Source节点的StreamNode,StreamNode的属性为:
•id:StreamNode的id,也是前面的transformation的id;•operatorName:算子名称;•operatorFactory:算子工厂实例,里面包裹着operator,在Source节点对应的operator为StreamSource;•jobVertexClass:顶点任务的class类型,source节点为SourceStreamTask,其他类型为OneInputStreamTask;•slotSharingGroup:slot共享组;•coLocationGroup:设置标识co-location组的键。具有相同co-location键的操作符将被调度器放置到相同的槽中。设置为null意味着没有co-location约束。
在source节点对应的StreamNode创建完成后,会将该节点放到StreamGraph的sources列表中。
接下来我们看下其他节点的生成过程,我们按照Transformation的transform顺序依次来看。
来看代码:
@Override
public Collection<Integer> translateForStreamingInternal(
final OneInputTransformation<IN, OUT> transformation,
final Context context) {
return translateInternal(transformation,
transformation.getOperatorFactory(),
transformation.getInputType(),
// 获取必须用于在此操作中分区键值状态的{@code KeySelector},也就是Partitioner,如ForwardPartitioner等
transformation.getStateKeySelector(),
// 状态key的类型
transformation.getStateKeyType(),
context
);
}
这里调用的是AbstractOneInputTransformationTranslator#translateInternal方法,代码如下:
protected Collection<Integer> translateInternal(
final Transformation<OUT> transformation,
final StreamOperatorFactory<OUT> operatorFactory,
final TypeInformation<IN> inputType,
@Nullable final KeySelector<IN, ?> stateKeySelector,
@Nullable final TypeInformation<?> stateKeyType,
final Context context) {
checkNotNull(transformation);
checkNotNull(operatorFactory);
checkNotNull(inputType);
checkNotNull(context);
// 获取当前StreamGraph引用
final StreamGraph streamGraph = context.getStreamGraph();
final String slotSharingGroup = context.getSlotSharingGroup();
final int transformationId = transformation.getId();
final ExecutionConfig executionConfig = streamGraph.getExecutionConfig();
// 向StreamGraph中添加operator
streamGraph.addOperator(
transformationId,
slotSharingGroup,
transformation.getCoLocationGroupKey(),
operatorFactory,
inputType,
transformation.getOutputType(),
transformation.getName());
// 设置StreamNode输入的key selector信息,注意这个在Source translator中是没有的
if (stateKeySelector != null) {
TypeSerializer<?> keySerializer = stateKeyType.createSerializer(executionConfig);
streamGraph.setOneInputStateKey(transformationId, stateKeySelector, keySerializer);
}
// 并行度
int parallelism = transformation.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT
? transformation.getParallelism()
: executionConfig.getParallelism();
streamGraph.setParallelism(transformationId, parallelism);
streamGraph.setMaxParallelism(transformationId, transformation.getMaxParallelism());
// 所有的input输入
final List<Transformation<?>> parentTransformations = transformation.getInputs();
checkState(
parentTransformations.size() == 1,
"Expected exactly one input transformation but found " + parentTransformations.size());
// 由于是OneInput类型的,所以这里只处理输入列表中的第一个input;而且在上面也有判断
for (Integer inputId: context.getStreamNodeIds(parentTransformations.get(0))) {
// 添加边信息,用于串联输入节点和当前节点的边。对于输入节点来说是输出边,对于当前节点来说是输入边
streamGraph.addEdge(inputId, transformationId, 0);
}
return Collections.singleton(transformationId);
}
这里的主要操作如下:
•创建StreamNode节点放到StreamGraph的streamNodes列表中;•对当前StreamNode添加stateKeySelector,也就是key选择器,这点是上面的Source节点对应的StreamNode中没有的;•往StreamGraph的当前StreamNode中添加边的信息,用于串联输入节点和当前节点的边。对于输入节点来说是输出边,对于当前节点来说是输入边。
直接来看代码:
@Override
protected Collection<Integer> translateForStreamingInternal(
final PartitionTransformation<OUT> transformation,
final Context context) {
return translateInternal(transformation, context);
}
这里调用的是PartitionTransformationTranslator#translateInternal方法,代码如下:
private Collection<Integer> translateInternal(
final PartitionTransformation<OUT> transformation,
final Context context) {
checkNotNull(transformation);
checkNotNull(context);
// 当前的StreamGraph
final StreamGraph streamGraph = context.getStreamGraph();
// 当前transformation的所有的input
final List<Transformation<?>> parentTransformations = transformation.getInputs();
checkState(
parentTransformations.size() == 1,
"Expected exactly one input transformation but found " + parentTransformations.size());
final Transformation<?> input = parentTransformations.get(0);
List<Integer> resultIds = new ArrayList<>();
// 这里根据取到已经转换过的transformation
for (Integer inputId: context.getStreamNodeIds(input)) {
final int virtualId = Transformation.getNewNodeId();
// 添加到虚拟的partition节点列表中去
streamGraph.addVirtualPartitionNode(
// 输入节点的id
inputId,
// 虚拟节点的id
virtualId,
// 分区器,也就是StreamPartitioner
transformation.getPartitioner(),
// shuffle模式
transformation.getShuffleMode());
resultIds.add(virtualId);
}
return resultIds;
}
这里主要做的操作有:
•创建虚拟分区节点,虚拟分区节点是一个三元组,属性为输入节点的id,分区器和shffle模式。并以当前虚拟节点的id为key,将这个虚拟分区节点放到StreamGraph的virtualPartitionNodes中去。•需要注意的是:PartitionTransformation也是单输入类型的。
直接来看代码:
@Override
public Collection<Integer> translateForStreamingInternal(
final ReduceTransformation<IN, KEY> transformation,
final Context context) {
// 创建StreamGroupedReduceOperator,它继承自AbstractUdfStreamOperator是OneInputStreamOperator类型的
StreamGroupedReduceOperator<IN> groupedReduce = new StreamGroupedReduceOperator<>(
transformation.getReducer(),
transformation
.getInputType()
.createSerializer(context.getStreamGraph().getExecutionConfig())
);
// 创建对应该operator的operatorFactory
SimpleOperatorFactory<IN> operatorFactory = SimpleOperatorFactory.of(groupedReduce);
operatorFactory.setChainingStrategy(transformation.getChainingStrategy());
return translateInternal(
transformation,
operatorFactory,
transformation.getInputType(),
transformation.getKeySelector(),
transformation.getKeyTypeInfo(),
context);
}
这里的代码与上面的稍有不同,这里先根据transformation的信息创建一个operator并生成对应的operatorFactory,然后执行AbstractOneInputTransformationTranslator#translateInternal方法,该方法与OneInputTransformation最终执行的方法相同,可以参考理解。
private Collection<Integer> translateInternal(
final LegacySinkTransformation<IN> transformation,
final Context context) {
checkNotNull(transformation);
checkNotNull(context);
final StreamGraph streamGraph = context.getStreamGraph();
final String slotSharingGroup = context.getSlotSharingGroup();
final int transformationId = transformation.getId();
final ExecutionConfig executionConfig = streamGraph.getExecutionConfig();
final List<Transformation<?>> parentTransformations = transformation.getInputs();
checkState(
parentTransformations.size() == 1,
"Expected exactly one input transformation but found " + parentTransformations.size());
final Transformation<?> input = parentTransformations.get(0);
// 添加sink对应的StreamNode节点并放到StreamGraph的sinks列表中
streamGraph.addSink(
transformationId,
slotSharingGroup,
transformation.getCoLocationGroupKey(),
transformation.getOperatorFactory(),
input.getOutputType(),
null,
"Sink: " + transformation.getName());
// 并行度设置
final int parallelism = transformation.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT
? transformation.getParallelism()
: executionConfig.getParallelism();
streamGraph.setParallelism(transformationId, parallelism);
streamGraph.setMaxParallelism(transformationId, transformation.getMaxParallelism());
// 设置边信息
for (Integer inputId: context.getStreamNodeIds(input)) {
streamGraph.addEdge(inputId, transformationId, 0);
}
// 设置key selector信息
if (transformation.getStateKeySelector() != null) {
TypeSerializer<?> keySerializer = transformation.getStateKeyType().createSerializer(executionConfig);
streamGraph.setOneInputStateKey(transformationId, transformation.getStateKeySelector(), keySerializer);
}
return Collections.emptyList();
}
这里会添加sink对应的StreamNode节点并放到StreamGraph的sinks列表中,设置输入节点和当前节点的边信息等,无殊。
可以看出,整个由Transformation列表向StreamGraph转换的过程总体来说主要是创建和梳理一下StreamNode之间的关系,StreamNode之间由StreamEdge来连接,从而形成一个StreamGraph。
主要用于处理FeedbackTransformation类型的,目前主要用于IterativeStream中,后续我们再用专门的篇幅进行分析。