我有一个弹簧卡夫卡消费者和制片人。使用者需要从主题1到1读取数据,处理(耗时)它&将其写入另一个主题,然后手动提交偏移量。
为了避免再平衡,我尝试在KafkaContainer上调用pause()和call (),但是使用者总是在运行&从不响应pause()调用,即使使用with循环也尝试它,但没有成功(无法暂停使用者)。KafkaListenerEndpointRegistry是自动发球。Springboot版本= 2.6.9,Springboot版本= 2.8.7
@KafkaListener(id = "c1", topics = "${app.topics.topic1}", containerFactory = "listenerContainerFactory1")
public void poll(ConsumerRecord<String, String> record, Acknowledgment ack) {
log.info("Received Message by consumer of topic1: " + value);
String result = process(record.value());
producer.sendMessage(result + " topic2");
log.info("Message sent from " + topicIn + " to " + topicOut);
ack.acknowledge();
log.info("Offset committed by consumer 1");
}
private String process(String value) {
try {
pauseConsumer();
// Perform time intensive network IO operations
resumeConsumer();
} catch (InterruptedException e) {
log.error(e.getMessage());
}
return value;
}
private void pauseConsumer() throws InterruptedException {
if (registry.getListenerContainer("c1").isRunning()) {
log.info("Attempting to pause consumer");
Objects.requireNonNull(registry.getListenerContainer("c1")).pause();
Thread.sleep(5000);
log.info("kafkalistener container state - " + registry.getListenerContainer("c1").isRunning());
}
}
private void resumeConsumer() throws InterruptedException {
if (registry.getListenerContainer("c1").isContainerPaused() || registry.getListenerContainer("c1").isPauseRequested()) {
log.info("Attempting to resume consumer");
Objects.requireNonNull(registry.getListenerContainer("c1")).resume();
Thread.sleep(5000);
log.info("kafkalistener container state - " + registry.getListenerContainer("c1").isRunning());
}
}
我是不是遗漏了什么?有人能用正确的方法引导我达到要求的行为吗?
发布于 2022-07-07 12:55:38
您正在侦听器线程上运行process()
方法,所以暂停/恢复不会产生任何效果;只有当侦听器线程退出侦听器方法时(以及在它处理了上一次轮询接收的所有记录之后),暂停才会发生。
下一个版本(2.9)将于本月晚些时候发布,它有一个新的属性pauseImmediate
,该属性将导致在处理当前记录后暂停生效。
发布于 2022-07-07 10:33:15
你可以这样做。这对我来说很有用
public class kafkaConsumer {
public void run(String topicName) {
try {
Consumer<String, String> consumer = new KafkaConsumer<>(config);
consumer.subscribe(Collections.singleton(topicName));
while (true) {
try {
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(80000));
for (TopicPartition partition : consumerRecords.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = consumerRecords.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {
kafkaEvent = record.value();
consumer.pause(consumer.assignment());
/** Implement Your Business Logic Here **/
Once your processing done
consumer.resume(consumer.assignment());
try {
consumer.commitSync();
} catch (CommitFailedException e) {
}
}
}
} catch (Exception e) {
continue;
}
}
} catch (Exception e) {
}
}
https://stackoverflow.com/questions/72895788
复制相似问题