前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >实操 | kafka如何手动异步提交offset

实操 | kafka如何手动异步提交offset

作者头像
create17
发布2020-09-24 15:52:40
3.8K0
发布2020-09-24 15:52:40
举报
文章被收录于专栏:大数据实战演练
每一个成功人士的背后,必定曾经做出过勇敢而又孤独的决定。

放弃不难,但坚持很酷~

kafka_2.11-1.1.0

Kafka 手动异步提交 offset 的步骤大概分为以下几步,如下图所示:

1、配置手动提交

enable.auto.commit 修改为 false 。

2、订阅 topic
代码语言:javascript
复制
consumer.subscribe(Arrays.asList("topic name"));
3、获取 topic 各分区当前读取到的最后一条记录的offset

首先定义一个全局变量:

代码语言:javascript
复制
//用来记录当前消费的偏移
private static Map<TopicPartition, Long> offsets = new HashMap<>();
代码语言:javascript
复制
for (TopicPartition partition : records.partitions()) {
    List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
    // 获取当前读取到的最后一条记录的offset
    long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
    // 提交offset
    offsets.put(partition, lastOffset + 1);
}

至于为什么消费者提交 offsets 时要 +1,在《Kafka消费者 之 如何提交消息的偏移量》中的概述章节里面也给出了答案。

4、手动异步提交 offset

首先定义一个全局变量:

代码语言:javascript
复制
//用来记录当需要提交的偏移
private static Map<TopicPartition, OffsetAndMetadata> commitOffset = new HashMap<>();
代码语言:javascript
复制
// 
for (Map.Entry<TopicPartition, Long> entry : offsets.entrySet()) {
    commitOffset.put(entry.getKey(), new OffsetAndMetadata(offsets.get(entry.getKey())));
    logger.info("partition[{}], 当前待提交kafka偏移:[{}]", entry.getKey().partition(), offsets.get(entry.getKey()));
}
// 异步提交offset
consumer.commitAsync(commitOffset, (offsets, exception) -> {
    if (exception != null) {
        logger.error("fail to commit offsets {}, {}", offsets, exception);
        // 同步提交,来做offset提交最后的保证。
        consumer.commitSync();
    }
});

清空:

代码语言:javascript
复制
commitOffset.clear();
offsets.clear();
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-09-21,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据实战演练 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1、配置手动提交
  • 2、订阅 topic
  • 3、获取 topic 各分区当前读取到的最后一条记录的offset
  • 4、手动异步提交 offset
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档