我有两个脚本和一个用例,在其中我需要在一个脚本中创建一个dataframe,然后在另一个循环中使用它。如下所示:
脚本1:
def generate_data(spark, logger, conf):
processed_data_final = None
path_1 = conf["raw_data_path_1"]
path_2 = conf["raw_data_path_2"]
df_path1 = spark.read.parquet(path_1)
df_path1.cache()
df_path1.take(1) //calling action item as spark does lazy evaluation
df_path2 = spark.read.parquet(path_2)
df_path2.cache()
df_path2.take(1)
for dt in date_list:
processed_data = process_data(spark, logger, conf, dt, df_path1, df_path2)
if processed_data is None:
processed_data_final = processed_data
else:
processed_data_final = processed_data_final.union(processed_data)
return processed_data_final
if __name__ == "__main__":
# generate global variables: spark, logger
if 5 == len(sys.argv):
env = sys.argv[1]
job_id = sys.argv[2]
else:
print("parameter {env} or {job_id}")
exit(1)
app_name = "past_viewership_" + job_id
spark = SparkSession \
.builder \
.appName(app_name) \
.config("spark.storage.memoryFraction", 0) \
.config("spark.driver.maxResultSize", "-1") \
.getOrCreate()
sc = spark.sparkContext
generate_data(spark, logger, conf)
在脚本2中,我在script1中重用script2中的dateframe,如下所示:
def process_data(spark, conf, df_path1, df_path2):
path3= conf['path3']
df3 = spark.read.parquet(path3)
res_df = df3.join(df_path1, ["id"],"inner").join(df_path2,["id"], "inner")
return res_df
这是解释流程的粗略代码,在这个流中,我在日志中看到它再次在循环中加载df_path1和df_path2。我以为它会使用缓存的数据帧。如何避免在循环中再次读取df_path1和df_path2?
发布于 2022-06-22 04:34:13
调用dataframe.take(1)
并不能实现整个数据格式。Spark的催化剂优化器将修改物理计划,只读取数据格式的第一个分区,因为只需要第一个记录。因此,只有第一个分区被缓存,直到其余的记录被读取。
https://stackoverflow.com/questions/72707547
复制相似问题