前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >PySpark on hpc 续: 合理分区处理及合并输出单一文件

PySpark on hpc 续: 合理分区处理及合并输出单一文件

原创
作者头像
flavorfan
发布2022-01-12 16:12:28
1.4K0
发布2022-01-12 16:12:28
举报
文章被收录于专栏:范传康的专栏范传康的专栏

在HPC上启动任务以local模式运行自定义spark,可以自由选择spark、python版本组合来处理数据;起多个任务并行处理独立分区数据,只要处理资源足够,限制速度的只是磁盘io。本地集群处理需要2周的数据,2个小时就处理好了。HPC通常没有数据库,进一步BI展示或者处理需要拉回本地集群,这时候需要把数据块(比如一天)的数据保存为tsv.gz拉回本地集群。pyspark dataframe 提供write的save方法,可以写tsv.gz,spark默认是并行写,所以在提供outpath目录下写多个文件。这个时候,需要顺序拼接多个tsv文件并压缩为gz格式。

1. process_to_tsv_path

代码语言:python
复制
  from pyspark.sql import SparkSession
  def process_to_tsv_path(spark, in_file, out_csv_path,tasks=8):
    result = (
        spark.read.csv(in_file, sep="\t", quote=None, header=True)
        .repartition(tasks)
        .where(...)
        .select(...)
        .write.format("com.databricks.spark.csv").save(out_csv_path)
    )
    return result

repartition的需要在读取输入文件后,并根据文件大小和申请cpu、MEM数适当设定;这样就会在out_csv_path生成对应tasks个csv文件。如果把repartition放在处理之后输出write之前,那么前面处理就只有一个分区,只能调用一个cpu核(和输入文件数对应),浪费算力。做个对比试验,笔者的处理数据情况大概差距5倍。

2. tsv_path_to_gz

代码语言:python
复制
  import glob, gzip
  def tsv_path_to_gz(out_csv_path, tar_file):
    interesting_files = sorted(glob.glob(f'{out_csv_path}/*.csv'))
    with gzip.open(tar_file, 'wb') as f_out:
        for file_name in interesting_files:
            with open(file_name, 'rb') as f_in:
                f_out.writelines(f_in)

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. process_to_tsv_path
  • 2. tsv_path_to_gz
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档