前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Kafka Streams 核心讲解

Kafka Streams 核心讲解

作者头像
java达人
发布2021-06-21 19:45:42
2.4K0
发布2021-06-21 19:45:42
举报
文章被收录于专栏:java达人java达人

Kafka Stream 的特点如下:

•Kafka Stream 提供了一个非常简单而轻量的 Library,它可以非常方便地嵌入任意Java应用中,也可以任意方式打包和部署•除了 Kafka 外,无任何外部依赖•充分利用 Kafka 分区机制实现水平扩展和顺序性保证•通过可容错的 state store 实现高效的状态操作(如 windowed join 和aggregation)•支持正好一次处理语义•提供记录级的处理能力,从而实现毫秒级的低延迟•支持基于事件时间的窗口操作,并且可处理晚到的数据(late arrival of records)•同时提供底层的处理原语 Processor(类似于 Storm 的 spout 和 bolt),以及高层抽象的DSL(类似于 Spark 的 map/group/reduce)

一般流式计算会与批量计算相比较。在流式计算模型中,输入是持续的,可以认为在时间上是无界的,也就意味着,永远拿不到全量数据去做计算。同时,计算结果是持续输出的,也即计算结果在时间上也是无界的。流式计算一般对实时性要求较高,同时一般是先定义目标计算,然后数据到来之后将计算逻辑应用于数据。同时为了提高计算效率,往往尽可能采用增量计算代替全量计算

Kafka Stream 作为流式处理类库,直接提供具体的类给开发者调用,整个应用的运行方式主要由开发者控制,方便使用和调试。

流处理拓扑结构

流(Stream)是 Kafka Stream 的一个非常重要的抽象概念,代表一个无界的、持续更新的数据集。Stream 是一个有序、可重演、容错并且不可变的数据集,它的数据是以 key-value 的方式定义的。

流处理程序(stream processing application)是指所有应用了 Kafka Streams library 的程序。流处理程序通过一个以上的处理器拓扑结构(processor topology)定义计算逻辑,其中处理器拓扑结构是一个连接到流(边界)的流处理器(节点)。

流处理器(stream processor)是处理器拓扑结构的一个节点;它代表一个处理步骤:从拓扑结构中的前置流处理器接收输入数据并按逻辑转换数据,随后向拓扑结构的后续流处理器提供一个或者多个结果数据。

拓扑结构中有两种特殊的处理器:Source Processor:Source Processor 是一种没有前置节点的特殊流处理器。它从一个或者多个 Kafka Topic 消费数据并产出一个输入流给到拓扑结构的后续处理节点。

Sink Processor:sink processor 是一种特殊的流处理器,没有处理器需要依赖于它。它从前置流处理器接收数据并传输给指定的 Kafka Topic 。

注意:一个正常的处理器节点在处理记录的同时是可以访问其他远程系统。因此,它的处理结果既可以写入到其他远程系统,也可以回流到 Kafka 系统中。

Kafka Streams 提供两种定义流处理拓扑结构的方式:Kafka Streams DSL提供 了一些常用的、开箱即用的数据转换操作,比如:map, filter, join 和 aggregations ;而底层的 Processor API 则允许 开发者定义和连接自定义的处理器,并且可以与 state stores 交互。处理器拓扑结构仅仅是对流处理代码的抽象。在程序运行时,逻辑拓扑结构会实例化并在应用程序中复制以进行并行处理。(详细信息可参考 Stream Partitions and Tasks )。

Time

流处理中很关键的一点是 时间(time) 的概念,以及它的模型设计、如何被整合到系统中。比如有些操作(如 窗口(windowing) ) 就是基于时间边界进行定义的。

流处理中关于时间的一些常见概念:

Event time : 事件或者数据记录产生的时间点,即事件在“源头”发生时的原始时间点。举个例子:如果是汽车GPS传感器产生的地理位置变化的事件,则 Event time 就是GPS传感器捕获到位置发生变更的时间。

Processing time :数据被流处理程序加工的时间,也就是数据被消费的时间。处理事件的时间会比时间产生的原始时间晚几毫秒、几个小时甚至是几天。举个例子:假设一个分析应用程序从汽车传感器读取和处理地理位置数据,并将结果呈现给车队管理仪表板。在这种情况下,分析应用程序的 processing-time 可能比 event time 晚几毫秒或几秒(例如,基于 Apache Kafka 和 Kafka Stream 的实时管道)或者晚几个小时(例如,基于 Apache Hadoop 或 Apache Spark 的批处理管道)。

Ingestion time :事件或者数据记录被 Kafka Broker 保存到 topic partition 的时间点。与 Event time 的不同之处在于 Ingestion time 的时间戳是在记录被 Kafka Broker 添加到目标 Topic 的时候产生的,而不是在记录的源头产生的。与 Processing time 的区别在于处理时间是流处理应用程序开始处理记录的时间。例如:如果记录没有被处理,就没有 Processing time 的概念,但是 ingestion time 是存在的。

选用 event-time 还是 ingestion-time 是通过 Kafka (不是 Kafka Streams)来配置的。从 Kafka 0.10.x 开始,时间戳是自动嵌入到 Kafka 的消息中。至于这些时间戳是 event-time 还是 ingestion-time 取决于 Kafka 的配置。这些配置在 Broker 层面 和 Topic 层面都可以进行设置。Kafka Streams 中默认的时间戳抽取器会原样获取这些嵌入的时间戳。因此,应用程序中时间的语义取决于生效的嵌入时间戳相关的 Kafka 配置。

Kafka Streams 通过 TimestampExtractor 接口来给每条记录分配时间戳。每条记录的时间戳描述了关于流处理与 time 相关的信息,并且被诸如 window 之类的 time-dependent 的操作所使用。因此,这些 time 仅在新纪录到达 processor 的时候才有用。我们将应用程序中的以数据驱动的 time 称为 stream time 以区别于程序运行时的 wall-clock time 。不同的 TimestampExtractor 的具体实现将为 stream time 定义提供不同的语义。例如,基于数据的实际内容来检索或计算时间戳,比如嵌入时间戳字段以提供 event time 语义,以及返回当前的 wall-clock time 以便为 stream time 提供 processing time 语义。因此开发者可以基于自己的业务需要来实施不同的 time 概念。

最后,当 Kafka Streams 应用程序向 Kafka 写记录时,程序也会给这些新记录分配时间戳。时间戳的分配方式取决于上下文:

当通过处理一些输入记录来生成新的输出记录时,例如,在 process() 函数调用中触发的 context.forward() ,输出记录的时间戳是直接从输入记录的时间戳中继承而来的。当新的输出记录是通过 Punctuator#punctuate() 之类的周期性函数产生的,输出记录时间戳被定义为当前流任务的内部时间(通过context.timestamp() 函数生成)。对于聚合操作,聚合结果的时间戳将是触发聚合更新的最新到达的输入记录的时间戳。

聚合

聚合操作采用一个输入流或表,并通过将多个输入记录合并为一个输出记录来产生一个新表。聚合的示例是计算数量或总和。在 Kafka Streams DSL中,聚合的输入流可以是 KStream 或 KTable,但是输出流始终是KTable。这使得Kafka Streams在值产生和发出之后,如果记录无序到达,则可以更新汇总值。当这种无序记录到达时,聚合的 KStream 或 KTable 会发出新的聚合值。由于输出是一个KTable,因此在后续处理步骤中,新值将使用相同的键覆盖旧值。

流表对偶性

实际上,在实现流处理用例时,通常既需要流又需要数据库。在实践中非常常见的示例用例是电子商务应用程序,该应用程序使用来自数据库表的最新客户信息来富化客户交易的传入流。换句话说,流无处不在,但数据库也无处不在。

因此,任何流处理技术都必须为流和表提供优先的支持。Kafka的Streams API通过其对流和表的核心抽象提供了此类功能,我们将在稍后讨论。现在,有趣的发现是流与表之间实际上存在着紧密的关系,即所谓的流表对偶性。Kafka通过多种方式利用这种对偶性:例如,使您的应用程序具有弹性,支持容错的有状态处理或针对应用程序的最新处理结果运行交互式查询。而且,除了内部使用之外,Kafka Streams API 还允许开发人员在自己的应用程序中利用这种对偶性。

在讨论诸如 Kafka Streams 中的聚合之类的概念之前,我们必须首先更详细地介绍表,然后讨论上述流表对偶。本质上,这种对偶性意味着流可以看作是一个表,而表可以看作是一个流。例如,Kafka 的日志压缩功能利用了这种对偶性。

表的一种简单形式是键-值对的集合,也称为映射或关联数组。这样的表可能如下所示:

流表对偶描述了流和表之间的紧密关系。

流作为表:流可以视为表的更改日志,其中流中的每个数据记录都捕获表的状态更改。因此,流是变相的表,并且通过从头到尾重播更改日志重建表,可以很容易地将其变成“真实”表。类似地,在一个更一般的类比中,在流中聚合数据记录(例如,根据页面浏览事件流计算用户的页面浏览总数)将返回一个表(此处的键和值为用户及其对应的网页浏览量)。

表作为流:表在某个时间点可以视为流中每个键的最新值的快照(流的数据记录是键值对)。因此,表是变相的流,并且可以通过迭代表中的每个键值条目将其轻松转换为“真实”流。让我们用一个例子来说明这一点。想象一下,某表跟踪用户的总浏览量(下图的第一列)。随着时间的流逝,无论何时处理了新的综合浏览量事件,表的状态都会相应地更新。在这里,状态在不同时间点之间的变化以及表的不同版本可以表示为变更日志流(第二列)。

有趣的是,由于流表的对偶性,相同的流可用于重建原始表(第三列):

例如,使用相同的机制,通过更改数据捕获(CDC)复制数据库,并在 Kafka Streams 中使用跨机器复制其所谓的状态存储以实现容错。流表对偶是一个非常重要的概念,Kafka Streams通过KStream,KTable和 GlobalKTable 接口对其进行显式建模。

KStream是一个数据流,可以认为所有记录都通过Insert only的方式插入进这个数据流里。而KTable代表一个完整的数据集,可以理解为数据库中的表。由于每条记录都是Key-Value对,这里可以将Key理解为数据库中的 Primary Key,而Value可以理解为一行记录。可以认为KTable中的数据都是通过Update only的方式进入的。也就意味着,如果KTable对应的Topic中新进入的数据的Key已经存在,那么从KTable只会取出同一Key对应的最后一条数据,相当于新的数据更新了旧的数据。以下图为例,假设有一个KStream和KTable,基于同一个Topic创建,并且该Topic中包含如下图所示5条数据。此时遍历KStream将得到与Topic内数据完全一样的所有5条数据,且顺序不变。而此时遍历KTable时,因为这5条记录中有3个不同的Key,所以将得到3条记录,每个Key对应最新的值,并且这三条数据之间的顺序与原来在Topic中的顺序保持一致。这一点与Kafka的日志compact相同。

此时如果对该KStream和KTable分别基于key做Group,对Value进行Sum,得到的结果将会不同。对KStream的计算结果是<Jack,4>,<Lily,7>,<Mike,4>。而对Ktable的计算结果是<Mike,4>,<Jack,3>,<Lily,5>。

PROCESSING GUARANTEES

在流处理领域,最常被问到的问题是:“即使在处理过程中遇到了一些故障,流处理系统是否保证每个记录只处理一次?” 不能保证 "exactly-once" 处理方式对于许多不能容忍任何数据丢失或数据重复的应用程序来说是一种破坏,在这种情况下,除了流处理管道之外,通常还会使用面向批处理的框架,也就是所谓的 Lambda 架构。在0.11.0.0之前, Kafka 仅提供 "at-least-once" 的传递保证,因此任何利用它作为后端存储的流处理系统都不能保证端到端 "exactly-once" 语义。实际上,即使对于那些声称支持 "exactly-once" 语义的流处理系统,只要他们将 Kafka 系统作为读/写 的源/目标,他们的应用程序实际上并不能保证在整个流水线中不会产生重复。自从0.11.0.0版本发布以来,Kafka 允许 Producer 以一种事务性的和幂等的方式向不同的 topic partition 发送消息提供强有力的支持,而 Kafka Streams 则通过利用这些特性来增加了端到端的 "exactly-once" 处理语义。更具体地说,它保证对于从 Kafka topics 读取的任何记录的处理结果将在 Kafka topic 输出结果中反映一次,在 state stores 中也仅进行一次状态操作。需要注意的是,Kafka Streams 的端到端一次性语义与其他流处理框架的主要区别在于,Kafka Streams 与底层的 Kafka 存储系统紧密集成,并确保输入 topics offset 的提交,state stores 的更新和写入输出 topics 的原子性,而不是将 Kafka 视为可能有副作用的外部系统。要详细了解如何在 Kafka Streams 内完成此操作,建议读者阅读 KIP-129 。在运行 Kafka 流应用程序时,为了实现 exactly-once 语义,用户需要设置 processing.guarantee 参数的值为 exactly_once (默认值为 at_least_once )。更多细节请参考 Kafka Streams Configs 部分.

乱序处理

除了保证每条记录将被完全处理一次之外,许多流处理应用程序还将面临的另一个问题是如何处理可能影响其业务逻辑的乱序数据。在 Kafka Streams 中,有两种原因可能会导致相对于时间戳的无序数据到达。在主题分区中,记录的时间戳及其偏移可能不会单调增加。由于 Kafka Streams 始终会尝试按照偏移顺序处理主题分区中的记录,因此它可能导致在相同主题中具有较大时间戳(但偏移量较小)的记录比具有较小时间戳(但偏移量较大)的记录要早处理。在可能正在处理多个主题分区的流任务中,如果用户将应用程序配置为不等待所有分区都包含一些缓冲的数据,并从时间戳最小的分区中选取来处理下一条记录,则稍后再处理从其他主题分区获取的记录时,则它们的时间戳可能小于从另一主题分区获取的已处理记录的时间戳。对于无状态操作,无序数据不会影响处理逻辑,因为一次只考虑一条记录,而无需查看过去已处理记录的历史;但是对于有状态操作(例如聚合和join),乱序数据可能会导致处理逻辑不正确。如果用户要处理此类乱序数据,通常需要允许其应用程序等待更长的时间,同时在等待时间内记录其状态,即在延迟,成本和正确性之间权衡。在Kafka Streams中,具体而言,用户可以为窗口聚合配置其窗口运算,以实现这种权衡(详细信息可以在《开发人员指南》中找到)。对于join,用户必须意识到,某些乱序数据无法通过增加Streams的延迟和成本来处理。对于Stream-Stream连接,所有三种类型(inner,outer,left)都可以正确处理乱序记录,但是对于左连接,结果流可能包含不必要的leftRecord-null;对于outer连接,结果流可能包含leftRecord-null或null-rightRecord 。

对于Stream-Table连接,不处理乱序记录(即Streams应用程序不检查乱序记录,而仅以偏移顺序处理所有记录),因此可能会产生不可预知的结果。

对于Table-Table连接,不处理乱序记录(即Streams应用程序不检查乱序记录,而仅以偏移顺序处理所有记录)。但是,join结果是变更日志流,因此最终将会一致。

架构

Stream Partitions and Tasks

Kafka 的消息层对数据进行分区存储并传输,而 Kafka Streams 对数据分区并处理。在这两种情形下,分区是为了实现数据本地化,弹性,可扩展性,高性能和容错性。Kafka Streams 使用 partitions 和 tasks 的概念作为并行模型的逻辑单元,它的并行模型是基于 Kafka topic partition 。Kafka Streams 和 Kafka 之间有着紧密的联系:

•每个 stream partition 都是完全有序的数据记录序列,并可以映射到 Kafka 的 topic partition 。•stream 中的一个数据记录可以映射到该主题的对应的Kafka 消息。•数据记录的 key值 决定了该记录在 Kafka 和 Kafka Stream 中如何被分区,即数据如何路由到 topic 的特定分区。

应用程序的处理器拓扑结构通过将其分解为多个任务来实现可拓展性。更具体地说,Kafka Streams 根据应用程序的 input stream partitions 创建固定数量的任务,每个任务都分配了来自 input stream (即 Kafka topic )的一些 partitions。任务与 partitions 的对应关系是永远不会改变,因此每个任务都是应用程序的固定并行单元。任务可以基于所分配的分区实例化它们自己的处理器拓扑结构;它们还为每个分配的分区保留一个缓冲区,并从这些记录缓冲区中按照 one-at-a-time 的方式处理消息。故流任务可以独立并行处理,无需人工干预。

我们需要明确一个很重要的观点:Kafka Streams 不是一个资源管理器,而是一个库,这个库“运行”在其流处理应用程序所需要的任何位置。应用程序的多个实例可以在同一台机器上执行,也可以分布在多台机器上,任务可以由库自动分配给正在运行的应用程序实例。任务与 partitions 的对应关系是不会改变的;如果应用程序实例失败,则其所有分配给它的任务将在其他实例上自动重新启动,并继续从相同的流分区中消费数据。

下图显示了两个任务,每个任务分配 input stream 的 一个 partition。

Threading Model

Kafka Streams 允许用户配置应用程序实例中可并行的线程数量。每个线程都可以按照处理器拓扑结构独立执行一个或多个任务。例如,下图显示了一个运行两个流任务的流线程。

启动更多流线程或更多的应用程序实例仅仅意味着可以复制更多的拓扑结构来处理不同的Kafka分区子集,从而有效地并行处理。值得注意的是,线程之间没有共享状态,所以不需要线程间协调。这使得跨应用程序实例和线程并行运行拓扑结构变得非常简单。Kafka Streams 通过利用 Kafka 的协作机制(Kafka's coordination)在各个流线程之间分配 Kafka topic partition,这对于用户来说是透明的。

如上所述,使用 Kafka Streams 扩展流处理应用程序非常简单:你只需要为程序启动额外的实例,然后 Kafka Streams 负责在应用程序实例中的任务之间分配分区。您可以启动与 input Kafka topic partitions 一样多的应用程序线程,以便在应用程序的所有正在运行的实例中,每个线程(或者说它运行的任务)至少有一个要处理的 input partition 。

本地状态存储(Local State Stores)

Kafka Streams 提供了所谓的 state stores ,它可以被流处理应用程序用来存储和查询数据,这是实现有状态操作时的一项重要功能。例如, Kafka Streams DSL 会在您调用诸如 join()或 aggregate()等有状态运算符时,或者在窗口化一个流时自动创建和管理 state stores 。

Kafka Streams 应用程序中的每个流任务都可以嵌入一个或多个可通过API访问的 local state stores ,以存储和查询处理过程所需的数据。Kafka Streams 为这些 local state stores 提供容错和自动恢复功能。

下图中的两个流任务都具有专用的 local state stores 。

Fault Tolerance

Kafka Streams 是基于 Kafka 原生的容错功能。Kafka partitions 是高可用和可复制的;因此当流数据持久化到 Kafka 之后,即使应用程序失败,数据也仍然可用并可重新处理。Kafka Streams 利用 Kafka consumer client 提供的容错机制来处理失败的情况。如果某台服务器上运行的某个任务失败了,则 Kafka Streams 会自动在应用程序剩余的某个运行实例中重新启动该任务。

此外,Kafka Streams 也确保 local state stores 的健壮性。对于每个 state store ,它都会维护一个可复制的 changelog Kafka topic 以便跟踪任何状态更新。这些 changelog topics 也进行了分区,以便每个 local state store 实例以及访问这些 store 的任务都有其自己专用的 changelog topic partition 。在 changelog topics 上会启用 日志压缩(Log compaction),以便可以安全地清除旧数据以防止 topic 无限增长。如果任务在一台故障的服务器上运行,并在另一台服务器上重新启动,则 Kafka Streams 保证在另一台服务器启动需要恢复的任务之前,会回滚相应的 changelog topics ,将其关联的 state stores 恢复成失败前的内容。因此,故障处理对最终用户来说是完全透明的。

请注意,任务(重新)初始化的时间通常取决于恢复 state 的时间(主要是回滚 state stores 相关联的 changelog topics 的时间)。为了尽可能缩短恢复时间,用户可以将应用程序配置为具有备用副本(standby replicas)的local states(即完全可复制的 state 副本)。当发生任务迁移时,Kafka Streams 会尝试将任务分配给已存在备用副本的应用程序实例,以最大程度地缩短任务(重新)初始化时间。请在 Kafka Streams Configs 部分查看 num.standby.replicas 配置项。

java达人

ID:drjava

(长按或扫码识别)

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-06-06,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 java达人 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 流处理拓扑结构
  • Time
  • 聚合
  • 流表对偶性
  • PROCESSING GUARANTEES
  • 乱序处理
  • 架构
  • Stream Partitions and Tasks
  • Threading Model
  • 本地状态存储(Local State Stores)
  • Fault Tolerance
相关产品与服务
数据库
云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档