我有一个脚本片段,我运行在不同的集群设置上,在pyspark 2.4上
v1 = spark.read.parquet(os.path.join(v1_prefix, 'df1.parquet'))
v2 = spark.read.parquet(os.path.join(v2_prefix, 'df2.parquet'))
out = v1.join(v2, [v1.Id == v2.Id, v1.Year == v2.Year, v1.Month == v2.Month])
for x in v1.columns:
tmp = out.select(v1[x].alias(x + '_old'), v2[x].alias(x + '_new')).filter('{}_old != {}_new'.format(x,x ))
if tmp.count() > 0:
tmp.show()
这两种都是带有200+列和150万条记录的数据格式,因此out dataframe有400+列,这些列可以相互比较以确定是否存在差异。
我假设在2节点集群中,数据被划分在不同的执行器上并被洗牌,这降低了性能。
如何改进out数据same,使其分布均匀,并至少具有与使用SMAK2.4在单个节点上运行的性能相同的性能?
发布于 2020-07-30 11:45:39
一个广播连接应该会有帮助。
import pyspark.sql.functions as F
out = v1.join(F.broadcast(v2), [v1.Id == v2.Id, v1.Year == v2.Year, v1.Month == v2.Month])
只有150万行的DataFrame应该足够小,可以广播。应该广播较小的DataFrame (在您的示例中,两者的大小似乎大致相同)。
尝试使用概述的out.select(v1[x].alias(x + '_old'), v2[x].alias(x + '_new'))
和这里的设计模式重构这里。您通常不希望遍历DataFrame中的所有列。
欢迎来到PySpark世界;)
https://stackoverflow.com/questions/63172079
复制相似问题