前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >flink-connector-kafka consumer checkpoint源码分析

flink-connector-kafka consumer checkpoint源码分析

作者头像
sanmutongzi
发布2020-03-05 10:49:15
1K0
发布2020-03-05 10:49:15
举报
文章被收录于专栏:stream processstream process

转发请注明原创地址:http://www.cnblogs.com/dongxiao-yang/p/7700600.html

flink-connector-kafka consumer的topic分区分配源码》一文提到了在flink-connector-kafka的consumer初始化的时候有三种offset提交模式:KAFKA_PERIODIC,DISABLED和ON_CHECKPOINTS。

其中ON_CHECKPOINTS表示在flink做完checkpoint后主动向kafka提交offset的方法,本文主要分析一下flink-connector-kafka在源码如何使用checkpoint机制实现offset的恢复和提交。

flink conusmer的实现基类FlinkKafkaConsumerBase定义如下,这个类实现了了与checkpoin相关的三个接口CheckpointedFunction,CheckpointedRestoring<HashMap<KafkaTopicPartition, Long>>,CheckpointListener。根据官网文档,CheckpointedRestoring的restoreState()方法已经被CheckpointedFunction的initializeState取代,所以重点关注三个方法实现

1initializeState() 实例初始化或者recover的时候调用

2snapshotState() 每次创建checkpoint的时候调用

3 notifyCheckpointComplete() 每次checkpoint结束的时候调用

代码语言:javascript
复制
public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T> implements 
        CheckpointListener,
        ResultTypeQueryable<T>,
        CheckpointedFunction,
        CheckpointedRestoring<HashMap<KafkaTopicPartition, Long>> {

initializeState

代码语言:javascript
复制
    @Override
    public final void initializeState(FunctionInitializationContext context) throws Exception {

        // we might have been restored via restoreState() which restores from legacy operator state
        if (!restored) {
            restored = context.isRestored();
        }

        OperatorStateStore stateStore = context.getOperatorStateStore();
        offsetsStateForCheckpoint = stateStore.getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);

        if (context.isRestored()) {
            if (restoredState == null) {
                restoredState = new HashMap<>();
                for (Tuple2<KafkaTopicPartition, Long> kafkaOffset : offsetsStateForCheckpoint.get()) {
                    restoredState.put(kafkaOffset.f0, kafkaOffset.f1);
                }

                LOG.info("Setting restore state in the FlinkKafkaConsumer.");
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Using the following offsets: {}", restoredState);
                }
            }
        } else {
            LOG.info("No restore state for FlinkKafkaConsumer.");
        }
    }

这个方法的逻辑比较简单,在task恢复的时候从stateStore中序列化出来之前存储的ListState<Tuple2<KafkaTopicPartition, Long>> 状态数据,并放到restoredState这个变量,用于下面open方法直接恢复对应的分区和offset起始值。

snapshotState

代码语言:javascript
复制
    @Override
    public final void snapshotState(FunctionSnapshotContext context) throws Exception {
        if (!running) {
            LOG.debug("snapshotState() called on closed source");
        } else {

            offsetsStateForCheckpoint.clear();

            final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;
            if (fetcher == null) {
                // the fetcher has not yet been initialized, which means we need to return the
                // originally restored offsets or the assigned partitions
                for (Map.Entry<KafkaTopicPartition, Long> subscribedPartition : subscribedPartitionsToStartOffsets.entrySet()) {
                    offsetsStateForCheckpoint.add(Tuple2.of(subscribedPartition.getKey(), subscribedPartition.getValue()));
                }

                if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
                    // the map cannot be asynchronously updated, because only one checkpoint call can happen
                    // on this function at a time: either snapshotState() or notifyCheckpointComplete()
                    pendingOffsetsToCommit.put(context.getCheckpointId(), restoredState);
                }
            } else {
                HashMap<KafkaTopicPartition, Long> currentOffsets = fetcher.snapshotCurrentState();

                if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
                    // the map cannot be asynchronously updated, because only one checkpoint call can happen
                    // on this function at a time: either snapshotState() or notifyCheckpointComplete()
                    pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);
                }

                for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry : currentOffsets.entrySet()) {
                    offsetsStateForCheckpoint.add(
                            Tuple2.of(kafkaTopicPartitionLongEntry.getKey(), kafkaTopicPartitionLongEntry.getValue()));
                }
            }

            if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
                // truncate the map of pending offsets to commit, to prevent infinite growth
                while (pendingOffsetsToCommit.size() > MAX_NUM_PENDING_CHECKPOINTS) {
                    pendingOffsetsToCommit.remove(0);
                }
            }
        }
    }

snapshot方法创建checkpoint的做法是把当前的KafkaTopicPartition和目前消费到的offset值不断存放到offsetsStateForCheckpoint这个state对象里,然后把当前的checkpointid和对应的offset存到pendingOffsetsToCommit这个linkmap。当前offset的获取分两个情况,初始化的时候(if (fetcher == null) {...})和fetcher已经初始化成功,初始化的时候从restoredState获取,正常运行中获取fetcher.snapshotCurrentState()。

notifyCheckpointComplete

代码语言:javascript
复制
public final void notifyCheckpointComplete(long checkpointId) throws Exception {
        if (!running) {
            LOG.debug("notifyCheckpointComplete() called on closed source");
            return;
        }

        final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;
        if (fetcher == null) {
            LOG.debug("notifyCheckpointComplete() called on uninitialized source");
            return;
        }

        if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
            // only one commit operation must be in progress
            if (LOG.isDebugEnabled()) {
                LOG.debug("Committing offsets to Kafka/ZooKeeper for checkpoint " + checkpointId);
            }

            try {
                final int posInMap = pendingOffsetsToCommit.indexOf(checkpointId);
                if (posInMap == -1) {
                    LOG.warn("Received confirmation for unknown checkpoint id {}", checkpointId);
                    return;
                }

                @SuppressWarnings("unchecked")
                HashMap<KafkaTopicPartition, Long> offsets =
                    (HashMap<KafkaTopicPartition, Long>) pendingOffsetsToCommit.remove(posInMap);

                // remove older checkpoints in map
                for (int i = 0; i < posInMap; i++) {
                    pendingOffsetsToCommit.remove(0);
                }

                if (offsets == null || offsets.size() == 0) {
                    LOG.debug("Checkpoint state was empty.");
                    return;
                }

                fetcher.commitInternalOffsetsToKafka(offsets, offsetCommitCallback);
            } catch (Exception e) {
                if (running) {
                    throw e;
                }
                // else ignore exception if we are no longer running
            }
        }
    }

notifyCheckpointComplete主要是在checkpoint结束后在ON_CHECKPOINTS的情况下向kafka集群commit offset,方法调用时会拿到已经完成的checkpointid,从前文的pendingOffsetsToCommit列表里找到对应的offset。如果判断索引不存在,则直接退出。否则,移除该索引对应的快照信息,然后将小于当前索引(较旧的)的快照信息也一并移除(这一点我之前解释过,因为所有的检查点都是按时间递增有序的)。最后将当前完成的检查点对应的消息的偏移量进行commit,也即commitOffsets。只不过这里该方法被定义为抽象方法,因为Kafka不同版本的API差别的原因,由适配不同版本的consumer各自实现,目前kafka09和010实现都是在Kafka09Fetcher内实现的commitInternalOffsetsToKafka方法。

参考文档:

http://blog.csdn.net/yanghua_kobe/article/details/51503885

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2017-10-20 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档