我们有一个用例,在这个用例中,我们必须将一些消息读入KStreams,然后将消息转换为另一个有条件的主题。
在我们的用例中,对于对象的转换,我们进行了下游API调用。如果API调用是成功的,那么生成到newTopic1,否则生成到newTopic2。怎样才能达到同样的目标?
到目前为止,我们正在使用以下方式,使用Streams提供的到方法为新的Kafka主题生成丰富的(即转换的对象)。
KStream<String, Customer> transformedStream = sourceKafkaStream
.mapValues(cust -> {
try {
logger.info("Hitting to the downstream to fetch additional information, will take few seconds.");
Thread.sleep(7000);
return RecordEnrichmentAndTransformationBuilder.enrichTheCustomer(cust);
} catch (InterruptedException e) {
e.printStackTrace();
}
return cust;
});
.to('newTopic1', Produced.with(AppSerdes.String(), AppSerdes.serdeForEnrichedCustomer()));感谢你对此的回应。
发布于 2020-10-02 11:48:40
使用DSL,您可以使用KStream::filter或KStream:to(TopicNameExtractor<K, V> topicExtractor, Produced<K, V> produced)。
如果两种格式相同,则示例代码将类似于:
KStream<String, Customer> transformedStream = sourceKafkaStream
.mapValues(cust -> {
try {
logger.info("Hitting to the downstream to fetch additional information, will take few seconds.");
Thread.sleep(7000);
return RecordEnrichmentAndTransformationBuilder.enrichTheCustomer(cust);
} catch (InterruptedException e) {
e.printStackTrace();
}
return cust;
});
.to((key, value, recordContext) -> topicNameCalculation(key, value), Produced.with(AppSerdes.String(), AppSerdes.serdeForEnrichedCustomer()));topicNameCalculation(...)将基于键和值选择正确的主题。
One注意到,通常在卡夫卡流中进行外部调用并不是很好的方法。
https://stackoverflow.com/questions/64170421
复制相似问题