首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >Spark正在对已排序的分区进行排序,导致性能损失。

Spark正在对已排序的分区进行排序,导致性能损失。
EN

Stack Overflow用户
提问于 2019-07-11 09:37:18
回答 1查看 1.5K关注 0票数 0

对于在分区中缓存、分区和排序的数据,使用where子句查询密钥时性能很好,而在同一键上使用小表执行联接时性能较差。

参见下面10Kx44K = 438M行的示例dataset dftest

代码语言:javascript
运行
复制
sqlContext.sql(f'set spark.sql.shuffle.partitions={32}')
sqlContext.clearCache()
sc.setCheckpointDir('/checkpoint/temp')
import datetime
from pyspark.sql.functions import *
from pyspark.sql import Row

start_date = datetime.date(1900, 1, 1)
end_date   = datetime.date(2020, 1, 1)

dates = [ start_date + datetime.timedelta(n) for n in range(int ((end_date - start_date).days))]

dfdates=spark.createDataFrame(list(map(lambda x: Row(date=x), dates))) # some dates
dfrange=spark.createDataFrame(list(map(lambda x: Row(number=x), range(10000)))) # some number range

dfjoin = dfrange.crossJoin(dfdates)
dftest = dfjoin.withColumn("random1", round(rand()*(10-5)+5,0)).withColumn("random2", round(rand()*(10-5)+5,0)).withColumn("random3", round(rand()*(10-5)+5,0)).withColumn("random4", round(rand()*(10-5)+5,0)).withColumn("random5", round(rand()*(10-5)+5,0)).checkpoint()
dftest = dftest.repartition("number").sortWithinPartitions("number", "date").cache()
dftest.count() # 438,290,000 rows

下面的查询现在大约需要1秒钟(在一个包含2个工作人员的小型集群上):

代码语言:javascript
运行
复制
dftest.where("number = 1000 and date = \"2001-04-04\"").count()

但是,当我编写类似于联接的条件时,需要2分钟:

代码语言:javascript
运行
复制
dfsub = spark.createDataFrame([(10,"1900-01-02",1),
  (1000,"2001-04-04",2),
  (4000,"2002-05-05",3),
  (5000,"1950-06-06",4),
  (9875,"1980-07-07",5)],
["number","date", "dummy"]).repartition("number").sortWithinPartitions("number", "date").cache()
df_result = dftest.join(dfsub, ( dftest.number == dfsub.number ) & ( dftest.date == dfsub.date ), 'inner').cache()
df_result.count() # takes 2 minutes (result = 5)

我原以为这个速度差不多也一样快。尤其是我希望更大的数据文件已经被集群和缓存了。看一下这个计划:

代码语言:javascript
运行
复制
== Physical Plan ==
InMemoryTableScan [number#771L, date#769, random1#775, random2#779, random3#784, random4#790, random5#797, number#945L, date#946, dummy#947L]
   +- InMemoryRelation [number#771L, date#769, random1#775, random2#779, random3#784, random4#790, random5#797, number#945L, date#946, dummy#947L], StorageLevel(disk, memory, deserialized, 1 replicas)
         +- *(3) SortMergeJoin [number#771L, cast(date#769 as string)], [number#945L, date#946], Inner
            :- *(1) Sort [number#771L ASC NULLS FIRST, cast(date#769 as string) ASC NULLS FIRST], false, 0
            :  +- *(1) Filter (isnotnull(number#771L) && isnotnull(date#769))
            :     +- InMemoryTableScan [number#771L, date#769, random1#775, random2#779, random3#784, random4#790, random5#797], [isnotnull(number#771L), isnotnull(date#769)]
            :           +- InMemoryRelation [number#771L, date#769, random1#775, random2#779, random3#784, random4#790, random5#797], StorageLevel(disk, memory, deserialized, 1 replicas)
            :                 +- Sort [number#771L ASC NULLS FIRST, date#769 ASC NULLS FIRST], false, 0
            :                    +- Exchange hashpartitioning(number#771L, 32)
            :                       +- *(1) Scan ExistingRDD[number#771L,date#769,random1#775,random2#779,random3#784,random4#790,random5#797]
            +- *(2) Filter (isnotnull(number#945L) && isnotnull(date#946))
               +- InMemoryTableScan [number#945L, date#946, dummy#947L], [isnotnull(number#945L), isnotnull(date#946)]
                     +- InMemoryRelation [number#945L, date#946, dummy#947L], StorageLevel(disk, memory, deserialized, 1 replicas)
                           +- Sort [number#945L ASC NULLS FIRST, date#946 ASC NULLS FIRST], false, 0
                              +- Exchange hashpartitioning(number#945L, 32)
                                 +- *(1) Scan ExistingRDD[number#945L,date#946,dummy#947L]

似乎花了很多时间按数字和日期对较大的数据进行排序(这一行:Sort [number#771L ASC NULLS FIRST, date#769 ASC NULLS FIRST], false, 0)。这给我留下了以下问题:

  • 在分区中,左侧和右侧的排序顺序完全相同,JOIN子句的最优排序顺序也是一样的,为什么Spark仍然会再次对分区进行排序?
  • 当5个连接记录匹配(最多)5个分区时,为什么要评估所有分区?
  • 催化剂似乎没有使用缓存数据的repartitionsortWithinPartitions的信息。在这种情况下使用sortWithinPartitions是否有意义?
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2019-07-16 06:03:54

我试着回答你的三个问题:

在分区中,左侧和右侧的排序顺序完全相同,JOIN子句的最优排序顺序也是一样的,为什么Spark仍然会再次对分区进行排序?

两个DataFrames中的排序顺序是不一样的,因为排序列date中的数据类型不同,在dfsub中是StringType,在dftest中是DateType,因此在join中看到两个分支中的排序是不同的,因此强制执行Sort

当5个连接记录匹配(最多)5个分区时,为什么要评估所有分区?

在查询计划处理过程中,Spark不知道在小型DataFrame中有多少分区是非空的,因此必须处理所有这些分区。

催化剂似乎没有使用缓存数据的重新分区和sortWithinPartitions的信息。在这种情况下使用sortWithinPartitions是否有意义?

火花优化器正在使用来自repartitionsortWithinPartitions的信息,但是对于它的工作方式有一些警告。要修复您的查询,同样重要的是使用在联接中使用的相同列(而不仅仅是一个列)重新分区。原则上,这不应该是必要的,并且有一个相关的jira正在努力解决这个问题。

下面是我对您的查询的建议更改:

  1. date列的类型更改为dftest中的StringType (或在dfsub中类似地更改为DateType ): Dftest.withColumn(“日期”,col(“日期”).cast(‘string’))
  2. 在两个DataFrames变化中 .repartition(数字) 至 .repartition(“号码”,“日期”)

在这些更改之后,您应该得到这样的计划:

代码语言:javascript
运行
复制
*(3) SortMergeJoin [number#1410L, date#1653], [number#1661L, date#1662], Inner
:- Sort [number#1410L ASC NULLS FIRST, date#1653 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(number#1410L, date#1653, 200)
:     +- *(1) Project [number#1410L, cast(date#1408 as string) AS date#1653, random1#1540, random2#1544, random3#1549, random4#1555, random5#1562]
:        +- *(1) Filter (isnotnull(number#1410L) && isnotnull(cast(date#1408 as string)))
:           +- *(1) Scan ExistingRDD[number#1410L,date#1408,random1#1540,random2#1544,random3#1549,random4#1555,random5#1562]
+- Sort [number#1661L ASC NULLS FIRST, date#1662 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(number#1661L, date#1662, 200)
      +- *(2) Filter (isnotnull(number#1661L) && isnotnull(date#1662))
         +- *(2) Scan ExistingRDD[number#1661L,date#1662,dummy#1663L]

因此,计划的每个分支中只有一个Exchange和一个Sort,这两个分支都来自您在转换中调用的repartitionsortWithinPartition,并且连接不会导致更多的排序或改组。还请注意,在我的计划中没有InMemoryTableScan,因为我没有使用缓存。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/56986185

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档