前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊flink DataStream的iterate操作

聊聊flink DataStream的iterate操作

原创
作者头像
code4it
发布2019-01-15 14:52:32
2.2K1
发布2019-01-15 14:52:32
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下flink DataStream的iterate操作

实例

IterativeStream<Long> iteration = initialStream.iterate();
DataStream<Long> iterationBody = iteration.map (/*do something*/);
DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>(){
    @Override
    public boolean filter(Long value) throws Exception {
        return value > 0;
    }
});
iteration.closeWith(feedback);
DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>(){
    @Override
    public boolean filter(Long value) throws Exception {
        return value <= 0;
    }
});
  • 本实例展示了IterativeStream的一些基本用法,使用iterate创建IterativeStream,使用IterativeStream的closeWith方法来关闭feedbackStream

DataStream.iterate

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/DataStream.java

@Public
public class DataStream<T> {
    //......
​
    @PublicEvolving
    public IterativeStream<T> iterate() {
        return new IterativeStream<>(this, 0);
    }
​
    @PublicEvolving
    public IterativeStream<T> iterate(long maxWaitTimeMillis) {
        return new IterativeStream<>(this, maxWaitTimeMillis);
    }
​
    //......
}
  • DataStream提供了两个iterate方法,它们创建并返回IterativeStream,无参的iterate方法其maxWaitTimeMillis为0

IterativeStream

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/IterativeStream.java

@PublicEvolving
public class IterativeStream<T> extends SingleOutputStreamOperator<T> {
​
    // We store these so that we can create a co-iteration if we need to
    private DataStream<T> originalInput;
    private long maxWaitTime;
​
    protected IterativeStream(DataStream<T> dataStream, long maxWaitTime) {
        super(dataStream.getExecutionEnvironment(),
                new FeedbackTransformation<>(dataStream.getTransformation(), maxWaitTime));
        this.originalInput = dataStream;
        this.maxWaitTime = maxWaitTime;
        setBufferTimeout(dataStream.environment.getBufferTimeout());
    }
​
    @SuppressWarnings({ "unchecked", "rawtypes" })
    public DataStream<T> closeWith(DataStream<T> feedbackStream) {
​
        Collection<StreamTransformation<?>> predecessors = feedbackStream.getTransformation().getTransitivePredecessors();
​
        if (!predecessors.contains(this.transformation)) {
            throw new UnsupportedOperationException(
                    "Cannot close an iteration with a feedback DataStream that does not originate from said iteration.");
        }
​
        ((FeedbackTransformation) getTransformation()).addFeedbackEdge(feedbackStream.getTransformation());
​
        return feedbackStream;
    }
​
    public <F> ConnectedIterativeStreams<T, F> withFeedbackType(Class<F> feedbackTypeClass) {
        return withFeedbackType(TypeInformation.of(feedbackTypeClass));
    }
​
    public <F> ConnectedIterativeStreams<T, F> withFeedbackType(TypeHint<F> feedbackTypeHint) {
        return withFeedbackType(TypeInformation.of(feedbackTypeHint));
    }
​
    public <F> ConnectedIterativeStreams<T, F> withFeedbackType(TypeInformation<F> feedbackType) {
        return new ConnectedIterativeStreams<>(originalInput, feedbackType, maxWaitTime);
    }
​
    @Public
    public static class ConnectedIterativeStreams<I, F> extends ConnectedStreams<I, F> {
​
        private CoFeedbackTransformation<F> coFeedbackTransformation;
​
        public ConnectedIterativeStreams(DataStream<I> input,
                TypeInformation<F> feedbackType,
                long waitTime) {
            super(input.getExecutionEnvironment(),
                    input,
                    new DataStream<>(input.getExecutionEnvironment(),
                            new CoFeedbackTransformation<>(input.getParallelism(),
                                    feedbackType,
                                    waitTime)));
            this.coFeedbackTransformation = (CoFeedbackTransformation<F>) getSecondInput().getTransformation();
        }
​
        public DataStream<F> closeWith(DataStream<F> feedbackStream) {
​
            Collection<StreamTransformation<?>> predecessors = feedbackStream.getTransformation().getTransitivePredecessors();
​
            if (!predecessors.contains(this.coFeedbackTransformation)) {
                throw new UnsupportedOperationException(
                        "Cannot close an iteration with a feedback DataStream that does not originate from said iteration.");
            }
​
            coFeedbackTransformation.addFeedbackEdge(feedbackStream.getTransformation());
​
            return feedbackStream;
        }
​
        private UnsupportedOperationException groupingException =
                new UnsupportedOperationException("Cannot change the input partitioning of an" +
                        "iteration head directly. Apply the partitioning on the input and" +
                        "feedback streams instead.");
​
        @Override
        public ConnectedStreams<I, F> keyBy(int[] keyPositions1, int[] keyPositions2) {
            throw groupingException;
        }
​
        @Override
        public ConnectedStreams<I, F> keyBy(String field1, String field2) {
            throw groupingException;
        }
​
        @Override
        public ConnectedStreams<I, F> keyBy(String[] fields1, String[] fields2) {
            throw groupingException;
        }
​
        @Override
        public ConnectedStreams<I, F> keyBy(KeySelector<I, ?> keySelector1, KeySelector<F, ?> keySelector2) {
            throw groupingException;
        }
​
        @Override
        public <KEY> ConnectedStreams<I, F> keyBy(KeySelector<I, KEY> keySelector1, KeySelector<F, KEY> keySelector2, TypeInformation<KEY> keyType) {
            throw groupingException;
        }
    }
}
  • IterativeStream继承了SingleOutputStreamOperator,它的构造器接收两个参数,一个是originalInput,一个是maxWaitTime;它根据dataStream.getTransformation()及maxWaitTime创建FeedbackTransformation;构造器同时会根据dataStream.environment.getBufferTimeout()参数来设置transformation的bufferTimeout
  • IterativeStream主要提供了两个方法,一个是closeWith方法,用于close iteration,它主要用于定义要被feedback到iteration头部的这部分iteration(可以理解为回流,或者类似递归的操作,filter控制的是递归的条件,通过filter的elements会重新进入IterativeStream的头部继续参与后面的运算操作);withFeedbackType方法创建了ConnectedIterativeStreams
  • ConnectedIterativeStreams继承了ConnectedStreams,它允许要被feedback的iteration的类型与originalInput的类型不一样,它也定义了closeWith方法,但是它覆盖了ConnectedStreams的keyBy方法,抛出UnsupportedOperationException异常

FeedbackTransformation

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/transformations/FeedbackTransformation.java

@Internal
public class FeedbackTransformation<T> extends StreamTransformation<T> {
​
    private final StreamTransformation<T> input;
​
    private final List<StreamTransformation<T>> feedbackEdges;
​
    private final Long waitTime;
​
    public FeedbackTransformation(StreamTransformation<T> input, Long waitTime) {
        super("Feedback", input.getOutputType(), input.getParallelism());
        this.input = input;
        this.waitTime = waitTime;
        this.feedbackEdges = Lists.newArrayList();
    }
​
    public StreamTransformation<T> getInput() {
        return input;
    }
​
    public void addFeedbackEdge(StreamTransformation<T> transform) {
​
        if (transform.getParallelism() != this.getParallelism()) {
            throw new UnsupportedOperationException(
                    "Parallelism of the feedback stream must match the parallelism of the original" +
                            " stream. Parallelism of original stream: " + this.getParallelism() +
                            "; parallelism of feedback stream: " + transform.getParallelism() +
                            ". Parallelism can be modified using DataStream#setParallelism() method");
        }
​
        feedbackEdges.add(transform);
    }
​
    public List<StreamTransformation<T>> getFeedbackEdges() {
        return feedbackEdges;
    }
​
    public Long getWaitTime() {
        return waitTime;
    }
​
    @Override
    public final void setChainingStrategy(ChainingStrategy strategy) {
        throw new UnsupportedOperationException("Cannot set chaining strategy on Split Transformation.");
    }
​
    @Override
    public Collection<StreamTransformation<?>> getTransitivePredecessors() {
        List<StreamTransformation<?>> result = Lists.newArrayList();
        result.add(this);
        result.addAll(input.getTransitivePredecessors());
        return result;
    }
}
  • FeedbackTransformation继承了StreamTransformation,它有feedbackEdges、waitTime等属性
  • addFeedbackEdge方法用于添加一个a feedback edge,IterativeStream的closeWith方法会调用addFeedbackEdge来添加一个StreamTransformation
  • waitTime指定的是feedback operator等待feedback elements的时间,一旦过了waitTime则operation会关闭,不再接受新的feedback elements

小结

  • DataStream提供了两个iterate方法,它们创建并返回IterativeStream,无参的iterate方法其maxWaitTimeMillis为0
  • IterativeStream的构造器接收两个参数,一个是originalInput,一个是maxWaitTime;它根据dataStream.getTransformation()及maxWaitTime创建FeedbackTransformation;构造器同时会根据dataStream.environment.getBufferTimeout()参数来设置transformation的bufferTimeout;FeedbackTransformation继承了StreamTransformation,它有feedbackEdges、waitTime等属性,waitTime指定的是feedback operator等待feedback elements的时间,一旦过了waitTime则operation会关闭,不再接受新的feedback elements
  • IterativeStream继承了SingleOutputStreamOperator,它主要提供了两个方法,一个是closeWith方法,用于close iteration,它主要用于定义要被feedback到iteration头部的这部分iteration;withFeedbackType方法创建了ConnectedIterativeStreams,ConnectedIterativeStreams继承了ConnectedStreams,它允许要被feedback的iteration的类型与originalInput的类型不一样,它也定义了closeWith方法,但是它覆盖了ConnectedStreams的keyBy方法,抛出UnsupportedOperationException异常

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 实例
  • DataStream.iterate
  • IterativeStream
  • FeedbackTransformation
  • 小结
  • doc
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档