首页
学习
活动
专区
工具
TVP
发布

Kafka streams概览

先回答题图的3个问题:

什么是流处理(stream processing)?

与批处理对应的一个名词

数据源是持续不断产生数据的,而不是定期产生数据

对持续不断产生的数据持续处理即为流处理

Why kafka streams?

storm,spark等常用流处理工具倾向于基于kafka队列实现数据中转

kafka streams与kafka队列集成度最高,新特性最新被集成,比如不丢不重的特性

kafka streams作为商业公司confluent的主要卖点,有持续产品完善的预期及商业支持的可能

相对其他流处理框架易于运用

kafka streams 有什么特点?

streams是kafka队列的一个客户端应用,无runtime支持,多个streams需要应用自己管理

streams应用随队列的partition策略而rebalance

提供相对易用的DSL和更灵活的Processor两套API,Confluent平台还提供KSQL构建streams

一些概念:

Topology:定义stream中的各个组件及协同关系,分为不同的node,包括:

source, 从kafka topics中获取数据并传给porocessor

processor, 从上一个node中接收数据并处理数据,可以继续传给下一个processor,也可以传给sink

sink,从processor中获取数据并写入topics,

构建好了Topology之后,传给KafkaStreams,启动后就能按照这个Topology运行了。

StreamsBuilder,high-level DSL strem构建器,stream()返回KStream,table()返回KTable,

build() 返回一个Topology,用于传递给KafkaStreams, 可以通过Consumed参数指定一个自定义的参数,比如Serde等。Produced用于一个stream.to()或者throught()到另外一个stream或者table时指定参数。

KStream, 接口,一个key对应多个带有历史记录value的kv结构, 比如K人买了I1,I2两个东西,就是,两条记录进入stream中。

A KStream can be transformed record by record, joined with another KStream, KTable, GlobalKTable, or can be aggregated into a KTable

flatMapValues(ValuMapper(v, vr)) , 将value从v转换为vr,vr是一个Iterable接口的类, 底层实现是KStreamImpl ,调用Processor API进行处理。

count(), groupBy()之后可以count,支持 Materialized.as() 指定一个store,用于存储count返回的值 。

KTable,一个key只有一个最新状态的value的kv结构

KafkaStreams, kafka streams处理的客户端,内部有 KafkaProducer and KafkaConsumer ,通过KafkaStreams.start()启动一个流处理,可以多线程,通过shut down handler处理ctrl+c,调用close(),通过CountDownLatch控制。KafkaStreams构建时需要用到Topology和Properties

Processor API(PAPI):process(),transform()等,streams处理的底层接口,自定义程度更高,所写的代码更多。

时间概念:分为

event time:数据源产生数据的时间

processing time:数据被流处理程序处理的时间

ingestion time:数据存往队列的时间

用哪个时间通过kafka broker的配置来指定,可以分不同topic设置,

状态:中间状态通过store保存,通过interactive query查询。stateful operators操作之后一般需要保存状态,比如join(),aggregate()等

一致性保障:通过processing.guarantee配置,默认是at_least_once ,可以改为exactly_once

容错机制:partition本身有高可用和多份备份,state store也可以配置多备份,同时有topic changelog,如果没有多备份的statestore,将需要从changelog中恢复state store,耗时较长,可以考虑在streams中配置num.standby.replicas

StreamsConfig,通过properties配置。

application.id,同一个应用所有实例共用一个id,id名可以标上版本号,升级版本时如果不复用之前的数据,可以用不同的版本号以示区分。application.id默认会作为consumer和producer的client.id的前缀,同时作为consumer的group.id,作为state.dir的名字,kafka内部的topic名字前缀

stateless DSL:

branch() ,将一个stream按不同条件拆分多个子stream, 按条件顺序进行匹配,匹配上了就进入,条件可以不互斥

filter(),根据条件过滤

filterNot(),根据条件删除一些值

flatMap(),从一个key,value产生0到多个k,v对

flatMapValues(),从一个k,v产生多个k,v对, key保持不变

forEach(),Terminal operation,不返回stream,对stream每个元素做print等操作。

groupByKey(),按key分组,返回KGroupedStream,key不改变

groupBy(),指定分组条件,不按key分组用这个操作

map(),一对一的映射操作,k,v都可以改变

mapValues(),仅改变value

peek(),功能与forEach()相同,在stream中使用的话,处理结果没法通过kafka保存

stateful DSL:

aggregate(),

需要指定初始化累加值() ->0L

累加的方法: (key,value,aggregate) -> aggregate+value

如果涉及类型转换,通过Materialize.as().withValueSerde()指定相应的类型

windowedBy(),指定时间窗口,key变成了Windowed,stateStore使用WindowStore

reduce(),组合(combine) groupbykey之后的values,和aggregate不同的是,结果的类型不能改变,所以如果value是Long型,stream需要通过mapValues()转为Long型,然后进行reduce运算。reduce不需要设置初始化值。

join

使用KTable做join要求被join的两个对象满足co-partition,即有相同的partition数量,分区策略也要求相同,即相同的key必须在同一个partition编号中。如果不满足co-partition的要求,则可以对分区较少的一边重新写到一个新的stream或者table中去。

使用GlobalKTable对co-partition没有要求

stream之间的join需要指定time window

join的逻辑是基于key相等,然后对两个value进行操作。

Record cache

windowing

tumbling window,滚动窗口,窗口大小与前进的窗口大小正好相同的hopping window,窗口不重叠。通过TimeWindows.of()构造

hopping window,窗口大小固定,时间步进大小与窗口大小可以不一致。通过TimeWindows.of(windowSizeMs).advanceBy(advanceMs) 构造

sliding window,用于join()操作,通过JoinWindows.of()构造。与数据的timdestatmp有关系。

session window,通过SessionWindows.with(mills)构造,mills指定的是空闲时间,超过空闲时间的数据会放到一个新的session window中,在session活动期间内的数据都放在同一个session中,所以session window的窗口大小不是固定的。

interactive query

基于state store的查询接口,默认的state store是rocksdb

apache 上面的文档不完整,example代码得到confluentinc的github去找。

  • 发表于:
  • 原文链接http://kuaibao.qq.com/s/20180403G1LZMK00?refer=cp_1026
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券