前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Apache Spark大数据处理 - 性能分析(实例)

Apache Spark大数据处理 - 性能分析(实例)

作者头像
程序你好
发布2018-07-23 09:32:11
1.6K0
发布2018-07-23 09:32:11
举报
文章被收录于专栏:程序你好程序你好

介绍

今天的任务是将伦敦自行车租赁数据分为两组,周末和工作日。将数据分组到更小的子集进行进一步处理是一种常见的业务需求,我们将看到Spark如何帮助我们完成这项任务。

数据由167个CSV文件组成,总共6.5GB,我们将使用两个节点集群来处理它,每个节点集群有4GB的RAM和3个cpu。

在我们开始处理真实数据之前,了解Spark如何在集群中移动我们的数据,以及这与性能之间的关系是很有用的。Spark无法同时在内存中保存整个数据集,因此必须将数据写入驱动器或通过网络传递。这比内存中处理要慢得多,而且在这里经常出现性能瓶颈。

在理论上

分区

为了跨集群分配工作并减少每个节点的内存需求,Spark将数据分割为称为分区的更小的部分。然后,将其中的每一个发送给一个执行程序以进行处理。每个执行线程一次只计算一个分区,因此传递给执行程序的分区的大小和数量与完成所需的时间成正比。

数据偏斜(Data Skew)

通常,数据会根据一个键被分割成多个分区,例如一个名称的第一个字母。如果值在整个键中分布不均匀,那么将会有更多的数据被放置在一个分区中。一个例子是:

{Adam, Alex, Anja, Beth, Claire}-> A: {Adam, Alex, Anja}-> B: {Beth}-> C: {Clair}

这里的A分区比另外两个大3倍,因此需要大约3倍的时间来计算。由于下一阶段的处理必须在对所有三个分区进行评估之后才能开始,因此该阶段的总体结果将被延迟。

调度

在分割为多个分区时可能出现的另一个问题是,有太多的分区无法正确地覆盖可用执行程序的数量。下图给出了一个示例,其中有2个执行程序和3个分区。

Executor 1有一个额外的分区来计算,因此需要执行2次。这导致Executor 2有一半的工作时间是空闲的和未使用的。

解决方案

上述两个问题的最简单解决方案是增加用于计算的分区数量。这将减少向单个分区倾斜的影响,并允许更好地匹配cpu的调度。

一个常见的建议是每个CPU有4个分区,但是与Spark性能相关的设置非常依赖于具体情况,因此这个值应该与给定的场景进行微调。

洗牌

当在分区之间重新排列数据时,就会发生洗牌。当转换需要来自其他分区的信息时,比如将列中的所有值相加,就需要这样做。Spark将从每个分区收集所需的数据,并将其合并到一个新的分区中,可能是在不同的执行程序上。

在洗牌过程中,数据被写到磁盘上并通过网络传输,中断了Spark在内存中进行处理的能力,并导致性能瓶颈。因此,我们希望尝试减少正在进行的洗牌数量或减少正在洗牌的数据量。

Map-Side减少

在洗牌过程中聚合数据时,与其传递所有数据,不如合并当前分区中的值,只传递洗牌中的结果。这个过程称为map-side减少,通过减少在洗牌过程中传输的数据量来提高性能。

Spark开发人员在改进Spark提供的自动优化方面做了大量工作,特别是Dataset groupBy函数将在可能的情况下自动执行map-side减少。然而,仍有必要检查执行图和统计数据,以减少未发生的大洗牌。

在实践中

为了分割数据,我们将添加一个列,该列将开始日期转换为一周中的一天、工作日,然后添加一个布尔列,以确定这一天是周末还是周末。数据也需要一些清理,以消除错误的开始日期和持续时间。

Dataset<Row> data = getCleanedDataset(spark);

data = data.withColumn("Weekday", date_format(data.col("Start_Date"), "EEEE"));

data = data.withColumn("isWeekend", data.col("Weekday").equalTo("Saturday") .or(data.col("Weekday").equalTo("Sunday")));

最后,我们将基于isWeekend列重新划分数据,然后将其保存为Parquet格式。

data.repartition(data.col("isWeekend")).write() .parquet("cycle-data-results" + Time.now());

第一轮

当作业运行时,我们看到repartition命令执行洗牌并生成200个分区(spark缺省值),这应该提供极好的并行性;让我们看一下执行时间表。

200个分区执行时间线和度量

时间线看起来不平衡。在许多非常小的分区中,只有两个分区占用任何重要的执行时间,即使在两个较大的分区之间,处理也不是平均分割的,如果有什么区别的话,它们的比率大约是5比2。这表明数据倾斜,因为分区需要不同的时间来处理,并且还演示了前面提到的调度问题,第二个执行程序在最后60秒内处于空闲状态。

这种不平等的处理分割在Spark作业中很常见,提高性能的关键是找到这些问题,理解它们发生的原因,并在整个集群中正确地重新平衡它们。

为什么?

在这种情况下,之所以会发生这种情况,是因为调用repartition将同一键的所有值移动到同一执行程序上的同一个分区中。这里的关键是isWeekend是一个布尔值,这意味着只有两个分区将使用数据填充。Spark不能在其内部优化中考虑到这一点,因此提供了198个没有数据的其他分区。如果我们有超过两个可用的执行程序,它们将只接收空分区,并且在整个过程中都是空闲的,这将极大地减少集群的总吞吐量。

以这种方式进行分组也是内存异常的一个常见来源,因为对于大型数据集,单个分区可以很容易地获得多个GBs数据,并迅速超过分配的RAM。因此,我们必须考虑我们所选择的每个键的数据的可能比例,以及这些数据如何与我们的集群相关联。

第二轮

为了改进上述问题,我们需要对查询进行更改,以便更均匀地将数据分布到我们的分区和执行器中。

另一种编写查询的方法是将重分区委托给write方法。

data.write().partitionBy("isWeekend") .parquet("cycle-data-results" + Time.now());

在之前的案例中,Spark将CSV文件加载到69个分区中,将这些文件拆分为isWeekend,并将结果合并为200个新的分区。在新的解决方案中,Spark仍然将CSVs加载到69个分区中,但是它可以跳过shuffle阶段,认识到它可以基于密钥分割现有的分区,然后直接将数据写入到parquet文件中。查看执行时间轴,我们可以看到分区和节点之间更健康的扩展,并且不会发生任何洗牌。

改进执行时间和度量

结论

在这种情况下,写入时间从1.4分钟减少到0.3分钟,减少了79%,如果我们有一个节点较多的集群,这种差异会变得更加明显。此外,我们避免了3.4GB的洗牌读写,大大减少了集群上的网络和磁盘使用。

希望这篇文章对优化Spark作业提供了一些见解,并展示了如何从集群中获得最大的好处。

在Github可查看示例的源代码:/MatdeB-SL/Spark-Performance---Cycle-Hire-Data

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2018-06-24,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 程序你好 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 介绍
  • 在理论上
  • 在实践中
  • 结论
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档