专栏首页码匠的流水账聊聊flink DataStream的split操作
原创

聊聊flink DataStream的split操作

本文主要研究一下flink DataStream的split操作

实例

SplitStream<Integer> split = someDataStream.split(new OutputSelector<Integer>() {
    @Override
    public Iterable<String> select(Integer value) {
        List<String> output = new ArrayList<String>();
        if (value % 2 == 0) {
            output.add("even");
        }
        else {
            output.add("odd");
        }
        return output;
    }
});
  • 本实例将dataStream split为两个dataStream,一个outputName为even,另一个outputName为odd

DataStream.split

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/DataStream.java

@Public
public class DataStream<T> {
​
    //......
​
    public SplitStream<T> split(OutputSelector<T> outputSelector) {
        return new SplitStream<>(this, clean(outputSelector));
    }
​
    //......
}
  • DataStream的split操作接收OutputSelector参数,然后创建并返回SplitStream

OutputSelector

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/collector/selector/OutputSelector.java

@PublicEvolving
public interface OutputSelector<OUT> extends Serializable {
​
    Iterable<String> select(OUT value);
​
}
  • OutputSelector定义了select方法用于给element打上outputNames

SplitStream

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/SplitStream.java

@PublicEvolving
public class SplitStream<OUT> extends DataStream<OUT> {
​
    protected SplitStream(DataStream<OUT> dataStream, OutputSelector<OUT> outputSelector) {
        super(dataStream.getExecutionEnvironment(), new SplitTransformation<OUT>(dataStream.getTransformation(), outputSelector));
    }
​
    public DataStream<OUT> select(String... outputNames) {
        return selectOutput(outputNames);
    }
​
    private DataStream<OUT> selectOutput(String[] outputNames) {
        for (String outName : outputNames) {
            if (outName == null) {
                throw new RuntimeException("Selected names must not be null");
            }
        }
​
        SelectTransformation<OUT> selectTransform = new SelectTransformation<OUT>(this.getTransformation(), Lists.newArrayList(outputNames));
        return new DataStream<OUT>(this.getExecutionEnvironment(), selectTransform);
    }
​
}
  • SplitStream继承了DataStream,它定义了select方法,可以用来根据outputNames选择split出来的dataStream;select方法创建了SelectTransformation

StreamGraphGenerator

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java

@Internal
public class StreamGraphGenerator {
​
    //......
​
    private Collection<Integer> transform(StreamTransformation<?> transform) {
​
        if (alreadyTransformed.containsKey(transform)) {
            return alreadyTransformed.get(transform);
        }
​
        LOG.debug("Transforming " + transform);
​
        if (transform.getMaxParallelism() <= 0) {
​
            // if the max parallelism hasn't been set, then first use the job wide max parallelism
            // from theExecutionConfig.
            int globalMaxParallelismFromConfig = env.getConfig().getMaxParallelism();
            if (globalMaxParallelismFromConfig > 0) {
                transform.setMaxParallelism(globalMaxParallelismFromConfig);
            }
        }
​
        // call at least once to trigger exceptions about MissingTypeInfo
        transform.getOutputType();
​
        Collection<Integer> transformedIds;
        if (transform instanceof OneInputTransformation<?, ?>) {
            transformedIds = transformOneInputTransform((OneInputTransformation<?, ?>) transform);
        } else if (transform instanceof TwoInputTransformation<?, ?, ?>) {
            transformedIds = transformTwoInputTransform((TwoInputTransformation<?, ?, ?>) transform);
        } else if (transform instanceof SourceTransformation<?>) {
            transformedIds = transformSource((SourceTransformation<?>) transform);
        } else if (transform instanceof SinkTransformation<?>) {
            transformedIds = transformSink((SinkTransformation<?>) transform);
        } else if (transform instanceof UnionTransformation<?>) {
            transformedIds = transformUnion((UnionTransformation<?>) transform);
        } else if (transform instanceof SplitTransformation<?>) {
            transformedIds = transformSplit((SplitTransformation<?>) transform);
        } else if (transform instanceof SelectTransformation<?>) {
            transformedIds = transformSelect((SelectTransformation<?>) transform);
        } else if (transform instanceof FeedbackTransformation<?>) {
            transformedIds = transformFeedback((FeedbackTransformation<?>) transform);
        } else if (transform instanceof CoFeedbackTransformation<?>) {
            transformedIds = transformCoFeedback((CoFeedbackTransformation<?>) transform);
        } else if (transform instanceof PartitionTransformation<?>) {
            transformedIds = transformPartition((PartitionTransformation<?>) transform);
        } else if (transform instanceof SideOutputTransformation<?>) {
            transformedIds = transformSideOutput((SideOutputTransformation<?>) transform);
        } else {
            throw new IllegalStateException("Unknown transformation: " + transform);
        }
​
        // need this check because the iterate transformation adds itself before
        // transforming the feedback edges
        if (!alreadyTransformed.containsKey(transform)) {
            alreadyTransformed.put(transform, transformedIds);
        }
​
        if (transform.getBufferTimeout() >= 0) {
            streamGraph.setBufferTimeout(transform.getId(), transform.getBufferTimeout());
        }
        if (transform.getUid() != null) {
            streamGraph.setTransformationUID(transform.getId(), transform.getUid());
        }
        if (transform.getUserProvidedNodeHash() != null) {
            streamGraph.setTransformationUserHash(transform.getId(), transform.getUserProvidedNodeHash());
        }
​
        if (transform.getMinResources() != null && transform.getPreferredResources() != null) {
            streamGraph.setResources(transform.getId(), transform.getMinResources(), transform.getPreferredResources());
        }
​
        return transformedIds;
    }
​
    private <T> Collection<Integer> transformSelect(SelectTransformation<T> select) {
        StreamTransformation<T> input = select.getInput();
        Collection<Integer> resultIds = transform(input);
​
        // the recursive transform might have already transformed this
        if (alreadyTransformed.containsKey(select)) {
            return alreadyTransformed.get(select);
        }
​
        List<Integer> virtualResultIds = new ArrayList<>();
​
        for (int inputId : resultIds) {
            int virtualId = StreamTransformation.getNewNodeId();
            streamGraph.addVirtualSelectNode(inputId, virtualId, select.getSelectedNames());
            virtualResultIds.add(virtualId);
        }
        return virtualResultIds;
    }
​
    private <T> Collection<Integer> transformSplit(SplitTransformation<T> split) {
​
        StreamTransformation<T> input = split.getInput();
        Collection<Integer> resultIds = transform(input);
​
        // the recursive transform call might have transformed this already
        if (alreadyTransformed.containsKey(split)) {
            return alreadyTransformed.get(split);
        }
​
        for (int inputId : resultIds) {
            streamGraph.addOutputSelector(inputId, split.getOutputSelector());
        }
​
        return resultIds;
    }
​
    //......
}
  • StreamGraphGenerator里头的transform会对SelectTransformation以及SplitTransformation进行相应的处理
  • transformSelect方法会根据select.getSelectedNames()来addVirtualSelectNode
  • transformSplit方法则根据split.getOutputSelector()来addOutputSelector

小结

  • DataStream的split操作接收OutputSelector参数,然后创建并返回SplitStream
  • OutputSelector定义了select方法用于给element打上outputNames
  • SplitStream继承了DataStream,它定义了select方法,可以用来根据outputNames选择split出来的dataStream

doc

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

登录 后参与评论
0 条评论

相关文章

  • 聊聊flink DataStream的split操作

    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/data...

    code4it
  • 聊聊flink DataStream的join操作

    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/data...

    code4it
  • 聊聊flink DataStream的connect操作

    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/data...

    code4it
  • 聊聊flink DataStream的connect操作

    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/data...

    code4it
  • 聊聊flink DataStream的join操作

    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/data...

    code4it
  • 聊聊flink DataStream的iterate操作

    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/data...

    code4it
  • 聊聊flink DataStream的window coGroup操作

    本文主要研究一下flink DataStream的window coGroup操作

    code4it
  • 聊聊flink DataStream的window coGroup操作

    本文主要研究一下flink DataStream的window coGroup操作

    code4it
  • Flink实战教程-自定义函数之TableFunction

    今天我们来聊聊flink sql中另外一种自定义函数-TableFuntion. TableFuntion 可以有0个、一个、多个输入参数,他的返回值可以是任意...

    大数据技术与应用实战
  • 聊聊flink的window操作

    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/data...

    code4it
  • 聊聊flink的window操作

    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/data...

    code4it
  • 聊聊flink的JDBCAppendTableSink

    flink-jdbc_2.11-1.7.0-sources.jar!/org/apache/flink/api/java/io/jdbc/JDBCAppendT...

    code4it
  • 聊聊flink的JDBCAppendTableSink

    flink-jdbc_2.11-1.7.0-sources.jar!/org/apache/flink/api/java/io/jdbc/JDBCAppendT...

    code4it
  • Flink入门宝典(详细截图版)

    本文基于java构建Flink1.9版本入门程序,需要Maven 3.0.4 和 Java 8 以上版本。需要安装Netcat进行简单调试。

    实时计算
  • flink sql 知其所以然(九):window tvf tumble window 的奇思妙解

    针对 datastream api 大家都比较熟悉了,还是那句话,在 datastream 中,你写的代码逻辑是什么样的,它最终的执行方式就是什么样的。

    公众号:大数据羊说
  • Flink入门宝典(详细截图版)

    本文基于java构建Flink1.9版本入门程序,需要Maven 3.0.4 和 Java 8 以上版本。需要安装Netcat进行简单调试。

    用户6070864
  • flink sql 知其所以然(八):flink sql tumble window 的奇妙解析之路

    针对 datastream api 大家都比较熟悉了,还是那句话,在 datastream 中,你写的代码逻辑是什么样的,它最终的执行方式就是什么样的。

    公众号:大数据羊说
  • 聊聊flink的Table API及SQL Programs

    (adsbygoogle = window.adsbygoogle || []).push({});

    stys35

扫码关注云+社区

领取腾讯云代金券