聊聊flink的SourceFunction

本文主要研究一下flink的SourceFunction

实例

        // set up the execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> dataStreamSource = env.addSource(new RandomWordSource());

        dataStreamSource.map(new UpperCaseMapFunc()).print();

        env.execute("sourceFunctionDemo");
  • 这里通过addSource方法来添加自定义的SourceFunction

SourceFunction

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/functions/source/SourceFunction.java

@Public
public interface SourceFunction<T> extends Function, Serializable {

    void run(SourceContext<T> ctx) throws Exception;

    void cancel();

    // ------------------------------------------------------------------------
    //  source context
    // ------------------------------------------------------------------------

    /**
     * Interface that source functions use to emit elements, and possibly watermarks.
     *
     * @param <T> The type of the elements produced by the source.
     */
    @Public // Interface might be extended in the future with additional methods.
    interface SourceContext<T> {

        //......
    }
}
  • SourceFunction是flink stream data sources的基本接口,这里头定义了run方法以及cancel方法,同时定义了SourceContext接口

SourceContext

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/functions/source/SourceFunction.java

    /**
     * Interface that source functions use to emit elements, and possibly watermarks.
     *
     * @param <T> The type of the elements produced by the source.
     */
    @Public // Interface might be extended in the future with additional methods.
    interface SourceContext<T> {

        /**
         * Emits one element from the source, without attaching a timestamp. In most cases,
         * this is the default way of emitting elements.
         *
         * <p>The timestamp that the element will get assigned depends on the time characteristic of
         * the streaming program:
         * <ul>
         *     <li>On {@link TimeCharacteristic#ProcessingTime}, the element has no timestamp.</li>
         *     <li>On {@link TimeCharacteristic#IngestionTime}, the element gets the system's
         *         current time as the timestamp.</li>
         *     <li>On {@link TimeCharacteristic#EventTime}, the element will have no timestamp initially.
         *         It needs to get a timestamp (via a {@link TimestampAssigner}) before any time-dependent
         *         operation (like time windows).</li>
         * </ul>
         *
         * @param element The element to emit
         */
        void collect(T element);

        /**
         * Emits one element from the source, and attaches the given timestamp. This method
         * is relevant for programs using {@link TimeCharacteristic#EventTime}, where the
         * sources assign timestamps themselves, rather than relying on a {@link TimestampAssigner}
         * on the stream.
         *
         * <p>On certain time characteristics, this timestamp may be ignored or overwritten.
         * This allows programs to switch between the different time characteristics and behaviors
         * without changing the code of the source functions.
         * <ul>
         *     <li>On {@link TimeCharacteristic#ProcessingTime}, the timestamp will be ignored,
         *         because processing time never works with element timestamps.</li>
         *     <li>On {@link TimeCharacteristic#IngestionTime}, the timestamp is overwritten with the
         *         system's current time, to realize proper ingestion time semantics.</li>
         *     <li>On {@link TimeCharacteristic#EventTime}, the timestamp will be used.</li>
         * </ul>
         *
         * @param element The element to emit
         * @param timestamp The timestamp in milliseconds since the Epoch
         */
        @PublicEvolving
        void collectWithTimestamp(T element, long timestamp);

        /**
         * Emits the given {@link Watermark}. A Watermark of value {@code t} declares that no
         * elements with a timestamp {@code t' <= t} will occur any more. If further such
         * elements will be emitted, those elements are considered <i>late</i>.
         *
         * <p>This method is only relevant when running on {@link TimeCharacteristic#EventTime}.
         * On {@link TimeCharacteristic#ProcessingTime},Watermarks will be ignored. On
         * {@link TimeCharacteristic#IngestionTime}, the Watermarks will be replaced by the
         * automatic ingestion time watermarks.
         *
         * @param mark The Watermark to emit
         */
        @PublicEvolving
        void emitWatermark(Watermark mark);

        /**
         * Marks the source to be temporarily idle. This tells the system that this source will
         * temporarily stop emitting records and watermarks for an indefinite amount of time. This
         * is only relevant when running on {@link TimeCharacteristic#IngestionTime} and
         * {@link TimeCharacteristic#EventTime}, allowing downstream tasks to advance their
         * watermarks without the need to wait for watermarks from this source while it is idle.
         *
         * <p>Source functions should make a best effort to call this method as soon as they
         * acknowledge themselves to be idle. The system will consider the source to resume activity
         * again once {@link SourceContext#collect(T)}, {@link SourceContext#collectWithTimestamp(T, long)},
         * or {@link SourceContext#emitWatermark(Watermark)} is called to emit elements or watermarks from the source.
         */
        @PublicEvolving
        void markAsTemporarilyIdle();

        /**
         * Returns the checkpoint lock. Please refer to the class-level comment in
         * {@link SourceFunction} for details about how to write a consistent checkpointed
         * source.
         *
         * @return The object to use as the lock
         */
        Object getCheckpointLock();

        /**
         * This method is called by the system to shut down the context.
         */
        void close();
    }
  • SourceContext主要定义了数据源发射数据的接口,这里是collect方法(如果数据本身没有时间,则在使用TimeCharacteristic.EventTime的时候,可以使用TimestampAssigner在进行依赖时间的相关操作时指定timestamp;如果是配合TimeCharacteristic.IngestionTime,则无需指定,系统会自动生成timestamp);除了collect方法外,还有collectWithTimestamp发射数据同时指定timestamp(配合TimeCharacteristic.EventTime使用)
  • 此外还定义了emitWatermark方法,用于处理数据乱序时,只考虑哪些时间范围内的数据,这个只有在配合TimeCharacteristic.EventTime的时候才有效;如果是TimeCharacteristic.ProcessingTime则watermark会被忽略;如果是TimeCharacteristic.IngestionTime则watermark会被自动生成的ingestion time watermarks替代
  • 这里还定义了markAsTemporarilyIdle方法,用于告诉系统当前的source会暂停发射数据一段时间,这个只在配合使用TimeCharacteristic.IngestionTime或者TimeCharacteristic.EventTime的时候才有效;当SourceContext.collect(T)或者SourceContext.collectWithTimestamp(T, long)或者SourceContext.emitWatermark(Watermark)被调用时,系统会认为source又恢复回来继续生产数据
  • 这里还定义了getCheckpointLock方法,用于返回checkpoint的lock,方便source处理checkpoint相关的逻辑
  • close方法主要给系统来调用,用于关闭context相关的资源

Task.run(上游)

flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/taskmanager/Task.java

public class Task implements Runnable, TaskActions, CheckpointListener {
    //......

    /**
     * The core work method that bootstraps the task and executes its code.
     */
    @Override
    public void run() {
            //......
            // now load and instantiate the task's invokable code
            invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env);

            // ----------------------------------------------------------------
            //  actual task core work
            // ----------------------------------------------------------------

            // we must make strictly sure that the invokable is accessible to the cancel() call
            // by the time we switched to running.
            this.invokable = invokable;

            // switch to the RUNNING state, if that fails, we have been canceled/failed in the meantime
            if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
                throw new CancelTaskException();
            }

            // notify everyone that we switched to running
            notifyObservers(ExecutionState.RUNNING, null);
            taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING));

            // make sure the user code classloader is accessible thread-locally
            executingThread.setContextClassLoader(userCodeClassLoader);

            // run the invokable
            invokable.invoke();

            //......
    }
}
  • Task的run方法会调用invokable.invoke(),这里的invokable为StreamTask

StreamTask.invoke

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/tasks/StreamTask.java

@Internal
public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
        extends AbstractInvokable
        implements AsyncExceptionHandler {

        //......

    @Override
    public final void invoke() throws Exception {

        boolean disposed = false;
        try {
            //......

            // let the task do its work
            isRunning = true;
            run();

            // if this left the run() method cleanly despite the fact that this was canceled,
            // make sure the "clean shutdown" is not attempted
            if (canceled) {
                throw new CancelTaskException();
            }

            LOG.debug("Finished task {}", getName());

            //......
        }
        finally {
            // clean up everything we initialized
            isRunning = false;

            //......
        }
    }
}
  • StreamTask的invoke方法里头调用了子类的run方法,这里子类为SourceStreamTask

SourceStreamTask.run

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java

    @Override
    protected void run() throws Exception {
        headOperator.run(getCheckpointLock(), getStreamStatusMaintainer());
    }
  • SourceStreamTask的run方法主要调用headOperator的run方法,这里的headOperator为SourceStream

SourceStream.run

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/operators/StreamSource.java

    public void run(final Object lockingObject, final StreamStatusMaintainer streamStatusMaintainer) throws Exception {
        run(lockingObject, streamStatusMaintainer, output);
    }

    public void run(final Object lockingObject,
            final StreamStatusMaintainer streamStatusMaintainer,
            final Output<StreamRecord<OUT>> collector) throws Exception {

        final TimeCharacteristic timeCharacteristic = getOperatorConfig().getTimeCharacteristic();

        final Configuration configuration = this.getContainingTask().getEnvironment().getTaskManagerInfo().getConfiguration();
        final long latencyTrackingInterval = getExecutionConfig().isLatencyTrackingConfigured()
            ? getExecutionConfig().getLatencyTrackingInterval()
            : configuration.getLong(MetricOptions.LATENCY_INTERVAL);

        LatencyMarksEmitter<OUT> latencyEmitter = null;
        if (latencyTrackingInterval > 0) {
            latencyEmitter = new LatencyMarksEmitter<>(
                getProcessingTimeService(),
                collector,
                latencyTrackingInterval,
                this.getOperatorID(),
                getRuntimeContext().getIndexOfThisSubtask());
        }

        final long watermarkInterval = getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval();

        this.ctx = StreamSourceContexts.getSourceContext(
            timeCharacteristic,
            getProcessingTimeService(),
            lockingObject,
            streamStatusMaintainer,
            collector,
            watermarkInterval,
            -1);

        try {
            userFunction.run(ctx);

            // if we get here, then the user function either exited after being done (finite source)
            // or the function was canceled or stopped. For the finite source case, we should emit
            // a final watermark that indicates that we reached the end of event-time
            if (!isCanceledOrStopped()) {
                ctx.emitWatermark(Watermark.MAX_WATERMARK);
            }
        } finally {
            // make sure that the context is closed in any case
            ctx.close();
            if (latencyEmitter != null) {
                latencyEmitter.close();
            }
        }
    }
  • SourceStream的run方法,这里先通过StreamSourceContexts.getSourceContext构造SourceFunction.SourceContext,然后调用userFunction的run方法,这里的userFunction为RandomWordSource,即用户自定义的SourceFunction(这里要注意在调用userFunction.run(ctx)之前,如果latencyTrackingInterval大于0,还创建了LatencyMarksEmitter)

RandomWordSource.run

public class RandomWordSource implements SourceFunction<String> {

    private static final Logger LOGGER = LoggerFactory.getLogger(RandomWordSource.class);

    private volatile boolean isRunning = true;

    private static final String[] words = new String[]{"The", "brown", "fox", "quick", "jump", "sucky", "5dolla"};

    @Override
    public void run(SourceContext<String> ctx) throws Exception {
        while (isRunning) {
            Thread.sleep(300);
            int rnd = (int) (Math.random() * 10 % words.length);
            LOGGER.info("emit word: {}", words[rnd]);
            ctx.collect(words[rnd]);
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
    }
}
  • RandomWordSource的run方法会一直循环发射数据

StreamSource.LatencyMarksEmitter

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/operators/StreamSource.java

    private static class LatencyMarksEmitter<OUT> {
        private final ScheduledFuture<?> latencyMarkTimer;

        public LatencyMarksEmitter(
                final ProcessingTimeService processingTimeService,
                final Output<StreamRecord<OUT>> output,
                long latencyTrackingInterval,
                final OperatorID operatorId,
                final int subtaskIndex) {

            latencyMarkTimer = processingTimeService.scheduleAtFixedRate(
                new ProcessingTimeCallback() {
                    @Override
                    public void onProcessingTime(long timestamp) throws Exception {
                        try {
                            // ProcessingTimeService callbacks are executed under the checkpointing lock
                            output.emitLatencyMarker(new LatencyMarker(timestamp, operatorId, subtaskIndex));
                        } catch (Throwable t) {
                            // we catch the Throwables here so that we don't trigger the processing
                            // timer services async exception handler
                            LOG.warn("Error while emitting latency marker.", t);
                        }
                    }
                },
                0L,
                latencyTrackingInterval);
        }

        public void close() {
            latencyMarkTimer.cancel(true);
        }
    }
  • LatencyMarksEmitter是在StreamSource的run方法里头,调用userFunction的run方法前创建的(如果latencyTrackingInterval>0的话),这里的latencyTrackingInterval先调用getExecutionConfig().isLatencyTrackingConfigured()判断executionConfig是否有配置该值,有配置的话则使用getExecutionConfig().getLatencyTrackingInterval()返回的值,没有配置的话则使用configuration.getLong(MetricOptions.LATENCY_INTERVAL)返回的值,后者默认是2000L(这里使用的是后者的配置,即为2000)
  • LatencyMarksEmitter的构造器里头调用processingTimeService.scheduleAtFixedRate方法注册了一个fixedRate的定时任务,调度间隔为latencyTrackingInterval
  • 定时任务的处理内容在ProcessingTimeCallback的onProcessTime方法,里头调用了output.emitLatencyMarker(new LatencyMarker(timestamp, operatorId, subtaskIndex))来发送LatencyMarker;这里的processingTimeService为SystemProcessingTimeService;这里的output为AbstractStreamOperator.CountingOutput

SystemProcessingTimeService.scheduleAtFixedRate

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java

    @Override
    public ScheduledFuture<?> scheduleAtFixedRate(ProcessingTimeCallback callback, long initialDelay, long period) {
        long nextTimestamp = getCurrentProcessingTime() + initialDelay;

        // we directly try to register the timer and only react to the status on exception
        // that way we save unnecessary volatile accesses for each timer
        try {
            return timerService.scheduleAtFixedRate(
                new RepeatedTriggerTask(status, task, checkpointLock, callback, nextTimestamp, period),
                initialDelay,
                period,
                TimeUnit.MILLISECONDS);
        } catch (RejectedExecutionException e) {
            final int status = this.status.get();
            if (status == STATUS_QUIESCED) {
                return new NeverCompleteFuture(initialDelay);
            }
            else if (status == STATUS_SHUTDOWN) {
                throw new IllegalStateException("Timer service is shut down");
            }
            else {
                // something else happened, so propagate the exception
                throw e;
            }
        }
    }

    @Override
    public long getCurrentProcessingTime() {
        return System.currentTimeMillis();
    }
  • SystemProcessingTimeService的scheduleAtFixedRate方法,实际是委托timerService的scheduleAtFixedRate来执行的,这里的timerService即ScheduledThreadPoolExecutor,它的corePoolSize为1,然后它调度的任务是RepeatedTriggerTask

RepeatedTriggerTask

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java

    /**
     * Internal task which is repeatedly called by the processing time service.
     */
    private static final class RepeatedTriggerTask implements Runnable {

        private final AtomicInteger serviceStatus;
        private final Object lock;
        private final ProcessingTimeCallback target;
        private final long period;
        private final AsyncExceptionHandler exceptionHandler;

        private long nextTimestamp;

        private RepeatedTriggerTask(
                final AtomicInteger serviceStatus,
                final AsyncExceptionHandler exceptionHandler,
                final Object lock,
                final ProcessingTimeCallback target,
                final long nextTimestamp,
                final long period) {

            this.serviceStatus = Preconditions.checkNotNull(serviceStatus);
            this.lock = Preconditions.checkNotNull(lock);
            this.target = Preconditions.checkNotNull(target);
            this.period = period;
            this.exceptionHandler = Preconditions.checkNotNull(exceptionHandler);

            this.nextTimestamp = nextTimestamp;
        }

        @Override
        public void run() {
            synchronized (lock) {
                try {
                    if (serviceStatus.get() == STATUS_ALIVE) {
                        target.onProcessingTime(nextTimestamp);
                    }

                    nextTimestamp += period;
                } catch (Throwable t) {
                    TimerException asyncException = new TimerException(t);
                    exceptionHandler.handleAsyncException("Caught exception while processing repeated timer task.", asyncException);
                }
            }
        }
    }
  • RepeatedTriggerTask会在serviceStatus为STATUS_ALIVE的时候,调用ProcessingTimeCallback的onProcessingTime;这里的nextTimestamp最初传进来的是依据getCurrentProcessingTime() + initialDelay来算的,之后不断累加period

AbstractStreamOperator.CountingOutput.emitLatencyMarker

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java

    /**
     * Wrapping {@link Output} that updates metrics on the number of emitted elements.
     */
    public static class CountingOutput<OUT> implements Output<StreamRecord<OUT>> {
        private final Output<StreamRecord<OUT>> output;
        private final Counter numRecordsOut;

        public CountingOutput(Output<StreamRecord<OUT>> output, Counter counter) {
            this.output = output;
            this.numRecordsOut = counter;
        }

        @Override
        public void emitWatermark(Watermark mark) {
            output.emitWatermark(mark);
        }

        @Override
        public void emitLatencyMarker(LatencyMarker latencyMarker) {
            output.emitLatencyMarker(latencyMarker);
        }

        @Override
        public void collect(StreamRecord<OUT> record) {
            numRecordsOut.inc();
            output.collect(record);
        }

        @Override
        public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {
            numRecordsOut.inc();
            output.collect(outputTag, record);
        }

        @Override
        public void close() {
            output.close();
        }
    }
  • 它实际包装的是RecordWriterOutput

RecordWriterOutput.emitLatencyMarker

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java

/**
 * Implementation of {@link Output} that sends data using a {@link RecordWriter}.
 */
@Internal
public class RecordWriterOutput<OUT> implements OperatorChain.WatermarkGaugeExposingOutput<StreamRecord<OUT>> {

    private StreamRecordWriter<SerializationDelegate<StreamElement>> recordWriter;

    private SerializationDelegate<StreamElement> serializationDelegate;

    //......

    @Override
    public void emitLatencyMarker(LatencyMarker latencyMarker) {
        serializationDelegate.setInstance(latencyMarker);

        try {
            recordWriter.randomEmit(serializationDelegate);
        }
        catch (Exception e) {
            throw new RuntimeException(e.getMessage(), e);
        }
    }
}
  • 这里的emitLatencyMarker主要调用了StreamRecordWriter的randomEmit(它实际上是通过父类RecordWriter来发射),来发射LatencyMarker

RecordWriter

flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java

    /**
     * This is used to send LatencyMarks to a random target channel.
     */
    public void randomEmit(T record) throws IOException, InterruptedException {
        sendToTarget(record, rng.nextInt(numChannels));
    }

    private void sendToTarget(T record, int targetChannel) throws IOException, InterruptedException {
        RecordSerializer<T> serializer = serializers[targetChannel];

        SerializationResult result = serializer.addRecord(record);

        while (result.isFullBuffer()) {
            if (tryFinishCurrentBufferBuilder(targetChannel, serializer)) {
                // If this was a full record, we are done. Not breaking
                // out of the loop at this point will lead to another
                // buffer request before breaking out (that would not be
                // a problem per se, but it can lead to stalls in the
                // pipeline).
                if (result.isFullRecord()) {
                    break;
                }
            }
            BufferBuilder bufferBuilder = requestNewBufferBuilder(targetChannel);

            result = serializer.continueWritingWithNextBufferBuilder(bufferBuilder);
        }
        checkState(!serializer.hasSerializedData(), "All data should be written at once");

        if (flushAlways) {
            targetPartition.flush(targetChannel);
        }
    }
  • RecordWriter的randomEmit就是随机选择一个targetChannel,然后进行发送

Task.run(下游)

flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/taskmanager/Task.java

public class Task implements Runnable, TaskActions, CheckpointListener {
    //......

    /**
     * The core work method that bootstraps the task and executes its code.
     */
    @Override
    public void run() {
            //......
            // now load and instantiate the task's invokable code
            invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env);

            // ----------------------------------------------------------------
            //  actual task core work
            // ----------------------------------------------------------------

            // we must make strictly sure that the invokable is accessible to the cancel() call
            // by the time we switched to running.
            this.invokable = invokable;

            // switch to the RUNNING state, if that fails, we have been canceled/failed in the meantime
            if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
                throw new CancelTaskException();
            }

            // notify everyone that we switched to running
            notifyObservers(ExecutionState.RUNNING, null);
            taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING));

            // make sure the user code classloader is accessible thread-locally
            executingThread.setContextClassLoader(userCodeClassLoader);

            // run the invokable
            invokable.invoke();

            //......
    }
}
  • 下游的Task的run方法会调用invokable.invoke(),这里的invokable为OneInputStreamTask

OneInputStreamTask

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java

    @Override
    protected void run() throws Exception {
        // cache processor reference on the stack, to make the code more JIT friendly
        final StreamInputProcessor<IN> inputProcessor = this.inputProcessor;

        while (running && inputProcessor.processInput()) {
            // all the work happens in the "processInput" method
        }
    }
  • Task的run方法会调用StreamTask的invoke方法,而invoke方法会调用OneInputStreamTask的run方法这里主要是不断循环调用inputProcessor.processInput();这里的inputProcessor为StreamInputProcessor

StreamInputProcessor

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java

    public boolean processInput() throws Exception {
        if (isFinished) {
            return false;
        }
        if (numRecordsIn == null) {
            try {
                numRecordsIn = ((OperatorMetricGroup) streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
            } catch (Exception e) {
                LOG.warn("An exception occurred during the metrics setup.", e);
                numRecordsIn = new SimpleCounter();
            }
        }

        while (true) {
            if (currentRecordDeserializer != null) {
                DeserializationResult result = currentRecordDeserializer.getNextRecord(deserializationDelegate);

                if (result.isBufferConsumed()) {
                    currentRecordDeserializer.getCurrentBuffer().recycleBuffer();
                    currentRecordDeserializer = null;
                }

                if (result.isFullRecord()) {
                    StreamElement recordOrMark = deserializationDelegate.getInstance();

                    if (recordOrMark.isWatermark()) {
                        // handle watermark
                        statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), currentChannel);
                        continue;
                    } else if (recordOrMark.isStreamStatus()) {
                        // handle stream status
                        statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), currentChannel);
                        continue;
                    } else if (recordOrMark.isLatencyMarker()) {
                        // handle latency marker
                        synchronized (lock) {
                            streamOperator.processLatencyMarker(recordOrMark.asLatencyMarker());
                        }
                        continue;
                    } else {
                        // now we can do the actual processing
                        StreamRecord<IN> record = recordOrMark.asRecord();
                        synchronized (lock) {
                            numRecordsIn.inc();
                            streamOperator.setKeyContextElement1(record);
                            streamOperator.processElement(record);
                        }
                        return true;
                    }
                }
            }

            final BufferOrEvent bufferOrEvent = barrierHandler.getNextNonBlocked();
            if (bufferOrEvent != null) {
                if (bufferOrEvent.isBuffer()) {
                    currentChannel = bufferOrEvent.getChannelIndex();
                    currentRecordDeserializer = recordDeserializers[currentChannel];
                    currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
                }
                else {
                    // Event received
                    final AbstractEvent event = bufferOrEvent.getEvent();
                    if (event.getClass() != EndOfPartitionEvent.class) {
                        throw new IOException("Unexpected event: " + event);
                    }
                }
            }
            else {
                isFinished = true;
                if (!barrierHandler.isEmpty()) {
                    throw new IllegalStateException("Trailing data in checkpoint barrier handler.");
                }
                return false;
            }
        }
    }
  • processInput方法首先调用currentRecordDeserializer.getNextRecord(deserializationDelegate)获取nextRecord,然后只有当result.isFullRecord()的时候才进行处理
  • 处理的时候会根据StreamElement的不同类型进行不同处理,主要分为watermark、streamStatus、latencyMakrker及正常的数据这几类来处理
  • 如果是正常的数据,则调用streamOperator.processElement(record),这里的streamOperator为StreamMap

StreamMap.processElement

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/operators/StreamMap.java

/**
 * A {@link StreamOperator} for executing {@link MapFunction MapFunctions}.
 */
@Internal
public class StreamMap<IN, OUT>
        extends AbstractUdfStreamOperator<OUT, MapFunction<IN, OUT>>
        implements OneInputStreamOperator<IN, OUT> {

    private static final long serialVersionUID = 1L;

    public StreamMap(MapFunction<IN, OUT> mapper) {
        super(mapper);
        chainingStrategy = ChainingStrategy.ALWAYS;
    }

    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        output.collect(element.replace(userFunction.map(element.getValue())));
    }
}
  • 这里调用了userFunction.map(element.getValue())来进行map操作,这里的userFunction即为UpperCaseMapFunc

小结

  • SourceFunction是flink stream data sources的基本接口,这里头定义了run方法以及cancel方法,同时定义了SourceContext接口;SourceContext接口主要定义了collect、collectWithTimestamp方法用于发射数据,同时也提供了emitWatermark来发射Watermark
  • 对于数据的发射来说,其调用顺序为Task.run --> StreamTask.invoke --> SourceStreamTask.run --> SourceStream.run --> userFunction.run(ctx)(RandomWordSource.run);SourceStream.run里头在调用userFunction.run之前会判断latencyTrackingInterval是否大于0,如果大于0则会创建LatencyMarksEmitter,它注册了定时任务来定时回调ProcessingTimeCallback的onProcessingTime方法,来触发output.emitLatencyMarker(new LatencyMarker(timestamp, operatorId, subtaskIndex))
  • 这里相当于下游会收到userFunction.run发送的用户数据,也会收到定时任务发送的LatencyMarker;下游的调用顺序为Task.run --> StreamTask.invoke --> OneInputStreamTask.run --> StreamInputProcessor.processInput --> statusWatermarkValve.inputWatermark或者statusWatermarkValve.inputStreamStatus或者streamOperator.processLatencyMarker或者streamOperator.processElement;可以看到StreamInputProcessor.processInput里头会根据数据的不同类型做不同处理,如果是用户数据,则调用streamOperator.processElement即StreamMap.processElement --> userFunction.map(UpperCaseMapFunc.map)

doc

  • SourceFunction

原文发布于微信公众号 - 码匠的流水账(geek_luandun)

原文发表时间:2018-11-27

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏码匠的流水账

聊聊jesque的几个dao

jesque-2.1.0-sources.jar!/net/greghaines/jesque/meta/dao/FailureDAO.java

771
来自专栏Java技术分享圈

杨老师课堂之Excel VBA 程序开发第六讲根据部门列创建工作表

方式1:本节课件下载地址:链接: https://pan.baidu.com/s/1rf5pRmZ95fjVbz70KYi6Aw 密码: q9yk

804
来自专栏蘑菇先生的技术笔记

探索c#之不可变数据类型

2024
来自专栏F_Alex

数据结构与算法(六)-背包、栈和队列

  前言:许多基础数据类型都和对象的集合有关。具体来说,数据类型的值就是一组对象的集合,所有操作都是关于添加、删除或是访问集合中的对象。而且有很多高级数据结构都...

1124
来自专栏Ryan Miao

Java8学习(4)-Stream流

Stream和Collection的区别是什么 流和集合的区别是什么? 粗略地说, 集合和流之间的差异就在于什么时候进行计算。集合是一个内存中的数据结构,它包...

8717
来自专栏Play & Scala 技术分享

Play For Scala 开发指南 - 第3章 常用类介绍

2684
来自专栏技术栈大杂烩

Python locals() 的陷阱

在工作中, 有时候会遇到一种情况: 动态地进行变量赋值, 不管是局部变量还是全局变量, 在我们绞尽脑汁的时候, Python已经为我们解决了这个问题.

1262
来自专栏开源优测

[快学Python3]数据结构-堆栈

概述 什么是堆栈,简单而言:后进先出。 算法原理 进栈(PUSH)算法 若TOP≥n时,则给出溢出信息,作出错处理(进栈前首先检查栈是否已满,满则溢出;不满则作...

29211
来自专栏黑泽君的专栏

【Java面试复习经典】传智播客Java就业班入学测试题及答案解析(2014年版)

3142
来自专栏lhyt前端之路

小白的diff算法试试水之旅0.前言1. 主角1:Element构造函数2. 主角2:render函数3. 大主角: diff函数4. 更新5. 完成

先介绍一下虚拟dom的数据结构,我们都知道源码里面有createElement函数,通过他创建虚拟dom,然后调用render函数。还记得VUE脚手架住入口文件...

1022

扫码关注云+社区