我在我的项目中使用了spring Kafka,我希望在消费者基于key和value消费之前过滤消息。
有可能吗?
发布于 2018-07-25 12:12:00
是的,在spring Kafka中,你可以在消费者消费之前过滤消息,在接口boolean filter(org.apache.kafka.clients.consumer.ConsumerRecord<K,V> consumerRecord)中有一个接口public interface RecordFilterStrategy<K,V>和方法
因此,您需要覆盖此filter方法,如果返回false,消费者将使用消息,如果返回true,则不使用消息
您可以对消息和消息值应用此过滤
consumerRecord.key() // will return key of message
consumerRecord.value() // will return the message示例代码:
 @Bean(name = "kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConcurrency(Integer.parseInt(threads));
    factory.setBatchListener(true);
    factory.setConsumerFactory(kafkaConsumerFactory());
    factory.getContainerProperties().setPollTimeout(Long.parseLong(pollTimeout));
    factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.BATCH);
    if(true) {
        factory.setRecordFilterStrategy(new RecordFilterStrategy<String, String>() {
            @Override
            public boolean filter(ConsumerRecord<String, String> consumerRecord) {
                if(consumerRecord.key().equals("ETEST")) {
                return false;
                }
            else {
                return true;
                 }
            }   
        });
    }
    return factory;
}https://stackoverflow.com/questions/51510653
复制相似问题