前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊flink的Allowed Lateness

聊聊flink的Allowed Lateness

作者头像
code4it
发布2019-01-23 17:35:00
1.1K0
发布2019-01-23 17:35:00
举报

本文主要研究一下flink的Allowed Lateness

WindowedStream

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/WindowedStream.java

@Public
public class WindowedStream<T, K, W extends Window> {

    /** The keyed data stream that is windowed by this stream. */
    private final KeyedStream<T, K> input;

    /** The window assigner. */
    private final WindowAssigner<? super T, W> windowAssigner;

    /** The trigger that is used for window evaluation/emission. */
    private Trigger<? super T, ? super W> trigger;

    /** The evictor that is used for evicting elements before window evaluation. */
    private Evictor<? super T, ? super W> evictor;

    /** The user-specified allowed lateness. */
    private long allowedLateness = 0L;

    /**
     * Side output {@code OutputTag} for late data. If no tag is set late data will simply be
     * dropped.
     */
    private OutputTag<T> lateDataOutputTag;

    @PublicEvolving
    public WindowedStream<T, K, W> allowedLateness(Time lateness) {
        final long millis = lateness.toMilliseconds();
        checkArgument(millis >= 0, "The allowed lateness cannot be negative.");

        this.allowedLateness = millis;
        return this;
    }

    @PublicEvolving
    public WindowedStream<T, K, W> sideOutputLateData(OutputTag<T> outputTag) {
        Preconditions.checkNotNull(outputTag, "Side output tag must not be null.");
        this.lateDataOutputTag = input.getExecutionEnvironment().clean(outputTag);
        return this;
    }

    //......

    public <R> SingleOutputStreamOperator<R> reduce(
            ReduceFunction<T> reduceFunction,
            WindowFunction<T, R, K, W> function,
            TypeInformation<R> resultType) {

        if (reduceFunction instanceof RichFunction) {
            throw new UnsupportedOperationException("ReduceFunction of reduce can not be a RichFunction.");
        }

        //clean the closures
        function = input.getExecutionEnvironment().clean(function);
        reduceFunction = input.getExecutionEnvironment().clean(reduceFunction);

        final String opName = generateOperatorName(windowAssigner, trigger, evictor, reduceFunction, function);
        KeySelector<T, K> keySel = input.getKeySelector();

        OneInputStreamOperator<T, R> operator;

        if (evictor != null) {
            @SuppressWarnings({"unchecked", "rawtypes"})
            TypeSerializer<StreamRecord<T>> streamRecordSerializer =
                (TypeSerializer<StreamRecord<T>>) new StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig()));

            ListStateDescriptor<StreamRecord<T>> stateDesc =
                new ListStateDescriptor<>("window-contents", streamRecordSerializer);

            operator =
                new EvictingWindowOperator<>(windowAssigner,
                    windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
                    keySel,
                    input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
                    stateDesc,
                    new InternalIterableWindowFunction<>(new ReduceApplyWindowFunction<>(reduceFunction, function)),
                    trigger,
                    evictor,
                    allowedLateness,
                    lateDataOutputTag);

        } else {
            ReducingStateDescriptor<T> stateDesc = new ReducingStateDescriptor<>("window-contents",
                reduceFunction,
                input.getType().createSerializer(getExecutionEnvironment().getConfig()));

            operator =
                new WindowOperator<>(windowAssigner,
                    windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
                    keySel,
                    input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
                    stateDesc,
                    new InternalSingleValueWindowFunction<>(function),
                    trigger,
                    allowedLateness,
                    lateDataOutputTag);
        }

        return input.transform(opName, resultType, operator);
    }

    //......
}
  • WindowedStream有两个参数跟Allowed Lateness相关,一个是allowedLateness,用于指定允许元素迟到的时间长度,一个是lateDataOutputTag,用于配置迟到元素的输出
  • WindowedStream的reduce、aggregate、fold、process等操作里头会根据evictor是否为null来创建不同的WindowOperator(evictor不为null创建的是EvictingWindowOperator,evictor为null创建的是WindowOperator)
  • EvictingWindowOperator继承了WindowOperator,其构造器比WindowOperator多了Evictor参数,但它们构造器都需要Trigger、allowedLateness、lateDataOutputTag参数

OutputTag

flink-core-1.7.0-sources.jar!/org/apache/flink/util/OutputTag.java

@PublicEvolving
public class OutputTag<T> implements Serializable {

    private static final long serialVersionUID = 2L;

    private final String id;

    private final TypeInformation<T> typeInfo;

    public OutputTag(String id) {
        Preconditions.checkNotNull(id, "OutputTag id cannot be null.");
        Preconditions.checkArgument(!id.isEmpty(), "OutputTag id must not be empty.");
        this.id = id;

        try {
            this.typeInfo = TypeExtractor.createTypeInfo(this, OutputTag.class, getClass(), 0);
        }
        catch (InvalidTypesException e) {
            throw new InvalidTypesException("Could not determine TypeInformation for the OutputTag type. " +
                    "The most common reason is forgetting to make the OutputTag an anonymous inner class. " +
                    "It is also not possible to use generic type variables with OutputTags, such as 'Tuple2<A, B>'.", e);
        }
    }

    public OutputTag(String id, TypeInformation<T> typeInfo) {
        Preconditions.checkNotNull(id, "OutputTag id cannot be null.");
        Preconditions.checkArgument(!id.isEmpty(), "OutputTag id must not be empty.");
        this.id = id;
        this.typeInfo = Preconditions.checkNotNull(typeInfo, "TypeInformation cannot be null.");
    }

    // ------------------------------------------------------------------------

    public String getId() {
        return id;
    }

    public TypeInformation<T> getTypeInfo() {
        return typeInfo;
    }

    // ------------------------------------------------------------------------

    @Override
    public boolean equals(Object obj) {
        return obj instanceof OutputTag
            && ((OutputTag) obj).id.equals(this.id);
    }

    @Override
    public int hashCode() {
        return id.hashCode();
    }

    @Override
    public String toString() {
        return "OutputTag(" + getTypeInfo() + ", " + id + ")";
    }
}
  • OutputTag是一个带有名称及类型信息的side output标识;flink允许ProcessFunction、CoProcessFunction、ProcessWindowFunction、ProcessAllWindowFunction这些function输出side output,这些function的Context有一个output(OutputTag<X> outputTag, X value)方法用于输出元素到side output

SingleOutputStreamOperator

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java

@Public
public class SingleOutputStreamOperator<T> extends DataStream<T> {

    protected boolean nonParallel = false;

    private Map<OutputTag<?>, TypeInformation> requestedSideOutputs = new HashMap<>();

    private boolean wasSplitApplied = false;

    //......

    public <X> DataStream<X> getSideOutput(OutputTag<X> sideOutputTag) {
        if (wasSplitApplied) {
            throw new UnsupportedOperationException("getSideOutput() and split() may not be called on the same DataStream. " +
                "As a work-around, please add a no-op map function before the split() call.");
        }

        sideOutputTag = clean(requireNonNull(sideOutputTag));

        // make a defensive copy
        sideOutputTag = new OutputTag<X>(sideOutputTag.getId(), sideOutputTag.getTypeInfo());

        TypeInformation<?> type = requestedSideOutputs.get(sideOutputTag);
        if (type != null && !type.equals(sideOutputTag.getTypeInfo())) {
            throw new UnsupportedOperationException("A side output with a matching id was " +
                    "already requested with a different type. This is not allowed, side output " +
                    "ids need to be unique.");
        }

        requestedSideOutputs.put(sideOutputTag, sideOutputTag.getTypeInfo());

        SideOutputTransformation<X> sideOutputTransformation = new SideOutputTransformation<>(this.getTransformation(), sideOutputTag);
        return new DataStream<>(this.getExecutionEnvironment(), sideOutputTransformation);
    }
}
  • SingleOutputStreamOperator提供了getSideOutput方法,可以根据OutputTag来获取之前在function里头输出的site output

WindowOperator

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java

@Internal
public class WindowOperator<K, IN, ACC, OUT, W extends Window>
    extends AbstractUdfStreamOperator<OUT, InternalWindowFunction<ACC, OUT, K, W>>
    implements OneInputStreamOperator<IN, OUT>, Triggerable<K, W> {

    //......

    public void processElement(StreamRecord<IN> element) throws Exception {
        final Collection<W> elementWindows = windowAssigner.assignWindows(
            element.getValue(), element.getTimestamp(), windowAssignerContext);

        //if element is handled by none of assigned elementWindows
        boolean isSkippedElement = true;

        final K key = this.<K>getKeyedStateBackend().getCurrentKey();

        if (windowAssigner instanceof MergingWindowAssigner) {
            MergingWindowSet<W> mergingWindows = getMergingWindowSet();

            for (W window: elementWindows) {

                // adding the new window might result in a merge, in that case the actualWindow
                // is the merged window and we work with that. If we don't merge then
                // actualWindow == window
                W actualWindow = mergingWindows.addWindow(window, new MergingWindowSet.MergeFunction<W>() {
                    @Override
                    public void merge(W mergeResult,
                            Collection<W> mergedWindows, W stateWindowResult,
                            Collection<W> mergedStateWindows) throws Exception {

                        if ((windowAssigner.isEventTime() && mergeResult.maxTimestamp() + allowedLateness <= internalTimerService.currentWatermark())) {
                            throw new UnsupportedOperationException("The end timestamp of an " +
                                    "event-time window cannot become earlier than the current watermark " +
                                    "by merging. Current watermark: " + internalTimerService.currentWatermark() +
                                    " window: " + mergeResult);
                        } else if (!windowAssigner.isEventTime() && mergeResult.maxTimestamp() <= internalTimerService.currentProcessingTime()) {
                            throw new UnsupportedOperationException("The end timestamp of a " +
                                    "processing-time window cannot become earlier than the current processing time " +
                                    "by merging. Current processing time: " + internalTimerService.currentProcessingTime() +
                                    " window: " + mergeResult);
                        }

                        triggerContext.key = key;
                        triggerContext.window = mergeResult;

                        triggerContext.onMerge(mergedWindows);

                        for (W m: mergedWindows) {
                            triggerContext.window = m;
                            triggerContext.clear();
                            deleteCleanupTimer(m);
                        }

                        // merge the merged state windows into the newly resulting state window
                        windowMergingState.mergeNamespaces(stateWindowResult, mergedStateWindows);
                    }
                });

                // drop if the window is already late
                if (isWindowLate(actualWindow)) {
                    mergingWindows.retireWindow(actualWindow);
                    continue;
                }
                isSkippedElement = false;

                W stateWindow = mergingWindows.getStateWindow(actualWindow);
                if (stateWindow == null) {
                    throw new IllegalStateException("Window " + window + " is not in in-flight window set.");
                }

                windowState.setCurrentNamespace(stateWindow);
                windowState.add(element.getValue());

                triggerContext.key = key;
                triggerContext.window = actualWindow;

                TriggerResult triggerResult = triggerContext.onElement(element);

                if (triggerResult.isFire()) {
                    ACC contents = windowState.get();
                    if (contents == null) {
                        continue;
                    }
                    emitWindowContents(actualWindow, contents);
                }

                if (triggerResult.isPurge()) {
                    windowState.clear();
                }
                registerCleanupTimer(actualWindow);
            }

            // need to make sure to update the merging state in state
            mergingWindows.persist();
        } else {
            for (W window: elementWindows) {

                // drop if the window is already late
                if (isWindowLate(window)) {
                    continue;
                }
                isSkippedElement = false;

                windowState.setCurrentNamespace(window);
                windowState.add(element.getValue());

                triggerContext.key = key;
                triggerContext.window = window;

                TriggerResult triggerResult = triggerContext.onElement(element);

                if (triggerResult.isFire()) {
                    ACC contents = windowState.get();
                    if (contents == null) {
                        continue;
                    }
                    emitWindowContents(window, contents);
                }

                if (triggerResult.isPurge()) {
                    windowState.clear();
                }
                registerCleanupTimer(window);
            }
        }

        // side output input event if
        // element not handled by any window
        // late arriving tag has been set
        // windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp
        if (isSkippedElement && isElementLate(element)) {
            if (lateDataOutputTag != null){
                sideOutput(element);
            } else {
                this.numLateRecordsDropped.inc();
            }
        }
    }

    protected boolean isElementLate(StreamRecord<IN> element){
        return (windowAssigner.isEventTime()) &&
            (element.getTimestamp() + allowedLateness <= internalTimerService.currentWatermark());
    }

    private long cleanupTime(W window) {
        if (windowAssigner.isEventTime()) {
            long cleanupTime = window.maxTimestamp() + allowedLateness;
            return cleanupTime >= window.maxTimestamp() ? cleanupTime : Long.MAX_VALUE;
        } else {
            return window.maxTimestamp();
        }
    }

    //......
}
  • WindowOperator里头有个isElementLate方法,根据allowedLateness来判断一个element是否late,其processElement方法最后在isSkippedElement为true而且isElementLate也为true的情况下会执行如下逻辑:在lateDataOutputTag不为null的情况下会将late的element输出到side output,如果lateDataOutputTag为null,则执行numLateRecordsDropped.inc()来递增numLateRecordsDropped统计数

小结

  • 当使用event-time window的时候,flink提供了allowedLateness方法用来配置元素允许的迟到时间,超过该值则会被丢弃(在窗口结束时间+允许迟到时间内到达的元素仍然会被添加到窗口内),默认该参数设置为0;对于使用GlobalWindows这类window assigner,由于其end时间戳为Long.MAX_VALUE,因此element就无所谓late
  • OutputTag是一个带有名称及类型信息的side output标识;flink允许ProcessFunction、CoProcessFunction、ProcessWindowFunction、ProcessAllWindowFunction这些function输出side output,这些function的Context有一个output(OutputTag<X> outputTag, X value)方法用于输出元素到side output
  • SingleOutputStreamOperator提供了getSideOutput方法,可以根据OutputTag来获取之前在function里头输出的site output;WindowOperator的processElement方法在最后会判断,如果isSkippedElement为true而且isElementLate也为true,则在lateDataOutputTag不为null的情况下会将late的element输出到side output

doc

  • Allowed Lateness
  • Side Outputs
  • Window Lifecycle
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-01-08,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • WindowedStream
  • OutputTag
  • SingleOutputStreamOperator
  • WindowOperator
  • 小结
  • doc
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档