专栏首页SAMshare一起揭开 PySpark 编程的神秘面纱

一起揭开 PySpark 编程的神秘面纱

在开始讲解PySpark程序启动原理之前,我们先来了解一下Spark的一些概念和特性。

1. Spark的核心概念

Spark 是 UC Berkeley AMP lab 开发的一个集群计算的框架,类似于 Hadoop,但有很多的区别。最大的优化是让计算任务的中间结果可以存储在内存中,不需要每次都写入 HDFS,更适用于需要迭代的 MapReduce 算法场景中,可以获得更好的性能提升。例如一次排序测试中,对 100TB 数据进行排序,Spark 比 Hadoop 快三倍,并且只需要十分之一的机器。Spark 集群目前最大的可以达到 8000 节点,处理的数据达到 PB 级别,在互联网企业中应用非常广泛。

2. Spark 的特性

Hadoop 的核心是分布式文件系统 HDFS 和计算框架 MapReduces。Spark 可以替代 MapReduce,并且兼容 HDFS、Hive 等分布式存储层,良好的融入 Hadoop 的生态系统。

Spark 执行的特点

  • 中间结果输出:Spark 将执行工作流抽象为通用的有向无环图执行计划(DAG),可以将多 Stage 的任务串联或者并行执行。
  • 数据格式和内存布局:Spark 抽象出分布式内存存储结构弹性分布式数据集 RDD,能够控制数据在不同节点的分区,用户可以自定义分区策略。
  • 任务调度的开销:Spark 采用了事件驱动的类库 AKKA 来启动任务,通过线程池的复用线程来避免系统启动和切换开销。

Spark 的优势

  • 速度快,运行工作负载快 100 倍。Apache Spark 使用最先进的 DAG 调度器、查询优化器和物理执行引擎,实现了批处理和流数据的高性能。
  • 易于使用,支持用 Java、Scala、Python、R 和 SQL 快速编写应用程序。Spark 提供了超过 80 个算子,可以轻松构建并行应用程序。您可以从 Scala、Python、R 和 SQL shell 中交互式地使用它。
  • 普遍性,结合 SQL、流处理和复杂分析。Spark 提供了大量的库,包括 SQL 和 DataFrames、用于机器学习的 MLlib、GraphX 和 Spark 流。您可以在同一个应用程序中无缝地组合这些库。
  • 各种环境都可以运行,Spark 在 Hadoop、Apache Mesos、Kubernetes、单机或云主机中运行。它可以访问不同的数据源。您可以使用它的独立集群模式在 EC2、Hadoop YARN、Mesos 或 Kubernetes 上运行 Spark。访问 HDFS、Apache Cassandra、Apache HBase、Apache Hive 和数百个其他数据源中的数据。

3. Spark分布式运行架构

Spark程序简单来说它的分布式运行架构,大致上是把任务发布到Driver端,然后Spark解析调度并封装成一个个的小Task,分发到每一个Executor上面去run,Task包含计算逻辑、数据等等,基础架构以及执行顺序如下两图:

图来自:https://www.cnblogs.com/xia520pi/p/8695141.html

4. Spark任务调度分析

Spark拿到我们的一个任务,是会先发布到Driver端,Driver端拆分任务逻辑放入不同的Task,若干个Task组成一个Task Set,根据Executor资源情况分配任务。大概的逻辑是上面这样子的,不过,我们在看YARN日志的时候,经常性地会看到Job、Stage、Task,它们的关系是怎么样的呢?可以看下图:

简单来说,Job包含Stage,Stage包含Task,其中Job的划分看Action,Stage划分看Shuffle。我们来复习下常用RDD算子,如下图:

这里敲黑板划重点:

1、Action算子:collect、collectAsMap、reduce、countByKey、take、first等。

2、Shuffle操作:Shuffle指的是数据从Map Task输出到Reduce Task的过程,作为连接Map和Reduce两端的桥梁。它需要把Map端不同Task的数据都拉取到一个Reduce Task,十分消耗IO和内存。Shuffle操作可以分为Map端的数据准备和Reduce端的数据拷贝,称之为Shuffle Write和Shuffle read。而rdd宽依赖操作就会引起Shuffle过程。

3、常见的宽依赖(Wide Dependencies)操作有:reduceByKey、groupBykey、join、

4、常见的窄依赖(Narrow Dependencies)操作有:map、filter、flatMap、union等

5、哪里并行?我们常说的并行指的是同一个Stage内并行,Stage之间是存在依赖关系的,属于串行操作。

5. Spark 生态系统 —— BDAS

目前,Spark 已经发展成为包含众多子项目的大数据计算平台。BDAS 是伯克利大学提出的基于 Spark 的数据分析栈(BDAS)。其核心框架是 Spark,同时涵盖支持结构化数据 SQL 查询与分析的查询引擎 Spark SQL,提供机器学习功能的系统 MLBase 及底层的分布式机器学习库 MLlib,并行图计算框架 GraphX,流计算框架 Spark Streaming,近似查询引擎 BlinkDB,内存分布式文件系统 Tachyon,资源管理框架 Mesos 等子项目。这些子项目在 Spark 上层提供了更高层、更丰富的计算范式。

6. PySpark与Spark的关系

Spark支持很多语言的调用,包括了Java、Scala、Python等,其中用Python语言编写的Spark API就是PySpark。它在Spark最外层封装了一层Python API,借助了Py4j来实现Spark底层API的调用,从而可以实现实现我们直接编写Python脚本即可调用Spark强大的分布式计算能力。

7. PySpark分布式运行架构

与Spark分布式运行架构一致,不过就是外围多了一层Python API。用户通过实例化Python的SparkContext对象,接着Py4j会把Python脚本映射到JVM中,同样地实例化一个Scala的SparkContext对象,然后Driver端发送Task任务到Executor端去执行,因为Task任务中可能会包含一些Python的函数,所以每一个Task都是需要开启一个Python进程,通过Socket通信方式将相关的Python函数部分发送到Python进程去执行。

综上所述,PySpark是借助于Py4j实现了Python调用Java从而来驱动Spark程序的运行,这样子可以保证了Spark核心代码的独立性,但是在大数据场景下,如果代码中存在频繁进行数据通信的操作,这样子JVM和Python进程就会频繁交互,可能会导致我们的任务失败。所以,如果面对大规模数据还是需要我们使用原生的API来编写程序(Java或者Scala)。但是对于中小规模的,比如TB数据量以下的,直接使用PySpark来开发还是很爽的。

8. 程序启动步骤实操

一般我们在生产中提交PySpark程序,都是通过spark-submit的方式提供脚本的,也就是一个shell脚本,配置各种Spark的资源参数和运行脚本信息,和py脚本一并提交到调度平台进行任务运行,其中shell脚本模版如下:

#!/bin/bash
basePath=$(cd "$(dirname )"$(cd "$(dirname "$0"): pwd)")": pwd)

spark-submit \
    --master yarn \
    --queue samshare \
    --deploy-mode client \
    --num-executors 100 \
    --executor-memory 4G \
    --executor-cores 4 \
    --driver-memory 2G \
    --driver-cores 2 \
    --conf spark.default.parallelism=1000 \
    --conf spark.yarn.executor.memoryOverhead=8G \
    --conf spark.sql.shuffle.partitions=1000 \
    --conf spark.network.timeout=1200 \
    --conf spark.python.worker.memory=64m \
    --conf spark.sql.catalogImplementation=hive \
    --conf spark.sql.crossJoin.enabled=True \
    --conf spark.dynamicAllocation.enabled=True \
    --conf spark.shuffle.service.enabled=True \
    --conf spark.scheduler.listenerbus.eventqueue.size=100000 \
    --conf spark.pyspark.driver.python=python3 \
    --conf spark.pyspark.python=python3 \
    --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=python3 \
    --conf spark.sql.pivotMaxValues=500000 \
    --conf spark.hadoop.hive.exec.dynamic.partition=True \
    --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict \
    --conf spark.hadoop.hive.exec.max.dynamic.partitions.pernode=100000 \
    --conf spark.hadoop.hive.exec.max.dynamic.partitions=100000 \
    --conf spark.hadoop.hive.exec.max.created.files=100000 \
    ${bashPath}/project_name/main.py $v_var1 $v_var2

有的时候,我们想直接通过PySpark调用SQL脚本,那么可以通过spark-sql命令启动,shell脚本就可以这样子写:

#!/bin/bash
echo "依次打印:v_yesterday,v_2days_ago"
echo $v_yesterday
echo $v_2days_ago

V_SCRIPT_PATH=$(dirname $0);
V_PROC_NAME=main.sql;

spark-sql \
    --name yourappname \
   --master yarn \
   --queue samshare \
   --deploy-mode client  \
   --num-executors 50 \
   --executor-memory 8G  \
   --executor-cores 2  \
   --driver-cores 1  \
   --driver-memory 16G \
   --conf spark.default.parallelism=400 \
    --conf spark.rpc.message.maxSize=1024 \
    --conf spark.sql.shuffle.partitions=400\
    --conf spark.sql.autoBroadcastJoinThreshold=314572800\
    --conf spark.hadoop.hive.exec.dynamic.partition=true \
    --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict \
    --hiveconf v_yesterday=${v_yesterday} \
    --hiveconf v_2days_ago=${v_2days_ago} \
    -f ${V_SCRIPT_PATH}/${V_PROC_NAME}

我们在main函数里,也就主程序中要怎么启动PySpark呢?可以通过下面的方式:

# -*- coding: utf-8 -*-
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import HiveContext
from pyspark.sql.functions import col, lit, udf
from pyspark.sql.types import StringType, MapType
import pandas as pd

conf = SparkConf() \
       .setAppName("your_appname") \
       .set("hive.exec.dynamic.partition.mode", "nonstrict")
sc = SparkContext(conf=conf)
hc = HiveContext(sc)

"""
your code
"""

最后,如果数据结果需要保存下来,我们假设是保存到Hive,那么可以参考下面两种方式:

# 方式1: 结果为Python DataFrame
result_df = pd.DataFrame([1,2,3], columns=['a'])
save_table = "tmp.samshare_pyspark_savedata"

# 获取DataFrame的schema
c1 = list(result_df.columns)
# 转为SparkDataFrame
result = hc.createDataFrame(result_df.astype(str), c1)
result.write.format("hive").mode("overwrite").saveAsTable(save_table) # 或者改成append模式
print(datetime.now().strftime("%y/%m/%d %H:%M:%S"), "测试数据写入到表" + save_table)



# 方式2: 结果为SparkDataFrame
list_values = [['Sam', 28, 88], ['Flora', 28, 90], ['Run', 1, 60]]
Spark_df = spark.createDataFrame(list_values, ['name', 'age', 'score'])
print(Spark_df.show())
save_table = "tmp.samshare_pyspark_savedata"

# 方式2.1: 直接写入到Hive
Spark_df.write.format("hive").mode("overwrite").saveAsTable(save_table) # 或者改成append模式
print(datetime.now().strftime("%y/%m/%d %H:%M:%S"), "测试数据写入到表" + save_table)

# 方式2.2: 注册为临时表,使用SparkSQL来写入分区表
Spark_df.createOrReplaceTempView("tmp_table")
write_sql = """
insert overwrite table {0} partitions (pt_date='{1}')
select * from tmp_table
""".format(save_table, "20210520")
hc.sql(write_sql)
print(datetime.now().strftime("%y/%m/%d %H:%M:%S"), "测试数据写入到表" + save_table)

Reference

  1. PySpark 的背后原理 https://www.cnblogs.com/xia520pi/p/8695652.html
  2. Spark Scheduler内部原理剖析 https://www.cnblogs.com/xia520pi/p/8695141.html
  3. Spark大数据实战课程——实验楼

本文分享自微信公众号 - SAMshare(gh_8528ce7b7e80),作者:Samshare

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2021-09-26

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 一起揭开 PySpark 编程的神秘面纱

    在开始讲解PySpark程序启动原理之前,我们先来了解一下Spark的一些概念和特性。

    Python数据科学
  • 一起揭开 Hive 编程的神秘面纱

    Hadoop实现了一个特别的计算模型,就是MapReduce,可以将我们的计算任务分拆成多个小的计算单元,然后分配到家用或者服务器级别的硬件机器上,从而达到降低...

    Sam Gor
  • 一起揭开 YARN 的神秘面纱

    我们知道MapReduce1.0,是把计算框架和资源调度框架都弄在一起了,所以Master端的JobTracker会大包大揽去执行任务,存在很多问题,比如资源分...

    Sam Gor
  • 一起揭开 Kafka 的神秘面纱

    Kafka是由Servers(服务器)和Clients(客户端)组成的高吞吐量分布式发布订阅消息系统,可以部署在裸机、虚拟机以及云环境上。在我们接触的很多业务场...

    Sam Gor
  • 揭开神秘的面纱

    打开f12检查页面后,刷新一下页面,点击Network,再点击下面的XHR,查看动态数据,会发现如下图所示,有两行数据。

    公众号guangcity
  • 让我们一起揭开算法的神秘面纱

    其实我一直想写一篇揭秘算法的文章,因为,据我所见,大多数写算法的软件工程师,其实,就是普通程序员,并非高人一等。

    Kiba518
  • 揭开NFV神秘的面纱

    网络功能虚拟化(NFV)的正确实现,还需一段时间的研究与发展,因为NFV不仅要实现自动化、融合于现有的管理系统中,还需要更加的轻便。 ? NFV能够采用简单易用...

    SDNLAB
  • 揭开GANs的神秘面纱

    这篇文章中,作者通过将生成对抗网络(GANs,以下均简称GANs)运用到图片生成任务中向我们解释了其的原理。GANs是无监督学习中少有的成功的技术,一经提出,将...

    昱良
  • 揭开 LVS 神秘的面纱

    耕耘实录
  • 揭开HTTPS的神秘面纱

    在说HTTP前,一定要先介绍一下HTTP,这家伙应该不用过多说明了,大家每天都在用,每一次HTTP请求,都是一次TCP连接。遗憾的是,请求的内容在TCP报文中是...

    烟草的香味
  • 揭开 Kubernetes 的神秘面纱

    “容器”已成为最新的流行语之一。但是,这个词到底意味着什么呢?说起“容器”,人们通常会把它和 Docker 联系起来,Docker 是一个被定义为软件的标准化单...

    用户8639654
  • 揭开 Monad 的神秘面纱

    我们知道 Swift 语言支持函数式编程范式,所以函数式编程的一些概念近来比较火。有一些相对于OOP来说不太一样的概念,比如 Applicative, Func...

    JoeyBlue
  • 揭开MySQL“锁”的神秘面纱

    在实验环境MySQL5.6、存储引擎:InnoDB中,揭开“锁”的神秘面纱,捋一捋我对这几个概念的想法

    孙玄@奈学教育
  • 一篇文章揭开Kotlin协程的神秘面纱

    Kotlin协程提供了一种新的异步执行方式,但直接查看库函数可能会有点混乱,本文中尝试揭开协程的神秘面纱。

    砸漏
  • 揭开神经网络的神秘面纱

    未经训练的神经网络模型很像新生儿: 他们被创造出来的时候对世界一无所知(如果考虑到认识论理论的话),而且只有通过接触这个世界,也就是后天的知识,才会慢慢提高它们...

    商业新知
  • 一文揭开AI芯片的神秘面纱

    今天一朋友咨询我AI芯片怎么样?我是搞软件的,历来计算机系专业学生里搞软件的不懂硬件,但是要聊到AI芯片,它真的很简单,哈哈。

    用户1594945
  • 带你揭开WebSocket的神秘面纱!

    在揭开webSocket的神秘面纱之前,有言在先,我在写文章之前对webSocket一无所知,由于公司业务用到,故此研究记录一下,班门弄斧之处,请大佬批评指正...

    用户7413032
  • 前端工程师,揭开HTTP的神秘面纱

    TCP/IP协议将应用层、表示层、会话层合并为应用层,物理层和数据链路层合并为网络接口层

    FinGet
  • 揭开微信小程序 Kbone 的神秘面纱

    微信小程序诞生至今,渗透到用户生活的方方面面,包括餐厅点餐,网上购物,乘车出行,挂号就医……

    binnie

扫码关注云+社区

领取腾讯云代金券