前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >PySpark on HPC 续:批量处理的框架的工程实现

PySpark on HPC 续:批量处理的框架的工程实现

原创
作者头像
flavorfan
修改2022-01-21 19:09:17
1.3K0
修改2022-01-21 19:09:17
举报
文章被收录于专栏:范传康的专栏范传康的专栏

PySpark on HPC系列记录了我独自探索在HPC利用PySpark处理大数据业务数据的过程,由于这方面资料少或者搜索能力不足,没有找到需求匹配的框架,不得不手搓一个工具链,容我虚荣点,叫“框架”。框架的实现功能如下:

  1. generate job file(生成批量任务描述文件):读取raw data folder,生成带读取raw file list,根据输入job参数(batch size)等输出系列job file(描述输入raw文件路径,生成文件路径);
  2. job script -- single job file(任务脚本:输入一个job file,执行单批次的任务);
  3. job script- array job file(任务脚本:输入array job,执行系列化任务):根据job file folder和array id并行处理多批次raw data file。

1 Framework overview

framework
framework

如上图所示,另外有几个注意点:

  • PySpark Env详见 pyspark on hpc
  • HPC处理,处理环境(singularity镜像,或者conda环境)和输入输出数据、任务描述(job file)需要存放于HPC各个节点都可以访问的存储上;

2 Process script & job file generate

具体任务处理脚本有几点注意事项:

  1. 初始化HPC PySpark环境;
  2. 入口函数接受一个job file路径,该文件是一个表格文件(如csv),有3列,in_file,out_file,tmp_folder(用于Spark输出,后面gzip压缩成单个文件后删除);
  3. 日志文件要每个job(task)一个,典型的是日期加一个随机值或者job_id;
代码语言:python
复制
...
os.environ["PYSPARK_PYTHON"] = "<env_path>/python"
os.environ["SPARK_HOME"] = "<env_path>/spark"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-10.9-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")
sys.path.insert(0, '<work_path>')
...

def process_raw(spark, in_file, file_output, out_csv_path):
    raw_to_csv(spark, in_file, out_csv_path)
    csv_to_zip(out_csv_path, file_output)
    shutil.rmtree(out_csv_path)

def process_job_file(in_file,spark):
    df = pd.read_csv(in_file)
    for index, row in df.iterrows():
        in_file, out_file, tmp_path = row['in_file'],row['out_file'],row['tmp_path']
        process_raw(spark, in_file, out_file, tmp_path)

def get_parser():
    parser = argparse.ArgumentParser(description='...')
    parser.add_argument("-j", help="job type", dest="job_type",default='process_raw') 
    # process_raw:
    parser.add_argument("-i", help="input job file", dest="input_file")
    # generate job file
    parser.add_argument("-b", help="one job batch size", dest="batch_size",default=8)
    parser.add_argument("-g", help="generate job file root", dest="gen_file_root", default='./jobs')
    parser.add_argument("-r", help="raw data root", dest="raw_data_root")
    parser.add_argument("-t", help="target data root", dest="tar_data_root")

if __name__ == '__main__':
    parser = get_parser()
    args = parser.parse_args() 
    if args.job_type == 'process_raw' and args.input_file is not None:
       spark = get_spark()
       process_job_file(args.input_file,spark)
    elif args.job_type == 'gen_job_file':
        generate_jobfile_from_folder(args.raw_data_root, args.tar_data_root, batch_size=args.batch_size, job_file_folder=args.gen_file_root)
    else:
        parser.print_help()

3 Jobsript

1) 处理单个任务文件: spark-hpc-batch.sh

代码语言:shell
复制
#!/usr/bin/env bash
#SBATCH --job-name=<job_file>
#SBATCH --time=1:00:00
#SBATCH --nodes=1
#SBATCH --cpus-per-task=8
#SBATCH --mem=12G
#SBATCH --output=<job_name>-%j.out
#SBATCH --error=<job_name>-%j.err

<path_to_env>/python <process_file_path>.py -i $1

调用方法

代码语言:shell
复制
sbatch spark-hpc-batch.sh <job_file_path>

2) 处理队列任务文件: spark-hpc-batch-array.sh

代码语言:shell
复制
#!/usr/bin/env bash
#SBATCH --job-name=<job_file>
#SBATCH --time=1:00:00
#SBATCH --nodes=1
#SBATCH --cpus-per-task=8
#SBATCH --mem=12G
#SBATCH --output=<job_name>-%j.out
#SBATCH --error=<job_name>-%j.err

JOB_FILE_ROOT=$1
<path_to_env>/python <process_file_path>.py -i $1 "$JOB_FILE_ROOT/$SLURM_ARRAY_TASK_ID.csv"

调用方法

代码语言:shell
复制
sbatch --array=0-29 spark-hpc-batch-array.sh <job_file_root>

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1 Framework overview
  • 2 Process script & job file generate
  • 3 Jobsript
    • 1) 处理单个任务文件: spark-hpc-batch.sh
      • 2) 处理队列任务文件: spark-hpc-batch-array.sh
      相关产品与服务
      大数据
      全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档