专栏首页Spark2.4.0Flink1.7.2 local WordCount源码分析
原创

Flink1.7.2 local WordCount源码分析

概述

时序图

005-source-operation-sink源码分析.png

输入数据

  • nc -lk 1234a b a b a客户端程序SocketWindowWordCountLocal.scalapackage com.opensourceteams.module.bigdata.flink.example.stream.worldcount.nc import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.windowing.time.Time /** * nc -lk 1234 输入数据 */ object SocketWindowWordCountLocal { def main(args: Array[String]): Unit = { val port = 1234 // get the execution environment // val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val configuration : Configuration = new Configuration() val timeout = "100000 s" val timeoutHeartbeatPause = "1000000 s" configuration.setString("akka.ask.timeout",timeout) configuration.setString("akka.lookup.timeout",timeout) configuration.setString("akka.tcp.timeout",timeout) configuration.setString("akka.transport.heartbeat.interval",timeout) configuration.setString("akka.transport.heartbeat.pause",timeoutHeartbeatPause) configuration.setString("akka.watch.heartbeat.pause",timeout) configuration.setInteger("heartbeat.interval",10000000) configuration.setInteger("heartbeat.timeout",50000000) val env:StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironment(1,configuration) // get input data by connecting to the socket val dataStream = env.socketTextStream("localhost", port, '\n') import org.apache.flink.streaming.api.scala._ val textResult = dataStream.flatMap( w => w.split("\\s") ).map( w => WordWithCount(w,1)) .keyBy("word") /** * 每20秒刷新一次,相当于重新开始计数, * 好处,不需要一直拿所有的数据统计 * 只需要在指定时间间隔内的增量数据,减少了数据规模 */ .timeWindow(Time.seconds(20)) //.countWindow(3) //.countWindow(3,1) //.countWindowAll(3) .sum("count" ) textResult.print().setParallelism(1) if(args == null || args.size ==0){ env.execute("默认作业") //执行计划 //println(env.getExecutionPlan) //StreamGraph //println(env.getStreamGraph.getStreamingPlanAsJSON) //JsonPlanGenerator.generatePlan(jobGraph) }else{ env.execute(args(0)) } println("结束") } // Data type for words with count case class WordWithCount(word: String, count: Long) }Flink源码分析

Source(读取数据)

SocketTextStreamFunction

  • SocketTextStreamFunction.run函数,只要task在运行,就一直通过Socket连接流,BufferedReader.read进行读取,每次读8kb,然后对缓存中的数据进行按行处理
  • NonTimestampContext.collect函数进行处理
@Override
	public void run(SourceContext<String> ctx) throws Exception {
		final StringBuilder buffer = new StringBuilder();
		long attempt = 0;

		while (isRunning) {

			try (Socket socket = new Socket()) {
				currentSocket = socket;

				LOG.info("Connecting to server socket " + hostname + ':' + port);
				socket.connect(new InetSocketAddress(hostname, port), CONNECTION_TIMEOUT_TIME);
				try (BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()))) {

					char[] cbuf = new char[8192];
					int bytesRead;
					while (isRunning && (bytesRead = reader.read(cbuf)) != -1) {
						buffer.append(cbuf, 0, bytesRead);
						int delimPos;
						while (buffer.length() >= delimiter.length() && (delimPos = buffer.indexOf(delimiter)) != -1) {
							String record = buffer.substring(0, delimPos);
							// truncate trailing carriage return
							if (delimiter.equals("\n") && record.endsWith("\r")) {
								record = record.substring(0, record.length() - 1);
							}
							ctx.collect(record);
							buffer.delete(0, delimPos + delimiter.length());
						}
					}
				}
			}

			// if we dropped out of this loop due to an EOF, sleep and retry
			if (isRunning) {
				attempt++;
				if (maxNumRetries == -1 || attempt < maxNumRetries) {
					LOG.warn("Lost connection to server socket. Retrying in " + delayBetweenRetries + " msecs...");
					Thread.sleep(delayBetweenRetries);
				}
				else {
					// this should probably be here, but some examples expect simple exists of the stream source
					// throw new EOFException("Reached end of stream and reconnects are not enabled.");
					break;
				}
			}
		}

		// collect trailing data
		if (buffer.length() > 0) {
			ctx.collect(buffer.toString());
		}
	}

NonTimestampContext

  • collect
  • element参数为读取到source中的一行数据
  • 调用AbstractStreamOperator.CountingOutput.collect
		public void collect(T element) {
			synchronized (lock) {
				output.collect(reuse.replace(element));
			}
		}

AbstractStreamOperator.CountingOutput

  • collect @Override public void collect(StreamRecord<OUT> record) { numRecordsOut.inc(); output.collect(record); }
  • 调用CopyingChainingOutput.collect

CopyingChainingOutput.collect

  • collect public void collect(StreamRecord<T> record) { if (this.outputTag != null) { // we are only responsible for emitting to the main input return; } pushToOperator(record); }
  • 调用pushToOperator()

pushToOperator

  • 调用StreamFlatMap.processElementprotected <X> void pushToOperator(StreamRecord<X> record) { try { // we know that the given outputTag matches our OutputTag so the record // must be of the type that our operator (and Serializer) expects. @SuppressWarnings("unchecked") StreamRecord<T> castRecord = (StreamRecord<T>) record; numRecordsIn.inc(); StreamRecord<T> copy = castRecord.copy(serializer.copy(castRecord.getValue())); operator.setKeyContextElement1(copy); operator.processElement(copy); } catch (ClassCastException e) { if (outputTag != null) { // Enrich error message ClassCastException replace = new ClassCastException( String.format( "%s. Failed to push OutputTag with id '%s' to operator. " + "This can occur when multiple OutputTags with different types " + "but identical names are being used.", e.getMessage(), outputTag.getId())); throw new ExceptionInChainedOperatorException(replace); } else { throw new ExceptionInChainedOperatorException(e); } } catch (Exception e) { throw new ExceptionInChainedOperatorException(e); } } }

Operator(FlatMap)

StreamFlatMap

  • processElement public void processElement(StreamRecord<IN> element) throws Exception { collector.setTimestamp(element); userFunction.flatMap(element.getValue(), collector); }
  • userFunction为自定义函数,即flatMap( w => w.split("\s") ),括号中的表达式
  • element.getValue()为source中的一行数据
  • 调用DataStream.flatMap

DataStream

  • flatMap /** * Creates a new DataStream by applying the given function to every element and flattening * the results. */ def flatMap[R: TypeInformation](fun: T => TraversableOnce[R]): DataStream[R] = { if (fun == null) { throw new NullPointerException("FlatMap function must not be null.") } val cleanFun = clean(fun) val flatMapper = new FlatMapFunction[T, R] { def flatMap(in: T, out: Collector[R]) { cleanFun(in) foreach out.collect } } flatMap(flatMapper) }
  • cleanFun(in) 相当于是,source中的一行数据,执行完flatMap函数后返回的结果数据,然后进行foreach遍历,即取出集合中的一个元素,调用out.collect函数,即调用TimestampedCollector.collect

Operator(Map)

TimestampedCollector

  • collect public void collect(T record) { output.collect(reuse.replace(record)); }CountingOutput public void collect(StreamRecord<OUT> record) { numRecordsOut.inc(); output.collect(record); }
  • 调用CountingOutput.collect()
  • 调用CopyingChainingOutput.collect

CopyingChainingOutput

  • 调用函数pushToOperator() public void collect(StreamRecord<T> record) { if (this.outputTag != null) { // we are only responsible for emitting to the main input return; } pushToOperator(record); }
  • 调用operator.processElement(copy);即StreamMap.processElementprotected <X> void pushToOperator(StreamRecord<X> record) { try { // we know that the given outputTag matches our OutputTag so the record // must be of the type that our operator (and Serializer) expects. @SuppressWarnings("unchecked") StreamRecord<T> castRecord = (StreamRecord<T>) record; numRecordsIn.inc(); StreamRecord<T> copy = castRecord.copy(serializer.copy(castRecord.getValue())); operator.setKeyContextElement1(copy); operator.processElement(copy); } catch (ClassCastException e) { if (outputTag != null) { // Enrich error message ClassCastException replace = new ClassCastException( String.format( "%s. Failed to push OutputTag with id '%s' to operator. " + "This can occur when multiple OutputTags with different types " + "but identical names are being used.", e.getMessage(), outputTag.getId())); throw new ExceptionInChainedOperatorException(replace); } else { throw new ExceptionInChainedOperatorException(e); } } catch (Exception e) { throw new ExceptionInChainedOperatorException(e); } } }

StreamMap

  • userFunction 相当于map( w => WordWithCount(w,1)) 括号中的表达式 public void processElement(StreamRecord<IN> element) throws Exception { output.collect(element.replace(userFunction.map(element.getValue()))); }
  • userFunction.map(element.getValue()) 相当于,拿到Source中一行数据,进行FlatMap操作后,取集合中的一个元素,再进行flatMap操作,得到的值:(a,1)
  • 再调用output.collect,即 CountingOutput.collect

CountingOutput

  • 调用RecordWriterOutput.collect public void collect(StreamRecord<OUT> record) { numRecordsOut.inc(); output.collect(record); }RecordWriterOutput public void collect(StreamRecord<OUT> record) { if (this.outputTag != null) { // we are only responsible for emitting to the main input return; } pushToRecordWriter(record); }
  • 调用函数pushToRecordWriter
  • pushToRecordWriter private <X> void pushToRecordWriter(StreamRecord<X> record) { serializationDelegate.setInstance(record); try { recordWriter.emit(serializationDelegate); } catch (Exception e) { throw new RuntimeException(e.getMessage(), e); } }
  • 调用StreamRecordWriter.emit

StreamRecordWriter

  • 调用RecordWriter.emit public void emit(T record) throws IOException, InterruptedException { checkErroneous(); super.emit(record); }RecordWriter public void emit(T record) throws IOException, InterruptedException { emit(record, channelSelector.selectChannels(record, numChannels)); }
  • 调用emit
  • emit private void emit(T record, int[] targetChannels) throws IOException, InterruptedException { serializer.serializeRecord(record); boolean pruneAfterCopying = false; for (int channel : targetChannels) { if (copyFromSerializerToTargetChannel(channel)) { pruneAfterCopying = true; } } // Make sure we don't hold onto the large intermediate serialization buffer for too long if (pruneAfterCopying) { serializer.prune(); } }
  • 调用copyFromSerializerToTargetChannel(),该函数会往Channel中写数据,会触发WindowOperator
  • copyFromSerializerToTargetChannel/** * @param targetChannel * @return <tt>true</tt> if the intermediate serialization buffer should be pruned */ private boolean copyFromSerializerToTargetChannel(int targetChannel) throws IOException, InterruptedException { // We should reset the initial position of the intermediate serialization buffer before // copying, so the serialization results can be copied to multiple target buffers. serializer.reset(); boolean pruneTriggered = false; BufferBuilder bufferBuilder = getBufferBuilder(targetChannel); SerializationResult result = serializer.copyToBufferBuilder(bufferBuilder); while (result.isFullBuffer()) { numBytesOut.inc(bufferBuilder.finish()); numBuffersOut.inc(); // If this was a full record, we are done. Not breaking out of the loop at this point // will lead to another buffer request before breaking out (that would not be a // problem per se, but it can lead to stalls in the pipeline). if (result.isFullRecord()) { pruneTriggered = true; bufferBuilders[targetChannel] = Optional.empty(); break; } bufferBuilder = requestNewBufferBuilder(targetChannel); result = serializer.copyToBufferBuilder(bufferBuilder); } checkState(!serializer.hasSerializedData(), "All data should be written at once"); if (flushAlways) { targetPartition.flush(targetChannel); } return pruneTriggered; }

window operator(reduce)

WindowOperator

  • processElement,该函数,每次source进行flatMap,map,之后,即(a,1) 这样的元素调用emit之后,就会触发该函数调用,每一个元素进行emit之后,都会调用该函数
  • windowAssigner.assignWindows,把每一个元素分配给对应的window
  • 把该元素存到HeapReducingState.add()中, 这个state值在WindowOperator.windowState.stateTable.primaryTable.state 这个里边存着
  • add()调用transform,最终调用ReduceTransformation.apply,该函数会调用reduce函数,在同一次window中,每来一个相同key,就更新一次,实现累加,
```
public V apply(V previousState, V value) throws Exception { return previousState != null ? reduceFunction.reduce(previousState, value) : value; }
```
  • 每一个元素都关联trigger,TriggerResult triggerResult = triggerContext.onElement(element)
  • triggerResult.isFire(),只有当前window完成才为true
public void processElement(StreamRecord<IN> element) throws Exception {
		final Collection<W> elementWindows = windowAssigner.assignWindows(
			element.getValue(), element.getTimestamp(), windowAssignerContext);

		//if element is handled by none of assigned elementWindows
		boolean isSkippedElement = true;

		final K key = this.<K>getKeyedStateBackend().getCurrentKey();

		if (windowAssigner instanceof MergingWindowAssigner) {
			MergingWindowSet<W> mergingWindows = getMergingWindowSet();

			for (W window: elementWindows) {

				// adding the new window might result in a merge, in that case the actualWindow
				// is the merged window and we work with that. If we don't merge then
				// actualWindow == window
				W actualWindow = mergingWindows.addWindow(window, new MergingWindowSet.MergeFunction<W>() {
					@Override
					public void merge(W mergeResult,
							Collection<W> mergedWindows, W stateWindowResult,
							Collection<W> mergedStateWindows) throws Exception {

						if ((windowAssigner.isEventTime() && mergeResult.maxTimestamp() + allowedLateness <= internalTimerService.currentWatermark())) {
							throw new UnsupportedOperationException("The end timestamp of an " +
									"event-time window cannot become earlier than the current watermark " +
									"by merging. Current watermark: " + internalTimerService.currentWatermark() +
									" window: " + mergeResult);
						} else if (!windowAssigner.isEventTime() && mergeResult.maxTimestamp() <= internalTimerService.currentProcessingTime()) {
							throw new UnsupportedOperationException("The end timestamp of a " +
									"processing-time window cannot become earlier than the current processing time " +
									"by merging. Current processing time: " + internalTimerService.currentProcessingTime() +
									" window: " + mergeResult);
						}

						triggerContext.key = key;
						triggerContext.window = mergeResult;

						triggerContext.onMerge(mergedWindows);

						for (W m: mergedWindows) {
							triggerContext.window = m;
							triggerContext.clear();
							deleteCleanupTimer(m);
						}

						// merge the merged state windows into the newly resulting state window
						windowMergingState.mergeNamespaces(stateWindowResult, mergedStateWindows);
					}
				});

				// drop if the window is already late
				if (isWindowLate(actualWindow)) {
					mergingWindows.retireWindow(actualWindow);
					continue;
				}
				isSkippedElement = false;

				W stateWindow = mergingWindows.getStateWindow(actualWindow);
				if (stateWindow == null) {
					throw new IllegalStateException("Window " + window + " is not in in-flight window set.");
				}

				windowState.setCurrentNamespace(stateWindow);
				windowState.add(element.getValue());

				triggerContext.key = key;
				triggerContext.window = actualWindow;

				TriggerResult triggerResult = triggerContext.onElement(element);

				if (triggerResult.isFire()) {
					ACC contents = windowState.get();
					if (contents == null) {
						continue;
					}
					emitWindowContents(actualWindow, contents);
				}

				if (triggerResult.isPurge()) {
					windowState.clear();
				}
				registerCleanupTimer(actualWindow);
			}

			// need to make sure to update the merging state in state
			mergingWindows.persist();
		} else {
			for (W window: elementWindows) {

				// drop if the window is already late
				if (isWindowLate(window)) {
					continue;
				}
				isSkippedElement = false;

				windowState.setCurrentNamespace(window);
				windowState.add(element.getValue());

				triggerContext.key = key;
				triggerContext.window = window;

				TriggerResult triggerResult = triggerContext.onElement(element);

				if (triggerResult.isFire()) {
					ACC contents = windowState.get();
					if (contents == null) {
						continue;
					}
					emitWindowContents(window, contents);
				}

				if (triggerResult.isPurge()) {
					windowState.clear();
				}
				registerCleanupTimer(window);
			}
		}

		// side output input event if
		// element not handled by any window
		// late arriving tag has been set
		// windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp
		if (isSkippedElement && isElementLate(element)) {
			if (lateDataOutputTag != null){
				sideOutput(element);
			} else {
				this.numLateRecordsDropped.inc();
			}
		}
	}
  • onProcessingTimepublic void onProcessingTime(InternalTimer<K, W> timer) throws Exception { triggerContext.key = timer.getKey(); triggerContext.window = timer.getNamespace(); MergingWindowSet<W> mergingWindows; if (windowAssigner instanceof MergingWindowAssigner) { mergingWindows = getMergingWindowSet(); W stateWindow = mergingWindows.getStateWindow(triggerContext.window); if (stateWindow == null) { // Timer firing for non-existent window, this can only happen if a // trigger did not clean up timers. We have already cleared the merging // window and therefore the Trigger state, however, so nothing to do. return; } else { windowState.setCurrentNamespace(stateWindow); } } else { windowState.setCurrentNamespace(triggerContext.window); mergingWindows = null; } TriggerResult triggerResult = triggerContext.onProcessingTime(timer.getTimestamp()); if (triggerResult.isFire()) { ACC contents = windowState.get(); if (contents != null) { emitWindowContents(triggerContext.window, contents); } } if (triggerResult.isPurge()) { windowState.clear(); } if (!windowAssigner.isEventTime() && isCleanupTime(triggerContext.window, timer.getTimestamp())) { clearAllState(triggerContext.window, windowState, mergingWindows); } if (mergingWindows != null) { // need to make sure to update the merging state in state mergingWindows.persist(); } }
  • 调window完成会调用onProcessingTime()函数
  • WindowOperator.processElement()中triggerContext.onElement(element),中的trigger最终当完成window时,会调用WindowOperator.onProcessingTime()
  • 取state中的数据,调用emitWindowContents()函数

emitWindowContents

	private void emitWindowContents(W window, ACC contents) throws Exception {
		timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());
		processContext.window = window;
		userFunction.process(triggerContext.key, window, processContext, contents, timestampedCollector);
	}

SinkStream(PrintSinkFunction)

InternalSingleValueWindowFunction

  • PassThroughWindowFunction.apply public void process(KEY key, W window, InternalWindowContext context, IN input, Collector<OUT> out) throws Exception { wrappedFunction.apply(key, window, Collections.singletonList(input), out); }

PassThroughWindowFunction

  • TimestampedCollector.collect public void apply(K k, W window, Iterable<T> input, Collector<T> out) throws Exception { for (T in: input) { out.collect(in); } }

TimestampedCollector

  • AbstractStreamOperator.CountingOutput.collect public void collect(T record) { output.collect(reuse.replace(record)); }

AbstractStreamOperator.CountingOutput

  • OperatorChain.CopyingChainingOutput.collect public void collect(StreamRecord<OUT> record) { numRecordsOut.inc(); output.collect(record); }

OperatorChain.CopyingChainingOutput

  • pushToOperator public void collect(StreamRecord<T> record) { if (this.outputTag != null) { // we are only responsible for emitting to the main input return; } pushToOperator(record); }
  • pushToOperatorprotected <X> void pushToOperator(StreamRecord<X> record) { try { // we know that the given outputTag matches our OutputTag so the record // must be of the type that our operator (and Serializer) expects. @SuppressWarnings("unchecked") StreamRecord<T> castRecord = (StreamRecord<T>) record; numRecordsIn.inc(); StreamRecord<T> copy = castRecord.copy(serializer.copy(castRecord.getValue())); operator.setKeyContextElement1(copy); operator.processElement(copy); } catch (ClassCastException e) { if (outputTag != null) { // Enrich error message ClassCastException replace = new ClassCastException( String.format( "%s. Failed to push OutputTag with id '%s' to operator. " + "This can occur when multiple OutputTags with different types " + "but identical names are being used.", e.getMessage(), outputTag.getId())); throw new ExceptionInChainedOperatorException(replace); } else { throw new ExceptionInChainedOperatorException(e); } } catch (Exception e) { throw new ExceptionInChainedOperatorException(e); } } }
  • StreamSink.processElement

StreamSink

  • PrintSinkFunction.invoke 打印输出public void processElement(StreamRecord<IN> element) throws Exception { sinkContext.element = element; userFunction.invoke(element.getValue(), sinkContext); }

end

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Spark2.4.0源码分析之WorldCount Stage提交顺序(DAGScheduler)(五)

    thinktothings
  • Spark2.4.0源码分析之WorldCount ShuffleMapTask处理(八)

    thinktothings
  • 永久免费的百度语音转字幕的工具

    本文档是百度AI开放平台Linux SDK (C++)BDSpeechSDK 3.x 的用户指南。描述了在线语音识别相关接口的使用说明

    thinktothings
  • Python编程入门基础语法详解经典

    sample_nest = [(2,4,6),{5:7,9:11,'key':[2,5]},6]

    一墨编程学习
  • python自学第三节课(笔记)

    被缩进的内容(print()函数)和if条件语句组成了一个代码块(一个整体),成为了if条件下的内部命令。

    小老鼠
  • 多线程中的锁系统(四)-谈谈自旋锁

    蘑菇先生
  • 推荐系统之FM与MF傻傻分不清楚

    之前分享过一篇关于围绕LR周边模型展开的文章,主要前向回顾了它与Linear Regression的关系,后向介绍了它与Softmax Regression以及...

    张小磊
  • POJ1013 称硬币

    原题为POJ上的1013题,链接为:http://poj.org/problem?id=1013 代码如下:

    AI那点小事
  • debounce与throttle区别

    在2011年,Twitter网站曾爆出一个问题:在主页往下滚动时,页面会变得缓慢以致没有响应。John Resig发表了一篇文章《 a blog post ab...

    奋飛
  • 身份证校验工具类IdcardUtils

    凯哥Java

扫码关注云+社区

领取腾讯云代金券