首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >如何为一个Kafka侦听器在单独的配置属性中指定多个主题?

如何为一个Kafka侦听器在单独的配置属性中指定多个主题?
EN

Stack Overflow用户
提问于 2022-03-16 12:55:09
回答 2查看 3.7K关注 0票数 1

我想创建一个spring引导应用程序,从几个Kafka主题中读取。我意识到我可以在我的appliation.properties上创建一个逗号分隔的主题列表,但是我希望将主题名称单独列出以提高可读性,因此我可以使用每个主题名来解决如何处理消息。

我发现了以下问题,但它们都将主题列为逗号分隔的数组:

Consume multiple topics in one listener in spring boot kafka

Using multiple topic names with KafkaListener annotation

Enabling @KafkaListener to take in variable topic names from application.yml file

Pass array list of topic names to @KafkaListener

我最接近的是以下几点:

application.properties

代码语言:javascript
运行
复制
kafka.topic1=topic1
kafka.topic2=topic2

KafkaConsumer

代码语言:javascript
运行
复制
@KafkaListener(topics = "#{'${kafka.topic1}'},#{'${kafka.topic2}'}")
public void receive(@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
      @Header(required = false, name= KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
      @Payload(required = false) String payload) throws IOException {
}       

这就产生了错误:

代码语言:javascript
运行
复制
Caused by: org.apache.kafka.common.errors.InvalidTopicException: Invalid topics: [topic1,topic2]

我意识到我需要它是{"topic1","topic2},但我不知道怎么做。“

让注释@KafkaListener(topics = "#{'${kafka.topic1}'}")正确地订阅第一个主题。如果我将它更改为@KafkaListener(topics = "#{'${kafka.topic2}'}"),我就可以正确地订阅第二个主题。

这只是我无法理解的注释中主题数组的创建。

任何帮助都是很好的

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2022-03-16 15:46:14

代码语言:javascript
运行
复制
@KafkaListener(id = "so71497475", topics = { "${kafka.topic1}", "${kafka.topic2}" })

编辑

这是一种更复杂的技术,可以在不更改任何代码的情况下添加更多的主题:

代码语言:javascript
运行
复制
@SpringBootApplication
@EnableConfigurationProperties
public class So71497475Application {

    public static void main(String[] args) {
        SpringApplication.run(So71497475Application.class, args);
    }

    @KafkaListener(id = "so71497475", topics = "#{@myProps.kafkaTopics}")
    void listen(String in) {
        System.out.println(in);
    }

    @Bean // This will add the topics to the broker if not present
    KafkaAdmin.NewTopics topics(MyProps props) {
        return new KafkaAdmin.NewTopics(props.getTopics().stream()
                .map(t -> TopicBuilder.name(t).partitions(1).replicas(1).build())
                .toArray(size -> new NewTopic[size]));
    }

}

@ConfigurationProperties("my.kafka")
@Component
class MyProps {

    private List<String> topics = new ArrayList<>();

    public List<String> getTopics() {
        return this.topics;
    }

    public void setTopics(List<String> topics) {
        this.topics = topics;
    }

    public String[] getKafkaTopics() {
        return this.topics.toArray(new String[0]);
    }

}
代码语言:javascript
运行
复制
my.kafka.topics[0]=topic1
my.kafka.topics[1]=topic2
my.kafka.topics[2]=topic3
代码语言:javascript
运行
复制
so71497475: partitions assigned: [topic1-0, topic2-0, topic3-0]
票数 3
EN

Stack Overflow用户

发布于 2022-04-08 04:53:41

如果将主题配置为逗号,如下所示:

代码语言:javascript
运行
复制
kafka.topics = topic1,topic2

在这种情况下,您可以简单地使用:

代码语言:javascript
运行
复制
@KafkaListener(topics = "#{'${kafka.topics}'.split(',')}")
void listen(){}
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/71497475

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档