我对kafka流很陌生,我想用阅读一个主题,并在一个新的主题中编写其中的一部分。我的键是string,value是Avro,有文档/示例可以使用吗?
编辑:
final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, GenericRecord> inputStream = builder.stream("Test_CX_TEST_KAFKA_X");
final KStream<String, String> newStream = inputStream.mapValues(value -> value.get("ID").toString());
newStream.to("SUB_TOPIC",Produced.with(Serdes.String(),Serdes.String()));
final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
streams.start();
在SUB_TOPIC中,我有:
键:{ "ID":"145“}时间戳: 2019年3月14日17:52:23.43偏移: 12分区:0
我的输入主题:
{ "ID":“145个”,“时间戳”:1552585938545,“周”:"\u0000",“源”:{“字符串”:"TMP“},”正文“:{”字符串“:”{\“operation_type\:\”插入\“,\”ROW_ID\“:{\”ROW_ID\“:null,\"LAST_UPD\":null,\"SIREN_SIRET\":null},\“ROW_ID\”:{\“ROW_ID\”:\“170309-*LAST_UPD\”:\“2019-03-14T17:52:18\”,\“面额\”:\“1-*\”,\“SIREN_SIRET\”:null} },"TYPE_ACTION":{“字符串”:“插入”}}
如何在新主题中添加Body中的其他字段?例子:
{ "ID":"145",“时间戳”:1552585938545,“周”:"\u0000",“源”:{“字符串”:"TMP“},”正文“:{”字符串“:”{\“operation_type\:\”插入\“,\”ROW_ID\“:{\”ROW_ID\“:null,\"LAST_UPD\":null},\“新”:{\“ROW_ID\”:\“170309-*\”,\“LAST_UPD\”:\“2019-03-14T17:52:18}”},"TYPE_ACTION":{“字符串”:“插入”}
发布于 2019-03-14 14:45:43
您可以使用.map()/.mapValues()函数作为流使用一个主题,并修改值/KeyValue。
示例:假设您想从avro记录中选择一列并发布到新的输出主题。
// If you are using Schema registry, make sure to add the schema registry url
// in streamConfiguration. Also specify the AvroSerde for VALUE_SERDE
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, GenericRecord> inputStream = builder.stream("inputTopic");
final KStream<String, String> newStream = userProfiles.mapValues(value -> value.get("fieldName").toString());
subStream.to("outputTopic",Produced.with(Serdes.String(),Serdes.String());
final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
此外,您还可以查看github上的示例:
https://stackoverflow.com/questions/55164623
复制相似问题