前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink是如何kafka读取数据的

Flink是如何kafka读取数据的

作者头像
shengjk1
发布2019-04-18 14:27:45
1.7K0
发布2019-04-18 14:27:45
举报
文章被收录于专栏:码字搬砖码字搬砖码字搬砖

版权声明:本文为博主原创,欢迎转载,转载请标明出处 Blog Address:http://blog.csdn.net/jsjsjs1789 https://cloud.tencent.com/developer/article/1414864

首先来看一下 FlinkKafkaConsumerBase.run方法,相当于是Flink 从kafka中拉取数据的入口方法:

//入口方法 start a source
	public void run(SourceContext<T> sourceContext) throws Exception {
		......
		// from this point forward:
		//   - 'snapshotState' will draw offsets from the fetcher,
		//     instead of being built from `subscribedPartitionsToStartOffsets`
		//   - 'notifyCheckpointComplete' will start to do work (i.e. commit offsets to
		//     Kafka through the fetcher, if configured to do so)
		//创建Fetcher 从kafka中拉取数据
		this.kafkaFetcher = createFetcher(
				sourceContext,
				subscribedPartitionsToStartOffsets,
				periodicWatermarkAssigner,
				punctuatedWatermarkAssigner,
				(StreamingRuntimeContext) getRuntimeContext(),
				offsetCommitMode,
				getRuntimeContext().getMetricGroup().addGroup(KAFKA_CONSUMER_METRICS_GROUP),
				useMetrics);

		if (!running) {
			return;
		}

		// depending on whether we were restored with the current state version (1.3),
		// remaining logic branches off into 2 paths:
		//  1) New state - partition discovery loop executed as separate thread, with this
		//                 thread running the main fetcher loop
		//  2) Old state - partition discovery is disabled and only the main fetcher loop is executed
		
		if (discoveryIntervalMillis == PARTITION_DISCOVERY_DISABLED) {
		//未配置KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS
			kafkaFetcher.runFetchLoop();
		} else {
//仍然调用了kafkaFetcher.runFetchLoop();
			runWithPartitionDiscovery();
		}
	}

createFetcher方法

@Override
	protected AbstractFetcher<T, ?> createFetcher(
		SourceContext<T> sourceContext,
		Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
		SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
		SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
		StreamingRuntimeContext runtimeContext,
		OffsetCommitMode offsetCommitMode,
		MetricGroup consumerMetricGroup,
		boolean useMetrics) throws Exception {
......

		return new KafkaFetcher<>(
			sourceContext,
			assignedPartitionsWithInitialOffsets,
			watermarksPeriodic,
			watermarksPunctuated,
			runtimeContext.getProcessingTimeService(),
			runtimeContext.getExecutionConfig().getAutoWatermarkInterval(),
			runtimeContext.getUserCodeClassLoader(),
			runtimeContext.getTaskNameWithSubtasks(),
			deserializer,
			properties,
			pollTimeout,
			runtimeContext.getMetricGroup(),
			consumerMetricGroup,
			useMetrics);
	}

返回了一个 KafkaFetcher对象,我们点进去看一下

KafkaFetcher的构造器里面创建了一个 KafkaConsumerThread对象

public KafkaFetcher(
		SourceFunction.SourceContext<T> sourceContext,
		Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
		SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
		SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
		ProcessingTimeService processingTimeProvider,
		long autoWatermarkInterval,
		ClassLoader userCodeClassLoader,
		String taskNameWithSubtasks,
		KafkaDeserializationSchema<T> deserializer,
		Properties kafkaProperties,
		long pollTimeout,
		MetricGroup subtaskMetricGroup,
		MetricGroup consumerMetricGroup,
		boolean useMetrics) throws Exception {
......
		this.consumerThread = new KafkaConsumerThread(
			LOG,
//KafkaConsumerThread 构造器中的参数
			handover,
			kafkaProperties,
//unassignedPartitionsQueue具体是什么呢?咱们会在flink startupMode是如何起作用的 详细去讲
			unassignedPartitionsQueue,
			getFetcherName() + " for " + taskNameWithSubtasks,
			pollTimeout,
			useMetrics,
			consumerMetricGroup,
			subtaskMetricGroup);
	}

至此为止createFetch就介绍完了,也可以看作是拉取数据的准备工作,接下来看一下kafkaFetcher.runFetchLoop();

KafkaFetch中的runFetchLoop方法,正式开始从kafka中拉取message

//fetcher message from kafka
	public void runFetchLoop() throws Exception {
		try {
//KafkaConsumerThread构造的参数之一
			final Handover handover = this.handover;

			// kick off the actual Kafka consumer
      //实际的从kafka中拉取数据的地方 
			consumerThread.start();

			while (running) {
				// this blocks until we get the next records
				// it automatically re-throws exceptions encountered in the consumer thread
//从handover中获取数据,然后对records进行处理
				final ConsumerRecords<byte[], byte[]> records = handover.pollNext();

				// get the records for each topic partition
				for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitionStates()) {

					List<ConsumerRecord<byte[], byte[]>> partitionRecords =
						records.records(partition.getKafkaPartitionHandle());

					for (ConsumerRecord<byte[], byte[]> record : partitionRecords) {
						final T value = deserializer.deserialize(record);

						if (deserializer.isEndOfStream(value)) {
							// end of stream signaled
							running = false;
							break;
						}

						// emit the actual record. this also updates offset state atomically
						// and deals with timestamps and watermark generation
//发送消息,接下来就是timestamps和watermark的处理了
						emitRecord(value, partition, record.offset(), record);
					}
				}
			}
		}
		finally {
			// this signals the consumer thread that no more work is to be done
			consumerThread.shutdown();
		}

		// on a clean exit, wait for the runner thread
		try {
			consumerThread.join();
		}
		catch (InterruptedException e) {
			// may be the result of a wake-up interruption after an exception.
			// we ignore this here and only restore the interruption state
			Thread.currentThread().interrupt();
		}
	}

既然consumerThread.start()开始了实际的kafka consumer,我们一起来看一下consumerThread中的方法

@Override
	public void run() {
		// early exit check
		if (!running) {
			return;
		}

		// this is the means to talk to FlinkKafkaConsumer's main thread
//构造器中参数,用于FlinkKafkaConsumer主线程进行消费,上面提到的handover.pollNext()
		final Handover handover = this.handover;

		// This method initializes the KafkaConsumer and guarantees it is torn down properly.
		// This is important, because the consumer has multi-threading issues,
		// including concurrent 'close()' calls.
		try {
//获取consumer
			this.consumer = getConsumer(kafkaProperties);
		}
		catch (Throwable t) {
			handover.reportError(t);
			return;
		}

		// from here on, the consumer is guaranteed to be closed properly
		......

			// early exit check
			if (!running) {
				return;
			}

			// the latest bulk of records. May carry across the loop if the thread is woken up
			// from blocking on the handover
			ConsumerRecords<byte[], byte[]> records = null;

			// reused variable to hold found unassigned new partitions.
			// found partitions are not carried across loops using this variable;
			// they are carried across via re-adding them to the unassigned partitions queue
			List<KafkaTopicPartitionState<TopicPartition>> newPartitions;

			// main fetch loop
			while (running) {

				// check if there is something to commit
				//default false
				if (!commitInProgress) {
					// get and reset the work-to-be committed, so we don't repeatedly commit the same
//这里具体可以参考[Flink是如何保存Offset的](https://www.jianshu.com/p/ee4fe63f0182)
					final Tuple2<Map<TopicPartition, OffsetAndMetadata>, KafkaCommitCallback> commitOffsetsAndCallback =
							nextOffsetsToCommit.getAndSet(null);

					if (commitOffsetsAndCallback != null) {
						log.debug("Sending async offset commit request to Kafka broker");

						// also record that a commit is already in progress
						// the order here matters! first set the flag, then send the commit command.
						commitInProgress = true;
						consumer.commitAsync(commitOffsetsAndCallback.f0, new CommitCallback(commitOffsetsAndCallback.f1));
					}
				}

				try {
					//hasAssignedPartitions default false
					//当发现新的partition的时候,会add到unassignedPartitionsQueue和sub
//具体可以参考 flink startupMode是如何起作用的
					if (hasAssignedPartitions) {
						newPartitions = unassignedPartitionsQueue.pollBatch();
					}
					else {
						// if no assigned partitions block until we get at least one
						// instead of hot spinning this loop. We rely on a fact that
						// unassignedPartitionsQueue will be closed on a shutdown, so
						// we don't block indefinitely
						newPartitions = unassignedPartitionsQueue.getBatchBlocking();
					}
					if (newPartitions != null) {
						reassignPartitions(newPartitions);
					}
				} catch (AbortedReassignmentException e) {
					continue;
				}

				if (!hasAssignedPartitions) {
					// Without assigned partitions KafkaConsumer.poll will throw an exception
					continue;
				}

				// get the next batch of records, unless we did not manage to hand the old batch over
				if (records == null) {
					try {
//通过kafkaAPI 拉取数据
						records = consumer.poll(pollTimeout);
					}
					catch (WakeupException we) {
						continue;
					}
				}

				try {
//handover对records进行"包装",供FlinkKafkaConsumer主线程消费
					handover.produce(records);
					records = null;
				}
				catch (Handover.WakeupException e) {
					// fall through the loop
				}
			}
			// end main fetch loop
		}
		catch (Throwable t) {
			// let the main thread know and exit
			// it may be that this exception comes because the main thread closed the handover, in
			// which case the below reporting is irrelevant, but does not hurt either
			handover.reportError(t);
		}
		finally {
			// make sure the handover is closed if it is not already closed or has an error
			handover.close();

			// make sure the KafkaConsumer is closed
			try {
				consumer.close();
			}
			catch (Throwable t) {
				log.warn("Error while closing Kafka consumer", t);
			}
		}
	}

至此如何从kafka中拉取数据,已经介绍完了

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019年04月07日,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档