由于原生KafkaConsumer不是线程安全的,所以不鼓励从不同的线程调用pause和resume方法,而不是从kafka-consumer处理线程调用。但是spring-kafka提供了另一层KafkaMessageListenerContainer,在内部使用kafka-consumer。所以我的问题是,我们是否可以使用KafkaListenerEndpointRegistry通过id获取侦听器容器,并从其他线程而不是消费者处理线程调用resume或pause方法。
kafkaListenerEndpointRegistry.getListenerContainer("id").pause();
ExecutorService executorService = newFixedThreadPool(2);
executorService.submit(()->{
System.out.println("CurrentThread: {}" + Thread.currentThread().getId()+ " " + Thread.currentThread().getName());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
kafkaListenerEndpointRegistry.getListenerContainer("id").resume();
});
发布于 2020-07-22 21:40:25
可以;container.pause()
会设置一个标志,告诉Consumer
线程在下一次poll()
调用之前暂停。类似地,resume()
重置标志,以便使用者线程在下一次轮询之前恢复Consumer
。
https://stackoverflow.com/questions/63032912
复制相似问题