前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >自定义kafka streams的processor

自定义kafka streams的processor

作者头像
code4it
发布2018-09-17 15:10:14
8290
发布2018-09-17 15:10:14
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文来解析一下kafka streams的KStreamBuilder以及举例如何自定义kafka streams的processor

实例

代码语言:javascript
复制
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> source = builder.stream("demo-topic");
KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();

KStreamBuilder里头隐藏着Topology

KStreamBuilder

kafka-streams-0.10.2.1-sources.jar!/org/apache/kafka/streams/kstream/KStreamBuilder.java

代码语言:javascript
复制
public class KStreamBuilder extends TopologyBuilder {

    public <K, V> KStream<K, V> stream(final String... topics) {
        return stream(null, null, null, topics);
    }
    public <K, V> KStream<K, V> stream(final AutoOffsetReset offsetReset,
                                       final Serde<K> keySerde,
                                       final Serde<V> valSerde,
                                       final String... topics) {
        final String name = newName(KStreamImpl.SOURCE_NAME);

        addSource(offsetReset, name,  keySerde == null ? null : keySerde.deserializer(), valSerde == null ? null : valSerde.deserializer(), topics);

        return new KStreamImpl<>(this, name, Collections.singleton(name), false);
    }
}

这里的addSource就是调用TopologyBuilder的方法

TopologyBuilder

kafka-streams-0.10.2.1-sources.jar!/org/apache/kafka/streams/processor/TopologyBuilder.java

代码语言:javascript
复制
public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset, final String name, final Deserializer keyDeserializer, final Deserializer valDeserializer, final String... topics) {
        if (topics.length == 0) {
            throw new TopologyBuilderException("You must provide at least one topic");
        }
        Objects.requireNonNull(name, "name must not be null");
        if (nodeFactories.containsKey(name))
            throw new TopologyBuilderException("Processor " + name + " is already added.");

        for (String topic : topics) {
            Objects.requireNonNull(topic, "topic names cannot be null");
            validateTopicNotAlreadyRegistered(topic);
            maybeAddToResetList(earliestResetTopics, latestResetTopics, offsetReset, topic);
            sourceTopicNames.add(topic);
        }
        nodeFactories.put(name, new SourceNodeFactory(name, topics, null, keyDeserializer, valDeserializer));
        nodeToSourceTopics.put(name, Arrays.asList(topics));
        nodeGrouper.add(name);

        return this;
    }

doc

  • Kafka设计解析(七)- 流式计算的新贵 Kafka Stream
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2017-10-16,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 实例
  • KStreamBuilder
  • TopologyBuilder
  • doc
相关产品与服务
流计算 Oceanus
流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的企业级实时大数据分析平台,具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档