首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >Springboot Kafka @Listener使用者暂停/恢复不工作

Springboot Kafka @Listener使用者暂停/恢复不工作
EN

Stack Overflow用户
提问于 2022-07-07 09:55:06
回答 2查看 381关注 0票数 1

我有一个弹簧卡夫卡消费者和制片人。使用者需要从主题1到1读取数据,处理(耗时)它&将其写入另一个主题,然后手动提交偏移量。

为了避免再平衡,我尝试在KafkaContainer上调用pause()和call (),但是使用者总是在运行&从不响应pause()调用,即使使用with循环也尝试它,但没有成功(无法暂停使用者)。KafkaListenerEndpointRegistry是自动发球。Springboot版本= 2.6.9,Springboot版本= 2.8.7

代码语言:javascript
运行
复制
@KafkaListener(id = "c1", topics = "${app.topics.topic1}", containerFactory = "listenerContainerFactory1")
public void poll(ConsumerRecord<String, String> record, Acknowledgment ack) {
    log.info("Received Message by consumer of topic1: " + value);
    String result = process(record.value());
    producer.sendMessage(result + " topic2");
    log.info("Message sent from " + topicIn + " to " + topicOut);
    ack.acknowledge();
    log.info("Offset committed by consumer 1");
}

private String process(String value) {
    try {
        pauseConsumer();
        // Perform time intensive network IO operations
        resumeConsumer();
    } catch (InterruptedException e) {
        log.error(e.getMessage());
    }
    return value;
}

private void pauseConsumer() throws InterruptedException {
    if (registry.getListenerContainer("c1").isRunning()) {
        log.info("Attempting to pause consumer");
        Objects.requireNonNull(registry.getListenerContainer("c1")).pause();
        Thread.sleep(5000);
        log.info("kafkalistener container state - " + registry.getListenerContainer("c1").isRunning());
    }
}

private void resumeConsumer() throws InterruptedException {
    if (registry.getListenerContainer("c1").isContainerPaused() || registry.getListenerContainer("c1").isPauseRequested()) {
        log.info("Attempting to resume consumer");
        Objects.requireNonNull(registry.getListenerContainer("c1")).resume();
        Thread.sleep(5000);
        log.info("kafkalistener container state - " + registry.getListenerContainer("c1").isRunning());
    }
}

我是不是遗漏了什么?有人能用正确的方法引导我达到要求的行为吗?

EN

回答 2

Stack Overflow用户

发布于 2022-07-07 12:55:38

您正在侦听器线程上运行process()方法,所以暂停/恢复不会产生任何效果;只有当侦听器线程退出侦听器方法时(以及在它处理了上一次轮询接收的所有记录之后),暂停才会发生。

下一个版本(2.9)将于本月晚些时候发布,它有一个新的属性pauseImmediate,该属性将导致在处理当前记录后暂停生效。

票数 2
EN

Stack Overflow用户

发布于 2022-07-07 10:33:15

你可以这样做。这对我来说很有用

代码语言:javascript
运行
复制
public class kafkaConsumer {
    public void run(String topicName) {
        try {
            Consumer<String, String> consumer = new KafkaConsumer<>(config);
            consumer.subscribe(Collections.singleton(topicName));
                while (true) {
                    try {
                        ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(80000));
                        for (TopicPartition partition : consumerRecords.partitions()) {
                            List<ConsumerRecord<String, String>> partitionRecords = consumerRecords.records(partition);
                            for (ConsumerRecord<String, String> record : partitionRecords) {

                                kafkaEvent = record.value();

                                consumer.pause(consumer.assignment());

                /** Implement Your Business Logic Here **/

                Once your processing done

                consumer.resume(consumer.assignment());

                                try {
                                    consumer.commitSync();
                                } catch (CommitFailedException e) {
                                }
                            }
                        }
                    } catch (Exception e) {
                        continue;
                    }
                }

        } catch (Exception e) {
        }
    }
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/72895788

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档