首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >使用Topic1到Topic2的kafka流

使用Topic1到Topic2的kafka流
EN

Stack Overflow用户
提问于 2019-03-14 14:08:03
回答 1查看 279关注 0票数 1

我对kafka流很陌生,我想用阅读一个主题,并在一个新的主题中编写其中的一部分。我的键是string,value是Avro,有文档/示例可以使用吗?

编辑:

代码语言:javascript
运行
复制
    final StreamsBuilder builder = new StreamsBuilder();
    final KStream<String, GenericRecord> inputStream = builder.stream("Test_CX_TEST_KAFKA_X");
    final KStream<String, String> newStream = inputStream.mapValues(value -> value.get("ID").toString());
    newStream.to("SUB_TOPIC",Produced.with(Serdes.String(),Serdes.String()));
    final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
    streams.start();

在SUB_TOPIC中,我有:

键:{ "ID":"145“}时间戳: 2019年3月14日17:52:23.43偏移: 12分区:0

我的输入主题:

{ "ID":“145个”,“时间戳”:1552585938545,“周”:"\u0000",“源”:{“字符串”:"TMP“},”正文“:{”字符串“:”{\“operation_type\:\”插入\“,\”ROW_ID\“:{\”ROW_ID\“:null,\"LAST_UPD\":null,\"SIREN_SIRET\":null},\“ROW_ID\”:{\“ROW_ID\”:\“170309-*LAST_UPD\”:\“2019-03-14T17:52:18\”,\“面额\”:\“1-*\”,\“SIREN_SIRET\”:null} },"TYPE_ACTION":{“字符串”:“插入”}}

如何在新主题中添加Body中的其他字段?例子:

{ "ID":"145",“时间戳”:1552585938545,“周”:"\u0000",“源”:{“字符串”:"TMP“},”正文“:{”字符串“:”{\“operation_type\:\”插入\“,\”ROW_ID\“:{\”ROW_ID\“:null,\"LAST_UPD\":null},\“新”:{\“ROW_ID\”:\“170309-*\”,\“LAST_UPD\”:\“2019-03-14T17:52:18}”},"TYPE_ACTION":{“字符串”:“插入”}

EN

回答 1

Stack Overflow用户

发布于 2019-03-14 14:45:43

您可以使用.map()/.mapValues()函数作为流使用一个主题,并修改值/KeyValue。

示例:假设您想从avro记录中选择一列并发布到新的输出主题。

代码语言:javascript
运行
复制
// If you are using Schema registry, make sure to add the schema registry url 
// in streamConfiguration. Also specify the AvroSerde for VALUE_SERDE

final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, GenericRecord> inputStream = builder.stream("inputTopic");
final KStream<String, String> newStream = userProfiles.mapValues(value -> value.get("fieldName").toString());
subStream.to("outputTopic",Produced.with(Serdes.String(),Serdes.String());
final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);

此外,您还可以查看github上的示例:

https://github.com/confluentinc/kafka-streams-examples/blob/5.1.2-post/src/main/java/io/confluent/examples/streams/WikipediaFeedAvroExample.java

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/55164623

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档