前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink startupMode是如何起作用的

Flink startupMode是如何起作用的

作者头像
shengjk1
发布2019-04-18 14:27:20
1.3K0
发布2019-04-18 14:27:20
举报
文章被收录于专栏:码字搬砖码字搬砖

版权声明:本文为博主原创,欢迎转载,转载请标明出处 Blog Address:http://blog.csdn.net/jsjsjs1789 https://cloud.tencent.com/developer/article/1414863

之前一直有个疑问,如果consumer.setStartFromLatest()以及kafkaProperties.put(“auto.offset.reset”, “earliest”)同时存在,究竟哪一个会起作用,答案肯定是consumer.setStartFromLatest(),为什么呢?我们一起来看一下

代码语言:javascript
复制
@Override
	public void open(Configuration configuration) throws Exception {
		// determine the offset commit mode,区分ON_CHECKPOINTS、DISABLED or KAFKA_PERIODIC,本文主要针对ON_CHECKPOINTS
		this.offsetCommitMode = OffsetCommitModes.fromConfiguration(
				getIsAutoCommitEnabled(),
				enableCommitOnCheckpoints,
				((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled());

		// create the kafka partition discoverer
		this.partitionDiscoverer = createPartitionDiscoverer(
				topicsDescriptor,
				getRuntimeContext().getIndexOfThisSubtask(),
				getRuntimeContext().getNumberOfParallelSubtasks());
		this.partitionDiscoverer.open();

		subscribedPartitionsToStartOffsets = new HashMap<>();
		//获取fixed topic's or topic pattern 's all partitions
		final List<KafkaTopicPartition> allPartitions = partitionDiscoverer.discoverPartitions();
		//从checkpoint中恢复
		if (restoredState != null) {
			for (KafkaTopicPartition partition : allPartitions) {
				//新的分区(未曾在checkpoint中的分区将从earliest offset 开始消费),old partition已经从checkpoint中恢复了,并且已经保存在subscribedPartitionsToStartOffsets
				if (!restoredState.containsKey(partition)) {
					restoredState.put(partition, KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
				}
			}

			for (Map.Entry<KafkaTopicPartition, Long> restoredStateEntry : restoredState.entrySet()) {
				if (!restoredFromOldState) {
					// seed the partition discoverer with the union state while filtering out
					// restored partitions that should not be subscribed by this subtask
					if (KafkaTopicPartitionAssigner.assign(
						restoredStateEntry.getKey(), getRuntimeContext().getNumberOfParallelSubtasks())
							== getRuntimeContext().getIndexOfThisSubtask()){
						subscribedPartitionsToStartOffsets.put(restoredStateEntry.getKey(), restoredStateEntry.getValue());
					}
				} else {
					// when restoring from older 1.1 / 1.2 state, the restored state would not be the union state;
					// in this case, just use the restored state as the subscribed partitions
					subscribedPartitionsToStartOffsets.put(restoredStateEntry.getKey(), restoredStateEntry.getValue());
				}
			}

			if (filterRestoredPartitionsWithCurrentTopicsDescriptor) {
				subscribedPartitionsToStartOffsets.entrySet().removeIf(entry -> {
					if (!topicsDescriptor.isMatchingTopic(entry.getKey().getTopic())) {
						LOG.warn(
							"{} is removed from subscribed partitions since it is no longer associated with topics descriptor of current execution.",
							entry.getKey());
						return true;
					}
					return false;
				});
			}

			LOG.info("Consumer subtask {} will start reading {} partitions with offsets in restored state: {}",
				getRuntimeContext().getIndexOfThisSubtask(), subscribedPartitionsToStartOffsets.size(), subscribedPartitionsToStartOffsets);
		} else {
			// use the partition discoverer to fetch the initial seed partitions,
			// and set their initial offsets depending on the startup mode.
			// for SPECIFIC_OFFSETS and TIMESTAMP modes, we set the specific offsets now;
			// for other modes (EARLIEST, LATEST, and GROUP_OFFSETS), the offset is lazily determined
			// when the partition is actually read.
			switch (startupMode) {
				case SPECIFIC_OFFSETS:
					if (specificStartupOffsets == null) {
						throw new IllegalStateException(
							"Startup mode for the consumer set to " + StartupMode.SPECIFIC_OFFSETS +
								", but no specific offsets were specified.");
					}

					for (KafkaTopicPartition seedPartition : allPartitions) {
						//指定partition的offset,从指定的offset卡开始,未指定的从group_offset开始
						Long specificOffset = specificStartupOffsets.get(seedPartition);
						if (specificOffset != null) {
							// since the specified offsets represent the next record to read, we subtract
							// it by one so that the initial state of the consumer will be correct
							subscribedPartitionsToStartOffsets.put(seedPartition, specificOffset - 1);
						} else {
							// default to group offset behaviour if the user-provided specific offsets
							// do not contain a value for this partition
						//对应的startupMode也存储到	subscribedPartitionsToStartOffsets中
subscribedPartitionsToStartOffsets.put(seedPartition, KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
						}
					}

					break;
				case TIMESTAMP:
					if (startupOffsetsTimestamp == null) {
						throw new IllegalStateException(
							"Startup mode for the consumer set to " + StartupMode.TIMESTAMP +
								", but no startup timestamp was specified.");
					}

					for (Map.Entry<KafkaTopicPartition, Long> partitionToOffset
							: fetchOffsetsWithTimestamp(allPartitions, startupOffsetsTimestamp).entrySet()) {
						subscribedPartitionsToStartOffsets.put(
							partitionToOffset.getKey(),
							(partitionToOffset.getValue() == null)
									// if an offset cannot be retrieved for a partition with the given timestamp,
									// we default to using the latest offset for the partition
									? KafkaTopicPartitionStateSentinel.LATEST_OFFSET
									// since the specified offsets represent the next record to read, we subtract
									// it by one so that the initial state of the consumer will be correct
									: partitionToOffset.getValue() - 1);
					}

					break;
				default:
					//默认GROUP_OFFSETS
					for (KafkaTopicPartition seedPartition : allPartitions) {
						subscribedPartitionsToStartOffsets.put(seedPartition, startupMode.getStateSentinel());
					}
			}

			if (!subscribedPartitionsToStartOffsets.isEmpty()) {
				switch (startupMode) {
					case EARLIEST:
						LOG.info("Consumer subtask {} will start reading the following {} partitions from the earliest offsets: {}",
							getRuntimeContext().getIndexOfThisSubtask(),
							subscribedPartitionsToStartOffsets.size(),
							subscribedPartitionsToStartOffsets.keySet());
						break;
					case LATEST:
						LOG.info("Consumer subtask {} will start reading the following {} partitions from the latest offsets: {}",
							getRuntimeContext().getIndexOfThisSubtask(),
							subscribedPartitionsToStartOffsets.size(),
							subscribedPartitionsToStartOffsets.keySet());
						break;
					case TIMESTAMP:
						LOG.info("Consumer subtask {} will start reading the following {} partitions from timestamp {}: {}",
							getRuntimeContext().getIndexOfThisSubtask(),
							subscribedPartitionsToStartOffsets.size(),
							startupOffsetsTimestamp,
							subscribedPartitionsToStartOffsets.keySet());
						break;
					case SPECIFIC_OFFSETS:
						LOG.info("Consumer subtask {} will start reading the following {} partitions from the specified startup offsets {}: {}",
							getRuntimeContext().getIndexOfThisSubtask(),
							subscribedPartitionsToStartOffsets.size(),
							specificStartupOffsets,
							subscribedPartitionsToStartOffsets.keySet());

						List<KafkaTopicPartition> partitionsDefaultedToGroupOffsets = new ArrayList<>(subscribedPartitionsToStartOffsets.size());
						for (Map.Entry<KafkaTopicPartition, Long> subscribedPartition : subscribedPartitionsToStartOffsets.entrySet()) {
							if (subscribedPartition.getValue() == KafkaTopicPartitionStateSentinel.GROUP_OFFSET) {
								partitionsDefaultedToGroupOffsets.add(subscribedPartition.getKey());
							}
						}

						if (partitionsDefaultedToGroupOffsets.size() > 0) {
							LOG.warn("Consumer subtask {} cannot find offsets for the following {} partitions in the specified startup offsets: {}" +
									"; their startup offsets will be defaulted to their committed group offsets in Kafka.",
								getRuntimeContext().getIndexOfThisSubtask(),
								partitionsDefaultedToGroupOffsets.size(),
								partitionsDefaultedToGroupOffsets);
						}
						break;
					case GROUP_OFFSETS:
						LOG.info("Consumer subtask {} will start reading the following {} partitions from the committed group offsets in Kafka: {}",
							getRuntimeContext().getIndexOfThisSubtask(),
							subscribedPartitionsToStartOffsets.size(),
							subscribedPartitionsToStartOffsets.keySet());
				}
			} else {
				LOG.info("Consumer subtask {} initially has no partitions to read from.",
					getRuntimeContext().getIndexOfThisSubtask());
			}
		}

open方法主要是将user指定的topic和对应的partition、offset,存储到Map<KafkaTopicPartition, Long> subscribedPartitionsToStartOffsets中,接下来看flink 消费kafka的入口方法

代码语言:javascript
复制
@Override
	//入口方法 start a source
	public void run(SourceContext<T> sourceContext) throws Exception {
		if (subscribedPartitionsToStartOffsets == null) {
			throw new Exception("The partitions were not set for the consumer");
		}

		// initialize commit metrics and default offset callback method
		this.successfulCommits = this.getRuntimeContext().getMetricGroup().counter(COMMITS_SUCCEEDED_METRICS_COUNTER);
		this.failedCommits =  this.getRuntimeContext().getMetricGroup().counter(COMMITS_FAILED_METRICS_COUNTER);

		this.offsetCommitCallback = new KafkaCommitCallback() {
			@Override
			public void onSuccess() {
				successfulCommits.inc();
			}

			@Override
			public void onException(Throwable cause) {
				LOG.warn("Async Kafka commit failed.", cause);
				failedCommits.inc();
			}
		};

		// mark the subtask as temporarily idle if there are no initial seed partitions;
		// once this subtask discovers some partitions and starts collecting records, the subtask's
		// status will automatically be triggered back to be active.
		if (subscribedPartitionsToStartOffsets.isEmpty()) {
			sourceContext.markAsTemporarilyIdle();
		}

		// from this point forward:
		//   - 'snapshotState' will draw offsets from the fetcher,
		//     instead of being built from `subscribedPartitionsToStartOffsets`
		//   - 'notifyCheckpointComplete' will start to do work (i.e. commit offsets to
		//     Kafka through the fetcher, if configured to do so)
		this.kafkaFetcher = createFetcher(
				sourceContext,
				subscribedPartitionsToStartOffsets,
				periodicWatermarkAssigner,
				punctuatedWatermarkAssigner,
				(StreamingRuntimeContext) getRuntimeContext(),
				offsetCommitMode,
				getRuntimeContext().getMetricGroup().addGroup(KAFKA_CONSUMER_METRICS_GROUP),
				useMetrics);

		if (!running) {
			return;
		}

		// depending on whether we were restored with the current state version (1.3),
		// remaining logic branches off into 2 paths:
		//  1) New state - partition discovery loop executed as separate thread, with this
		//                 thread running the main fetcher loop
		//  2) Old state - partition discovery is disabled and only the main fetcher loop is executed
		if (discoveryIntervalMillis == PARTITION_DISCOVERY_DISABLED) {
			kafkaFetcher.runFetchLoop();
		} else {
			runWithPartitionDiscovery();
		}
	}

createFetcher传入了刚才的subscribedPartitionsToStartOffsets,继续往下走,在创建KafkaFetcher对象的时候,作为构造函数的,最后传到了AbstractFetcher构造器

代码语言:javascript
复制
	protected AbstractFetcher(
			SourceContext<T> sourceContext,
			Map<KafkaTopicPartition, Long> seedPartitionsWithInitialOffsets,
			SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
			SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
			ProcessingTimeService processingTimeProvider,
			long autoWatermarkInterval,
			ClassLoader userCodeClassLoader,
			MetricGroup consumerMetricGroup,
			boolean useMetrics) throws Exception {
		this.sourceContext = checkNotNull(sourceContext);
		this.checkpointLock = sourceContext.getCheckpointLock();
		this.userCodeClassLoader = checkNotNull(userCodeClassLoader);

		this.useMetrics = useMetrics;
		this.consumerMetricGroup = checkNotNull(consumerMetricGroup);
		this.legacyCurrentOffsetsMetricGroup = consumerMetricGroup.addGroup(LEGACY_CURRENT_OFFSETS_METRICS_GROUP);
		this.legacyCommittedOffsetsMetricGroup = consumerMetricGroup.addGroup(LEGACY_COMMITTED_OFFSETS_METRICS_GROUP);

		// figure out what we watermark mode we will be using
		this.watermarksPeriodic = watermarksPeriodic;
		this.watermarksPunctuated = watermarksPunctuated;

		if (watermarksPeriodic == null) {
			if (watermarksPunctuated == null) {
				// simple case, no watermarks involved
				timestampWatermarkMode = NO_TIMESTAMPS_WATERMARKS;
			} else {
				timestampWatermarkMode = PUNCTUATED_WATERMARKS;
			}
		} else {
			if (watermarksPunctuated == null) {
				timestampWatermarkMode = PERIODIC_WATERMARKS;
			} else {
				throw new IllegalArgumentException("Cannot have both periodic and punctuated watermarks");
			}
		}

		this.unassignedPartitionsQueue = new ClosableBlockingQueue<>();

		// initialize subscribed partition states with seed partitions,根据有无timestamp / watermark 
//subscribedPartitionStates 持有了List<KafkaTopicPartitionState<KPH>>,KafkaTopicPartitionState包括kafkaTopicPartition offset等信息
		this.subscribedPartitionStates = createPartitionStateHolders(
				seedPartitionsWithInitialOffsets,
				timestampWatermarkMode,
				watermarksPeriodic,
				watermarksPunctuated,
				userCodeClassLoader);

		// check that all seed partition states have a defined offset
		//无论是从checkpoint中恢复也好,还是从kafkaConsumer.set...设置也好都需要有initial offset
		for (KafkaTopicPartitionState partitionState : subscribedPartitionStates) {
			if (!partitionState.isOffsetDefined()) {
				throw new IllegalArgumentException("The fetcher was assigned seed partitions with undefined initial offsets.");
			}
		}

		// all seed partitions are not assigned yet, so should be added to the unassigned partitions queue
		//到目前为止consumer并未指定partition
		for (KafkaTopicPartitionState<KPH> partition : subscribedPartitionStates) {
			unassignedPartitionsQueue.add(partition);
		}

		// register metrics for the initial seed partitions
		if (useMetrics) {
			registerOffsetMetrics(consumerMetricGroup, subscribedPartitionStates);
		}

		// if we have periodic watermarks, kick off the interval scheduler
		if (timestampWatermarkMode == PERIODIC_WATERMARKS) {
			@SuppressWarnings("unchecked")
			PeriodicWatermarkEmitter periodicEmitter = new PeriodicWatermarkEmitter(
					subscribedPartitionStates,
					sourceContext,
					processingTimeProvider,
					autoWatermarkInterval);

			periodicEmitter.start();
		}
	}

然后从AbstractFetch的子类KafkaFetch的构造器我们可以知道,unassignedPartitionsQueue又传递给了KafkaConsumerThread

代码语言:javascript
复制
public KafkaFetcher(
		SourceFunction.SourceContext<T> sourceContext,
		Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
		SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
		SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
		ProcessingTimeService processingTimeProvider,
		long autoWatermarkInterval,
		ClassLoader userCodeClassLoader,
		String taskNameWithSubtasks,
		KafkaDeserializationSchema<T> deserializer,
		Properties kafkaProperties,
		long pollTimeout,
		MetricGroup subtaskMetricGroup,
		MetricGroup consumerMetricGroup,
		boolean useMetrics) throws Exception {
		super(
			sourceContext,
			assignedPartitionsWithInitialOffsets,
			watermarksPeriodic,
			watermarksPunctuated,
			processingTimeProvider,
			autoWatermarkInterval,
			userCodeClassLoader,
			consumerMetricGroup,
			useMetrics);

		this.deserializer = deserializer;
		this.handover = new Handover();

		this.consumerThread = new KafkaConsumerThread(
			LOG,
			handover,
			kafkaProperties,
			unassignedPartitionsQueue,
			getFetcherName() + " for " + taskNameWithSubtasks,
			pollTimeout,
			useMetrics,
			consumerMetricGroup,
			subtaskMetricGroup);
	}

当KafkaConsumerThread 开始start的时候,也就是KafkaConsumerThread run方法

代码语言:javascript
复制
......
try {
					//hasAssignedPartitions default false
					//当发现新的partition的时候,会add到unassignedPartitionsQueue和subscribedPartitionsToStartOffsets
					if (hasAssignedPartitions) {
						newPartitions = unassignedPartitionsQueue.pollBatch();
					}
					else {
						// if no assigned partitions block until we get at least one
						// instead of hot spinning this loop. We rely on a fact that
						// unassignedPartitionsQueue will be closed on a shutdown, so
						// we don't block indefinitely
						newPartitions = unassignedPartitionsQueue.getBatchBlocking();
					}
//由于unassignedPartitionsQueue是有数据的,所以newPartitions != null 为true,会执行reassignPartitions方法
					if (newPartitions != null) {
						reassignPartitions(newPartitions);
					}
				} catch (AbortedReassignmentException e) {
					continue;
				}
......
代码语言:javascript
复制
void reassignPartitions(List<KafkaTopicPartitionState<TopicPartition>> newPartitions) throws Exception {
   	if (newPartitions.size() == 0) {
   		return;
   	}
   	hasAssignedPartitions = true;
   	boolean reassignmentStarted = false;

   	// since the reassignment may introduce several Kafka blocking calls that cannot be interrupted,
   	// the consumer needs to be isolated from external wakeup calls in setOffsetsToCommit() and shutdown()
   	// until the reassignment is complete.
   	final KafkaConsumer<byte[], byte[]> consumerTmp;
   	synchronized (consumerReassignmentLock) {
//将consumer的引用赋值给consumerTmp
   		consumerTmp = this.consumer;
   		this.consumer = null;
   	}

   	final Map<TopicPartition, Long> oldPartitionAssignmentsToPosition = new HashMap<>();
   	try {
/* 之所有会有newPartition和oldPartition是因为当我们配置了KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS,每个固定时间会判断是否新加了partition,如果新加了,会将新加的partition添加到unassignedPartitionsQueue中
*/
   		for (TopicPartition oldPartition : consumerTmp.assignment()) {
   			oldPartitionAssignmentsToPosition.put(oldPartition, consumerTmp.position(oldPartition));
   		}

   		final List<TopicPartition> newPartitionAssignments =
   			new ArrayList<>(newPartitions.size() + oldPartitionAssignmentsToPosition.size());
   		newPartitionAssignments.addAll(oldPartitionAssignmentsToPosition.keySet());
   		newPartitionAssignments.addAll(convertKafkaPartitions(newPartitions));

   		// reassign with the new partitions
   		consumerTmp.assign(newPartitionAssignments);
   		reassignmentStarted = true;

   		// old partitions should be seeked to their previous position
   		for (Map.Entry<TopicPartition, Long> oldPartitionToPosition : oldPartitionAssignmentsToPosition.entrySet()) {
   			consumerTmp.seek(oldPartitionToPosition.getKey(), oldPartitionToPosition.getValue());
   		}

   		// offsets in the state of new partitions may still be placeholder sentinel values if we are:
   		//   (1) starting fresh,
   		//   (2) checkpoint / savepoint state we were restored with had not completely
   		//       been replaced with actual offset values yet, or
   		//   (3) the partition was newly discovered after startup;
   		// replace those with actual offsets, according to what the sentinel value represent.
   		
   		//kafka中配置关于offset的参数是不起作用的,还是依赖于startupMode
//根据getOffset的类型,consumer指定开始消费的offset,而offset的类型呢,我们知道来源于startupMode
   		for (KafkaTopicPartitionState<TopicPartition> newPartitionState : newPartitions) {
   			if (newPartitionState.getOffset() == KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET) {
   				consumerTmp.seekToBeginning(Collections.singletonList(newPartitionState.getKafkaPartitionHandle()));
   				newPartitionState.setOffset(consumerTmp.position(newPartitionState.getKafkaPartitionHandle()) - 1);
   			} else if (newPartitionState.getOffset() == KafkaTopicPartitionStateSentinel.LATEST_OFFSET) {
   				consumerTmp.seekToEnd(Collections.singletonList(newPartitionState.getKafkaPartitionHandle()));
   				newPartitionState.setOffset(consumerTmp.position(newPartitionState.getKafkaPartitionHandle()) - 1);
   			} else if (newPartitionState.getOffset() == KafkaTopicPartitionStateSentinel.GROUP_OFFSET) {
   				// the KafkaConsumer by default will automatically seek the consumer position
   				// to the committed group offset, so we do not need to do it.

   				newPartitionState.setOffset(consumerTmp.position(newPartitionState.getKafkaPartitionHandle()) - 1);
   			} else {
   				consumerTmp.seek(newPartitionState.getKafkaPartitionHandle(), newPartitionState.getOffset() + 1);
   			}
   		}
   	} catch (WakeupException e) {
   		// a WakeupException may be thrown if the consumer was invoked wakeup()
   		// before it was isolated for the reassignment. In this case, we abort the
   		// reassignment and just re-expose the original consumer.

   		synchronized (consumerReassignmentLock) {
   			this.consumer = consumerTmp;

   			// if reassignment had already started and affected the consumer,
   			// we do a full roll back so that it is as if it was left untouched
   			if (reassignmentStarted) {
   				this.consumer.assign(new ArrayList<>(oldPartitionAssignmentsToPosition.keySet()));

   				for (Map.Entry<TopicPartition, Long> oldPartitionToPosition : oldPartitionAssignmentsToPosition.entrySet()) {
   					this.consumer.seek(oldPartitionToPosition.getKey(), oldPartitionToPosition.getValue());
   				}
   			}

   			// no need to restore the wakeup state in this case,
   			// since only the last wakeup call is effective anyways
   			hasBufferedWakeup = false;

   			// re-add all new partitions back to the unassigned partitions queue to be picked up again
   			for (KafkaTopicPartitionState<TopicPartition> newPartition : newPartitions) {
   				unassignedPartitionsQueue.add(newPartition);
   			}

   			// this signals the main fetch loop to continue through the loop
   			throw new AbortedReassignmentException();
   		}
   	}

   	// reassignment complete; expose the reassigned consumer
   	synchronized (consumerReassignmentLock) {
   		this.consumer = consumerTmp;

   		// restore wakeup state for the consumer if necessary
   		if (hasBufferedWakeup) {
   			this.consumer.wakeup();
   			hasBufferedWakeup = false;
   		}
   	}
   }
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019年04月07日,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档