前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark 编程指南 (一) [Spa

Spark 编程指南 (一) [Spa

作者头像
py3study
发布2020-01-03 10:20:08
2.1K0
发布2020-01-03 10:20:08
举报
文章被收录于专栏:python3python3python3

Python Programming Guide - Spark(Python)

Spark应用基本概念

每一个运行在cluster上的spark应用程序,是由一个运行main函数的driver program和运行多种并行操作的executes组成

其中spark的核心是弹性分布式数据集(Resilient Distributed Dataset—RDD)

  • Resilient(弹性):易变化、易计算
  • Distributed(分布式):可横跨多台机器,集群分布
  • Dataset(数据集):大批量数据的集合

<!-- more -->

RDD基本概念

RDD是逻辑集中的实体,代表一个分区的只读数据集,不可发生改变

【RDD的重要内部属性】

  • 分区列表(partitions) 对于一个RDD而言,分区的多少涉及对这个RDD并行计算的粒度,每一个RDD分区的计算都会在一个单独的任务中执行,每一个分区对应一个Task,分区后的数据存放在内存当中
  • 计算每个分区的函数(compute) 对于Spark中每个RDD都是以分区进行计算的,并且每个分区的compute函数是在对迭代器进行复合操作,不需要每次计算,直到提交动作触发才会将之前所有的迭代操作进行计算,lineage在容错中有重要作用
  • 对父级RDD的依赖(dependencies) 由于RDD存在转换关系,所以新生成的RDD对上一个RDD有依赖关系,RDD之间通过lineage产生依赖关系

【窄依赖】 每一个父RDD的分区最多只被子RDD的一个分区所使用,可以类似于流水线一样,计算所有父RDD的分区;在节点计算失败的恢复上也更有效,可以直接计算其父RDD的分区,还可以进行并行计算 子RDD的每个分区依赖于常数个父分区(即与数据规模无关) 输入输出一对一的算子,且结果RDD的分区结构不变,主要是map、flatmap 输入输出一对一,但结果RDD的分区结构发生了变化,如union、coalesce 从输入中选择部分元素的算子,如filter、distinct、subtract、sample 【宽依赖】 多个子RDD的分区会依赖于同一个父RDD的分区,需要取得其父RDD的所有分区数据进行计算,而一个节点的计算失败,将会导致其父RDD上多个分区重新计算 子RDD的每个分区依赖于所有父RDD分区 对单个RDD基于key进行重组和reduce,如groupByKey、reduceByKey 对两个RDD基于key进行jion和重组,如jion

  • 对key-value数据类型RDD的分区器,控制分区策略和分区数(partitioner) partitioner就是RDD的分区函数,即HashPartitioner(哈希分区)和RangePartitioner(区域分区),分区函数决定了每个RDD的分区策略和分区数,并且这个函数只在(k-v)类型的RDD中存在,在非(k-v)结构的RDD中是None
  • 每个数据分区的地址列表(preferredLocations) 与Spark中的调度相关,返回的是此RDD的每个partition所出储存的位置,按照“移动数据不如移动计算”的理念,在spark进行任务调度的时候,尽可能将任务分配到数据块所存储的位置
  • 控制操作(control operation) spark中对RDD的持久化操作是很重要的,可以将RDD存放在不同的存储介质中,方便后续的操作可以重复使用。

主要有cache、persist、checkpoint,checkpoint接口是将RDD持久化到HDFS中,与persist的区别是checkpoint会切断此RDD之前的依赖关系,而persist会保留依赖关系。checkpoint的两大作用:一是spark程序长期驻留,过长的依赖会占用很多的系统资源,定期checkpoint可以有效的节省资源;二是维护过长的依赖关系可能会出现问题,一旦spark程序运行失败,RDD的容错成本会很高

Python连接Spark

Spark 1.6.0 支持 Python 2.6+ 或者 Python 3.4+,它使用标准的CPython解释器, 所以像NumPy这样的C语言类库也可以使用,同样也支持PyPy 2.3+

可以用spark目录里的bin/spark-submit脚本在python中运行spark应用程序,这个脚本可以加载Java/Scala类库,让你提交应用程序到集群当中。你也可以使用bin/pyspark脚本去启动python交互界面

如果你希望访问HDFS上的数据集,你需要建立对应HDFS版本的PySpark连接。

最后,你的程序需要import一些spark类库:

from pyspark import SparkContext, SparkConf

PySpark 要求driver和workers需要相同的python版本,它通常引用环境变量PATH默认的python版本;你也可以自己指定PYSPARK_PYTHON所用的python版本,例如:

PYSPARK_PYTHON=python3.4 bin/pyspark
PYSPARK_PYTHON=/opt/pypy-2.5/bin/pypy bin/spark-submit examples/src/main/python/pi.py

初始化Spark

一个Spark应用程序的第一件事就是去创建SparkContext对象,它的作用是告诉Spark如何建立一个集群。创建SparkContext之前,先要创建SparkConf对象,SparkConf包含了应用程序的相关信息。

conf = SparkConf().setAppName(appName).setMaster(master)
sc = SparkContext(conf=conf)
  • appName:应用的名称,用户显示在集群UI上
  • master:Spark、Mesos或者YARN集群的URL,如果是本地运行,则应该是特殊的'local'字符串

在实际运行时,你不会讲master参数写死在程序代码里,而是通过spark-submit来获取这个参数;在本地测试和单元测试中,你仍然需要'local'去运行Spark应用程序

使用Shell

在PySpark Shell中,一个特殊SparkContext已经帮你创建好了,变量名是:sc,然而在Shell中创建你自己的SparkContext是不起作用的。

你可以通过--master参数设置master所连接的上下文主机;你也可以通过--py-files参数传递一个用逗号作为分割的列表,将Python中的.zip、.egg、.py等文件添加到运行路径当中;你同样可以通过--packages参数,传递一个用逗号分割的maven列表,来个这个Shell会话添加依赖(例如Spark的包)

任何额外的包含依赖的仓库(如SonaType),都可以通过--repositories参数添加进来。 Spark中所有的Python依赖(requirements.txt的依赖包列表),在必要时都必须通过pip手动安装

例如用4个核来运行bin/pyspark:

./bin/pyspark --master local[4]

或者,将code.py添加到搜索路径中(为了后面可以import):

./bin/pyspark --master local[4] --py-files code.py

通过运行pyspark --help来查看完整的操作帮助信息,在这种情况下,pyspark会调用一个通用的spark-submit脚本

在IPython这样增强Python解释器中,也可以运行PySpark Shell;支持IPython 1.0.0+;在利用IPython运行bin/pyspark时,必须将PYSPARK_DRIVER_PYTHON变量设置成ipython:

PYSPARK_DRIVER_PYTHON=ipython ./bin/pyspark

你可以通过PYSPARK_DRIVER_PYTHON_OPTS参数来自己定制ipython命令,比如在IPython Notebook中开启PyLab图形支持:

PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS="notebook" ./bin/pyspark

参考:Spark Programming Guide 官方文档

原博链接,请注明出处。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019-09-28 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Spark应用基本概念
    • RDD基本概念
    • Python连接Spark
    • 初始化Spark
    • 使用Shell
    相关产品与服务
    GPU 云服务器
    GPU 云服务器(Cloud GPU Service,GPU)是提供 GPU 算力的弹性计算服务,具有超强的并行计算能力,作为 IaaS 层的尖兵利器,服务于深度学习训练、科学计算、图形图像处理、视频编解码等场景。腾讯云随时提供触手可得的算力,有效缓解您的计算压力,提升业务效率与竞争力。
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档