聊聊spring for kafka的AckMode

本文主要讲述一下spring for kafka的consumer在spring.kafka.consumer.enable-auto-commit是false情况下,AckMode的选项

AckMode

spring-kafka-1.2.3.RELEASE-sources.jar!/org/springframework/kafka/listener/AbstractMessageListenerContainer.java$AckMode

    /**
     * The offset commit behavior enumeration.
     */
    public enum AckMode {

        /**
         * Commit after each record is processed by the listener.
         */
        RECORD,

        /**
         * Commit whatever has already been processed before the next poll.
         */
        BATCH,

        /**
         * Commit pending updates after
         * {@link ContainerProperties#setAckTime(long) ackTime} has elapsed.
         */
        TIME,

        /**
         * Commit pending updates after
         * {@link ContainerProperties#setAckCount(int) ackCount} has been
         * exceeded.
         */
        COUNT,

        /**
         * Commit pending updates after
         * {@link ContainerProperties#setAckCount(int) ackCount} has been
         * exceeded or after {@link ContainerProperties#setAckTime(long)
         * ackTime} has elapsed.
         */
        COUNT_TIME,

        /**
         * User takes responsibility for acks using an
         * {@link AcknowledgingMessageListener}.
         */
        MANUAL,

        /**
         * User takes responsibility for acks using an
         * {@link AcknowledgingMessageListener}. The consumer is woken to
         * immediately process the commit.
         */
        MANUAL_IMMEDIATE,

    }
  • RECORD 每处理一条commit一次
  • BATCH(默认) 每次poll的时候批量提交一次,频率取决于每次poll的调用频率
  • TIME 每次间隔ackTime的时间去commit(跟auto commit interval有什么区别呢?)
  • COUNT 累积达到ackCount次的ack去commit
  • COUNT_TIME ackTime或ackCount哪个条件先满足,就commit
  • MANUAL listener负责ack,但是背后也是批量上去
  • MANUAL_IMMEDIATE listner负责ack,每调用一次,就立即commit

KafkaMessageListenerContainer$ListenerConsumer

spring-kafka-1.2.3.RELEASE-sources.jar!/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

        @Override
        public void run() {
            if (this.theListener instanceof ConsumerSeekAware) {
                ((ConsumerSeekAware) this.theListener).registerSeekCallback(this);
            }
            this.count = 0;
            this.last = System.currentTimeMillis();
            if (isRunning() && this.definedPartitions != null) {
                initPartitionsIfNeeded();
                // we start the invoker here as there will be no rebalance calls to
                // trigger it, but only if the container is not set to autocommit
                // otherwise we will process records on a separate thread
                if (!this.autoCommit) {
                    startInvoker();
                }
            }
            long lastReceive = System.currentTimeMillis();
            long lastAlertAt = lastReceive;
            while (isRunning()) {
                try {
                    if (!this.autoCommit) {
                        processCommits();
                    }
                    processSeeks();
                    if (this.logger.isTraceEnabled()) {
                        this.logger.trace("Polling (paused=" + this.paused + ")...");
                    }
                    ConsumerRecords<K, V> records = this.consumer.poll(this.containerProperties.getPollTimeout());
                    if (records != null && this.logger.isDebugEnabled()) {
                        this.logger.debug("Received: " + records.count() + " records");
                    }
                    if (records != null && records.count() > 0) {
                        if (this.containerProperties.getIdleEventInterval() != null) {
                            lastReceive = System.currentTimeMillis();
                        }
                        // if the container is set to auto-commit, then execute in the
                        // same thread
                        // otherwise send to the buffering queue
                        if (this.autoCommit) {
                            invokeListener(records);
                        }
                        else {
                            if (sendToListener(records)) {
                                if (this.assignedPartitions != null) {
                                    // avoid group management rebalance due to a slow
                                    // consumer
                                    this.consumer.pause(this.assignedPartitions);
                                    this.paused = true;
                                    this.unsent = records;
                                }
                            }
                        }
                    }
                    else {
                        if (this.containerProperties.getIdleEventInterval() != null) {
                            long now = System.currentTimeMillis();
                            if (now > lastReceive + this.containerProperties.getIdleEventInterval()
                                    && now > lastAlertAt + this.containerProperties.getIdleEventInterval()) {
                                publishIdleContainerEvent(now - lastReceive);
                                lastAlertAt = now;
                                if (this.theListener instanceof ConsumerSeekAware) {
                                    seekPartitions(getAssignedPartitions(), true);
                                }
                            }
                        }
                    }
                    this.unsent = checkPause(this.unsent);
                }
                catch (WakeupException e) {
                    this.unsent = checkPause(this.unsent);
                }
                catch (Exception e) {
                    if (this.containerProperties.getGenericErrorHandler() != null) {
                        this.containerProperties.getGenericErrorHandler().handle(e, null);
                    }
                    else {
                        this.logger.error("Container exception", e);
                    }
                }
            }
            if (this.listenerInvokerFuture != null) {
                stopInvoker();
                commitManualAcks();
            }
            try {
                this.consumer.unsubscribe();
            }
            catch (WakeupException e) {
                // No-op. Continue process
            }
            this.consumer.close();
            if (this.logger.isInfoEnabled()) {
                this.logger.info("Consumer stopped");
            }
        }

这里while循环每次都判断是否auto commit,如果不是则processCommits

        private void processCommits() {
            handleAcks();
            this.count += this.acks.size();
            long now;
            AckMode ackMode = this.containerProperties.getAckMode();
            if (!this.isManualImmediateAck) {
                if (!this.isManualAck) {
                    updatePendingOffsets();
                }
                boolean countExceeded = this.count >= this.containerProperties.getAckCount();
                if (this.isManualAck || this.isBatchAck || this.isRecordAck
                        || (ackMode.equals(AckMode.COUNT) && countExceeded)) {
                    if (this.logger.isDebugEnabled() && ackMode.equals(AckMode.COUNT)) {
                        this.logger.debug("Committing in AckMode.COUNT because count " + this.count
                                + " exceeds configured limit of " + this.containerProperties.getAckCount());
                    }
                    commitIfNecessary();
                    this.count = 0;
                }
                else {
                    now = System.currentTimeMillis();
                    boolean elapsed = now - this.last > this.containerProperties.getAckTime();
                    if (ackMode.equals(AckMode.TIME) && elapsed) {
                        if (this.logger.isDebugEnabled()) {
                            this.logger.debug("Committing in AckMode.TIME " +
                                    "because time elapsed exceeds configured limit of " +
                                    this.containerProperties.getAckTime());
                        }
                        commitIfNecessary();
                        this.last = now;
                    }
                    else if (ackMode.equals(AckMode.COUNT_TIME) && (elapsed || countExceeded)) {
                        if (this.logger.isDebugEnabled()) {
                            if (elapsed) {
                                this.logger.debug("Committing in AckMode.COUNT_TIME " +
                                        "because time elapsed exceeds configured limit of " +
                                        this.containerProperties.getAckTime());
                            }
                            else {
                                this.logger.debug("Committing in AckMode.COUNT_TIME " +
                                        "because count " + this.count + " exceeds configured limit of" +
                                        this.containerProperties.getAckCount());
                            }
                        }

                        commitIfNecessary();
                        this.last = now;
                        this.count = 0;
                    }
                }
            }
        }

handleAcks

        private void handleAcks() {
            ConsumerRecord<K, V> record = this.acks.poll();
            while (record != null) {
                if (this.logger.isTraceEnabled()) {
                    this.logger.trace("Ack: " + record);
                }
                processAck(record);
                record = this.acks.poll();
            }
        }

        private void processAck(ConsumerRecord<K, V> record) {
            if (ListenerConsumer.this.isManualImmediateAck) {
                try {
                    ackImmediate(record);
                }
                catch (WakeupException e) {
                    // ignore - not polling
                }
            }
            else {
                addOffset(record);
            }
        }

这里可以看到,如果不是isManualImmediateAck,则每次是累加到offsets的map中

commitIfNecessary

        private void commitIfNecessary() {
            Map<TopicPartition, OffsetAndMetadata> commits = new HashMap<>();
            for (Entry<String, Map<Integer, Long>> entry : this.offsets.entrySet()) {
                for (Entry<Integer, Long> offset : entry.getValue().entrySet()) {
                    commits.put(new TopicPartition(entry.getKey(), offset.getKey()),
                            new OffsetAndMetadata(offset.getValue() + 1));
                }
            }
            this.offsets.clear();
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Commit list: " + commits);
            }
            if (!commits.isEmpty()) {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("Committing: " + commits);
                }
                try {
                    if (this.containerProperties.isSyncCommits()) {
                        this.consumer.commitSync(commits);
                    }
                    else {
                        this.consumer.commitAsync(commits, this.commitCallback);
                    }
                }
                catch (WakeupException e) {
                    // ignore - not polling
                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug("Woken up during commit");
                    }
                }
            }
        }

这里会从offsets的map组装出commits,然后去提交(commitSync或者commitAsync),然后clear掉offsets

manual commit

    @KafkaListener(topics = "k010")
    public void listen(ConsumerRecord<?, ?> cr,Acknowledgment ack) throws Exception {
        LOGGER.info(cr.toString());
        ack.acknowledge();
    }

方法参数里头传递Acknowledgment,然后手工ack 前提要配置AckMode

instance.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);

doc

  • spring-kafka-committing-offsets

原文发布于微信公众号 - 码匠的流水账(geek_luandun)

原文发表时间:2017-10-13

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏张善友的专栏

LINQ to SQL集成到应用程序中需考虑的一些问题

1、LINQ to SQL集成到应用程序中需考虑的一个问题, 到底应该返回IQueryable<T>还是IQueryable? 或许这个列表还应该继续扩展为T,...

22260
来自专栏Java进阶之路

IK分词器访问远程词典功能实现

IKAnalyzer是一个开源的,基于java语言开发的轻量级的中文分词工具包。从2006年12月推出1.0版开始,IKAnalyzer已经推出了3个大版本。最...

35920
来自专栏PPV课数据科学社区

【学习】七天搞定SAS(五):数据操作与合并

数据集操作永远是逃不掉的问题,最简单的就是两个数据集的合并——当然不是简简单单的行列添加,按照某一主键或者某些主键合并才是最常用的。在SAS中,要熟悉的就是SE...

415110
来自专栏项勇

笔记30 | 数据存储之SQLite的介绍及使用

26880
来自专栏分布式系统进阶

FBString分析与使用FBString简介

简单来说,使用了三层存储策略+内存分配策略+大小端支持,特别是配合使用 jemalloc, 减少磁盘碎片,加快并发下的分配速度和性能。

27220
来自专栏信安之路

SQL注入的常规思路及奇葩技巧

最近在看《SQL注入攻击与防御》这本书,看了之后感觉自己之前的视野和格局还是太小了些。SQLi的应用特别广泛,多种web数据库不说,移动安卓端也存在通用的SQL...

14500
来自专栏技术碎碎念

windows API 开发飞机订票系统 图形化界面 (四)

接下来的是录入航班、修改航班信息功能的实现: 1 //录入航班 2 BOOL EntryFlight(HWND hEntryDlg){ 3 4 ...

33850
来自专栏ShaoYL

iOS---小经验分享

28760
来自专栏鸿的学习笔记

Python写的Python解释器(三)

变量 接下来,给解释器添加变量。 变量需要一条指令来存储变量的值,STORE_NAME; 一条检索它的指令,LOAD_NAME; 以及变量名称到值的映射。 目前...

8230
来自专栏Python

Django---ORM操作大全

前言 Django框架功能齐全自带数据库操作功能,本文主要介绍Django的ORM框架 到目前为止,当我们的程序涉及到数据库相关操作时,我们一般都会这么搞:...

1.1K100

扫码关注云+社区

领取腾讯云代金券