首页
学习
活动
专区
圈层
工具
发布
50 篇文章
1
2021年大数据Spark(一):框架概述
2
2021年大数据Spark(二):四大特点
3
2021年大数据Spark(三):框架模块初步了解
4
2021年大数据Spark(四):三种常见的运行模式
5
2021年大数据Spark(五):大环境搭建本地模式 Local
6
2021年大数据Spark(六):环境搭建集群模式 Standalone
7
2021年大数据Spark(七):应用架构基本了解
8
2021年大数据Spark(八):环境搭建集群模式 Standalone HA
9
2021年大数据Spark(九):Spark On Yarn两种模式总结
10
2021年大数据Spark(十):环境搭建集群模式 Spark on YARN
11
2021年大数据Spark(十一):应用开发基于IDEA集成环境
12
2021年大数据Spark(十二):Spark Core的RDD详解
13
2021年大数据Spark(十三):Spark Core的RDD创建
14
2021年大数据Spark(十四):Spark Core的RDD操作
15
2021年大数据Spark(十五):Spark Core的RDD常用算子
16
2021年大数据Spark(十六):Spark Core的RDD算子练习
17
2021年大数据Spark(十七):Spark Core的RDD持久化
18
2021年大数据Spark(十八):Spark Core的RDD Checkpoint
19
2021年大数据Spark(十九):Spark Core的​​​​​​​共享变量
20
2021年大数据Spark(二十):Spark Core外部数据源引入
21
2021年大数据Spark(二十一):Spark Core案例-SogouQ日志分析
22
2021年大数据Spark(二十二):内核原理
23
2021年大数据Spark(二十三):SparkSQL 概述
24
2021年大数据Spark(二十四):SparkSQL数据抽象
25
2021年大数据Spark(二十五):SparkSQL的RDD、DF、DS相关操作
26
2021年大数据Spark(二十六):SparkSQL数据处理分析
27
2021年大数据Spark(二十七):SparkSQL案例一花式查询和案例二WordCount
28
2021年大数据Spark(二十八):SparkSQL案例三电影评分数据分析
29
2021年大数据Spark(二十九):SparkSQL案例四开窗函数
30
2021年大数据Spark(三十):SparkSQL自定义UDF函数
31
2021年大数据Spark(三十一):Spark On Hive
32
2021年大数据Spark(三十二):SparkSQL的External DataSource
33
2021年大数据Spark(三十三):SparkSQL分布式SQL引擎
34
2021年大数据Spark(三十四):Spark Streaming概述
35
2021年大数据Spark(三十五):SparkStreaming数据抽象 DStream
36
2021年大数据Spark(三十六):SparkStreaming实战案例一 WordCount
37
2021年大数据Spark(三十七):SparkStreaming实战案例二 UpdateStateByKey
38
2021年大数据Spark(三十八):SparkStreaming实战案例三 状态恢复 扩展
39
2021年大数据Spark(三十九):SparkStreaming实战案例四 窗口函数
40
2021年大数据Spark(四十):SparkStreaming实战案例五 TopN-transform
41
2021年大数据Spark(四十一):SparkStreaming实战案例六 自定义输出 foreachRDD
42
2021年大数据Spark(四十二):SparkStreaming的Kafka快速回顾与整合说明
43
2021年大数据Spark(四十三):SparkStreaming整合Kafka 0.10 开发使用
44
2021年大数据Spark(四十四):Structured Streaming概述
45
2021年大数据Spark(四十五):Structured Streaming Sources 输入源
46
2021年大数据Spark(四十六):Structured Streaming Operations 操作
47
2021年大数据Spark(四十七):Structured Streaming Sink 输出
48
2021年大数据Spark(四十八):Structured Streaming 输出终端/位置
49
2021年大数据Spark(四十九):Structured Streaming 整合 Kafka
50
2021年大数据Spark(五十):Structured Streaming 案例一实时数据ETL架构

2021年大数据Spark(四十九):Structured Streaming 整合 Kafka


​​​​​​​整合 Kafka

说明

http://spark.apache.org/docs/2.4.5/structured-streaming-kafka-integration.html

Apache Kafka 是目前最流行的一个分布式的实时流消息系统,给下游订阅消费系统提供了并行处理和可靠容错机制,现在大公司在流式数据的处理场景,Kafka基本是标配。

Structured Streaming很好的集成Kafka,可以从Kafka拉取消息,然后就可以把流数据看做一个DataFrame, 一张无限增长的大表,在这个大表上做查询,Structured Streaming保证了端到端的 exactly-once,用户只需要关心业务即可,不用费心去关心底层是怎么做的StructuredStreaming既可以从Kafka读取数据,又可以向Kafka 写入数据

添加Maven依赖:

代码语言:javascript
复制
dependency>

            <groupId>org.apache.spark</groupId>

            <artifactId>spark-sql-kafka-0-10_2.11</artifactId>

            <version>${spark.version}</version>

</dependency>
  • 注意:

目前仅支持Kafka 0.10.+版本及以上,底层使用Kafka New Consumer API拉取数据    

  • 消费位置

Kafka把生产者发送的数据放在不同的分区里面,这样就可以并行进行消费了。每个分区里面的数据都是递增有序的,跟structured commit log类似,生产者和消费者使用Kafka 进行解耦,消费者不管你生产者发送的速率如何,只要按照一定的节奏进行消费就可以了。每条消息在一个分区里面都有一个唯一的序列号offset(偏移量),Kafka 会对内部存储的消息设置一个过期时间,如果过期了,就会标记删除,不管这条消息有没有被消费。

Kafka 可以被看成一个无限的流,里面的流数据是短暂存在的,如果不消费,消息就过期滚动没了。如果开始消费,就要定一下从什么位置开始。

1.earliest:从最起始位置开始消费,当然不一定是从0开始,因为如果数据过期就清掉了,所以可以理解为从现存的数据里最小位置开始消费;

2.latest:从最末位置开始消费;

3.per-partition assignment:对每个分区都指定一个offset,然后从offset位置开始消费;

当第一次开始消费一个Kafka 流的时候,上述策略任选其一,如果之前已经消费了,而且做了 checkpoint ,这时候就会从上次结束的位置开始继续消费。目前StructuredStreaming和Flink框架从Kafka消费数据时,都支持上述的策略。

​​​​​​​Kafka特定配置

从Kafka消费数据时,相关配置属性可以通过带有kafka.prefix的DataStreamReader.option进行设置,例如前面设置Kafka Brokers地址属性:stream.option("kafka.bootstrap.servers", "host:port"),更多关于Kafka 生产者Producer Config配置属和消费者Consumer Config配置属性,参考文档:

 生产者配置(Producer Configs):

http://kafka.apache.org/20/documentation.html#producerconfigs

 消费者配置(New Consumer Configs):

http://kafka.apache.org/20/documentation.html#newconsumerconfigs

注意以下Kafka参数属性可以不设置,如果设置的话,Kafka source或者sink可能会抛出错误:

1)、group.id:Kafka source将会自动为每次查询创建唯一的分组ID;

2)、auto.offset.reset:在将source选项startingOffsets设置为指定从哪里开始。结构化流管理内部消费的偏移量,而不是依赖Kafka消费者来完成。这将确保在topic/partitons动态订阅时不会遗漏任何数据。注意,只有在启动新的流式查询时才会应用startingOffsets,并且恢复操作始终会从查询停止的位置启动

3)、key.deserializer/value.deserializer:Keys/Values总是被反序列化为ByteArrayDeserializer的字节数组,使用DataFrame操作显式反序列化keys/values;

4)、key.serializer/value.serializer:keys/values总是使用ByteArraySerializer或StringSerializer进行序列化,使用DataFrame操作将keysvalues/显示序列化为字符串或字节数组;

5)、enable.auto.commit:Kafka source不提交任何offset;

6)、interceptor.classes:Kafka source总是以字节数组的形式读取key和value。使用ConsumerInterceptor是不安全的,因为它可能会打断查询;

​​​​​​​KafkaSoure

Structured Streaming消费Kafka数据,采用的是poll方式拉取数据,与Spark Streaming中New Consumer API集成方式一致。从Kafka Topics中读取消息,需要指定数据源(kafka)、Kafka集群的连接地址(kafka.bootstrap.servers)、消费的topic(subscribe或subscribePattern), 指定topic 的时候,可以使用正则来指定,也可以指定一个 topic 的集合。

官方提供三种方式从Kafka topic中消费数据,主要区别在于每次消费Topic名称指定,

1.消费一个Topic数据

2.消费多个Topic数据

3.消费通配符匹配Topic数据

从Kafka 获取数据后Schema字段信息如下,既包含数据信息有包含元数据信息:

在实际开发时,往往需要获取每条数据的消息,存储在value字段中,由于是binary类型,需要转换为字符串String类型;此外了方便数据操作,通常将获取的key和value的DataFrame转换为Dataset强类型,伪代码如下:

从Kafka数据源读取数据时,可以设置相关参数,包含必须参数和可选参数:

  •  必须参数:kafka.bootstrap.servers和subscribe,可以指定开始消费偏移量assign。
  •  可选参数:

​​​​​​​KafkaSink

往Kafka里面写数据类似读取数据,可以在DataFrame上调用writeStream来写入Kafka,设置参数指定value,其中key是可选的,如果不指定就是null。

  • 配置说明

将DataFrame写入Kafka时,Schema信息中所需的字段:

需要写入哪个topic,可以像上述所示在操作DataFrame 的时候在每条record上加一列topic字段指定,也可以在DataStreamWriter上指定option配置

写入数据至Kafka,需要设置Kafka Brokers地址信息及可选配置:

1.kafka.bootstrap.servers,使用逗号隔开【host:port】字符;

2.topic,如果DataFrame中没有topic列,此处指定topic表示写入Kafka Topic。

官方提供示例代码如下:

下一篇
举报
领券