前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >最简单流处理引擎——Kafka Streams简介

最简单流处理引擎——Kafka Streams简介

作者头像
用户6070864
发布于 2019-09-05 09:33:52
发布于 2019-09-05 09:33:52
1.6K00
代码可运行
举报
文章被收录于专栏:实时流式计算实时流式计算
运行总次数:0
代码可运行

Kafka在0.10.0.0版本以前的定位是分布式,分区化的,带备份机制的日志提交服务。而kafka在这之前也没有提供数据处理的顾服务。大家的流处理计算主要是还是依赖于StormSpark Streaming,Flink等流式处理框架。

Storm,Spark Streaming,Flink流处理的三驾马车各有各的优势.

Storm低延迟,并且在市场中占有一定的地位,目前很多公司仍在使用。

Spark Streaming借助Spark的体系优势,活跃的社区,也占有一定的份额。

而Flink在设计上更贴近流处理,并且有便捷的API,未来一定很有发展。但是他们都离不开Kafka的消息中转,所以Kafka于0.10.0.0版本推出了自己的流处理框架,Kafka Streams。

Kafka的定位也正式成为Apache Kafka® is a distributed streaming platform,分布式流处理平台。

实时流式计算

近几年来实时流式计算发展迅速,主要原因是实时数据的价值和对于数据处理架构体系的影响。实时流式计算包含了 无界数据 近实时 一致性 可重复结果 等等特征。a type of data processing engine that is designed with infinite data sets in mind 一种考虑了无线数据集的数据处理引擎。

1、无限数据:一种不断增长的,基本上无限的数据集。这些通常被称为“流式数据”。无限的流式数据集可以称为无界数据,相对而言有限的批量数据就是有界数据。

2、无界数据处理:一种持续的数据处理模式,应用于上面的无界数据。批量处理数据(离线计算)也可以重复运行来处理数据,但是会有性能的瓶颈。

3、低延迟,近实时的结果:相对于离线计算而言,离线计算并没有考虑延迟的问题。

解决了两个问题,流处理可以提代批处理系统:

1、正确性:有了这个,就和批量计算等价了。

Streaming需要能随着时间的推移依然能计算一定时间窗口的数据。Spark Streaming通过微批的思想解决了这个问题,实时与离线系统进行了一致性的存储,这一点在未来的实时计算系统中都应该满足。

2、推理时间的工具:这可以让我们超越批量计算。

好的时间推理工具对于处理不同事件的无界无序数据至关重要。

而时间又分为事件时间和处理时间。

还有很多实时流式计算的相关概念,这里不做赘述。

Kafka Streams简介

Kafka Streams被认为是开发实时应用程序的最简单方法。它是一个Kafka的客户端API库,编写简单的java和scala代码就可以实现流式处理。

优势:
  • 弹性,高度可扩展,容错
  • 部署到容器,VM,裸机,云
  • 同样适用于小型,中型和大型用例
  • 与Kafka安全性完全集成
  • 编写标准Java和Scala应用程序
  • 在Mac,LinuxWindows上开发
  • Exactly-once 语义
用例:

纽约时报使用Apache Kafka和Kafka Streams将发布的内容实时存储和分发到各种应用程序和系统,以供读者使用。

Pinterest大规模使用Apache Kafka和Kafka Streams来支持其广告基础架构的实时预测预算系统。使用Kafka Streams,预测比以往更准确。

作为欧洲领先的在线时尚零售商,Zalando使用Kafka作为ESB(企业服务总线),帮助我们从单一服务架构转变为微服务架构。使用Kafka处理 事件流使我们的技术团队能够实现近乎实时的商业智能

荷兰合作银行是荷兰三大银行之一。它的数字神经系统Business Event Bus由Apache Kafka提供支持。它被越来越多的财务流程和服务所使用,其中之一就是Rabo Alerts。此服务会在财务事件时实时向客户发出警报,并使用Kafka Streams构建。

LINE使用Apache Kafka作为我们服务的中央数据库,以便彼此通信。每天产生数亿亿条消息,用于执行各种业务逻辑,威胁检测,搜索索引和数据分析。LINE利用Kafka Streams可靠地转换和过滤主题,使消费者可以有效消费的子主题,同时由于其复杂而简单的代码库,保持易于维护性。

Topology

Kafka Streams通过一个或多个拓扑定义其计算逻辑,其中拓扑是通过流(边缘)和流处理器(节点)构成的图。

拓扑中有两种特殊的处理器

  • 源处理器:源处理器是一种特殊类型的流处理器,没有任何上游处理器。它通过使用来自这些主题的记录并将它们转发到其下游处理器,从一个或多个Kafka主题为其拓扑生成输入流。
  • 接收器处理器:接收器处理器是一种特殊类型的流处理器,没有下游处理器。它将从其上游处理器接收的任何记录发送到指定的Kafka主题。

在正常处理器节点中,还可以把数据发给远程系统。因此,处理后的结果可以流式传输回Kafka或写入外部系统。

Kafka在这当中提供了最常用的数据转换操作,例如mapfilterjoinaggregations等,简单易用。

当然还有一些关于时间,窗口,聚合,乱序处理等。未来再一一做详细介绍,下面我们进行简单的入门案例开发。

快速入门

首先提供WordCount的java版和scala版本。

java8+:

代码语言:javascript
代码运行次数:0
运行
复制
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.state.KeyValueStore;

import java.util.Arrays;
import java.util.Properties;

public class WordCountApplication {

    public static void main(final String[] args) throws Exception {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> textLines = builder.stream("TextLinesTopic");
        KTable<String, Long> wordCounts = textLines
            .flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+")))
            .groupBy((key, word) -> word)
            .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"));
        wordCounts.toStream().to("WordsWithCountsTopic", Produced.with(Serdes.String(), Serdes.Long()));

        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();
    }

}

scala:

代码语言:javascript
代码运行次数:0
运行
复制
import java.util.Properties
import java.util.concurrent.TimeUnit

import org.apache.kafka.streams.kstream.Materialized
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala._
import org.apache.kafka.streams.scala.kstream._
import org.apache.kafka.streams.{KafkaStreams, StreamsConfig}

object WordCountApplication extends App {
  import Serdes._

  val props: Properties = {
    val p = new Properties()
    p.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application")
    p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092")
    p
  }

  val builder: StreamsBuilder = new StreamsBuilder
  val textLines: KStream[String, String] = builder.stream[String, String]("TextLinesTopic")
  val wordCounts: KTable[String, Long] = textLines
    .flatMapValues(textLine => textLine.toLowerCase.split("\\W+"))
    .groupBy((_, word) => word)
    .count()(Materialized.as("counts-store"))
  wordCounts.toStream.to("WordsWithCountsTopic")

  val streams: KafkaStreams = new KafkaStreams(builder.build(), props)
  streams.start()

  sys.ShutdownHookThread {
     streams.close(10, TimeUnit.SECONDS)
  }
}

如果kafka已经启动了,可以跳过前两步。

1、下载

下载 2.3.0版本并解压缩它。请注意,有多个可下载的Scala版本,我们选择使用推荐的版本(2.12):

代码语言:javascript
代码运行次数:0
运行
复制
> tar -xzf kafka_2.12-2.3.0.tgz
> cd kafka_2.12-2.3.0

2、启动

Kafka使用ZooKeeper,因此如果您还没有ZooKeeper服务器,则需要先启动它。

代码语言:javascript
代码运行次数:0
运行
复制
> bin/zookeeper-server-start.sh config/zookeeper.properties
[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...

启动Kafka服务器:

代码语言:javascript
代码运行次数:0
运行
复制
> bin/kafka-server-start.sh config/server.properties
[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
...

3、创建topic 启动生产者

我们创建名为streams-plaintext-input的输入主题和名为streams-wordcount-output的输出主题:

代码语言:javascript
代码运行次数:0
运行
复制
> bin/kafka-topics.sh --create \
    --bootstrap-server localhost:9092 \
    --replication-factor 1 \
    --partitions 1 \
    --topic streams-plaintext-input
Created topic "streams-plaintext-input".


> bin/kafka-topics.sh --create \
    --bootstrap-server localhost:9092 \
    --replication-factor 1 \
    --partitions 1 \
    --topic streams-wordcount-output \
    --config cleanup.policy=compact
Created topic "streams-wordcount-output".

查看:

代码语言:javascript
代码运行次数:0
运行
复制
> bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe

Topic:streams-plaintext-input   PartitionCount:1    ReplicationFactor:1 Configs:
    Topic: streams-plaintext-input  Partition: 0    Leader: 0   Replicas: 0 Isr: 0
Topic:streams-wordcount-output  PartitionCount:1    ReplicationFactor:1 Configs:cleanup.policy=compact
    Topic: streams-wordcount-output Partition: 0    Leader: 0   Replicas: 0 Isr: 0

4、启动WordCount

以下命令启动WordCount演示应用程序:

代码语言:javascript
代码运行次数:0
运行
复制
> bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo

演示应用程序将从输入主题stream-plaintext-input读取,对每个读取消息执行WordCount算法的计算,并连续将其当前结果写入输出主题streams-wordcount-output。因此,除了日志条目之外不会有任何STDOUT输出,因为结果会写回Kafka。

现在我们可以在一个单独的终端中启动控制台生成器,为这个主题写一些输入数据:

代码语言:javascript
代码运行次数:0
运行
复制
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input

并通过在单独的终端中使用控制台使用者读取其输出主题来检查WordCount演示应用程序的输出:

代码语言:javascript
代码运行次数:0
运行
复制
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
    --topic streams-wordcount-output \
    --from-beginning \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

5、处理数据

我们在生产者端输入一些数据。

代码语言:javascript
代码运行次数:0
运行
复制
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input
all streams lead to kafka

输出端:

代码语言:javascript
代码运行次数:0
运行
复制
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
    --topic streams-wordcount-output \
    --from-beginning \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

all     1
streams 1
lead    1
to      1
kafka   1

继续输入:

代码语言:javascript
代码运行次数:0
运行
复制
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input
all streams lead to kafka
hello kafka streams
代码语言:javascript
代码运行次数:0
运行
复制
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
    --topic streams-wordcount-output \
    --from-beginning \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

all     1
streams 1
lead    1
to      1
kafka   1
hello   1
kafka   2
streams 2

我们看到随着数据实时输入,wordcount的结果实时的输出了。

6、停止程序

您现在可以通过Ctrl-C按顺序停止控制台使用者,控制台生产者,Wordcount应用程序,Kafka代理和ZooKeeper服务器。

什么是Kafka? Kafka监控工具汇总 Kafka快速入门 Kafka核心之Consumer Kafka核心之Producer

替代Flume——Kafka Connect简介

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-09-04,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 实时流式计算 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
Kafka核心API——Stream API
Kafka Stream是Apache Kafka从0.10版本引入的一个新Feature,它提供了对存储于Kafka内的数据进行流式处理和分析的功能。简而言之,Kafka Stream就是一个用来做流计算的类库,与Storm、Spark Streaming、Flink的作用类似,但要轻量得多。
端碗吹水
2020/09/23
3.7K0
Kafka核心API——Stream API
最简单流处理引擎——Kafka Streams简介
Kafka在0.10.0.0版本以前的定位是分布式,分区化的,带备份机制的日志提交服务。而kafka在这之前也没有提供数据处理的顾服务。大家的流处理计算主要是还是依赖于Storm,Spark Streaming,Flink等流式处理框架。
用户6070864
2019/09/05
1.6K0
最简单流处理引擎——Kafka Streams简介
Kafka Streams之WordCount
(1)Stream 从topic中取出每一条数据记录 (<key, value>格式): <null, “Spark and spark”>
全栈程序员站长
2022/11/16
6140
kafka stream word count实例
kafka呢其实正道不是消息队列,本质是日志存储系统,而stream processing是其最近大力推广的特性,本文简单介绍下word count的实例。
code4it
2018/09/17
1K0
如何保证Kafka顺序消费
在分布式消息系统中,消息的顺序性是一个重要的问题。Apache Kafka 提供了多种机制来确保消息的顺序消费,但需要根据具体的使用场景进行配置和设计。以下是一些确保 Kafka 顺序消费的关键点和方法:
小马哥学JAVA
2024/07/03
1.4K0
Kafka Stream(KStream) vs Apache Flink
腾讯云流计算 Oceanus 是大数据实时化分析利器,兼容 Apache Flink 应用程序。新用户可以 1 元购买流计算 Oceanus(Flink) 集群,欢迎读者们体验使用。
吴云涛
2021/11/28
4.9K0
Kafka Stream(KStream) vs Apache Flink
kafka streams的join实例
这里使用的是inner join,也有left join,也有outer join。如果要记录在时间窗口没有匹配上的记录,可以使用outer join,额外存储下来,然后再根据已经匹配的记录再过滤一次。
code4it
2018/09/17
1.6K0
快速学习-Kafka Streams
Kafka Streams。Apache Kafka开源项目的一个组成部分。是一个功能强大,易于使用的库。用于在Kafka上构建高可分布式、拓展性,容错的应用程序。
cwl_java
2020/02/20
8530
学习kafka教程(二)
Kafka Streams编写关键任务实时应用程序和微服务的最简单方法,是一个用于构建应用程序和微服务的客户端库,其中输入和输出数据存储在Kafka集群中。它结合了在客户端编写和部署标准Java和Scala应用程序的简单性和Kafka服务器端集群技术的优点。
用户3467126
2019/07/03
9330
学习kafka教程(二)
学习kafka教程(三)
Kafka流通过构建Kafka生产者和消费者库,并利用Kafka的本地功能来提供数据并行性、分布式协调、容错和操作简单性,从而简化了应用程序开发。 下图展示了一个使用Kafka Streams库的应用程序的结构。
用户3467126
2019/07/03
9860
腾讯面试:Kafka如何处理百万级消息队列?
在今天的大数据时代,处理海量数据已成为各行各业的标配。特别是在消息队列领域,Apache Kafka 作为一个分布式流处理平台,因其高吞吐量、可扩展性、容错性以及低延迟的特性而广受欢迎。但当面对真正的百万级甚至更高量级的消息处理时,如何有效地利用 Kafka,确保数据的快速、准确传输,成为了许多开发者和架构师思考的问题。本文将深入探讨 Kafka 的高级应用,通过10个实用技巧,帮助你掌握处理百万级消息队列的艺术。
程序员江小北
2024/02/21
2890
腾讯面试:Kafka如何处理百万级消息队列?
Kafka学习(一)-------- Quickstart
截至2019年7月8日 最新版本为 2.3.0 2.12为编译的scala版本 2.3.0为kafka版本
大数据流动
2019/08/08
5770
kafka的JavaAPI操作
一、创建maven工程并添加jar包 创建maven工程并添加以下依赖jar包的坐标到pom.xml
程序狗
2021/12/28
4880
Kafka扩展内容
Intercetpor的实现接口是org.apache.kafka.clients.producer.ProducerInterceptor。
matt
2022/10/25
3400
Kafka Streams 核心讲解
•Kafka Stream 提供了一个非常简单而轻量的 Library,它可以非常方便地嵌入任意Java应用中,也可以任意方式打包和部署•除了 Kafka 外,无任何外部依赖•充分利用 Kafka 分区机制实现水平扩展和顺序性保证•通过可容错的 state store 实现高效的状态操作(如 windowed join 和aggregation)•支持正好一次处理语义•提供记录级的处理能力,从而实现毫秒级的低延迟•支持基于事件时间的窗口操作,并且可处理晚到的数据(late arrival of records)•同时提供底层的处理原语 Processor(类似于 Storm 的 spout 和 bolt),以及高层抽象的DSL(类似于 Spark 的 map/group/reduce)
java达人
2021/06/21
2.7K0
Kafka Streams 核心讲解
【首席架构师看Event Hub】Kafka深挖 -第2部分:Kafka和Spring Cloud Stream
在这个博客系列的第1部分之后,Apache Kafka的Spring——第1部分:错误处理、消息转换和事务支持,在这里的第2部分中,我们将关注另一个增强开发者在Kafka上构建流应用程序时体验的项目:Spring Cloud Stream。
架构师研究会
2019/10/23
2.6K0
【首席架构师看Event Hub】Kafka深挖 -第2部分:Kafka和Spring Cloud Stream
快速入门Kafka系列(6)——Kafka的JavaAPI操作
作为快速入门Kafka系列的第六篇博客,本篇为大家带来的是Kafka的JavaAPI操作~
大数据梦想家
2021/01/27
5570
快速入门Kafka系列(6)——Kafka的JavaAPI操作
第二天:Kafka API操作
Kafka的Producer发送消息采用的是异步发送的方式。在消息发送的过程中,涉及到了两个线程——main线程和Sender线程,以及一个线程共享变量——RecordAccumulator。main线程将消息发送给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到Kafka broker。
sowhat1412
2020/11/05
8290
第二天:Kafka  API操作
大数据技术之_10_Kafka学习_Kafka概述+Kafka集群部署+Kafka工作流程分析+Kafka API实战+Kafka Producer拦截器+Kafka Streams
1)点对点模式(一对一,消费者主动拉取数据,消息收到后消息清除)   点对点模型通常是一个基于拉取或者轮询的消息传送模型,这种模型从队列中请求信息,而不是将消息推送到客户端。这个模型的特点是发送到队列的消息被一个且只有一个接收者接收处理,即使有多个消息监听者也是如此。
黑泽君
2019/03/15
1.2K0
【Spring底层原理高级进阶】Spring Kafka:实时数据流处理,让业务风起云涌!️
Spring Kafka 是 Spring Framework 提供的一个集成 Apache Kafka 的库,用于构建基于 Kafka 的实时数据流处理应用程序。Apache Kafka 是一个高性能、分布式的流数据平台,广泛用于构建可扩展的、实时的数据处理管道。
苏泽
2024/03/10
1.1K0
【Spring底层原理高级进阶】Spring Kafka:实时数据流处理,让业务风起云涌!️
相关推荐
Kafka核心API——Stream API
更多 >
目录
  • 实时流式计算
  • Kafka Streams简介
    • 优势:
    • 用例:
    • Topology
  • 快速入门
    • 1、下载
    • 2、启动
    • 3、创建topic 启动生产者
    • 4、启动WordCount
    • 5、处理数据
    • 6、停止程序
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档