专栏首页SAMshare用PySpark开发时的调优思路(下)

用PySpark开发时的调优思路(下)

上期回顾:用PySpark开发时的调优

2. 资源参数调优

如果要进行资源调优,我们就必须先知道Spark运行的机制与流程。

下面我们就来讲解一些常用的Spark资源配置的参数吧,了解其参数原理便于我们依据实际的数据情况进行配置。

1)num-executors

指的是执行器的数量,数量的多少代表了并行的stage数量(假如executor是单核的话),但也并不是越多越快,受你集群资源的限制,所以一般设置50-100左右吧。

2)executor-memory

这里指的是每一个执行器的内存大小,内存越大当然对于程序运行是很好的了,但是也不是无节制地大下去,同样受我们集群资源的限制。假设我们集群资源为500core,一般1core配置4G内存,所以集群最大的内存资源只有2000G左右。num-executors x executor-memory 是不能超过2000G的,但是也不要太接近这个值,不然的话集群其他同事就没法正常跑数据了,一般我们设置4G-8G。

3)executor-cores

这里设置的是executor的CPU core数量,决定了executor进程并行处理task的能力。

4)driver-memory

设置driver的内存,一般设置2G就好了。但如果想要做一些Python的DataFrame操作可以适当地把这个值设大一些。

5)driver-cores

与executor-cores类似的功能。

6)spark.default.parallelism

设置每个stage的task数量。一般Spark任务我们设置task数量在500-1000左右比较合适,如果不去设置的话,Spark会根据底层HDFS的block数量来自行设置task数量。有的时候会设置得偏少,这样子程序就会跑得很慢,即便你设置了很多的executor,但也没有用。

下面说一个基本的参数设置的shell脚本,一般我们都是通过一个shell脚本来设置资源参数配置,接着就去调用我们的主函数。

#!/bin/bash
basePath=$(cd "$(dirname )"$(cd "$(dirname "$0"): pwd)")": pwd)

spark-submit \
    --master yarn \
    --queue samshare \
    --deploy-mode client \
    --num-executors 100 \
    --executor-memory 4G \
    --executor-cores 4 \
    --driver-memory 2G \
    --driver-cores 2 \
    --conf spark.default.parallelism=1000 \
    --conf spark.yarn.executor.memoryOverhead=8G \
    --conf spark.sql.shuffle.partitions=1000 \
    --conf spark.network.timeout=1200 \
    --conf spark.python.worker.memory=64m \
    --conf spark.sql.catalogImplementation=hive \
    --conf spark.sql.crossJoin.enabled=True \
    --conf spark.dynamicAllocation.enabled=True \
    --conf spark.shuffle.service.enabled=True \
    --conf spark.scheduler.listenerbus.eventqueue.size=100000 \
    --conf spark.pyspark.driver.python=python3 \
    --conf spark.pyspark.python=python3 \
    --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=python3 \
    --conf spark.sql.pivotMaxValues=500000 \
    --conf spark.hadoop.hive.exec.dynamic.partition=True \
    --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict \
    --conf spark.hadoop.hive.exec.max.dynamic.partitions.pernode=100000 \
    --conf spark.hadoop.hive.exec.max.dynamic.partitions=100000 \
    --conf spark.hadoop.hive.exec.max.created.files=100000 \
    ${bashPath}/project_name/main.py $v_var1 $v_var2

3. 数据倾斜调优

相信我们对于数据倾斜并不陌生了,很多时间数据跑不出来有很大的概率就是出现了数据倾斜,在Spark开发中无法避免的也会遇到这类问题,而这不是一个崭新的问题,成熟的解决方案也是有蛮多的,今天来简单介绍一些比较常用并且有效的方案。

首先我们要知道,在Spark中比较容易出现倾斜的操作,主要集中在distinct、groupByKey、reduceByKey、aggregateByKey、join、repartition等,可以优先看这些操作的前后代码。而为什么使用了这些操作就容易导致数据倾斜呢?大多数情况就是进行操作的key分布不均,然后使得大量的数据集中在同一个处理节点上,从而发生了数据倾斜。

查看Key 分布

# 针对Spark SQL
hc.sql("select key, count(0) nums from table_name group by key")

# 针对RDD
RDD.countByKey()

Plan A: 过滤掉导致倾斜的key

这个方案并不是所有场景都可以使用的,需要结合业务逻辑来分析这个key到底还需要不需要,大多数情况可能就是一些异常值或者空串,这种就直接进行过滤就好了。

Plan B: 提前处理聚合

如果有些Spark应用场景需要频繁聚合数据,而数据key又少的,那么我们可以把这些存量数据先用hive算好(每天算一次),然后落到中间表,后续Spark应用直接用聚合好的表+新的数据进行二度聚合,效率会有很高的提升。

Plan C:调高shuffle并行度

# 针对Spark SQL 
--conf spark.sql.shuffle.partitions=1000  # 在配置信息中设置参数
# 针对RDD
rdd.reduceByKey(1000) # 默认是200

Plan D:分配随机数再聚合

大概的思路就是对一些大量出现的key,人工打散,从而可以利用多个task来增加任务并行度,以达到效率提升的目的,下面是代码demo,分别从RDD 和 SparkSQL来实现。

# Way1: PySpark RDD实现
import pyspark
from pyspark import SparkContext, SparkConf, HiveContext
from random import randint
import pandas as pd

# SparkSQL的许多功能封装在SparkSession的方法接口中, SparkContext则不行的。
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .appName("sam_SamShare") \
    .config("master", "local[4]") \
    .enableHiveSupport() \
    .getOrCreate()

conf = SparkConf().setAppName("test_SamShare").setMaster("local[4]")
sc = SparkContext(conf=conf)
hc = HiveContext(sc)

# 分配随机数再聚合
rdd1 = sc.parallelize([('sam', 1), ('sam', 1), ('sam', 1), ('sam', 1), ('sam', 1), ('sam', 1)])

# 给key分配随机数后缀
rdd2 = rdd1.map(lambda x: (x[0] + "_" + str(randint(1,5)), x[1]))
print(rdd.take(10))
# [('sam_5', 1), ('sam_5', 1), ('sam_3', 1), ('sam_5', 1), ('sam_5', 1), ('sam_3', 1)]

# 局部聚合
rdd3 = rdd2.reduceByKey(lambda x,y : (x+y))
print(rdd3.take(10))
# [('sam_5', 4), ('sam_3', 2)]

# 去除后缀
rdd4 = rdd3.map(lambda x: (x[0][:-2], x[1]))
print(rdd4.take(10))
# [('sam', 4), ('sam', 2)]

# 全局聚合
rdd5 = rdd4.reduceByKey(lambda x,y : (x+y))
print(rdd5.take(10))
# [('sam', 6)]


# Way2: PySpark SparkSQL实现
df = pd.DataFrame(5*[['Sam', 1],['Flora', 1]],
                  columns=['name', 'nums'])
Spark_df = spark.createDataFrame(df)
print(Spark_df.show(10))

Spark_df.createOrReplaceTempView("tmp_table") # 注册为视图供SparkSQl使用

sql = """
with t1 as (
    select concat(name,"_",int(10*rand())) as new_name, name, nums
    from tmp_table
),
t2 as (
    select new_name, sum(nums) as n
    from t1
    group by new_name
),
t3 as (
    select substr(new_name,0,length(new_name) -2) as name, sum(n) as nums_sum 
    from t2
    group by substr(new_name,0,length(new_name) -2)
)
select *
from t3
"""
tt = hc.sql(sql).toPandas()
tt

下面是原理图。

All Done!

?学习资源推荐:

1)《Spark性能优化指南——基础篇》

https://tech.meituan.com/2016/04/29/spark-tuning-basic.html

2)《Spark性能优化指南——高级篇》

https://tech.meituan.com/2016/05/12/spark-tuning-pro.html

本文分享自微信公众号 - SAMshare(gh_8528ce7b7e80),作者:SamShare

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2021-07-31

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 用PySpark开发时的调优思路(上)

    这一小节的内容算是对pyspark入门的一个ending了,全文主要是参考学习了美团Spark性能优化指南的基础篇和高级篇内容,主体脉络和这两篇文章是一样的,只...

    Sam Gor
  • PySpark入门级学习教程,框架思维(上)

    为什么要学习Spark?作为数据从业者多年,个人觉得Spark已经越来越走进我们的日常工作了,无论是使用哪种编程语言,Python、Scala还是Java,都会...

    Sam Gor
  • [源码解析] 深度学习分布式训练框架 horovod (8) --- on spark

    Horovod 是Uber于2017年发布的一个易于使用的高性能的分布式训练框架,在业界得到了广泛应用。

    罗西的思考
  • Eat pyspark 1st day | 快速搭建你的Spark开发环境

    下载地址:https://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-213...

    超哥的杂货铺
  • Pyspark学习笔记(四)弹性分布式数据集 RDD(上)

    RDD(弹性分布式数据集) 是 PySpark 的基本构建块,它是容错、不可变的 分布式对象集合。

    TeeyoHuang
  • 【原】Spark之机器学习(Python版)(二)——分类

      写这个系列是因为最近公司在搞技术分享,学习Spark,我的任务是讲PySpark的应用,因为我主要用Python,结合Spark,就讲PySpark了。然而...

    Charlotte77
  • Jupyter在美团民宿的应用实践

    做算法的同学对于Kaggle应该都不陌生,除了举办算法挑战赛以外,它还提供了一个学习、练习数据分析和算法开发的平台。Kaggle提供了Kaggle Kernel...

    美团技术团队
  • PySpark简介

    Apache Spark是一个大数据处理引擎,与MapReduce相比具有多个优势。通过删除Hadoop中的大部分样板代码,Spark提供了更大的简单性。此外,...

    双愚
  • 金色传说,开源教程!属于算法的大数据工具-pyspark

    spark是目前大数据领域的核心技术栈,许多从事数据相关工作的小伙伴都想驯服它,变成"驯龙高手",以便能够驾驭成百上千台机器组成的集群之龙来驰骋于大数据之海。

    Sam Gor
  • PySpark——开启大数据分析师之路

    近日由于工作需要,突击学了一下PySpark的简单应用。现分享其安装搭建过程和简单功能介绍。

    luanhz
  • Spark vs Dask Python生态下的计算引擎

    对于 Python 环境下开发的数据科学团队,Dask 为分布式分析指出了非常明确的道路,但是事实上大家都选择了 Spark 来达成相同的目的。Dask 是一个...

    Ewdager
  • 3万字长文,PySpark入门级学习教程,框架思维

    关于PySpark,我们知道它是Python调用Spark的接口,我们可以通过调用Python API的方式来编写Spark程序,它支持了大多数的Spark功能...

    Sam Gor
  • 想学习Spark?先带你了解一些基础的知识

    之前也学习过一阵子的Spark了,是时候先输出一些知识内容了,一来加深印象,二来也可以分享知识,一举多得,今天这篇主要是在学习实验楼的一门课程中自己记下来的笔记...

    Sam Gor
  • PySpark SQL 相关知识介绍

    大数据是这个时代最热门的话题之一。但是什么是大数据呢?它描述了一个庞大的数据集,并且正在以惊人的速度增长。大数据除了体积(Volume)和速度(velocity...

    foochane
  • 使用Python写spark 示例

    个人GitHub地址: https://github.com/LinMingQiang

    py3study
  • ETL工程师必看!超实用的任务优化与断点执行方案

    随着大数据时代的快速发展,企业每天需要存储、计算、分析数以万亿的数据,同时还要确保分析的数据具备及时性、准确性和完整性。面对如此庞大的数据体系,ETL工程师(数...

    个推
  • 总要到最后关头才肯重构代码,强如spark也不例外

    用过Python做过机器学习的同学对Python当中pandas当中的DataFrame应该不陌生,如果没做过也没有关系,我们简单来介绍一下。DataFrame...

    TechFlow-承志
  • PySpark源码解析,教你用Python调用高效Scala接口,搞定大规模数据分析

    众所周知,Spark 框架主要是由 Scala 语言实现,同时也包含少量 Java 代码。Spark 面向用户的编程接口,也是 Scala。然而,在数据科学领域...

    机器之心
  • 一起揭开 PySpark 编程的神秘面纱

    Spark 是 UC Berkeley AMP lab 开发的一个集群计算的框架,类似于 Hadoop,但有很多的区别。最大的优化是让计算任务的中间结果可以存储...

    Sam Gor

扫码关注云+社区

领取腾讯云代金券