前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >PySpark简介

PySpark简介

作者头像
双愚
发布2018-08-30 10:31:50
6.8K0
发布2018-08-30 10:31:50
举报

什么是PySpark?

Apache Spark是一个大数据处理引擎,与MapReduce相比具有多个优势。通过删除Hadoop中的大部分样板代码,Spark提供了更大的简单性。此外,由于Spark处理内存中的大多数操作,因此它通常比MapReduce更快,在每次操作之后将数据写入磁盘。

PySpark是Spark的Python API。本指南介绍如何在单个Linode上安装PySpark。PySpark API将通过对文本文件的分析来介绍,通过计算得到每个总统就职演说中使用频率最高的五个词。

安装必备软件

安装过程需要安装Scala,它需要Java JDK 8作为依赖项。Miniconda将用于处理PySpark安装以及通过NLTK下载数据。

Miniconda

  1. 下载并安装Miniconda:

curl -OL https://repo.continuum.io/miniconda/Miniconda3-latest-Linux-x86\_64.sh bash Miniconda3-latest-Linux-x86\_64.sh

  1. 在安装过程中,系统会多次提示您。查看条款和条件,并为每个提示选择“是”。
  2. 重新启动shell会话以使PATH的更改生效。
  3. 检查你的Python版本: python --version

Java JDK 8

本节中的步骤将在Ubuntu 16.04上安装Java 8 JDK。对于其他发行版,请参阅官方文档

  1. 安装software-properties-common以轻松添加新存储库: sudo apt-get install software-properties-common
  2. 添加Java PPA: sudo add-apt-repository ppa:webupd8team/java
  3. 更新源列表: sudo apt-get update
  4. 安装Java JDK 8: sudo apt-get install oracle-java8-installer

Scala

当与Spark一起使用时,Scala会对Spark不支持Python的几个API调用。尽管Scala提供了比Python更好的性能,但Python更容易编写并且具有更多的库。根据用例,Scala可能优于PySpark。

  1. 下载Debian软件包并安装。
wget https://downloads.lightbend.com/scala/2.12.4/scala-2.12.4.deb 
sudo dpkg -i scala-2.12.4.deb

安装PySpark

1. 使用Miniconda,创建一个新的虚拟环境:

wget https://downloads.lightbend.com/scala/2.12.4/scala-2.12.4.deb  sudo dpkg -i scala-2.12.4.deb

2. 安装PySpark和Natural Language Toolkit(NLTK)conda install -c conda-forge pyspark nltk

3. 启动PySpark。会有一些警告,因为没有为群集设置配置。 pyspark

Python 3.6.3 |Anaconda, Inc.| (default, Nov 20 2017, 20:41:42)

GCC 7.2.0 on linux

Type "help", "copyright", "credits" or "license" for more information.

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties

Setting default log level to "WARN".

...

Welcome to

\_\_\_\_ \_\_

/ \_\_/\_\_ \_\_\_ \_\_\_\_\_/ /\_\_

\_\ \/ \_ \/ \_ `/ \_\_/ '\_/

/\_\_ / .\_\_/\_,\_/\_/ /\_/\_\ version 2.2.1

/\_/

Using Python version 3.6.3 (default, Nov 20 2017 20:41:42)

SparkSession available as 'spark'.

> >>

下载样本数据

本指南中使用的数据是1789年至2009年每个总统就职地址的文本文件汇编。该数据集可从NLTK获得。Miniconda和NLTK软件包具有内置功能,可简化从命令行下载的过程。

  1. 导入NLTK并下载文本文件。除语料库外,还要下载停用词列表。 import nltk nltk.download('inaugural') nltk.download('stopwords')
  2. 导入文件对象并显示从NLTK包下载的可用文本文件列表。 from nltk.corpus import inaugural, stopwords inaugural.fileids()

这应该返回从George Washington到Barack Obama的就职演说的文本文件列表。

注意

该文件位于/home/linode/nltk_data/corpora/inaugural/其中linode是用户名。

虽然可以完全用Python完成本指南的大部分目标,但目的是演示PySpark API,它也可以处理分布在集群中的数据。

PySpark API

Spark利用弹性分布式数据集(RDD)的概念。RDD的特点是:

  • 不可变性 - 对数据的更改会返回一个新的RDD,而不是修改现有的RDD
  • 分布式 - 数据可以存在于集群中并且可以并行运行
  • 已分区 - 更多分区允许在群集之间分配工作,但是太多分区会在调度中产生不必要的开销

本指南的这一部分将重点介绍如何将数据作为RDD加载到PySpark中。然后,一些PySpark API通过计数等简单操作进行演示。最后,将使用更复杂的方法,如过滤和聚合等函数来计算就职地址中最常用的单词。

将数据读入PySpark

由于PySpark是从shell运行的,因此SparkContext已经绑定到变量sc。对于在shell外部运行的独立程序,需要导入SparkContext。SparkContext对象表示Spark功能的入口点。

1. 从NLTK的文本文件集中读取,注意指定文本文件的绝对路径。假设通过上述方法下载了语料库,请替换linode为您的Unix用户名:


text_files = sc.textFile("file:///home/linode/nltk_data/corpora/inaugural/*.txt")

2. Spark中有两种类型的操作:转换操作。转换是延迟加载的操作,返回RDD。但是,这意味着在操作需要返回结果之前,Spark实际上不会计算转换。动作的一个示例是count()方法,它计算所有文件中的总行数:

>>> text_files.count()

2873

清理和标记数据

1. 要计算单词,必须对句子进行标记。在此之前,删除所有标点符号并将所有单词转换为小写以简化计数:

import string 
removed_punct = text_files.map(lambda sent: sent.translate({ord(c): None for c in string.punctuation}).lower())

由于map是转换,因此在执行操作之前不会应用该函数。

注意如果步骤不清楚,请尝试.collect()查看中间输出。

2. 对句子进行标记:

tokenize = removed_punct.flatMap(lambda sent: sent.split(" "))

注意:

与Python的map函数类似,PySpark map返回一个具有相同数量元素的RDD(在本例中为2873)。flatMap允许将RDD转换为在对单词进行标记时所需的另一个大小。

过滤和聚合数据

1. 通过方法链接,可以使用多个转换,而不是在每个步骤中创建对RDD的新引用。reduceByKey是通过聚合每个单词值对来计算每个单词的转换。

result = tokenize.map(lambda word: (word, 1))\ .reduceByKey(lambda a, b: a + b)

2. 应删除停用词(例如“a”,“an”,“the”等),因为这些词在英语中经常使用,但在此上下文中没有提供任何价值。在过滤时,通过删除空字符串来清理数据。然后通过takeOrdered返回的前五个最频繁的单词对结果进行排序。

words = stopwords.words('english')  
result.filter(lambda word: word0 not in words and word0 != '')\ .takeOrdered(5, key = lambda x: -x1)
('government', 557), ('people', 553), ('us', 455), ('upon', 369), ('must', 346)

在前五个词中,“政府”是最常用的词,计数为557,其中“人”收尾553.转换和行动可以简明扼要地概括。请记住linode用您的Unix用户名替换。

3. 这些操作可归纳为:

import string
from nltk.corpus import stopwords

words = stopwords.words('english')

sc.textFile("file:///home/linode/nltk_data/corpora/inaugural/*.txt")\
  .map(lambda sent: sent.translate({ord(c): None for c in string.punctuation}).lower())\
  .flatMap(lambda sent: sent.split(" "))\
  .map(lambda word: (word, 1))\
  .reduceByKey(lambda a, b: a + b)\
  .filter(lambda word: word[0] not in words and word[0] != '')\
  .takeOrdered(5, key = lambda x: -x[1])

PySpark还有许多其他功能,包括DataFrames,SQL,流媒体,甚至是机器学习模块。有关完整列表,请参阅PySpark文档

更多信息

有关此主题的其他信息,您可能需要参考以下资源。虽然提供这些是希望它们有用,但请注意,我们无法保证外部材料的准确性或及时性。

想要了解更多关于PySpark等教程,请前往腾讯云+社区学习更多知识。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 什么是PySpark?
  • 安装必备软件
    • Miniconda
      • Java JDK 8
        • Scala
        • 安装PySpark
        • 下载样本数据
        • PySpark API
          • 将数据读入PySpark
            • 清理和标记数据
              • 过滤和聚合数据
              • 更多信息
              相关产品与服务
              大数据
              全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档