前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >一起揭开 PySpark 编程的神秘面纱

一起揭开 PySpark 编程的神秘面纱

作者头像
Sam Gor
发布2021-09-29 15:02:14
2K0
发布2021-09-29 15:02:14
举报
文章被收录于专栏:SAMshareSAMshare
在开始讲解PySpark程序启动原理之前,我们先来了解一下Spark的一些概念和特性。

1. Spark的核心概念

Spark 是 UC Berkeley AMP lab 开发的一个集群计算的框架,类似于 Hadoop,但有很多的区别。最大的优化是让计算任务的中间结果可以存储在内存中,不需要每次都写入 HDFS,更适用于需要迭代的 MapReduce 算法场景中,可以获得更好的性能提升。例如一次排序测试中,对 100TB 数据进行排序,Spark 比 Hadoop 快三倍,并且只需要十分之一的机器。Spark 集群目前最大的可以达到 8000 节点,处理的数据达到 PB 级别,在互联网企业中应用非常广泛。

2. Spark 的特性

Hadoop 的核心是分布式文件系统 HDFS 和计算框架 MapReduces。Spark 可以替代 MapReduce,并且兼容 HDFS、Hive 等分布式存储层,良好的融入 Hadoop 的生态系统。

Spark 执行的特点

  • 中间结果输出:Spark 将执行工作流抽象为通用的有向无环图执行计划(DAG),可以将多 Stage 的任务串联或者并行执行。
  • 数据格式和内存布局:Spark 抽象出分布式内存存储结构弹性分布式数据集 RDD,能够控制数据在不同节点的分区,用户可以自定义分区策略。
  • 任务调度的开销:Spark 采用了事件驱动的类库 AKKA 来启动任务,通过线程池的复用线程来避免系统启动和切换开销。

Spark 的优势

  • 速度快,运行工作负载快 100 倍。Apache Spark 使用最先进的 DAG 调度器、查询优化器和物理执行引擎,实现了批处理和流数据的高性能。
  • 易于使用,支持用 Java、Scala、Python、R 和 SQL 快速编写应用程序。Spark 提供了超过 80 个算子,可以轻松构建并行应用程序。您可以从 Scala、Python、R 和 SQL shell 中交互式地使用它。
  • 普遍性,结合 SQL、流处理和复杂分析。Spark 提供了大量的库,包括 SQL 和 DataFrames、用于机器学习的 MLlib、GraphX 和 Spark 流。您可以在同一个应用程序中无缝地组合这些库。
  • 各种环境都可以运行,Spark 在 Hadoop、Apache Mesos、Kubernetes、单机或云主机中运行。它可以访问不同的数据源。您可以使用它的独立集群模式在 EC2、Hadoop YARN、Mesos 或 Kubernetes 上运行 Spark。访问 HDFS、Apache Cassandra、Apache HBase、Apache Hive 和数百个其他数据源中的数据。

3. Spark分布式运行架构

Spark程序简单来说它的分布式运行架构,大致上是把任务发布到Driver端,然后Spark解析调度并封装成一个个的小Task,分发到每一个Executor上面去run,Task包含计算逻辑、数据等等,基础架构以及执行顺序如下两图:

图来自:https://www.cnblogs.com/xia520pi/p/8695141.html

4. Spark任务调度分析

Spark拿到我们的一个任务,是会先发布到Driver端,Driver端拆分任务逻辑放入不同的Task,若干个Task组成一个Task Set,根据Executor资源情况分配任务。大概的逻辑是上面这样子的,不过,我们在看YARN日志的时候,经常性地会看到Job、Stage、Task,它们的关系是怎么样的呢?可以看下图:

简单来说,Job包含Stage,Stage包含Task,其中Job的划分看Action,Stage划分看Shuffle。我们来复习下常用RDD算子,如下图:

这里敲黑板划重点:

1、Action算子:collect、collectAsMap、reduce、countByKey、take、first等。

2、Shuffle操作:Shuffle指的是数据从Map Task输出到Reduce Task的过程,作为连接Map和Reduce两端的桥梁。它需要把Map端不同Task的数据都拉取到一个Reduce Task,十分消耗IO和内存。Shuffle操作可以分为Map端的数据准备和Reduce端的数据拷贝,称之为Shuffle Write和Shuffle read。而rdd宽依赖操作就会引起Shuffle过程。

3、常见的宽依赖(Wide Dependencies)操作有:reduceByKey、groupBykey、join、

4、常见的窄依赖(Narrow Dependencies)操作有:map、filter、flatMap、union等

5、哪里并行?我们常说的并行指的是同一个Stage内并行,Stage之间是存在依赖关系的,属于串行操作。

5. Spark 生态系统 —— BDAS

目前,Spark 已经发展成为包含众多子项目的大数据计算平台。BDAS 是伯克利大学提出的基于 Spark 的数据分析栈(BDAS)。其核心框架是 Spark,同时涵盖支持结构化数据 SQL 查询与分析的查询引擎 Spark SQL,提供机器学习功能的系统 MLBase 及底层的分布式机器学习库 MLlib,并行图计算框架 GraphX,流计算框架 Spark Streaming,近似查询引擎 BlinkDB,内存分布式文件系统 Tachyon,资源管理框架 Mesos 等子项目。这些子项目在 Spark 上层提供了更高层、更丰富的计算范式。

6. PySpark与Spark的关系

Spark支持很多语言的调用,包括了Java、Scala、Python等,其中用Python语言编写的Spark API就是PySpark。它在Spark最外层封装了一层Python API,借助了Py4j来实现Spark底层API的调用,从而可以实现实现我们直接编写Python脚本即可调用Spark强大的分布式计算能力。

7. PySpark分布式运行架构

与Spark分布式运行架构一致,不过就是外围多了一层Python API。用户通过实例化Python的SparkContext对象,接着Py4j会把Python脚本映射到JVM中,同样地实例化一个Scala的SparkContext对象,然后Driver端发送Task任务到Executor端去执行,因为Task任务中可能会包含一些Python的函数,所以每一个Task都是需要开启一个Python进程,通过Socket通信方式将相关的Python函数部分发送到Python进程去执行。

综上所述,PySpark是借助于Py4j实现了Python调用Java从而来驱动Spark程序的运行,这样子可以保证了Spark核心代码的独立性,但是在大数据场景下,如果代码中存在频繁进行数据通信的操作,这样子JVM和Python进程就会频繁交互,可能会导致我们的任务失败。所以,如果面对大规模数据还是需要我们使用原生的API来编写程序(Java或者Scala)。但是对于中小规模的,比如TB数据量以下的,直接使用PySpark来开发还是很爽的。

8. 程序启动步骤实操

一般我们在生产中提交PySpark程序,都是通过spark-submit的方式提供脚本的,也就是一个shell脚本,配置各种Spark的资源参数和运行脚本信息,和py脚本一并提交到调度平台进行任务运行,其中shell脚本模版如下:

代码语言:javascript
复制
#!/bin/bash
basePath=$(cd "$(dirname )"$(cd "$(dirname "$0"): pwd)")": pwd)

spark-submit \
    --master yarn \
    --queue samshare \
    --deploy-mode client \
    --num-executors 100 \
    --executor-memory 4G \
    --executor-cores 4 \
    --driver-memory 2G \
    --driver-cores 2 \
    --conf spark.default.parallelism=1000 \
    --conf spark.yarn.executor.memoryOverhead=8G \
    --conf spark.sql.shuffle.partitions=1000 \
    --conf spark.network.timeout=1200 \
    --conf spark.python.worker.memory=64m \
    --conf spark.sql.catalogImplementation=hive \
    --conf spark.sql.crossJoin.enabled=True \
    --conf spark.dynamicAllocation.enabled=True \
    --conf spark.shuffle.service.enabled=True \
    --conf spark.scheduler.listenerbus.eventqueue.size=100000 \
    --conf spark.pyspark.driver.python=python3 \
    --conf spark.pyspark.python=python3 \
    --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=python3 \
    --conf spark.sql.pivotMaxValues=500000 \
    --conf spark.hadoop.hive.exec.dynamic.partition=True \
    --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict \
    --conf spark.hadoop.hive.exec.max.dynamic.partitions.pernode=100000 \
    --conf spark.hadoop.hive.exec.max.dynamic.partitions=100000 \
    --conf spark.hadoop.hive.exec.max.created.files=100000 \
    ${bashPath}/project_name/main.py $v_var1 $v_var2

有的时候,我们想直接通过PySpark调用SQL脚本,那么可以通过spark-sql命令启动,shell脚本就可以这样子写:

代码语言:javascript
复制
#!/bin/bash
echo "依次打印:v_yesterday,v_2days_ago"
echo $v_yesterday
echo $v_2days_ago

V_SCRIPT_PATH=$(dirname $0);
V_PROC_NAME=main.sql;

spark-sql \
    --name yourappname \
   --master yarn \
   --queue samshare \
   --deploy-mode client  \
   --num-executors 50 \
   --executor-memory 8G  \
   --executor-cores 2  \
   --driver-cores 1  \
   --driver-memory 16G \
   --conf spark.default.parallelism=400 \
    --conf spark.rpc.message.maxSize=1024 \
    --conf spark.sql.shuffle.partitions=400\
    --conf spark.sql.autoBroadcastJoinThreshold=314572800\
    --conf spark.hadoop.hive.exec.dynamic.partition=true \
    --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict \
    --hiveconf v_yesterday=${v_yesterday} \
    --hiveconf v_2days_ago=${v_2days_ago} \
    -f ${V_SCRIPT_PATH}/${V_PROC_NAME}

我们在main函数里,也就主程序中要怎么启动PySpark呢?可以通过下面的方式:

代码语言:javascript
复制
# -*- coding: utf-8 -*-
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import HiveContext
from pyspark.sql.functions import col, lit, udf
from pyspark.sql.types import StringType, MapType
import pandas as pd

conf = SparkConf() \
       .setAppName("your_appname") \
       .set("hive.exec.dynamic.partition.mode", "nonstrict")
sc = SparkContext(conf=conf)
hc = HiveContext(sc)

"""
your code
"""

最后,如果数据结果需要保存下来,我们假设是保存到Hive,那么可以参考下面两种方式:

代码语言:javascript
复制
# 方式1: 结果为Python DataFrame
result_df = pd.DataFrame([1,2,3], columns=['a'])
save_table = "tmp.samshare_pyspark_savedata"

# 获取DataFrame的schema
c1 = list(result_df.columns)
# 转为SparkDataFrame
result = hc.createDataFrame(result_df.astype(str), c1)
result.write.format("hive").mode("overwrite").saveAsTable(save_table) # 或者改成append模式
print(datetime.now().strftime("%y/%m/%d %H:%M:%S"), "测试数据写入到表" + save_table)



# 方式2: 结果为SparkDataFrame
list_values = [['Sam', 28, 88], ['Flora', 28, 90], ['Run', 1, 60]]
Spark_df = spark.createDataFrame(list_values, ['name', 'age', 'score'])
print(Spark_df.show())
save_table = "tmp.samshare_pyspark_savedata"

# 方式2.1: 直接写入到Hive
Spark_df.write.format("hive").mode("overwrite").saveAsTable(save_table) # 或者改成append模式
print(datetime.now().strftime("%y/%m/%d %H:%M:%S"), "测试数据写入到表" + save_table)

# 方式2.2: 注册为临时表,使用SparkSQL来写入分区表
Spark_df.createOrReplaceTempView("tmp_table")
write_sql = """
insert overwrite table {0} partitions (pt_date='{1}')
select * from tmp_table
""".format(save_table, "20210520")
hc.sql(write_sql)
print(datetime.now().strftime("%y/%m/%d %H:%M:%S"), "测试数据写入到表" + save_table)

Reference

  1. PySpark 的背后原理 https://www.cnblogs.com/xia520pi/p/8695652.html
  2. Spark Scheduler内部原理剖析 https://www.cnblogs.com/xia520pi/p/8695141.html
  3. Spark大数据实战课程——实验楼
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-09-26,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 SAMshare 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. Spark的核心概念
  • 2. Spark 的特性
  • 3. Spark分布式运行架构
  • 4. Spark任务调度分析
  • 5. Spark 生态系统 —— BDAS
  • 6. PySpark与Spark的关系
  • 7. PySpark分布式运行架构
  • 8. 程序启动步骤实操
  • Reference
相关产品与服务
云 HDFS
云 HDFS(Cloud HDFS,CHDFS)为您提供标准 HDFS 访问协议,您无需更改现有代码,即可使用高可用、高可靠、多维度安全、分层命名空间的分布式文件系统。 只需几分钟,您就可以在云端创建和挂载 CHDFS,来实现您大数据存储需求。随着业务需求的变化,您可以实时扩展或缩减存储资源,CHDFS 存储空间无上限,满足您海量大数据存储与分析业务需求。此外,通过 CHDFS,您可以实现计算与存储分离,极大发挥计算资源灵活性,同时实现存储数据永久保存,降低您大数据分析资源成本。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档