本文来解析一下kafka streams的KStreamBuilder以及举例如何自定义kafka streams的processor
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> source = builder.stream("demo-topic");
KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();
KStreamBuilder里头隐藏着Topology
kafka-streams-0.10.2.1-sources.jar!/org/apache/kafka/streams/kstream/KStreamBuilder.java
public class KStreamBuilder extends TopologyBuilder {
public <K, V> KStream<K, V> stream(final String... topics) {
return stream(null, null, null, topics);
}
public <K, V> KStream<K, V> stream(final AutoOffsetReset offsetReset,
final Serde<K> keySerde,
final Serde<V> valSerde,
final String... topics) {
final String name = newName(KStreamImpl.SOURCE_NAME);
addSource(offsetReset, name, keySerde == null ? null : keySerde.deserializer(), valSerde == null ? null : valSerde.deserializer(), topics);
return new KStreamImpl<>(this, name, Collections.singleton(name), false);
}
}
这里的addSource就是调用TopologyBuilder的方法
kafka-streams-0.10.2.1-sources.jar!/org/apache/kafka/streams/processor/TopologyBuilder.java
public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset, final String name, final Deserializer keyDeserializer, final Deserializer valDeserializer, final String... topics) {
if (topics.length == 0) {
throw new TopologyBuilderException("You must provide at least one topic");
}
Objects.requireNonNull(name, "name must not be null");
if (nodeFactories.containsKey(name))
throw new TopologyBuilderException("Processor " + name + " is already added.");
for (String topic : topics) {
Objects.requireNonNull(topic, "topic names cannot be null");
validateTopicNotAlreadyRegistered(topic);
maybeAddToResetList(earliestResetTopics, latestResetTopics, offsetReset, topic);
sourceTopicNames.add(topic);
}
nodeFactories.put(name, new SourceNodeFactory(name, topics, null, keyDeserializer, valDeserializer));
nodeToSourceTopics.put(name, Arrays.asList(topics));
nodeGrouper.add(name);
return this;
}
扫码关注腾讯云开发者
领取腾讯云代金券
Copyright © 2013 - 2025 Tencent Cloud. All Rights Reserved. 腾讯云 版权所有
深圳市腾讯计算机系统有限公司 ICP备案/许可证号:粤B2-20090059 深公网安备号 44030502008569
腾讯云计算(北京)有限责任公司 京ICP证150476号 | 京ICP备11018762号 | 京公网安备号11010802020287
Copyright © 2013 - 2025 Tencent Cloud.
All Rights Reserved. 腾讯云 版权所有