我们有特定的主题,只有在条件consumeEnabled=true的情况下才需要使用消息。所以,它应该是这样的:
如果应用程序正在启动并使用
当应用程序正在消耗信息时,consumeEnabled就会变成假的,而不必考虑。
请使用Spring和\或Kafka客户端定义实现决策的最佳方法。
发布于 2019-12-16 13:51:54
如果您正在使用@KafkaListener,那么
@KafkaListener(id = "foo", ... , autoStartup="${consume.enabled}")其中consume.enabled是一个属性。
要在运行时启动/停止容器,请使用KafkaListenerEndpointRegistry bean。
registry.getListenerContainer("foo").start();发布于 2019-12-16 06:14:41
您可以将您的使用者放在一个简单的线程中,以切换使用者对象的轮询状态。
public class EnabledConsumer implements Runnable {
private Consumer consumer;
private boolean enabled;
public EnabledConsumer(Consumer consumer, boolean enabled) {
this.consumer = consumer;
this.enabled = enabled;
}
public void setEnabled(boolean enable) {
this.enabled = enable;
}
@Override
public void run() {
while(enabled) {
ConsumerRecords records = consumer.poll(...);
...
}
}https://stackoverflow.com/questions/59350993
复制相似问题