前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊flink的AsyncWaitOperator

聊聊flink的AsyncWaitOperator

作者头像
code4it
发布2019-01-28 16:21:57
6970
发布2019-01-28 16:21:57
举报
文章被收录于专栏:码匠的流水账

本文主要研究一下flink的AsyncWaitOperator

AsyncWaitOperator

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java

代码语言:javascript
复制
@Internal
public class AsyncWaitOperator<IN, OUT>
        extends AbstractUdfStreamOperator<OUT, AsyncFunction<IN, OUT>>
        implements OneInputStreamOperator<IN, OUT>, OperatorActions {
    private static final long serialVersionUID = 1L;

    private static final String STATE_NAME = "_async_wait_operator_state_";

    /** Capacity of the stream element queue. */
    private final int capacity;

    /** Output mode for this operator. */
    private final AsyncDataStream.OutputMode outputMode;

    /** Timeout for the async collectors. */
    private final long timeout;

    protected transient Object checkpointingLock;

    /** {@link TypeSerializer} for inputs while making snapshots. */
    private transient StreamElementSerializer<IN> inStreamElementSerializer;

    /** Recovered input stream elements. */
    private transient ListState<StreamElement> recoveredStreamElements;

    /** Queue to store the currently in-flight stream elements into. */
    private transient StreamElementQueue queue;

    /** Pending stream element which could not yet added to the queue. */
    private transient StreamElementQueueEntry<?> pendingStreamElementQueueEntry;

    private transient ExecutorService executor;

    /** Emitter for the completed stream element queue entries. */
    private transient Emitter<OUT> emitter;

    /** Thread running the emitter. */
    private transient Thread emitterThread;

    public AsyncWaitOperator(
            AsyncFunction<IN, OUT> asyncFunction,
            long timeout,
            int capacity,
            AsyncDataStream.OutputMode outputMode) {
        super(asyncFunction);
        chainingStrategy = ChainingStrategy.ALWAYS;

        Preconditions.checkArgument(capacity > 0, "The number of concurrent async operation should be greater than 0.");
        this.capacity = capacity;

        this.outputMode = Preconditions.checkNotNull(outputMode, "outputMode");

        this.timeout = timeout;
    }

    @Override
    public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output) {
        super.setup(containingTask, config, output);

        this.checkpointingLock = getContainingTask().getCheckpointLock();

        this.inStreamElementSerializer = new StreamElementSerializer<>(
            getOperatorConfig().<IN>getTypeSerializerIn1(getUserCodeClassloader()));

        // create the operators executor for the complete operations of the queue entries
        this.executor = Executors.newSingleThreadExecutor();

        switch (outputMode) {
            case ORDERED:
                queue = new OrderedStreamElementQueue(
                    capacity,
                    executor,
                    this);
                break;
            case UNORDERED:
                queue = new UnorderedStreamElementQueue(
                    capacity,
                    executor,
                    this);
                break;
            default:
                throw new IllegalStateException("Unknown async mode: " + outputMode + '.');
        }
    }

    @Override
    public void open() throws Exception {
        super.open();

        // create the emitter
        this.emitter = new Emitter<>(checkpointingLock, output, queue, this);

        // start the emitter thread
        this.emitterThread = new Thread(emitter, "AsyncIO-Emitter-Thread (" + getOperatorName() + ')');
        emitterThread.setDaemon(true);
        emitterThread.start();

        // process stream elements from state, since the Emit thread will start as soon as all
        // elements from previous state are in the StreamElementQueue, we have to make sure that the
        // order to open all operators in the operator chain proceeds from the tail operator to the
        // head operator.
        if (recoveredStreamElements != null) {
            for (StreamElement element : recoveredStreamElements.get()) {
                if (element.isRecord()) {
                    processElement(element.<IN>asRecord());
                }
                else if (element.isWatermark()) {
                    processWatermark(element.asWatermark());
                }
                else if (element.isLatencyMarker()) {
                    processLatencyMarker(element.asLatencyMarker());
                }
                else {
                    throw new IllegalStateException("Unknown record type " + element.getClass() +
                        " encountered while opening the operator.");
                }
            }
            recoveredStreamElements = null;
        }

    }

    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        final StreamRecordQueueEntry<OUT> streamRecordBufferEntry = new StreamRecordQueueEntry<>(element);

        if (timeout > 0L) {
            // register a timeout for this AsyncStreamRecordBufferEntry
            long timeoutTimestamp = timeout + getProcessingTimeService().getCurrentProcessingTime();

            final ScheduledFuture<?> timerFuture = getProcessingTimeService().registerTimer(
                timeoutTimestamp,
                new ProcessingTimeCallback() {
                    @Override
                    public void onProcessingTime(long timestamp) throws Exception {
                        userFunction.timeout(element.getValue(), streamRecordBufferEntry);
                    }
                });

            // Cancel the timer once we've completed the stream record buffer entry. This will remove
            // the register trigger task
            streamRecordBufferEntry.onComplete(
                (StreamElementQueueEntry<Collection<OUT>> value) -> {
                    timerFuture.cancel(true);
                },
                executor);
        }

        addAsyncBufferEntry(streamRecordBufferEntry);

        userFunction.asyncInvoke(element.getValue(), streamRecordBufferEntry);
    }

    @Override
    public void processWatermark(Watermark mark) throws Exception {
        WatermarkQueueEntry watermarkBufferEntry = new WatermarkQueueEntry(mark);

        addAsyncBufferEntry(watermarkBufferEntry);
    }

    @Override
    public void snapshotState(StateSnapshotContext context) throws Exception {
        super.snapshotState(context);

        ListState<StreamElement> partitionableState =
            getOperatorStateBackend().getListState(new ListStateDescriptor<>(STATE_NAME, inStreamElementSerializer));
        partitionableState.clear();

        Collection<StreamElementQueueEntry<?>> values = queue.values();

        try {
            for (StreamElementQueueEntry<?> value : values) {
                partitionableState.add(value.getStreamElement());
            }

            // add the pending stream element queue entry if the stream element queue is currently full
            if (pendingStreamElementQueueEntry != null) {
                partitionableState.add(pendingStreamElementQueueEntry.getStreamElement());
            }
        } catch (Exception e) {
            partitionableState.clear();

            throw new Exception("Could not add stream element queue entries to operator state " +
                "backend of operator " + getOperatorName() + '.', e);
        }
    }

    @Override
    public void initializeState(StateInitializationContext context) throws Exception {
        super.initializeState(context);
        recoveredStreamElements = context
            .getOperatorStateStore()
            .getListState(new ListStateDescriptor<>(STATE_NAME, inStreamElementSerializer));

    }

    @Override
    public void close() throws Exception {
        try {
            assert(Thread.holdsLock(checkpointingLock));

            while (!queue.isEmpty()) {
                // wait for the emitter thread to output the remaining elements
                // for that he needs the checkpointing lock and thus we have to free it
                checkpointingLock.wait();
            }
        }
        finally {
            Exception exception = null;

            try {
                super.close();
            } catch (InterruptedException interrupted) {
                exception = interrupted;

                Thread.currentThread().interrupt();
            } catch (Exception e) {
                exception = e;
            }

            try {
                // terminate the emitter, the emitter thread and the executor
                stopResources(true);
            } catch (InterruptedException interrupted) {
                exception = ExceptionUtils.firstOrSuppressed(interrupted, exception);

                Thread.currentThread().interrupt();
            } catch (Exception e) {
                exception = ExceptionUtils.firstOrSuppressed(e, exception);
            }

            if (exception != null) {
                LOG.warn("Errors occurred while closing the AsyncWaitOperator.", exception);
            }
        }
    }

    @Override
    public void dispose() throws Exception {
        Exception exception = null;

        try {
            super.dispose();
        } catch (InterruptedException interrupted) {
            exception = interrupted;

            Thread.currentThread().interrupt();
        } catch (Exception e) {
            exception = e;
        }

        try {
            stopResources(false);
        } catch (InterruptedException interrupted) {
            exception = ExceptionUtils.firstOrSuppressed(interrupted, exception);

            Thread.currentThread().interrupt();
        } catch (Exception e) {
            exception = ExceptionUtils.firstOrSuppressed(e, exception);
        }

        if (exception != null) {
            throw exception;
        }
    }

    private void stopResources(boolean waitForShutdown) throws InterruptedException {
        emitter.stop();
        emitterThread.interrupt();

        executor.shutdown();

        if (waitForShutdown) {
            try {
                if (!executor.awaitTermination(365L, TimeUnit.DAYS)) {
                    executor.shutdownNow();
                }
            } catch (InterruptedException e) {
                executor.shutdownNow();

                Thread.currentThread().interrupt();
            }

            /*
             * FLINK-5638: If we have the checkpoint lock we might have to free it for a while so
             * that the emitter thread can complete/react to the interrupt signal.
             */
            if (Thread.holdsLock(checkpointingLock)) {
                while (emitterThread.isAlive()) {
                    checkpointingLock.wait(100L);
                }
            }

            emitterThread.join();
        } else {
            executor.shutdownNow();
        }
    }

    private <T> void addAsyncBufferEntry(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException {
        assert(Thread.holdsLock(checkpointingLock));

        pendingStreamElementQueueEntry = streamElementQueueEntry;

        while (!queue.tryPut(streamElementQueueEntry)) {
            // we wait for the emitter to notify us if the queue has space left again
            checkpointingLock.wait();
        }

        pendingStreamElementQueueEntry = null;
    }

    @Override
    public void failOperator(Throwable throwable) {
        getContainingTask().getEnvironment().failExternally(throwable);
    }
}
  • AsyncWaitOperator继承了AbstractUdfStreamOperator,覆盖了AbstractUdfStreamOperator的setup、open、initializeState、close、dispose方法;实现了OneInputStreamOperator接口定义的processElement、processWatermark、processLatencyMarker方法;实现了OperatorActions定义的failOperator方法
  • setup方法使用Executors.newSingleThreadExecutor()创建了ExecutorService,之后根据不同的outputMode创建不同的StreamElementQueue(OrderedStreamElementQueue或者UnorderedStreamElementQueue);open方法使用Emitter创建并启动AsyncIO-Emitter-Thread,另外就是处理recoveredStreamElements,根据不同的类型分别调用processElement、processWatermark、processLatencyMarker方法
  • processElement方法首先根据timeout注册一个timer,在ProcessingTimeCallback的onProcessingTime方法里头执行userFunction.timeout,之后将StreamRecordQueueEntry添加到StreamElementQueue中,最后触发userFunction.asyncInvoke;close和dispose方法会调用stopResources方法来关闭资源,不同的是waitForShutdown参数传值不同,close方法传true,而dispose方法传false

Emitter

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/async/Emitter.java

代码语言:javascript
复制
@Internal
public class Emitter<OUT> implements Runnable {

    private static final Logger LOG = LoggerFactory.getLogger(Emitter.class);

    /** Lock to hold before outputting. */
    private final Object checkpointLock;

    /** Output for the watermark elements. */
    private final Output<StreamRecord<OUT>> output;

    /** Queue to consume the async results from. */
    private final StreamElementQueue streamElementQueue;

    private final OperatorActions operatorActions;

    /** Output for stream records. */
    private final TimestampedCollector<OUT> timestampedCollector;

    private volatile boolean running;

    public Emitter(
            final Object checkpointLock,
            final Output<StreamRecord<OUT>> output,
            final StreamElementQueue streamElementQueue,
            final OperatorActions operatorActions) {

        this.checkpointLock = Preconditions.checkNotNull(checkpointLock, "checkpointLock");
        this.output = Preconditions.checkNotNull(output, "output");
        this.streamElementQueue = Preconditions.checkNotNull(streamElementQueue, "streamElementQueue");
        this.operatorActions = Preconditions.checkNotNull(operatorActions, "operatorActions");

        this.timestampedCollector = new TimestampedCollector<>(this.output);
        this.running = true;
    }

    @Override
    public void run() {
        try {
            while (running) {
                LOG.debug("Wait for next completed async stream element result.");
                AsyncResult streamElementEntry = streamElementQueue.peekBlockingly();

                output(streamElementEntry);
            }
        } catch (InterruptedException e) {
            if (running) {
                operatorActions.failOperator(e);
            } else {
                // Thread got interrupted which means that it should shut down
                LOG.debug("Emitter thread got interrupted, shutting down.");
            }
        } catch (Throwable t) {
            operatorActions.failOperator(new Exception("AsyncWaitOperator's emitter caught an " +
                "unexpected throwable.", t));
        }
    }

    private void output(AsyncResult asyncResult) throws InterruptedException {
        if (asyncResult.isWatermark()) {
            synchronized (checkpointLock) {
                AsyncWatermarkResult asyncWatermarkResult = asyncResult.asWatermark();

                LOG.debug("Output async watermark.");
                output.emitWatermark(asyncWatermarkResult.getWatermark());

                // remove the peeked element from the async collector buffer so that it is no longer
                // checkpointed
                streamElementQueue.poll();

                // notify the main thread that there is again space left in the async collector
                // buffer
                checkpointLock.notifyAll();
            }
        } else {
            AsyncCollectionResult<OUT> streamRecordResult = asyncResult.asResultCollection();

            if (streamRecordResult.hasTimestamp()) {
                timestampedCollector.setAbsoluteTimestamp(streamRecordResult.getTimestamp());
            } else {
                timestampedCollector.eraseTimestamp();
            }

            synchronized (checkpointLock) {
                LOG.debug("Output async stream element collection result.");

                try {
                    Collection<OUT> resultCollection = streamRecordResult.get();

                    if (resultCollection != null) {
                        for (OUT result : resultCollection) {
                            timestampedCollector.collect(result);
                        }
                    }
                } catch (Exception e) {
                    operatorActions.failOperator(
                        new Exception("An async function call terminated with an exception. " +
                            "Failing the AsyncWaitOperator.", e));
                }

                // remove the peeked element from the async collector buffer so that it is no longer
                // checkpointed
                streamElementQueue.poll();

                // notify the main thread that there is again space left in the async collector
                // buffer
                checkpointLock.notifyAll();
            }
        }
    }

    public void stop() {
        running = false;
    }
}
  • Emitter实现了Runnable接口,它主要负责从StreamElementQueue取出element,然后输出到TimestampedCollector
  • Emitter的run方法就是不断循环调用streamElementQueue.peekBlockingly()阻塞获取AsyncResult,获取到之后就调用output方法将result输出出去
  • Emitter的output方法根据asyncResult是否是watermark做不同处理,不是watermark的话,就会将result通过timestampedCollector.collect输出,如果出现异常则调用operatorActions.failOperator传递异常,最后调用streamElementQueue.poll()来移除队首的元素

StreamElementQueue

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueue.java

代码语言:javascript
复制
@Internal
public interface StreamElementQueue {

    <T> void put(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException;

    <T> boolean tryPut(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException;

    AsyncResult peekBlockingly() throws InterruptedException;

    AsyncResult poll() throws InterruptedException;

    Collection<StreamElementQueueEntry<?>> values() throws InterruptedException;

    boolean isEmpty();

    int size();
}
  • StreamElementQueue接口主要定义了AsyncWaitOperator所要用的blocking stream element queue的接口;它定义了put、tryPut、peekBlockingly、poll、values、isEmpty、size方法;StreamElementQueue接口有两个子类分别是UnorderedStreamElementQueue及OrderedStreamElementQueue;队列元素类型为StreamElementQueueEntry

AsyncResult

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/async/queue/AsyncResult.java

代码语言:javascript
复制
@Internal
public interface AsyncResult {

    boolean isWatermark();

    boolean isResultCollection();

    AsyncWatermarkResult asWatermark();

    <T> AsyncCollectionResult<T> asResultCollection();
}
  • AsyncResult接口定义了StreamElementQueue的元素异步返回的结果要实现的方法,该async result可能是watermark,可能是真正的结果

StreamElementQueueEntry

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueEntry.java

代码语言:javascript
复制
@Internal
public abstract class StreamElementQueueEntry<T> implements AsyncResult {

    private final StreamElement streamElement;

    public StreamElementQueueEntry(StreamElement streamElement) {
        this.streamElement = Preconditions.checkNotNull(streamElement);
    }

    public StreamElement getStreamElement() {
        return streamElement;
    }

    public boolean isDone() {
        return getFuture().isDone();
    }

    public void onComplete(
            final Consumer<StreamElementQueueEntry<T>> completeFunction,
            Executor executor) {
        final StreamElementQueueEntry<T> thisReference = this;

        getFuture().whenCompleteAsync(
            // call the complete function for normal completion as well as exceptional completion
            // see FLINK-6435
            (value, throwable) -> completeFunction.accept(thisReference),
            executor);
    }

    protected abstract CompletableFuture<T> getFuture();

    @Override
    public final boolean isWatermark() {
        return AsyncWatermarkResult.class.isAssignableFrom(getClass());
    }

    @Override
    public final boolean isResultCollection() {
        return AsyncCollectionResult.class.isAssignableFrom(getClass());
    }

    @Override
    public final AsyncWatermarkResult asWatermark() {
        return (AsyncWatermarkResult) this;
    }

    @Override
    public final <T> AsyncCollectionResult<T> asResultCollection() {
        return (AsyncCollectionResult<T>) this;
    }
}
  • StreamElementQueueEntry实现了AsyncResult接口,它定义了onComplete方法用于结果完成时的回调处理,同时它还定义了抽象方法getFuture供子类实现;它有两个子类,分别是WatermarkQueueEntry及StreamRecordQueueEntry

小结

  • AsyncWaitOperator继承了AbstractUdfStreamOperator,覆盖了AbstractUdfStreamOperator的setup、open、initializeState、close、dispose方法;实现了OneInputStreamOperator接口定义的processElement、processWatermark、processLatencyMarker方法;实现了OperatorActions定义的failOperator方法;open方法使用Emitter创建并启动AsyncIO-Emitter-Thread
  • Emitter实现了Runnable接口,它主要负责从StreamElementQueue取出element,然后输出到TimestampedCollector;其run方法就是不断循环调用streamElementQueue.peekBlockingly()阻塞获取AsyncResult,获取到之后就调用output方法将result输出出去
  • StreamElementQueue接口主要定义了AsyncWaitOperator所要用的blocking stream element queue的接口;它定义了put、tryPut、peekBlockingly、poll、values、isEmpty、size方法;StreamElementQueue接口有两个子类分别是UnorderedStreamElementQueue及OrderedStreamElementQueue;队列元素类型为StreamElementQueueEntry,StreamElementQueueEntry实现了AsyncResult接口,它定义了onComplete方法用于结果完成时的回调处理,同时它还定义了抽象方法getFuture供子类实现;它有两个子类,分别是WatermarkQueueEntry及StreamRecordQueueEntry

doc

  • AsyncWaitOperator
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-01-20,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • AsyncWaitOperator
  • Emitter
  • StreamElementQueue
  • AsyncResult
    • StreamElementQueueEntry
    • 小结
    • doc
    相关产品与服务
    大数据
    全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档