首页
学习
活动
专区
工具
TVP
发布
社区首页 >问答首页 >如何使用Kafka DSL对Spring Cloud Stream进行单元测试

如何使用Kafka DSL对Spring Cloud Stream进行单元测试
EN

Stack Overflow用户
提问于 2018-06-01 00:35:13
回答 1查看 4.7K关注 0票数 4

我正在尝试(单元)测试使用Kafka DSL的Spring Cloud Stream Kafka处理器,但收到以下错误"Connection to node -1 could not be established. Broker may not be available.“。此外,测试不会关闭。我同时尝试了EmbeddedKafka和TestBinder,但我仍然有相同的行为。我尝试从reponse given by Spring Cloud Team开始(它可以工作),并且我调整了应用程序以使用Kafka DSL,并且几乎保留了测试类的原样。EmbeddedKafka真的支持Kafka DSL吗?

我正在使用Elmhurst.RELEASE

代码语言:javascript
复制
@SpringBootApplication
@EnableBinding(MyBinding.class)
public class So43330544Application {

    public static void main(String[] args) {
        SpringApplication.run(So43330544Application.class, args);
    }

    @StreamListener
    @SendTo(MyBinding.OUTPUT)
    public KStream<String,String> process(@Input(MyBinding.INPUT) KStream<String, String> in) {

        return in.peek((k,v) -> System.out.println("Received value " +v ))
                .mapValues(v -> v.toUpperCase());
    }
}

interface MyBinding {

    String INPUT = "input";
    String OUTPUT = "output";

    @Input(INPUT)
    KStream<String, String> messagesIn();

    @Output(OUTPUT)
    KStream<String, String> messagesOut();
} 

更新

如下面的示例所示,当我使用Spring Cloud Stream通用语法编写事件处理器时,this answer中提出的方法对我有效,但当我使用Kafka DSL (KStreams)时就不起作用了。要查看行为上的差异,只需切换到@SpringBootTest注释中的ExampleAppWorkingExampleAppNotWorking

代码语言:javascript
复制
@RunWith(SpringRunner.class)
@SpringBootTest(classes=ExampleKafkaEmbeddedTest.ExampleAppNotWorking.class)
@DirtiesContext(classMode=ClassMode.AFTER_EACH_TEST_METHOD)
public class ExampleKafkaEmbeddedTest {
    @ClassRule
    public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, false, "so0544in","so0544out");

    @Autowired
    private KafkaTemplate<Integer, byte[]> template;

    @Autowired
    private KafkaProperties properties;

    private static Consumer consumer;

    @BeforeClass
    public static void setup() throws Exception{
        System.setProperty("spring.kafka.bootstrap-servers", embeddedKafka.getBrokersAsString());
        System.setProperty("spring.cloud.stream.kafka.binder.zkNodes", embeddedKafka.getZookeeperConnectionString());
        System.setProperty("server.port","0");
        System.setProperty("spring.jmx.enabled" , "false");

        Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("group-id", "false", embeddedKafka);

        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
        consumer = cf.createConsumer();
        embeddedKafka.consumeFromAnEmbeddedTopic(consumer, "so0544out");

    }

    @After
    public void tearDown() {
        if (consumer != null){
            consumer.close();
        }
    }

    @Test
    public void testSendReceive() {
        template.send("so0544in", "foo".getBytes());

        Map<String, Object> configs = properties.buildConsumerProperties();
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, "test0544");
        configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        ConsumerRecord<String, String> cr = KafkaTestUtils.getSingleRecord(consumer, "so0544out");

        System.out.println("Contenu chaine resultat : " + cr.value());

        assertEquals(cr.value(), "FOO");
    }

    @SpringBootApplication
    @EnableBinding(Processor.class)
    public static class ExampleAppWorking {

        public static void main(String[] args) {
            SpringApplication.run(ExampleAppWorking.class, args);
        }

        @StreamListener(Processor.INPUT)
        @SendTo(Processor.OUTPUT)
        public String receive(String in) {
            return in.toUpperCase();
        }
    }

    @SpringBootApplication
    @EnableBinding(MyBinding.class)
    public static class ExampleAppNotWorking {

        public static void main(String[] args) {
            SpringApplication.run(ExampleAppNotWorking.class, args);
        }

        @StreamListener
        @SendTo(MyBinding.OUTPUT)
        public KStream<Integer,byte[]> toUpperCase (@Input(MyBinding.INPUT) KStream<Integer,byte[]> in){
            return in.map((key, val) -> KeyValue.pair(key, new String(val).toUpperCase().getBytes()));
        }
    }

    public interface MyBinding {
        String INPUT = "input";
        String OUTPUT = "output";

        @Input(INPUT)
        KStream<Integer, String> messagesIn();

        @Input(OUTPUT)
        KStream<Integer, String> messagesOut();
    }

}
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2018-06-01 00:55:05

EmbeddedKafa应该与Kafka Streams一起工作。所有这些tests都使用EmbeddedKafa进行测试。您可以遵循这些测试中使用的模式作为您自己测试的模板。

查看了您在下面的注释中提供的代码。您需要在setup方法中添加此属性。

System.setProperty("spring.cloud.stream.kafka.streams.binder.brokers", embeddedKafka.getBrokersAsString());

主Spring Boot应用程序期望Kafka代理在localhost上可用,但它不知道测试运行的是嵌入式代理。我们需要通过设置测试中的属性来明确这一事实,以便主引导应用程序正确地检测到嵌入的kafka代理。

票数 5
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/50628979

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档