专栏首页码字搬砖Flink是如何kafka读取数据的

Flink是如何kafka读取数据的

版权声明:本文为博主原创,欢迎转载,转载请标明出处 Blog Address:http://blog.csdn.net/jsjsjs1789 https://blog.csdn.net/jsjsjs1789/article/details/89067747

首先来看一下 FlinkKafkaConsumerBase.run方法,相当于是Flink 从kafka中拉取数据的入口方法:

//入口方法 start a source
	public void run(SourceContext<T> sourceContext) throws Exception {
		......
		// from this point forward:
		//   - 'snapshotState' will draw offsets from the fetcher,
		//     instead of being built from `subscribedPartitionsToStartOffsets`
		//   - 'notifyCheckpointComplete' will start to do work (i.e. commit offsets to
		//     Kafka through the fetcher, if configured to do so)
		//创建Fetcher 从kafka中拉取数据
		this.kafkaFetcher = createFetcher(
				sourceContext,
				subscribedPartitionsToStartOffsets,
				periodicWatermarkAssigner,
				punctuatedWatermarkAssigner,
				(StreamingRuntimeContext) getRuntimeContext(),
				offsetCommitMode,
				getRuntimeContext().getMetricGroup().addGroup(KAFKA_CONSUMER_METRICS_GROUP),
				useMetrics);

		if (!running) {
			return;
		}

		// depending on whether we were restored with the current state version (1.3),
		// remaining logic branches off into 2 paths:
		//  1) New state - partition discovery loop executed as separate thread, with this
		//                 thread running the main fetcher loop
		//  2) Old state - partition discovery is disabled and only the main fetcher loop is executed
		
		if (discoveryIntervalMillis == PARTITION_DISCOVERY_DISABLED) {
		//未配置KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS
			kafkaFetcher.runFetchLoop();
		} else {
//仍然调用了kafkaFetcher.runFetchLoop();
			runWithPartitionDiscovery();
		}
	}

createFetcher方法

@Override
	protected AbstractFetcher<T, ?> createFetcher(
		SourceContext<T> sourceContext,
		Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
		SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
		SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
		StreamingRuntimeContext runtimeContext,
		OffsetCommitMode offsetCommitMode,
		MetricGroup consumerMetricGroup,
		boolean useMetrics) throws Exception {
......

		return new KafkaFetcher<>(
			sourceContext,
			assignedPartitionsWithInitialOffsets,
			watermarksPeriodic,
			watermarksPunctuated,
			runtimeContext.getProcessingTimeService(),
			runtimeContext.getExecutionConfig().getAutoWatermarkInterval(),
			runtimeContext.getUserCodeClassLoader(),
			runtimeContext.getTaskNameWithSubtasks(),
			deserializer,
			properties,
			pollTimeout,
			runtimeContext.getMetricGroup(),
			consumerMetricGroup,
			useMetrics);
	}

返回了一个 KafkaFetcher对象,我们点进去看一下 KafkaFetcher的构造器里面创建了一个 KafkaConsumerThread对象

public KafkaFetcher(
		SourceFunction.SourceContext<T> sourceContext,
		Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
		SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
		SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
		ProcessingTimeService processingTimeProvider,
		long autoWatermarkInterval,
		ClassLoader userCodeClassLoader,
		String taskNameWithSubtasks,
		KafkaDeserializationSchema<T> deserializer,
		Properties kafkaProperties,
		long pollTimeout,
		MetricGroup subtaskMetricGroup,
		MetricGroup consumerMetricGroup,
		boolean useMetrics) throws Exception {
......
		this.consumerThread = new KafkaConsumerThread(
			LOG,
//KafkaConsumerThread 构造器中的参数
			handover,
			kafkaProperties,
//unassignedPartitionsQueue具体是什么呢?咱们会在flink startupMode是如何起作用的 详细去讲
			unassignedPartitionsQueue,
			getFetcherName() + " for " + taskNameWithSubtasks,
			pollTimeout,
			useMetrics,
			consumerMetricGroup,
			subtaskMetricGroup);
	}

至此为止createFetch就介绍完了,也可以看作是拉取数据的准备工作,接下来看一下kafkaFetcher.runFetchLoop(); KafkaFetch中的runFetchLoop方法,正式开始从kafka中拉取message

//fetcher message from kafka
	public void runFetchLoop() throws Exception {
		try {
//KafkaConsumerThread构造的参数之一
			final Handover handover = this.handover;

			// kick off the actual Kafka consumer
      //实际的从kafka中拉取数据的地方 
			consumerThread.start();

			while (running) {
				// this blocks until we get the next records
				// it automatically re-throws exceptions encountered in the consumer thread
//从handover中获取数据,然后对records进行处理
				final ConsumerRecords<byte[], byte[]> records = handover.pollNext();

				// get the records for each topic partition
				for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitionStates()) {

					List<ConsumerRecord<byte[], byte[]>> partitionRecords =
						records.records(partition.getKafkaPartitionHandle());

					for (ConsumerRecord<byte[], byte[]> record : partitionRecords) {
						final T value = deserializer.deserialize(record);

						if (deserializer.isEndOfStream(value)) {
							// end of stream signaled
							running = false;
							break;
						}

						// emit the actual record. this also updates offset state atomically
						// and deals with timestamps and watermark generation
//发送消息,接下来就是timestamps和watermark的处理了
						emitRecord(value, partition, record.offset(), record);
					}
				}
			}
		}
		finally {
			// this signals the consumer thread that no more work is to be done
			consumerThread.shutdown();
		}

		// on a clean exit, wait for the runner thread
		try {
			consumerThread.join();
		}
		catch (InterruptedException e) {
			// may be the result of a wake-up interruption after an exception.
			// we ignore this here and only restore the interruption state
			Thread.currentThread().interrupt();
		}
	}

既然consumerThread.start()开始了实际的kafka consumer,我们一起来看一下consumerThread中的方法

@Override
	public void run() {
		// early exit check
		if (!running) {
			return;
		}

		// this is the means to talk to FlinkKafkaConsumer's main thread
//构造器中参数,用于FlinkKafkaConsumer主线程进行消费,上面提到的handover.pollNext()
		final Handover handover = this.handover;

		// This method initializes the KafkaConsumer and guarantees it is torn down properly.
		// This is important, because the consumer has multi-threading issues,
		// including concurrent 'close()' calls.
		try {
//获取consumer
			this.consumer = getConsumer(kafkaProperties);
		}
		catch (Throwable t) {
			handover.reportError(t);
			return;
		}

		// from here on, the consumer is guaranteed to be closed properly
		......

			// early exit check
			if (!running) {
				return;
			}

			// the latest bulk of records. May carry across the loop if the thread is woken up
			// from blocking on the handover
			ConsumerRecords<byte[], byte[]> records = null;

			// reused variable to hold found unassigned new partitions.
			// found partitions are not carried across loops using this variable;
			// they are carried across via re-adding them to the unassigned partitions queue
			List<KafkaTopicPartitionState<TopicPartition>> newPartitions;

			// main fetch loop
			while (running) {

				// check if there is something to commit
				//default false
				if (!commitInProgress) {
					// get and reset the work-to-be committed, so we don't repeatedly commit the same
//这里具体可以参考[Flink是如何保存Offset的](https://www.jianshu.com/p/ee4fe63f0182)
					final Tuple2<Map<TopicPartition, OffsetAndMetadata>, KafkaCommitCallback> commitOffsetsAndCallback =
							nextOffsetsToCommit.getAndSet(null);

					if (commitOffsetsAndCallback != null) {
						log.debug("Sending async offset commit request to Kafka broker");

						// also record that a commit is already in progress
						// the order here matters! first set the flag, then send the commit command.
						commitInProgress = true;
						consumer.commitAsync(commitOffsetsAndCallback.f0, new CommitCallback(commitOffsetsAndCallback.f1));
					}
				}

				try {
					//hasAssignedPartitions default false
					//当发现新的partition的时候,会add到unassignedPartitionsQueue和sub
//具体可以参考 flink startupMode是如何起作用的
					if (hasAssignedPartitions) {
						newPartitions = unassignedPartitionsQueue.pollBatch();
					}
					else {
						// if no assigned partitions block until we get at least one
						// instead of hot spinning this loop. We rely on a fact that
						// unassignedPartitionsQueue will be closed on a shutdown, so
						// we don't block indefinitely
						newPartitions = unassignedPartitionsQueue.getBatchBlocking();
					}
					if (newPartitions != null) {
						reassignPartitions(newPartitions);
					}
				} catch (AbortedReassignmentException e) {
					continue;
				}

				if (!hasAssignedPartitions) {
					// Without assigned partitions KafkaConsumer.poll will throw an exception
					continue;
				}

				// get the next batch of records, unless we did not manage to hand the old batch over
				if (records == null) {
					try {
//通过kafkaAPI 拉取数据
						records = consumer.poll(pollTimeout);
					}
					catch (WakeupException we) {
						continue;
					}
				}

				try {
//handover对records进行"包装",供FlinkKafkaConsumer主线程消费
					handover.produce(records);
					records = null;
				}
				catch (Handover.WakeupException e) {
					// fall through the loop
				}
			}
			// end main fetch loop
		}
		catch (Throwable t) {
			// let the main thread know and exit
			// it may be that this exception comes because the main thread closed the handover, in
			// which case the below reporting is irrelevant, but does not hurt either
			handover.reportError(t);
		}
		finally {
			// make sure the handover is closed if it is not already closed or has an error
			handover.close();

			// make sure the KafkaConsumer is closed
			try {
				consumer.close();
			}
			catch (Throwable t) {
				log.warn("Error while closing Kafka consumer", t);
			}
		}
	}

至此如何从kafka中拉取数据,已经介绍完了

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Flink安装

    前提:安装:已安好hadoop,环境已经配好 java 7.X 及其以上 scala有对应版本 单机: 下载、解压、 需要注意:flink与hadoop版...

    shengjk1
  • 一文搞定 Flink Job 提交全流程

    前面,我们已经分析了 一文搞定 Flink 消费消息的全流程 、写给大忙人看的 Flink Window原理 还有 一文搞定 Flink Checkpoint ...

    shengjk1
  • Flink startupMode是如何起作用的

    版权声明:本文为博主原创,欢迎转载,转载请标明出处 Blog Address:http://blog.csdn.net/jsjsjs1789 https...

    shengjk1
  • Spring_总结_04_高级配置(二)之条件注解@Conditional

    在上一节,我们了解到 Profile 为不同环境下使用不同的配置提供了支持,那么Profile到底是如何实现的呢?其实Profile正是通过条件注解来实现的。

    shirayner
  • 【翻译】在Visual Studio中使用Asp.Net Core MVC创建你的第一个Web API应用(一)

    HTTP is not just for serving up web pages. It’s also a powerful platform for bui...

    脑洞的蜂蜜
  • Runtime Errors - START_CALL_SICK

    |Short Text ...

    Jerry Wang
  • [005] Python异常处理三板斧——Try, Except, and Assert!

    The dream of every software programmer is to write a program that runs smoothly....

    Sam Gor
  • Scaling data to the standard normal缩放数据到标准正态形式

    A preprocessing step that is almost recommended is to scale columns to the stand...

    到不了的都叫做远方
  • Injection for Xcode 高效Xcode编译调试插件

    对于iOS开发者来说,XCode有个另人十分难耐的特性——编译时长的问题。也许工作的时候你能够为自己找到一个闲下来喝杯咖啡的正当的借口,然而,多次的调试编译过程...

    freesan44
  • 从MapX到MapXtreme2004[3]-搜索图元Feature

    一、根据名称搜索图元   1、Mapxtreme的架构和Mapx有所变化,Mapx中,Layer包含Features,而Mapxtreme中则不是   2、...

    用户1075292

扫码关注云+社区

领取腾讯云代金券