Spark Streaming 应用与实战全攻略

作者:小小默

一、背景与架构改造

1.1 问题描述

有一块业务主要是做爬虫抓取与数据输出,通过大数据这边提供的 SOA 服务入库到 HBase,架构大致如下:

架构改造之前

以对于以上的架构存在一些问题,我们可以看见数据在 Dubbox 服务阶段处理后直接通过 HBase API 入库了 HBase,中间并没做任何缓冲,要是 HBase 出现了问题整个集群都完蛋,没法写入数据,数据还丢失,HBase 这边压力也相当大,针对这一点,对入库 HBase 这个阶段做了一些改造。

1.2 架构改造

改造后的架构,爬虫通过接口服务,入库到 Kafka,Spark streaming 去消费 kafka 的数据,入库到 HBase. 核心组件如下图所示:

架构改造图

为什么不直接入库到 HBase,这样做有什么好处?

缓解了 HBase 这边峰值的压力,并且流量可控;

HBase 集群出现问题或者挂掉,都不会照成数据丢失的问题;

增加了吞吐量。

1.3 为什么选择 Kafka 和 Spark streaming

由于 Kafka 它简单的架构以及出色的吞吐量;

Kafka 与 Spark streaming 也有专门的集成模块;

Spark 的容错,以及现在技术相当的成熟。

二、通过代码实现具体细节,并运行项目

然后就开始写代码了,总体思路就是:

put 数据构造 json数据,写入 Kafka;

Spark Streaming 任务启动后首先去 Zookeeper 中去读取 offset,组装成 fromOffsets;

Spark Streaming 获取到 fromOffsets 后通过 KafkaUtils.createDirectStream 去消费 Kafka 的数据;

读取 Kafka 数据返回一个 InputDStream 的信息,foreachRDD 遍历,同时记录读取到的 offset 到 zk 中;

写入数据到 HBase。

详细一点的架构图

2.1 初始化与配置加载

下面是一些接收参数,加载配置,获取配置中的 topic,还有初始化配置,代码如下:

只是需要注意一下,这里的 KafkaCluster,需要把源码拷贝过来,修改一下,因为里面有些方法是私有的。copy 过来后改为 public 即可。

2.2 链接 ZK

注意:这里的 ZKStringSerializer,需要把源码拷贝过来,修改一下。

2.3 组装 fromOffsets

组装 fromOffsets,createDirectStream 接收的是一个map的结构,所以可以支持多个 topic的消费。

2.4 通过 createDirectStream 接受数据

使用 KafkaUtils 里面的 createDirectStream 方法去消费 kafka 数据,createDirectStream 使用的是 kafka 简单的 Consumer API,所以需要自己去管理 offset,我们把 offset 写入到 zk 中,这样也方便了一些监控软件读取记录。

2.5 入库

入库 HBase:

插入数据到具体 HBase 数据库:

2.6 运行并查看结果

运行命令:

运行后可以去 spark UI 中去查看相关运行情况,UI 中具体细节见下文。

Streaming Statistics数据统计图

Completed Batches

三、对 Streaming 监控的介绍以及解决实际问题

这部分主要在代码运行起来的情况下来看一下任务的运行情况主要是 streaming 的监控界面,以及我们怎么去通过监控界面发现问题和解决问题。

3.1 监控

官网中指出,spark 中专门为 SparkStreaming 程序的监控设置了额外的途径,当使用StreamingContext 时,在 WEB UI 中会出现一个”Streaming”的选项卡:

WEB UI中的“Streaming”选项卡

在此选项卡内,统计的内容展示如下:

Streaming 状态图

Spark streaming 处理速度为3s一次,每次1000条。

Kafka product 每秒 1000 条数据,与上面 spark consumer 消费者恰好相等。结果:数据量大导致积压,这个过程中 active Batches 会越变越大。

因为忽略了实际的 Processing time:

Active Batches

Completed Batches

Streaming Batches对应的趋势图

这其中包括接受的记录数量,每一个 batch 内处理的记录数,处理时间,以及总共消耗的时间。在上述参数之中最重要的两个参数分别是 Porcessing Time 以及 Scheduling Delay:

Porcessing Time 用来统计每个 batch 内处理数据所消费的时间

Scheduling Delay 用来统计在等待被处理所消费的时间

如果 PT 比 SD 大,或者 SD 持续上升,这就表明此系统不能对产生的数据实时响应,换句话来说就是,出现了处理时延,每个 batch time 内的处理速度小于数据的产生速度。

在这种情况下,读者需要想法减少数据的处理速度,即需要提升处理效率。

3.2 问题发现

在我做压测的时候, Spark streaming 处理速度为 3s 一次,每次 1000 条。

Kafka product 每秒 1000 条数据, 与上面 spark consumer 消费者恰好相等。于是就会数据量大导致积压,这个过程中 active Batches 会越变越大。最后发现了一个问题:

Streaming Batches对应的趋势图

当压测峰值过后 Input Size=0 events,时间仍然不减,奇怪!

Streaming Batches一些异常情况图

查看摸个具体 stage:

Streaming具体的stage信息

从图中, 我们可以看到 Spark 总共调度分发了两批次 task set, 每个 task set 的处理(含序列化和压缩之类的工作)都不超过 100 毫秒,那么该 Stage 何来消耗 4s 呢?慢着,貌似这两批次的 task set 分发的时间相隔得有点长啊,隔了 4 秒左右。为什么会隔这么就才调度一次呢?

Streaming 源码

调了之后的处理时间为 0.7 s:

Streaming Completed Batches正常

具体耗时如下:

Streaming 具体耗时信息图

四、对项目做压测与相关的优化

对项目做压测与相关的优化,主要从内存(executor-memory和driver-memory)、num-executors、executor-cores,以及代码层面做一些测试和改造。

4.1 压测

Spark streaming 处理速度为 3s 一次,每次 1000 条。

Kafka product 每秒 1000 条数据, 与上面 spark consumer 消费者恰好相等。结果:数据量大导致积压,这个过程中 active Batches 会越变越大。

调整 Kafka product 每秒 600 条数据,存在积压,但已经不严重:

Kafka product 每秒600条数据,存在积压

调整 Kafka product 每秒 500 条数据,为消费者 50%,测试结果显示正常,等待时间很稳定:

Kafka product 每秒500条数据,正常

但是。此时每秒吞吐量为 500 显然不够,通过调整间歇实际等,发现并没有变化:

Kafka product 每秒 500 条数据,可以看见没有在指定时间内消费完数据,照成数据积压,并发下降了。

Kafka product 每秒500条数据,没有在指定时间内消费完

4.2 分析原因

分析原因,发现大部分耗时都在处理数据这样一阶段,如下图所示:

Streaming 时间分析图

4.3 调整参数

调整 executor-cores:

executor-cores 2 并发上升至 700/s

executor-cores 3 并发上升至 750/s

调整 executor-cores 后

调整 executor 内存,并发没有增长,无效:

调整 am 内存,并发没有增长,无效:

4.4 代码调整

发现现在主要还是在处理数据的时候消耗时间一直没有减少,而处理数据查看后发现是一条一条的往 HBase 里面插入的,修改为批量插入,重新构建了 json. 性能猛增!!修改前的代码:

修改后的代码:

插入数据到 HBase:

4.5 运行

刚测试时给它相对很小的内存跑一跑:

五六万的插入没什么压力,但是到 10 万的时候,就有些卡顿了!!

yarn 容器、cpu、内存大小

五六万的插入没什么压力

当然是需要增大内存的,修改配置,都增加一倍:

yarn 容器、cpu、内存大小

90000的插入没什么压力

查看插入数据量,能看到修改后插入数据10万是没有什么压力的:

查看插入数据量,能看到修改后插入数据10万是没有什么压力的

当我们再继续加大压力测试的时候,性能下降:

当我们再继续加大压力测试的时候,性能下降

查看统计信息:

查看统计信息

今日好课推荐

纯洁的微笑《精通 Spring Boot 42 讲》

认真学完课程,将更了解 Spring Boot 技术栈的使用场景和实践方式,具有上手开发的基本能力。

▼长按识别一起学 Spring Boot

  • 发表于:
  • 原文链接https://kuaibao.qq.com/s/20181008B1G8L700?refer=cp_1026
  • 腾讯「云+社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。

扫码关注云+社区

领取腾讯云代金券