前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink源码走读(二):Flink+Kafka实现端到端Exactly Once语义

Flink源码走读(二):Flink+Kafka实现端到端Exactly Once语义

原创
作者头像
2011aad
发布2020-02-14 10:24:37
4.9K0
发布2020-02-14 10:24:37
举报

一、前言

Flink通过Checkpoint机制实现了消息对状态影响的Exactly Once语义,即每条消息只会影响Flink内部状态有且只有一次。但无法保证输出到Sink中的数据不重复。以图一所示为例,Flink APP收到Source中的A消息,将其转化为B消息输出到Sink,APP在处理完A1后做了一次Checkpoint,假设APP在处理到A4时发生错误重启,APP将会重新从A2开始消费并处理数据,就会导致B2和B3重复输出到Sink中两次。

图一 Flink输出消息重复
图一 Flink输出消息重复

本文中端到端Exactly Once的含义就是:Source的每条数据会被处理有且仅有一次,并且输出到Sink中的结果也不重不漏

要实现端到端Exactly Once语义需要借助于Sink对消息事务的支持。好在Kafka在0.11版本中加入了对事务的支持,Flink使用Kafka的这个特性实现了端到端Exactly Once语义的数据处理。本文先简单介绍Kafka的消息事务,然后对照源码解读下Flink是如何实现输出消息不重不漏的。

二、Kafka消息事务

Kafka实现事务的出发点很简单:允许Producer原子性的发布一组消息,即允许一组消息对Consumer要么全部可见,要么全部不可见,不会存在中间状态。

首先介绍几个Kafka消息事务中的几个概念:

  • Transaction Cordinator:Kafka事务的协调器,两阶段提交协调者,负责记录当前正在执行的Transaction,写Transaction log等。
  • producer id(PID):用于标识执行事务的producer,由Transaction Cordinator分配,对Kafka客户端使用者透明。在Kafka的事务中,同一个事务只能由一个producer操作,就像mysql事务中所有的sql命令都必须来自同一个客户端连接一样。但是这里所说的“同一个producer”,并不是指同一个运行着producer的进程,而是持有相同PID的producer。例如,进程P1运行着一个Kafka producer,正在执行一个事务,它持有的PID是x,某一时刻进程P1意外终止,启动了另一个进程P2作为Kafka producer,只要进程P2能获取到x当做自己的PID(用相同的transactional id请求Transaction Cordinator),它就能继续之前的事务。换言之,即使是同一个进程P1,在运行过程中改变自身的PID(改变transactional id请求Transaction Cordinator),也就无法执行之前的事务了。对于每个producer id还有一个epoch的概念,用来防止两个进程同时操作同一个事务。
  • transactional id:用于标识一个事务,需要客户端使用者指定。客户端调用InitPidRequest(TransactionalId, TransactionTimeoutMs)方法向Transaction Cordinator请求初始化PID,相同的transactional id会得到相同的PID,并且使PID的epoch加一,Kafka只接受具有最大epoch的producer生产的消息,拒绝其他具有相同PID的producer(僵尸实例)。开启一个新的事务只需要生成一个未在使用中的transactional id即可,并没有什么特别的要求,后面我们会看到Flink Kafka Sink是如何生成transactional id的。
  • Transaction Marker:消息队列中用于标识事务开始结束的特殊控制消息。
图二 Kafka中消息存储
图二 Kafka中消息存储

图二展示了2个Producer在向Kafka同一个Topic的同一个Partition写入事务消息时,Kafka是如何存储事务消息的。Producer 1调用BeginTransaction后开始向Topic中生产事务消息,当第一条消息m1到达broker时,Transaction 1便开始了,消息m1中会有一个PID字段标识它是属于Transaction 1的,后面的消息也是相同的道理。Producer 1和Producer 2在一段时间内均向该Topic写入事务消息,消息便按照先后顺序排列在消息队列中。当Producer 1 Commit Transaction时,broker会向消息队列中插入一条控制消息Commit T1(Transaction Marker),同理Producer 2 Abort Transaction时,broker会插入Abort T2的控制消息。通过控制消息,Consumer在顺序消费的过程中,就知道每条消息是否应该可见。

以图二为例,假设m1是该Partition的第一条消息,且只有Producer 1和Producer 2在写入消息。在消息写入到m11时,所有消息对于消费者都是不可见的,因为不确定T1和T2最后是Commit还是Abort。当Producer 1执行Commit后,m1对于消费者是可见,因为m1之前的所有消息都已经确定状态了(只有m1一条消息),而由于m2并未确定状态,因此m2后面的消息对于消费者都是不可见的。当Producer 2执行Abort后,m1、m3、m4、m11便对消费者可见了(因为m12之前的所有消息状态都确定了),m2、m10、m12由于T2 Abort便会在消费的过程中被过滤掉,这种情况下Consumer消费出来消息的Offset便是不连续的。

Kafka事务消息写入的方式可以扩展到多Topic、多Partition的写入,只需要在Commit(Abort)时同时向所有涉及到的Partition写入控制消息,只是多条控制消息的原子性写入就是一个分布式事务问题了,因此Kafka采用了两阶段提交的方式实现事务。

三、Flink利用Kafka消息事务实现端到端Exactly Once语义

Flink实现内部状态Exactly Once的语义基本原理是:隔一段时间做一个Checkpoint,持久化记录当前上游Source处理到哪里了(如Kafka offset),以及当时本地状态的值,如果过了一会进程挂了,就把这个持久化保存的Checkpoint读出来,加载当时的状态,并从当时的位置重新开始处理,这样每条消息一定只会影响自身状态一次。但这种方式是没办法保证输出到下游Sink的数据不重复的。要想下游输出的消息不重,就需要下游Sink支持事务消息,把两次checkpoint之间输出的消息当做一个事务提交,如果新的checkpoint成功,则Commit,否则Abort。这样实现就解决了图一中B2和B3重复输出的问题,执行到A4时出错重启,由于还未产生新的Checkpoint,红色B2和B3所在的Transaction不会Commit,也就对下游消费者不可见。

1. TwoPhaseCommitSinkFunction

首先,我们简单回顾下Flink做checkpoint的流程:当Checkpoint过程开始时,JobManager会向数据流中插入一个Checkpoint barrier,下游的算子收到checkpoint barrier就对本算子的状态做Checkpoint,这样就保证所有算子在checkpoint中的状态是同步的。每个算子在Checkpoint完成之后会告知JobManager,JobManager在所有算子完成Checkpoint之后,会向所有算子推送一个NotifyCheckpointComplete消息。

Flink依赖下游Sink对事务的支持,实现端到端Exactly Once语义,而两阶段提交是解决分布式事务问题一个比较通用的解决方案,因此Flink抽象出了TwoPhaseCommitSinkFunction这个类来完成向下游Sink做两阶段提交的工作。

下面,我们具体来看下TwoPhaseCommitSinkFunction是如何工作的。

图三 TwoPhaseCommitSinkFunction工作过程
图三 TwoPhaseCommitSinkFunction工作过程

TwoPhaseCommitSinkFunction在收到Checkpoint barrier,开始做自身Checkpoint之前,对Sink做pre-commit,在整个系统制作Checkpoint的同时让下游Sink开始执行预提交;同时对Sink做一个begin transaction,开启下一个事务,由于在制作Checkpoint的过程中,Flink仍然可以继续处理后面的消息,这样就能保证后续消息在下一个事务周期中;完成自身Checkpoint后,收到JobManager发来的NotifyCheckpointComplete消息时,对Sink做commit,完成两阶段提交的过程,此时这个周期发送的数据才会对下游的消费者可见。

对着代码我们再来过一下这个过程:

@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
	// this is like the pre-commit of a 2-phase-commit transaction
	// we are ready to commit and remember the transaction

	checkState(currentTransactionHolder != null, "bug: no transaction object when performing state snapshot");

	long checkpointId = context.getCheckpointId();
	LOG.debug("{} - checkpoint {} triggered, flushing transaction '{}'", name(), context.getCheckpointId(), currentTransactionHolder);

	preCommit(currentTransactionHolder.handle);
	pendingCommitTransactions.put(checkpointId, currentTransactionHolder);
	LOG.debug("{} - stored pending transactions {}", name(), pendingCommitTransactions);

	currentTransactionHolder = beginTransactionInternal();
	LOG.debug("{} - started new transaction '{}'", name(), currentTransactionHolder);

	state.clear();
	state.add(new State<>(
		this.currentTransactionHolder,
		new ArrayList<>(pendingCommitTransactions.values()),
		userContext));
}

snapshotState继承自CheckpointedFunction接口,也就是收到checkpoint barrier后,执行自身状态checkpoint的函数。可以看到,首先对currentTransaction执行了Pre-Commit,并将currentTransaction放入pendingCommitTransactions中,同时开启了新的Transaction作为currentTransaction,最后将currentTransaction和pendingCommitTransactions都作为自身状态放入checkpoint中(这里将事务信息也放入状态中,可以保证从Checkpoint恢复时能继续之前的事务)。

@Override
public final void notifyCheckpointComplete(long checkpointId) throws Exception {
	// the following scenarios are possible here
	//
	//  (1) there is exactly one transaction from the latest checkpoint that
	//      was triggered and completed. That should be the common case.
	//      Simply commit that transaction in that case.
	//
	//  (2) there are multiple pending transactions because one previous
	//      checkpoint was skipped. That is a rare case, but can happen
	//      for example when:
	//
	//        - the master cannot persist the metadata of the last
	//          checkpoint (temporary outage in the storage system) but
	//          could persist a successive checkpoint (the one notified here)
	//
	//        - other tasks could not persist their status during
	//          the previous checkpoint, but did not trigger a failure because they
	//          could hold onto their state and could successfully persist it in
	//          a successive checkpoint (the one notified here)
	//
	//      In both cases, the prior checkpoint never reach a committed state, but
	//      this checkpoint is always expected to subsume the prior one and cover all
	//      changes since the last successful one. As a consequence, we need to commit
	//      all pending transactions.
	//
	//  (3) Multiple transactions are pending, but the checkpoint complete notification
	//      relates not to the latest. That is possible, because notification messages
	//      can be delayed (in an extreme case till arrive after a succeeding checkpoint
	//      was triggered) and because there can be concurrent overlapping checkpoints
	//      (a new one is started before the previous fully finished).
	//
	// ==> There should never be a case where we have no pending transaction here
	//
	Iterator<Map.Entry<Long, TransactionHolder<TXN>>> pendingTransactionIterator = pendingCommitTransactions.entrySet().iterator();
	checkState(pendingTransactionIterator.hasNext(), "checkpoint completed, but no transaction pending");
	Throwable firstError = null;

	while (pendingTransactionIterator.hasNext()) {
		Map.Entry<Long, TransactionHolder<TXN>> entry = pendingTransactionIterator.next();
		Long pendingTransactionCheckpointId = entry.getKey();
		TransactionHolder<TXN> pendingTransaction = entry.getValue();
		if (pendingTransactionCheckpointId > checkpointId) {
			continue;
		}

		LOG.info("{} - checkpoint {} complete, committing transaction {} from checkpoint {}",
			name(), checkpointId, pendingTransaction, pendingTransactionCheckpointId);

		logWarningIfTimeoutAlmostReached(pendingTransaction);
		try {
			commit(pendingTransaction.handle);
		} catch (Throwable t) {
			if (firstError == null) {
					firstError = t;
			}
		}

		LOG.debug("{} - committed checkpoint transaction {}", name(), pendingTransaction);

		pendingTransactionIterator.remove();
	}

	if (firstError != null) {
		throw new FlinkRuntimeException("Committing one of transactions failed, logging first encountered failure",
			firstError);
	}
}

notifyCheckpointComplete接口继承自CheckpointListener,就是收到JobManager发送的NotifyCheckpointComplete消息时执行的函数。这个函数就是简单的将所有pendingTransactions Commit掉。函数的注释解释了一般情况下pendingTransactions应该只有一个,即刚触发的snapshotState中Pre-Commit的pendingTransaction,但也有可能出现多个pendingTransactions的情况,比如上一次checkpoint之后的NotifyCheckpointComplete消息晚到了的情况。

// ------ methods that should be implemented in child class to support two phase commit algorithm ------

/**
 * Write value within a transaction.
 */
protected abstract void invoke(TXN transaction, IN value, Context context) throws Exception;

/**
 * Method that starts a new transaction.
 *
 * @return newly created transaction.
 */
protected abstract TXN beginTransaction() throws Exception;

/**
 * Pre commit previously created transaction. Pre commit must make all of the necessary steps to prepare the
 * transaction for a commit that might happen in the future. After this point the transaction might still be
 * aborted, but underlying implementation must ensure that commit calls on already pre committed transactions
 * will always succeed.
 *
 * <p>Usually implementation involves flushing the data.
 */
protected abstract void preCommit(TXN transaction) throws Exception;

/**
 * Commit a pre-committed transaction. If this method fail, Flink application will be
 * restarted and {@link TwoPhaseCommitSinkFunction#recoverAndCommit(Object)} will be called again for the
 * same transaction.
 */
protected abstract void commit(TXN transaction);

/**
 * Abort a transaction.
 */
protected abstract void abort(TXN transaction);

TwoPhaseCommitSinkFunction保留了5个函数需要子类去实现:

  • invoke:定义了作为sink如何写数据到外部系统。每一个sink都需要定义invoke函数,sink算子每收到一条数据都会触发一次invoke函数,这里的sink函数只是多了一个transaction入参。
  • beginTransaction、preCommit、commit、abort:两阶段提交协议的几个步骤。如果外部系统本身支持两阶段提交(如Kafka),这些函数的实现就是调用外部系统两阶段提交协议对应的函数。

2. FlinkKafkaProducer011

了解了TwoPhaseCommitSinkFunction再来看FlinkKafkaProducer011就简单多了。

@Override
public void invoke(KafkaTransactionState transaction, IN next, Context context) throws FlinkKafka011Exception {
	checkErroneous();

	byte[] serializedKey = schema.serializeKey(next);
	byte[] serializedValue = schema.serializeValue(next);
	String targetTopic = schema.getTargetTopic(next);
	if (targetTopic == null) {
		targetTopic = defaultTopicId;
	}

	Long timestamp = null;
	if (this.writeTimestampToKafka) {
		timestamp = context.timestamp();
	}

	ProducerRecord<byte[], byte[]> record;
	int[] partitions = topicPartitionsMap.get(targetTopic);
	if (null == partitions) {
		partitions = getPartitionsByTopic(targetTopic, transaction.producer);
		topicPartitionsMap.put(targetTopic, partitions);
	}
	if (flinkKafkaPartitioner != null) {
		record = new ProducerRecord<>(
			targetTopic,
			flinkKafkaPartitioner.partition(next, serializedKey, serializedValue, targetTopic, partitions),
			timestamp,
			serializedKey,
			serializedValue);
	} else {
		record = new ProducerRecord<>(targetTopic, null, timestamp, serializedKey, serializedValue);
	}
	pendingRecords.incrementAndGet();
	transaction.producer.send(record, callback);
}

FlinkKafkaProducer011中实现的invoke函数就是将输入的消息(next)构造为一个Kafka record,并调用Kafka客户端的send方法发送出去。

@Override
protected KafkaTransactionState beginTransaction() throws FlinkKafka011Exception {
	switch (semantic) {
		case EXACTLY_ONCE:
			FlinkKafkaProducer<byte[], byte[]> producer = createTransactionalProducer();
			producer.beginTransaction();
			return new KafkaTransactionState(producer.getTransactionalId(), producer);
		case AT_LEAST_ONCE:
		case NONE:
			// Do not create new producer on each beginTransaction() if it is not necessary
			final KafkaTransactionState currentTransaction = currentTransaction();
			if (currentTransaction != null && currentTransaction.producer != null) {
				return new KafkaTransactionState(currentTransaction.producer);
			}
			return new KafkaTransactionState(initNonTransactionalProducer(true));
		default:
			throw new UnsupportedOperationException("Not implemented semantic");
	}
}

@Override
protected void preCommit(KafkaTransactionState transaction) throws FlinkKafka011Exception {
	switch (semantic) {
		case EXACTLY_ONCE:
		case AT_LEAST_ONCE:
			flush(transaction);
			break;
		case NONE:
			break;
		default:
			throw new UnsupportedOperationException("Not implemented semantic");
	}
	checkErroneous();
}

@Override
protected void commit(KafkaTransactionState transaction) {
	if (transaction.isTransactional()) {
		try {
			transaction.producer.commitTransaction();
		} finally {
			recycleTransactionalProducer(transaction.producer);
		}
	}
}

@Override
protected void abort(KafkaTransactionState transaction) {
	if (transaction.isTransactional()) {
		transaction.producer.abortTransaction();
		recycleTransactionalProducer(transaction.producer);
	}
}

FlinkKafkaProducer011中实现的beginTransaction、preCommit、commit、abort主要就是调用Kafka Producer客户端对应的两阶段提交协议的函数。另外值得注意的有2点:

  • 在preCommit函数中调用了flush方法。从TwoPhaseCommitSinkFunction的分析中可以看到preCommit是在snapshotState方法中调用的,而snapshotState方法是在算子Checkpoint的时候触发的。这样就保证了算子在做Checkpoint时,所有该Checkpoint之前的数据都已经安全的发送到了下游(而不是在缓存中)。以图三为例,sink算子在收到第一个Checkpoint barrier时触发Checkpoint操作,而在Checkpoint完成之前,必须保证m1-m5这5条消息都已经发送到了下游,否则如果Checkpoint完成,而m1-m5中有消息没有送达,就会发生消息丢失。在snapshotState方法中保证缓存中的数据都已经发送出去是一个很通用的做法,在自己实现定制化SinkFunction时也要注意。这里的flush方法最终调用的是Kafka Producer客户端的flush方法,这是一个阻塞的方法,会等到所有缓存中的消息真正发给Kafka才返回,所以有时看到Checkpoint时间有毛刺,也可能是受这个flush的影响。
  • 在beginTransaction里调用了getTransactionalId,在commit和abort中调用了recycleTransactionalProducer。这里可以回顾下第二部分中提到的如何生成Kafka transactional id的问题,看一下Flink是如何产生这个id的。从下面的代码中可以看出Flink用一个队列作为transactional id的Pool,新的Transaction开始时从队头拿出一个transactional id,Transaction结束时将transactional id放回队尾。因为每开始一个Transaction,都会构造一个新的Kafka Producer,因此availableTransactionalIds初始的大小就是配置的Kafka Producer Pool Size(默认是5)。
/**
 * Pool of available transactional ids.
 */
private final BlockingDeque<String> availableTransactionalIds = new LinkedBlockingDeque<>();
	
/**
 * For each checkpoint we create new {@link FlinkKafkaProducer} so that new transactions will not clash
 * with transactions created during previous checkpoints ({@code producer.initTransactions()} assures that we
 * obtain new producerId and epoch counters).
 */
private FlinkKafkaProducer<byte[], byte[]> createTransactionalProducer() throws FlinkKafka011Exception {
	String transactionalId = availableTransactionalIds.poll();
	if (transactionalId == null) {
		throw new FlinkKafka011Exception(
			FlinkKafka011ErrorCode.PRODUCERS_POOL_EMPTY,
			"Too many ongoing snapshots. Increase kafka producers pool size or decrease number of concurrent checkpoints.");
	}
	FlinkKafkaProducer<byte[], byte[]> producer = initTransactionalProducer(transactionalId, true);
	producer.initTransactions();
	return producer;
}

private void recycleTransactionalProducer(FlinkKafkaProducer<byte[], byte[]> producer) {
	availableTransactionalIds.add(producer.getTransactionalId());
	producer.close();
}

四、总结

Flink使用Kafka的消息事务实现的端到端Exactly Once消息送达,其实是一个比较通用的解决方案,了解了其原理,可以很快将这种方案套用到其他支持事务的外部存储或消息队列。

Flink使用Kafka事务的方式,对于业务开发中正确使用Kafka也是一个很好的demo,在其他工程中使用Kafka实现消息的强一致性,也可以借鉴Flink的代码。

参考文献

1 Kafka 设计解析(八):Kafka 事务机制与 Exactly Once 语义实现原理。https://www.infoq.cn/article/kafka-analysis-part-8

2 Exactly Once Delivery and Transactional Messaging in Kafka. https://docs.google.com/document/d/11Jqy_GjUGtdXJK94XGsEIK7CP1SnQGdp2eF0wSw9ra8/edit#

3 An Overview of End-to-End Exactly-Once Processing in Apache Flink® (with Apache Kafka, too!). https://www.ververica.com/blog/end-to-end-exactly-once-processing-apache-flink-apache-kafka

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、前言
  • 二、Kafka消息事务
  • 三、Flink利用Kafka消息事务实现端到端Exactly Once语义
    • 1. TwoPhaseCommitSinkFunction
      • 2. FlinkKafkaProducer011
      • 四、总结
        • 参考文献
        相关产品与服务
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档