前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >flink源码从头分析第一篇之WordCount DataStream操作

flink源码从头分析第一篇之WordCount DataStream操作

作者头像
山行AI
发布2021-07-01 15:33:31
1.2K0
发布2021-07-01 15:33:31
举报
文章被收录于专栏:山行AI山行AI

前言

每个应用程序都有一个hello world代码,在flink里面这个hello world一般就是一段wordcount程序,我们来尝试通过一段wordcount代码来逐步剖析flink的执行过程。毫无疑问,这将是一个系列,而且笔者没办法保证能有足够的空闲时间完成这个系列。

WordCount

我们来看一段wordcount代码如下:

DataStream<String> text = env.fromElements(WordCountData.WORDS);
DataStream<Tuple2<String, Integer>> counts =
            // split up the lines in pairs (2-tuples) containing: (word,1)
            text.flatMap(new Tokenizer())
            // group by the tuple field "0" and sum up tuple field "1"
            .keyBy(value -> value.f0).sum(1);
counts.print();
env.execute("Streaming WordCount");

我们先来分析下这一段,看看上面几行代码执行的过程中,flink都做了哪些操作。

StreamExecutionEnvironment#fromElements方法
@SafeVarargs
    public final <OUT> DataStreamSource<OUT> fromElements(OUT... data) {
        if (data.length == 0) {
            throw new IllegalArgumentException("fromElements needs at least one element as argument");
        }

        TypeInformation<OUT> typeInfo;
        try {
            typeInfo = TypeExtractor.getForObject(data[0]);
        }
        catch (Exception e) {
            throw new RuntimeException("Could not create TypeInformation for type " + data[0].getClass().getName()
                    + "; please specify the TypeInformation manually via "
                    + "StreamExecutionEnvironment#fromElements(Collection, TypeInformation)", e);
        }
        return fromCollection(Arrays.asList(data), typeInfo);
    }

这里会通过TypeExtractor.getForObject方法从第一个元素中获取到数据的类型信息,其中包括属性信息和class信息(关于类型这部分,后面我们专门用一篇文章来介绍下)。fromCollection方法内部会将FromElementsFunction对象放到StreamSource内部,StreamSource就是用于数据处理的operator,对应的operatorFactory类型为SimpleUdfStreamOperatorFactory。然后利用operator对象生成LegacySourceTransformation对象,在LegacySourceTransformation内部是由StreamOperatorFactory、sourceName、outputType、并行度、数据源的格式(是否有界)等。最终会返回一个DataStreamSource对象,它的transformation引用指向的就是上面的LegacySourceTransformation对象。DataStreamSource也是DataStream类型的。

DataStream#flatMap(FlatMapFunction<T,R>)方法

public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper) {

        TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes(clean(flatMapper),
                getType(), Utils.getCallLocationName(), true);

        return flatMap(flatMapper, outType);
    }

会通过传入的flatMapper和当前数据流的类型获取这个flatmap算子的输出类型,然后在内部的flatMap方法中会将flatMapper包在StreamFlatMap这个operator中,该operator对应的operatorFactory为SimpleUdfStreamOperatorFactory类型的。进入到org.apache.flink.streaming.api.datastream.DataStream#doTransform方法中,我们来看:

protected <R> SingleOutputStreamOperator<R> doTransform(
            String operatorName,
            TypeInformation<R> outTypeInfo,
            StreamOperatorFactory<R> operatorFactory) {
        // read the output type of the input Transform to coax out errors about MissingTypeInfo
        // 相当于一步类型校验
        transformation.getOutputType();
        OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(
                this.transformation,
                operatorName,
                operatorFactory,
                outTypeInfo,
                environment.getParallelism());
        @SuppressWarnings({"unchecked", "rawtypes"})
        SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform);
        getExecutionEnvironment().addOperator(resultTransform);
        return returnStream;
    }

•这里会创建一个OneInputTransformation,将当前DataStream的transformation(即上文中的LegacySourceTransformation)作为这个OneInputTransformation的input,由flatMap操作产生的SimpleUdfStreamOperatorFactory(内部是StreamFlatMap算子)作为该OneInputTransformation的operatorFactory。•创建SingleOutputStreamOperator,将OneInputTransformation和任务执行上下文传入其中。•将OneInputTransformation实例放到上下文StreamExecutionEnvironment#transformations列表中。•返回SingleOutputStreamOperator,需要注意的是SingleOutputStreamOperator也是DataStream的一个子类。

总结:在这一步中将Source节点的LegacySourceTransformation作为OneInputTransformation的输入,将flatMap操作的operator对应的SimpleUdfStreamOperatorFactory也在OneInputTransformation中维护。并最终将从Source部分产生的DataStreamSource对象转换成了SingleOutputStreamOperator。

DataStream#keyBy(org.apache.flink.api.java.functions.KeySelector<T,K>)方法

public <K> KeyedStream<T, K> keyBy(KeySelector<T, K> key) {
        Preconditions.checkNotNull(key);
        return new KeyedStream<>(this, clean(key));
    }

这个我们直接看它最终调用的重载的构造方法:

public KeyedStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelector, TypeInformation<KEY> keyType) {
        this(
            dataStream,
            new PartitionTransformation<>(
                dataStream.getTransformation(),
                new KeyGroupStreamPartitioner<>(keySelector, StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM)),
            keySelector,
            keyType);
    }

这里会将上面flatMap操作生成的OneInputTransformation作为PartitionTransformation的输入,而且需要注意的是PartitionTransformation和OneInputTransformation的一些区别:在PartitionTransformation内部主要做数据分区转换操作,所以不涉及operatorFactory和operator。同时,这里会将上文中返回的SingleOutputStreamOperator流转换成KeyedStream,而在上下文StreamExecutionEnvironment#transformations列表中维护的还是那个OneInputTransformation。

KeyedStream#sum(int)方法

public SingleOutputStreamOperator<T> sum(int positionToSum) {
        return aggregate(new SumAggregator<>(positionToSum, getType(), getExecutionConfig()));
    }

protected SingleOutputStreamOperator<T> aggregate(AggregationFunction<T> aggregate) {
        return reduce(aggregate).name("Keyed Aggregation");
    }

public SingleOutputStreamOperator<T> reduce(ReduceFunction<T> reducer) {
        ReduceTransformation<T, KEY> reduce = new ReduceTransformation<>(
            "Keyed Reduce",
            environment.getParallelism(),
            transformation,
            clean(reducer),
            keySelector,
            getKeyType()
        );

        getExecutionEnvironment().addOperator(reduce);

        return new SingleOutputStreamOperator<>(getExecutionEnvironment(), reduce);
    }

•上面的三个方法均是KeyedStream的方法,我们主要关注下KeyedStream#reduce方法,在这个方法里我们依旧和上面一样,关注三个内容:function、operator、Transformation。这里的function是指ReduceFunction,和PartitionTransformation一样,不涉及operatorFactory和operator。•这里是将上面KeyedStream中的PartitionTransformation作为ReduceTransformation的input来生成一个新的transformation,reducer作为当前ReduceTransformation的ReduceFunction。•通过getExecutionEnvironment().addOperator方法将新创建的ReduceTransformation放到上下文StreamExecutionEnvironment#transformations列表中。此时transformations中有两个transformation实例,一个是上面的OneInputTransformation,另一个是刚刚创建的ReduceTransformation实例。•最后,将reduce transformation实例放到SingleOutputStreamOperator新实例中返回。这时SingleOutputStreamOperator对象中的transformation指向的是这个ReduceTransformation。我们来看下这时候数据流中维护的上下文信息,具体信息如下:

可以看到,最终在SingleOutputStreamOperator对象中维护着ReduceTransformation,它的input为OneInputTransformation...在environment的transformations列表中维护着OneInputTransformation和ReduceTransformation。

DataStream#print()方法

@PublicEvolving
    public DataStreamSink<T> print() {
        PrintSinkFunction<T> printFunction = new PrintSinkFunction<>();
        return addSink(printFunction).name("Print to Std. Out");
    }

这个主要需要关注下DataStream#addSink方法:

public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) {
        // read the output type of the input Transform to coax out errors about MissingTypeInfo
        transformation.getOutputType();
        // configure the type if needed
        if (sinkFunction instanceof InputTypeConfigurable) {
            ((InputTypeConfigurable) sinkFunction).setInputType(getType(), getExecutionConfig());
        }
        StreamSink<T> sinkOperator = new StreamSink<>(clean(sinkFunction));
        DataStreamSink<T> sink = new DataStreamSink<>(this, sinkOperator);
        getExecutionEnvironment().addOperator(sink.getTransformation());
        return sink;
    }

    protected DataStreamSink(DataStream<T> inputStream, StreamSink<T> operator) {
        this.transformation = (PhysicalTransformation<T>) new LegacySinkTransformation<>(
                inputStream.getTransformation(),
                "Unnamed",
                operator,
                inputStream.getExecutionEnvironment().getParallelism());
    }

这里我们依然关注以下几个内容:

•function:算子内部使用的userFunction为SinkFunction;•operator:StreamSink内容包裹着sinkFunction;•transformation:在DataStreamSink构造方法内部会新建一个LegacySinkTransformation对象,它的input是对应的inputStream中获取到的Transformation,即ReduceTransformation。它的operator是StreamSink;•会调用getExecutionEnvironment().addOperator方法将LegacySinkTransformation放到上下文StreamExecutionEnvironment#transformations列表中去;•最终返回的是DataStreamSink,它也是DataStream类型的。

StreamExecutionEnvironment#execute(java.lang.String)方法

这里进行的是依次生成streamGraph、jobGraph、executionGraph到任务创建和最终任务提交执行的过程,后续逐篇开展,本篇就先介绍到这里了。

总结

可以看到整个wordcount的过程就是将所有的operator操作包装在transformation中,根据transformation在DataStream中的顺序将transformation串联起来。不同类型的操作对应不同的function,不同的operator、不同的operatorFactory和transformation,最终返回不同类型的DataStream。这些操作都是为接下来的streamGraph的生成做好了准备。关于streamGraph的生成,请关注下一篇。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-06-06,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 开发架构二三事 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • WordCount
    • StreamExecutionEnvironment#fromElements方法
    • DataStream#flatMap(FlatMapFunction<T,R>)方法
    • DataStream#keyBy(org.apache.flink.api.java.functions.KeySelector<T,K>)方法
    • KeyedStream#sum(int)方法
    • DataStream#print()方法
    • StreamExecutionEnvironment#execute(java.lang.String)方法
    • 总结
    相关产品与服务
    大数据
    全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档