前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >自定义kafka streams的processor

自定义kafka streams的processor

作者头像
code4it
发布于 2018-09-17 07:10:14
发布于 2018-09-17 07:10:14
89200
代码可运行
举报
文章被收录于专栏:码匠的流水账码匠的流水账
运行总次数:0
代码可运行

本文来解析一下kafka streams的KStreamBuilder以及举例如何自定义kafka streams的processor

实例

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> source = builder.stream("demo-topic");
KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();

KStreamBuilder里头隐藏着Topology

KStreamBuilder

kafka-streams-0.10.2.1-sources.jar!/org/apache/kafka/streams/kstream/KStreamBuilder.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class KStreamBuilder extends TopologyBuilder {

    public <K, V> KStream<K, V> stream(final String... topics) {
        return stream(null, null, null, topics);
    }
    public <K, V> KStream<K, V> stream(final AutoOffsetReset offsetReset,
                                       final Serde<K> keySerde,
                                       final Serde<V> valSerde,
                                       final String... topics) {
        final String name = newName(KStreamImpl.SOURCE_NAME);

        addSource(offsetReset, name,  keySerde == null ? null : keySerde.deserializer(), valSerde == null ? null : valSerde.deserializer(), topics);

        return new KStreamImpl<>(this, name, Collections.singleton(name), false);
    }
}

这里的addSource就是调用TopologyBuilder的方法

TopologyBuilder

kafka-streams-0.10.2.1-sources.jar!/org/apache/kafka/streams/processor/TopologyBuilder.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset, final String name, final Deserializer keyDeserializer, final Deserializer valDeserializer, final String... topics) {
        if (topics.length == 0) {
            throw new TopologyBuilderException("You must provide at least one topic");
        }
        Objects.requireNonNull(name, "name must not be null");
        if (nodeFactories.containsKey(name))
            throw new TopologyBuilderException("Processor " + name + " is already added.");

        for (String topic : topics) {
            Objects.requireNonNull(topic, "topic names cannot be null");
            validateTopicNotAlreadyRegistered(topic);
            maybeAddToResetList(earliestResetTopics, latestResetTopics, offsetReset, topic);
            sourceTopicNames.add(topic);
        }
        nodeFactories.put(name, new SourceNodeFactory(name, topics, null, keyDeserializer, valDeserializer));
        nodeToSourceTopics.put(name, Arrays.asList(topics));
        nodeGrouper.add(name);

        return this;
    }

doc

  • Kafka设计解析(七)- 流式计算的新贵 Kafka Stream
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2017-10-16,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
Kafka核心API——Stream API
Kafka Stream是Apache Kafka从0.10版本引入的一个新Feature,它提供了对存储于Kafka内的数据进行流式处理和分析的功能。简而言之,Kafka Stream就是一个用来做流计算的类库,与Storm、Spark Streaming、Flink的作用类似,但要轻量得多。
端碗吹水
2020/09/23
3.7K0
Kafka核心API——Stream API
最简单流处理引擎——Kafka Streams简介
Kafka在0.10.0.0版本以前的定位是分布式,分区化的,带备份机制的日志提交服务。而kafka在这之前也没有提供数据处理的顾服务。大家的流处理计算主要是还是依赖于Storm,Spark Streaming,Flink等流式处理框架。
用户6070864
2019/09/05
1.6K0
最简单流处理引擎——Kafka Streams简介
Kafka Streams之WordCount
(1)Stream 从topic中取出每一条数据记录 (<key, value>格式): <null, “Spark and spark”>
全栈程序员站长
2022/11/16
6140
kafka stream word count实例
kafka呢其实正道不是消息队列,本质是日志存储系统,而stream processing是其最近大力推广的特性,本文简单介绍下word count的实例。
code4it
2018/09/17
1K0
如何保证Kafka顺序消费
在分布式消息系统中,消息的顺序性是一个重要的问题。Apache Kafka 提供了多种机制来确保消息的顺序消费,但需要根据具体的使用场景进行配置和设计。以下是一些确保 Kafka 顺序消费的关键点和方法:
小马哥学JAVA
2024/07/03
1.4K0
Kafka Stream(KStream) vs Apache Flink
腾讯云流计算 Oceanus 是大数据实时化分析利器,兼容 Apache Flink 应用程序。新用户可以 1 元购买流计算 Oceanus(Flink) 集群,欢迎读者们体验使用。
吴云涛
2021/11/28
4.9K0
Kafka Stream(KStream) vs Apache Flink
kafka streams的join实例
这里使用的是inner join,也有left join,也有outer join。如果要记录在时间窗口没有匹配上的记录,可以使用outer join,额外存储下来,然后再根据已经匹配的记录再过滤一次。
code4it
2018/09/17
1.6K0
快速学习-Kafka Streams
Kafka Streams。Apache Kafka开源项目的一个组成部分。是一个功能强大,易于使用的库。用于在Kafka上构建高可分布式、拓展性,容错的应用程序。
cwl_java
2020/02/20
8530
学习kafka教程(二)
Kafka Streams编写关键任务实时应用程序和微服务的最简单方法,是一个用于构建应用程序和微服务的客户端库,其中输入和输出数据存储在Kafka集群中。它结合了在客户端编写和部署标准Java和Scala应用程序的简单性和Kafka服务器端集群技术的优点。
用户3467126
2019/07/03
9330
学习kafka教程(二)
学习kafka教程(三)
Kafka流通过构建Kafka生产者和消费者库,并利用Kafka的本地功能来提供数据并行性、分布式协调、容错和操作简单性,从而简化了应用程序开发。 下图展示了一个使用Kafka Streams库的应用程序的结构。
用户3467126
2019/07/03
9860
腾讯面试:Kafka如何处理百万级消息队列?
在今天的大数据时代,处理海量数据已成为各行各业的标配。特别是在消息队列领域,Apache Kafka 作为一个分布式流处理平台,因其高吞吐量、可扩展性、容错性以及低延迟的特性而广受欢迎。但当面对真正的百万级甚至更高量级的消息处理时,如何有效地利用 Kafka,确保数据的快速、准确传输,成为了许多开发者和架构师思考的问题。本文将深入探讨 Kafka 的高级应用,通过10个实用技巧,帮助你掌握处理百万级消息队列的艺术。
程序员江小北
2024/02/21
2890
腾讯面试:Kafka如何处理百万级消息队列?
Kafka学习(一)-------- Quickstart
截至2019年7月8日 最新版本为 2.3.0 2.12为编译的scala版本 2.3.0为kafka版本
大数据流动
2019/08/08
5770
kafka的JavaAPI操作
一、创建maven工程并添加jar包 创建maven工程并添加以下依赖jar包的坐标到pom.xml
程序狗
2021/12/28
4880
Kafka扩展内容
Intercetpor的实现接口是org.apache.kafka.clients.producer.ProducerInterceptor。
matt
2022/10/25
3400
Kafka Streams 核心讲解
•Kafka Stream 提供了一个非常简单而轻量的 Library,它可以非常方便地嵌入任意Java应用中,也可以任意方式打包和部署•除了 Kafka 外,无任何外部依赖•充分利用 Kafka 分区机制实现水平扩展和顺序性保证•通过可容错的 state store 实现高效的状态操作(如 windowed join 和aggregation)•支持正好一次处理语义•提供记录级的处理能力,从而实现毫秒级的低延迟•支持基于事件时间的窗口操作,并且可处理晚到的数据(late arrival of records)•同时提供底层的处理原语 Processor(类似于 Storm 的 spout 和 bolt),以及高层抽象的DSL(类似于 Spark 的 map/group/reduce)
java达人
2021/06/21
2.7K0
Kafka Streams 核心讲解
【首席架构师看Event Hub】Kafka深挖 -第2部分:Kafka和Spring Cloud Stream
在这个博客系列的第1部分之后,Apache Kafka的Spring——第1部分:错误处理、消息转换和事务支持,在这里的第2部分中,我们将关注另一个增强开发者在Kafka上构建流应用程序时体验的项目:Spring Cloud Stream。
架构师研究会
2019/10/23
2.6K0
【首席架构师看Event Hub】Kafka深挖 -第2部分:Kafka和Spring Cloud Stream
快速入门Kafka系列(6)——Kafka的JavaAPI操作
作为快速入门Kafka系列的第六篇博客,本篇为大家带来的是Kafka的JavaAPI操作~
大数据梦想家
2021/01/27
5570
快速入门Kafka系列(6)——Kafka的JavaAPI操作
第二天:Kafka API操作
Kafka的Producer发送消息采用的是异步发送的方式。在消息发送的过程中,涉及到了两个线程——main线程和Sender线程,以及一个线程共享变量——RecordAccumulator。main线程将消息发送给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到Kafka broker。
sowhat1412
2020/11/05
8290
第二天:Kafka  API操作
大数据技术之_10_Kafka学习_Kafka概述+Kafka集群部署+Kafka工作流程分析+Kafka API实战+Kafka Producer拦截器+Kafka Streams
1)点对点模式(一对一,消费者主动拉取数据,消息收到后消息清除)   点对点模型通常是一个基于拉取或者轮询的消息传送模型,这种模型从队列中请求信息,而不是将消息推送到客户端。这个模型的特点是发送到队列的消息被一个且只有一个接收者接收处理,即使有多个消息监听者也是如此。
黑泽君
2019/03/15
1.2K0
【Spring底层原理高级进阶】Spring Kafka:实时数据流处理,让业务风起云涌!️
Spring Kafka 是 Spring Framework 提供的一个集成 Apache Kafka 的库,用于构建基于 Kafka 的实时数据流处理应用程序。Apache Kafka 是一个高性能、分布式的流数据平台,广泛用于构建可扩展的、实时的数据处理管道。
苏泽
2024/03/10
1.1K0
【Spring底层原理高级进阶】Spring Kafka:实时数据流处理,让业务风起云涌!️
相关推荐
Kafka核心API——Stream API
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验