我正在尝试使用delta lake oss实现合并,我的历史数据大约是70亿条记录,delta大约是500万条记录。
合并基于组合键(5列)。
我正在启动一个10节点集群r5d.12xlarge(~3TB内存/ ~480个内核)。
该作业第一次花费了35分钟,后续运行将花费更多时间。
我尝试过使用优化技术,但都不起作用,并且我在运行3次后开始得到堆内存问题,我看到数据洗牌时磁盘上的大量溢出,尝试使用合并键上的order by重写历史,在20分钟内完成了性能改进和合并,溢出大约为2TB,但是问题是作为合并过程的一部分写入的数据的顺序不同,因为我无法控制写入数据的顺序,因此后续运行花费的时间更长。
我无法在德尔塔湖操作系统中使用Zorder,因为它只提供订阅.I尝试压缩,但这也没有帮助。如果有更好的方法来优化合并过程,请告诉我。
发布于 2021-09-28 19:59:40
如果你真的想通过代码来优化它,你可以启动并行任务。这是我们用来并行化S3编写的示例代码。您也可以对adls位置使用相同的逻辑。
with futures.ThreadPoolExecutor(max_workers=total_days+1) as e:
print(f"{raw_bucket}/{db}/{table}/")
for single_date in daterange(start_date, end_date):
curr_date = single_date.strftime("%Y-%m-%d")
jobs.append(e.submit(writeS3, curr_date))
for job in futures.as_completed(jobs):
result_done = job.result()
print(f"Job Completed - {result_done}")
print("Task complete")参考:https://docs.python.org/3/library/concurrent.futures.html
https://stackoverflow.com/questions/63126467
复制相似问题