Spark入门_1_RddTransAction

ipython

Spark UI: ipaddress:4040

spark core concept

driver program: 每个spark的应用都包括一个driver program。driver program包含应用的主要函数并且定义了集群中的分布数据集,然后对数据集进行一定的操作。spark-shell,pyspark就是一个driver program。

SparkContext:driver program通过SparkContext连接Spark(连接到计算机群)。shell中spark context自动创建。

RDD:用来指代分布的数据集,可以在上面进行tansform、action等操作。一旦SparkContext创建好(也就是driver program和Spark连接好了),就可以创建RDD。比如sc.textFile('1.txt')

pythonLines = lines.filter(lambda line: ‘Python’ in line) write code in a single driver program and automatically have parts of it run on multiple nodes.

布置standalone program应用的时候,和shell方式唯一的不同就是需要初始化SparkContext,去和Spaek建立联系。 spark-submit在python脚本中添加了Spark 的依赖,建立了Spark’s Python API环境。

spark-submit my_script.py

from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local").setAppName("My App")
sc = SparkContext(conf=conf)
f = sc.textFile("/usr/local/spark/README.md")
print f.count()

#shutdown spark
sc.stop()
sys.exit()

RDD

In Spark all work is expressed as either creating new RDDs, transforming existing RDDs, or calling operations on RDDs to compute a result.

RDD is immutable distributed collection of objects. Each RDD is split into multiple partitions, which may be computed on different nodes of the cluster.

2 ways to create RDDS: 1. load an external dataset. 2. distribute a collection of objects(e.g. a list or set)

lazy fashion的好处,比如first这个action,如果不采用lazy fashion的话可能需要遍历整个文件。

If you would like to reuse an RDD in multiple actions, you can ask Spark to persist it using RDD.persist(). In practice, you will often use persist() to load a subset of your data into memory and query it repeatedly.

pythonLines.persist
pythonLines.count()
pythonLines.first()

步骤(spark program and shell session):

  1. create some input RDDs from external data or a collection of objects.
  2. transform them to define new RDDs using tansformations like filter().
  3. ask spark to persist() or cache() any intermediate RDDs that will need to be reused.
  4. launch actions such as count() and first() to kick off a parallel computation, which is then optimized and executed by spark.

creating rdds

#from a collection of objects
#outside of prototyping and testing, this is not widely used since
it requires that you have your entire dataset in memory on one machine.
lines = sc.parallelize(["pandas", "i like pandas"])

#load data from external storage
lines = sc.textFile("/path/to/README.md")

rdd operations

transformation

lazy evaluation

inputRDD = sc.textFile("log.txt")
errorsRDD = inputRDD.filter(lambda x: "error" in x)

#rdd is immutable, so filter command creates a new rdd
errorsRDD = inputRDD.filter(lambda x: "error" in x)
warningsRDD = inputRDD.filter(lambda x: "warning" in x)
badLinesRDD = errorsRDD.union(warningsRDD)

badLinesRDD = inputRDD.filter(lambda x: "warning" in x or "error" in x)

action

return a final value to the driver program or write data to an external storage system.

print "Input had " + badLinesRDD.count() + " concerning lines"
print "Here are 10 examples:"
for line in badLinesRDD.take(10):
    print line

rdd.saveAsTextFile() 
rdd.saveAsTextFile() 

Keep in mind that your entire dataset must fit in memory on a single machine to use collect() on it, so collect() shouldn’t be used on large datasets.

It is important to note that each time we call a new action, the entire RDD must be computed “from scratch.” To avoid this inefficiency, users can persist intermediate results.

lazy evaluation

Rather than thinking of an RDD as containing specific data, it is best to think of each RDD as consisting of instructions on how to compute the data that we build up through transformations.

Loading data into an RDD is lazily evaluated in the same way trans‐ formations are. So, when we call sc.textFile(), the data is not loaded until it is nec‐essary. As with transformations, the operation (in this case, reading the data) can occur multiple times.

In systems like Hadoop MapReduce, developers often have to spend a lot of time considering how to group together operations to minimize the number of MapReduce passes. In Spark, there is no substantial benefit to writing a single complex map instead of chaining together many simple opera‐ tions. Thus, users are free to organize their program into smaller, more manageable operations.

passing function

word = rdd.filter(lambda s: "error" in s)

def containsError(s):
    return "error" in s
word = rdd.filter(containsError)

one thing to watch out

common transformation and action

transformation

map()
filter()

nums = sc.parallelize([1, 2, 3, 4])
squared = nums.map(lambda x: x * x).collect()
for num in squared:
    print "%i " % (num)

lines = sc.parallelize(["hello world", "hi"])
words = lines.flatMap(lambda line: line.split(" "))
words.first() # returns "hello"

distinct() intersection() is expensive.

action

sum = rdd.reduce(lambda x, y: x + y)

sumCount = nums.aggregate((0, 0),
(lambda acc, value: (acc[0] + value, acc[1] + 1),
(lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1]))))
return sumCount[0] / float(sumCount[1])

foreach() action lets us perform computations on each element in the RDD without bringing it back locally.

caching

As discussed earlier, Spark RDDs are lazily evaluated, and sometimes we may wish to use the same RDD multiple times. If we do this naively, Spark will recompute the RDD and all of its dependencies each time we call an action on the RDD. This can be especially expensive for iterative algorithms, which look at the data many times.

val result = input.map(x => x*x)
println(result.count())
println(result.collect().mkString(","))

val result = input.map(x => x * x)
result.persist(StorageLevel.DISK_ONLY)
println(result.count())
println(result.collect().mkString(","))

#remove from cache
rdd.uppersist()

key-value pair

creating paired rdds

pairs = lines.map(lambda x: (x.split(" ")[0], x))

SparkContext.parallelize([(1,'m'),(1,'f'),(2,'m')]) 

transformation

general

aggregation

turning the level of parallelism

Every RDD has a fixed number of paritions that determine the degree of parallelism to use when executing operations on the RDD. When performing aggregations or grouping operations, we can ask Spark to use a specific number of partitions.

#somes, we want to change the partitioning of an RDD outside the context of grouping and aggregation operations.
rdd.repartition()  #shufle data across the network, which is very expensive.
rdd.coalesce() #avoid data movement, but only if you are decreasing the number of partition.
rdd.partitions.size()  
rdd.getNumPartitions()   #show partition size

grouping

joins

sorting data

action

general

partitioning

cause

类比单节点的程序需要有好的数据结构去存储数据方便查询提高效率。 对于分布式系统,communication非常昂贵,因此放置数据去最小化网络的沟通消耗会极大地提高系统的效率。 这就是partition发挥威力的地方,尤其在datasets is reused multiple times的情况下。

example

partition越大越好

有一些操作,比如sortByKey, groupByKey本身就会产生partition的RDD。

determining partition

operations which benefits from partition

operations which affects partition

pagerank

custom

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏数据科学与人工智能

【数据科学家】SparkR:数据科学家的新利器

摘要:R是非常流行的数据统计分析和制图的语言及环境,有调查显示,R语言在数据科学家中使用的程度仅次于SQL,但大数据时代的海量数据处理对R构成了挑战。 摘要:R...

2119
来自专栏xingoo, 一个梦想做发明家的程序员

Spark源码分析 之 Driver和Excutor是怎么跑起来的?(2.2.0版本)

今天抽空回顾了一下Spark相关的源码,本来想要了解一下Block的管理机制,但是看着看着就回到了SparkContext的创建与使用。正好之前没有正式的整理...

1939
来自专栏比原链

Derek解读Bytom源码-持久化存储LevelDB

Gitee地址:https://gitee.com/BytomBlockchain/bytom

853
来自专栏xingoo, 一个梦想做发明家的程序员

Spark源码分析 之 Driver和Excutor是怎么跑起来的?(2.2.0版本)

今天抽空回顾了一下Spark相关的源码,本来想要了解一下Block的管理机制,但是看着看着就回到了SparkContext的创建与使用。正好之前没有正式的整理...

1937
来自专栏cloudskyme

众推架构的进一步讨论

讨论内容 昨天的架构基本确定成如下图所示: ? 针对此架构,大家分别提了不同的看法: 【大侠】秦刘 9:53:58  工作节点的爬虫 应该就是普通的一个cmd...

3206
来自专栏jessetalks

ASP VNext 开源服务容错处理库Polly使用文档

在进入SOA之后,我们的代码从本地方法调用变成了跨机器的通信。任何一个新技术的引入都会为我们解决特定的问题,都会带来一些新的问题。比如网络故障、依赖服务崩溃、超...

3426
来自专栏行者悟空

Spark DAG调度

933
来自专栏肖力涛的专栏

Spark踩坑记:初试

Apache Spark是一个围绕速度、易用性和复杂分析构建的大数据处理框架。本系列文将通过初试、Hbase+Mysql 、Spark Streaming+ka...

1.1K2
来自专栏IT笔记

三分钟深入TT猫之故障转移

结束了一周繁忙的工作,趁着周末,小编手中的键盘早已饥渴难耐了,想知道上期省略号中发生了什么有趣的故事么?且听小编娓娓道来,结尾有彩蛋。 ? 风月前场 春风再续,...

3388
来自专栏编程札记

Spark的分区机制的应用及PageRank算法的实现

Spark中有一个很重要的特性是对数据集在节点间的分区进行控制,因为在分布式系统中,通信的代价是很大的,因此控制数据分布以获得最少的网络传输可以极大地提升整体性...

311

扫码关注云+社区