本文主要研究一下flink的SourceFunction
// set up the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> dataStreamSource = env.addSource(new RandomWordSource());
dataStreamSource.map(new UpperCaseMapFunc()).print();
env.execute("sourceFunctionDemo");
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/functions/source/SourceFunction.java
@Public
public interface SourceFunction<T> extends Function, Serializable {
void run(SourceContext<T> ctx) throws Exception;
void cancel();
// ------------------------------------------------------------------------
// source context
// ------------------------------------------------------------------------
/**
* Interface that source functions use to emit elements, and possibly watermarks.
*
* @param <T> The type of the elements produced by the source.
*/
@Public // Interface might be extended in the future with additional methods.
interface SourceContext<T> {
//......
}
}
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/functions/source/SourceFunction.java
/**
* Interface that source functions use to emit elements, and possibly watermarks.
*
* @param <T> The type of the elements produced by the source.
*/
@Public // Interface might be extended in the future with additional methods.
interface SourceContext<T> {
/**
* Emits one element from the source, without attaching a timestamp. In most cases,
* this is the default way of emitting elements.
*
* <p>The timestamp that the element will get assigned depends on the time characteristic of
* the streaming program:
* <ul>
* <li>On {@link TimeCharacteristic#ProcessingTime}, the element has no timestamp.</li>
* <li>On {@link TimeCharacteristic#IngestionTime}, the element gets the system's
* current time as the timestamp.</li>
* <li>On {@link TimeCharacteristic#EventTime}, the element will have no timestamp initially.
* It needs to get a timestamp (via a {@link TimestampAssigner}) before any time-dependent
* operation (like time windows).</li>
* </ul>
*
* @param element The element to emit
*/
void collect(T element);
/**
* Emits one element from the source, and attaches the given timestamp. This method
* is relevant for programs using {@link TimeCharacteristic#EventTime}, where the
* sources assign timestamps themselves, rather than relying on a {@link TimestampAssigner}
* on the stream.
*
* <p>On certain time characteristics, this timestamp may be ignored or overwritten.
* This allows programs to switch between the different time characteristics and behaviors
* without changing the code of the source functions.
* <ul>
* <li>On {@link TimeCharacteristic#ProcessingTime}, the timestamp will be ignored,
* because processing time never works with element timestamps.</li>
* <li>On {@link TimeCharacteristic#IngestionTime}, the timestamp is overwritten with the
* system's current time, to realize proper ingestion time semantics.</li>
* <li>On {@link TimeCharacteristic#EventTime}, the timestamp will be used.</li>
* </ul>
*
* @param element The element to emit
* @param timestamp The timestamp in milliseconds since the Epoch
*/
@PublicEvolving
void collectWithTimestamp(T element, long timestamp);
/**
* Emits the given {@link Watermark}. A Watermark of value {@code t} declares that no
* elements with a timestamp {@code t' <= t} will occur any more. If further such
* elements will be emitted, those elements are considered <i>late</i>.
*
* <p>This method is only relevant when running on {@link TimeCharacteristic#EventTime}.
* On {@link TimeCharacteristic#ProcessingTime},Watermarks will be ignored. On
* {@link TimeCharacteristic#IngestionTime}, the Watermarks will be replaced by the
* automatic ingestion time watermarks.
*
* @param mark The Watermark to emit
*/
@PublicEvolving
void emitWatermark(Watermark mark);
/**
* Marks the source to be temporarily idle. This tells the system that this source will
* temporarily stop emitting records and watermarks for an indefinite amount of time. This
* is only relevant when running on {@link TimeCharacteristic#IngestionTime} and
* {@link TimeCharacteristic#EventTime}, allowing downstream tasks to advance their
* watermarks without the need to wait for watermarks from this source while it is idle.
*
* <p>Source functions should make a best effort to call this method as soon as they
* acknowledge themselves to be idle. The system will consider the source to resume activity
* again once {@link SourceContext#collect(T)}, {@link SourceContext#collectWithTimestamp(T, long)},
* or {@link SourceContext#emitWatermark(Watermark)} is called to emit elements or watermarks from the source.
*/
@PublicEvolving
void markAsTemporarilyIdle();
/**
* Returns the checkpoint lock. Please refer to the class-level comment in
* {@link SourceFunction} for details about how to write a consistent checkpointed
* source.
*
* @return The object to use as the lock
*/
Object getCheckpointLock();
/**
* This method is called by the system to shut down the context.
*/
void close();
}
如果数据本身没有时间,则在使用TimeCharacteristic.EventTime的时候,可以使用TimestampAssigner在进行依赖时间的相关操作时指定timestamp;如果是配合TimeCharacteristic.IngestionTime,则无需指定,系统会自动生成timestamp
);除了collect方法外,还有collectWithTimestamp发射数据同时指定timestamp(配合TimeCharacteristic.EventTime使用
)上游
)flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/taskmanager/Task.java
public class Task implements Runnable, TaskActions, CheckpointListener {
//......
/**
* The core work method that bootstraps the task and executes its code.
*/
@Override
public void run() {
//......
// now load and instantiate the task's invokable code
invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env);
// ----------------------------------------------------------------
// actual task core work
// ----------------------------------------------------------------
// we must make strictly sure that the invokable is accessible to the cancel() call
// by the time we switched to running.
this.invokable = invokable;
// switch to the RUNNING state, if that fails, we have been canceled/failed in the meantime
if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
throw new CancelTaskException();
}
// notify everyone that we switched to running
notifyObservers(ExecutionState.RUNNING, null);
taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING));
// make sure the user code classloader is accessible thread-locally
executingThread.setContextClassLoader(userCodeClassLoader);
// run the invokable
invokable.invoke();
//......
}
}
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@Internal
public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
extends AbstractInvokable
implements AsyncExceptionHandler {
//......
@Override
public final void invoke() throws Exception {
boolean disposed = false;
try {
//......
// let the task do its work
isRunning = true;
run();
// if this left the run() method cleanly despite the fact that this was canceled,
// make sure the "clean shutdown" is not attempted
if (canceled) {
throw new CancelTaskException();
}
LOG.debug("Finished task {}", getName());
//......
}
finally {
// clean up everything we initialized
isRunning = false;
//......
}
}
}
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
@Override
protected void run() throws Exception {
headOperator.run(getCheckpointLock(), getStreamStatusMaintainer());
}
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/operators/StreamSource.java
public void run(final Object lockingObject, final StreamStatusMaintainer streamStatusMaintainer) throws Exception {
run(lockingObject, streamStatusMaintainer, output);
}
public void run(final Object lockingObject,
final StreamStatusMaintainer streamStatusMaintainer,
final Output<StreamRecord<OUT>> collector) throws Exception {
final TimeCharacteristic timeCharacteristic = getOperatorConfig().getTimeCharacteristic();
final Configuration configuration = this.getContainingTask().getEnvironment().getTaskManagerInfo().getConfiguration();
final long latencyTrackingInterval = getExecutionConfig().isLatencyTrackingConfigured()
? getExecutionConfig().getLatencyTrackingInterval()
: configuration.getLong(MetricOptions.LATENCY_INTERVAL);
LatencyMarksEmitter<OUT> latencyEmitter = null;
if (latencyTrackingInterval > 0) {
latencyEmitter = new LatencyMarksEmitter<>(
getProcessingTimeService(),
collector,
latencyTrackingInterval,
this.getOperatorID(),
getRuntimeContext().getIndexOfThisSubtask());
}
final long watermarkInterval = getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval();
this.ctx = StreamSourceContexts.getSourceContext(
timeCharacteristic,
getProcessingTimeService(),
lockingObject,
streamStatusMaintainer,
collector,
watermarkInterval,
-1);
try {
userFunction.run(ctx);
// if we get here, then the user function either exited after being done (finite source)
// or the function was canceled or stopped. For the finite source case, we should emit
// a final watermark that indicates that we reached the end of event-time
if (!isCanceledOrStopped()) {
ctx.emitWatermark(Watermark.MAX_WATERMARK);
}
} finally {
// make sure that the context is closed in any case
ctx.close();
if (latencyEmitter != null) {
latencyEmitter.close();
}
}
}
这里要注意在调用userFunction.run(ctx)之前,如果latencyTrackingInterval大于0,还创建了LatencyMarksEmitter
)public class RandomWordSource implements SourceFunction<String> {
private static final Logger LOGGER = LoggerFactory.getLogger(RandomWordSource.class);
private volatile boolean isRunning = true;
private static final String[] words = new String[]{"The", "brown", "fox", "quick", "jump", "sucky", "5dolla"};
@Override
public void run(SourceContext<String> ctx) throws Exception {
while (isRunning) {
Thread.sleep(300);
int rnd = (int) (Math.random() * 10 % words.length);
LOGGER.info("emit word: {}", words[rnd]);
ctx.collect(words[rnd]);
}
}
@Override
public void cancel() {
isRunning = false;
}
}
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/operators/StreamSource.java
private static class LatencyMarksEmitter<OUT> {
private final ScheduledFuture<?> latencyMarkTimer;
public LatencyMarksEmitter(
final ProcessingTimeService processingTimeService,
final Output<StreamRecord<OUT>> output,
long latencyTrackingInterval,
final OperatorID operatorId,
final int subtaskIndex) {
latencyMarkTimer = processingTimeService.scheduleAtFixedRate(
new ProcessingTimeCallback() {
@Override
public void onProcessingTime(long timestamp) throws Exception {
try {
// ProcessingTimeService callbacks are executed under the checkpointing lock
output.emitLatencyMarker(new LatencyMarker(timestamp, operatorId, subtaskIndex));
} catch (Throwable t) {
// we catch the Throwables here so that we don't trigger the processing
// timer services async exception handler
LOG.warn("Error while emitting latency marker.", t);
}
}
},
0L,
latencyTrackingInterval);
}
public void close() {
latencyMarkTimer.cancel(true);
}
}
如果latencyTrackingInterval>0的话
),这里的latencyTrackingInterval先调用getExecutionConfig().isLatencyTrackingConfigured()判断executionConfig是否有配置该值,有配置的话则使用getExecutionConfig().getLatencyTrackingInterval()返回的值,没有配置的话则使用configuration.getLong(MetricOptions.LATENCY_INTERVAL)返回的值,后者默认是2000L(这里使用的是后者的配置,即为2000
)flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
@Override
public ScheduledFuture<?> scheduleAtFixedRate(ProcessingTimeCallback callback, long initialDelay, long period) {
long nextTimestamp = getCurrentProcessingTime() + initialDelay;
// we directly try to register the timer and only react to the status on exception
// that way we save unnecessary volatile accesses for each timer
try {
return timerService.scheduleAtFixedRate(
new RepeatedTriggerTask(status, task, checkpointLock, callback, nextTimestamp, period),
initialDelay,
period,
TimeUnit.MILLISECONDS);
} catch (RejectedExecutionException e) {
final int status = this.status.get();
if (status == STATUS_QUIESCED) {
return new NeverCompleteFuture(initialDelay);
}
else if (status == STATUS_SHUTDOWN) {
throw new IllegalStateException("Timer service is shut down");
}
else {
// something else happened, so propagate the exception
throw e;
}
}
}
@Override
public long getCurrentProcessingTime() {
return System.currentTimeMillis();
}
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
/**
* Internal task which is repeatedly called by the processing time service.
*/
private static final class RepeatedTriggerTask implements Runnable {
private final AtomicInteger serviceStatus;
private final Object lock;
private final ProcessingTimeCallback target;
private final long period;
private final AsyncExceptionHandler exceptionHandler;
private long nextTimestamp;
private RepeatedTriggerTask(
final AtomicInteger serviceStatus,
final AsyncExceptionHandler exceptionHandler,
final Object lock,
final ProcessingTimeCallback target,
final long nextTimestamp,
final long period) {
this.serviceStatus = Preconditions.checkNotNull(serviceStatus);
this.lock = Preconditions.checkNotNull(lock);
this.target = Preconditions.checkNotNull(target);
this.period = period;
this.exceptionHandler = Preconditions.checkNotNull(exceptionHandler);
this.nextTimestamp = nextTimestamp;
}
@Override
public void run() {
synchronized (lock) {
try {
if (serviceStatus.get() == STATUS_ALIVE) {
target.onProcessingTime(nextTimestamp);
}
nextTimestamp += period;
} catch (Throwable t) {
TimerException asyncException = new TimerException(t);
exceptionHandler.handleAsyncException("Caught exception while processing repeated timer task.", asyncException);
}
}
}
}
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
/**
* Wrapping {@link Output} that updates metrics on the number of emitted elements.
*/
public static class CountingOutput<OUT> implements Output<StreamRecord<OUT>> {
private final Output<StreamRecord<OUT>> output;
private final Counter numRecordsOut;
public CountingOutput(Output<StreamRecord<OUT>> output, Counter counter) {
this.output = output;
this.numRecordsOut = counter;
}
@Override
public void emitWatermark(Watermark mark) {
output.emitWatermark(mark);
}
@Override
public void emitLatencyMarker(LatencyMarker latencyMarker) {
output.emitLatencyMarker(latencyMarker);
}
@Override
public void collect(StreamRecord<OUT> record) {
numRecordsOut.inc();
output.collect(record);
}
@Override
public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {
numRecordsOut.inc();
output.collect(outputTag, record);
}
@Override
public void close() {
output.close();
}
}
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
/**
* Implementation of {@link Output} that sends data using a {@link RecordWriter}.
*/
@Internal
public class RecordWriterOutput<OUT> implements OperatorChain.WatermarkGaugeExposingOutput<StreamRecord<OUT>> {
private StreamRecordWriter<SerializationDelegate<StreamElement>> recordWriter;
private SerializationDelegate<StreamElement> serializationDelegate;
//......
@Override
public void emitLatencyMarker(LatencyMarker latencyMarker) {
serializationDelegate.setInstance(latencyMarker);
try {
recordWriter.randomEmit(serializationDelegate);
}
catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
}
}
}
它实际上是通过父类RecordWriter来发射
),来发射LatencyMarkerflink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
/**
* This is used to send LatencyMarks to a random target channel.
*/
public void randomEmit(T record) throws IOException, InterruptedException {
sendToTarget(record, rng.nextInt(numChannels));
}
private void sendToTarget(T record, int targetChannel) throws IOException, InterruptedException {
RecordSerializer<T> serializer = serializers[targetChannel];
SerializationResult result = serializer.addRecord(record);
while (result.isFullBuffer()) {
if (tryFinishCurrentBufferBuilder(targetChannel, serializer)) {
// If this was a full record, we are done. Not breaking
// out of the loop at this point will lead to another
// buffer request before breaking out (that would not be
// a problem per se, but it can lead to stalls in the
// pipeline).
if (result.isFullRecord()) {
break;
}
}
BufferBuilder bufferBuilder = requestNewBufferBuilder(targetChannel);
result = serializer.continueWritingWithNextBufferBuilder(bufferBuilder);
}
checkState(!serializer.hasSerializedData(), "All data should be written at once");
if (flushAlways) {
targetPartition.flush(targetChannel);
}
}
下游
)flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/taskmanager/Task.java
public class Task implements Runnable, TaskActions, CheckpointListener {
//......
/**
* The core work method that bootstraps the task and executes its code.
*/
@Override
public void run() {
//......
// now load and instantiate the task's invokable code
invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env);
// ----------------------------------------------------------------
// actual task core work
// ----------------------------------------------------------------
// we must make strictly sure that the invokable is accessible to the cancel() call
// by the time we switched to running.
this.invokable = invokable;
// switch to the RUNNING state, if that fails, we have been canceled/failed in the meantime
if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
throw new CancelTaskException();
}
// notify everyone that we switched to running
notifyObservers(ExecutionState.RUNNING, null);
taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING));
// make sure the user code classloader is accessible thread-locally
executingThread.setContextClassLoader(userCodeClassLoader);
// run the invokable
invokable.invoke();
//......
}
}
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
@Override
protected void run() throws Exception {
// cache processor reference on the stack, to make the code more JIT friendly
final StreamInputProcessor<IN> inputProcessor = this.inputProcessor;
while (running && inputProcessor.processInput()) {
// all the work happens in the "processInput" method
}
}
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
public boolean processInput() throws Exception {
if (isFinished) {
return false;
}
if (numRecordsIn == null) {
try {
numRecordsIn = ((OperatorMetricGroup) streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
} catch (Exception e) {
LOG.warn("An exception occurred during the metrics setup.", e);
numRecordsIn = new SimpleCounter();
}
}
while (true) {
if (currentRecordDeserializer != null) {
DeserializationResult result = currentRecordDeserializer.getNextRecord(deserializationDelegate);
if (result.isBufferConsumed()) {
currentRecordDeserializer.getCurrentBuffer().recycleBuffer();
currentRecordDeserializer = null;
}
if (result.isFullRecord()) {
StreamElement recordOrMark = deserializationDelegate.getInstance();
if (recordOrMark.isWatermark()) {
// handle watermark
statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), currentChannel);
continue;
} else if (recordOrMark.isStreamStatus()) {
// handle stream status
statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), currentChannel);
continue;
} else if (recordOrMark.isLatencyMarker()) {
// handle latency marker
synchronized (lock) {
streamOperator.processLatencyMarker(recordOrMark.asLatencyMarker());
}
continue;
} else {
// now we can do the actual processing
StreamRecord<IN> record = recordOrMark.asRecord();
synchronized (lock) {
numRecordsIn.inc();
streamOperator.setKeyContextElement1(record);
streamOperator.processElement(record);
}
return true;
}
}
}
final BufferOrEvent bufferOrEvent = barrierHandler.getNextNonBlocked();
if (bufferOrEvent != null) {
if (bufferOrEvent.isBuffer()) {
currentChannel = bufferOrEvent.getChannelIndex();
currentRecordDeserializer = recordDeserializers[currentChannel];
currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
}
else {
// Event received
final AbstractEvent event = bufferOrEvent.getEvent();
if (event.getClass() != EndOfPartitionEvent.class) {
throw new IOException("Unexpected event: " + event);
}
}
}
else {
isFinished = true;
if (!barrierHandler.isEmpty()) {
throw new IllegalStateException("Trailing data in checkpoint barrier handler.");
}
return false;
}
}
}
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/operators/StreamMap.java
/**
* A {@link StreamOperator} for executing {@link MapFunction MapFunctions}.
*/
@Internal
public class StreamMap<IN, OUT>
extends AbstractUdfStreamOperator<OUT, MapFunction<IN, OUT>>
implements OneInputStreamOperator<IN, OUT> {
private static final long serialVersionUID = 1L;
public StreamMap(MapFunction<IN, OUT> mapper) {
super(mapper);
chainingStrategy = ChainingStrategy.ALWAYS;
}
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
output.collect(element.replace(userFunction.map(element.getValue())));
}
}
RandomWordSource.run
);SourceStream.run里头在调用userFunction.run之前会判断latencyTrackingInterval是否大于0,如果大于0则会创建LatencyMarksEmitter,它注册了定时任务来定时回调ProcessingTimeCallback的onProcessingTime方法,来触发output.emitLatencyMarker(new LatencyMarker(timestamp, operatorId, subtaskIndex))UpperCaseMapFunc.map
)