Spark入门_1_RddTransAction

ipython

Spark UI: ipaddress:4040

spark core concept

driver program: 每个spark的应用都包括一个driver program。driver program包含应用的主要函数并且定义了集群中的分布数据集,然后对数据集进行一定的操作。spark-shell,pyspark就是一个driver program。

SparkContext:driver program通过SparkContext连接Spark(连接到计算机群)。shell中spark context自动创建。

RDD:用来指代分布的数据集,可以在上面进行tansform、action等操作。一旦SparkContext创建好(也就是driver program和Spark连接好了),就可以创建RDD。比如sc.textFile('1.txt')

pythonLines = lines.filter(lambda line: ‘Python’ in line) write code in a single driver program and automatically have parts of it run on multiple nodes.

布置standalone program应用的时候,和shell方式唯一的不同就是需要初始化SparkContext,去和Spaek建立联系。 spark-submit在python脚本中添加了Spark 的依赖,建立了Spark’s Python API环境。

spark-submit my_script.py

from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local").setAppName("My App")
sc = SparkContext(conf=conf)
f = sc.textFile("/usr/local/spark/README.md")
print f.count()

#shutdown spark
sc.stop()
sys.exit()

RDD

In Spark all work is expressed as either creating new RDDs, transforming existing RDDs, or calling operations on RDDs to compute a result.

RDD is immutable distributed collection of objects. Each RDD is split into multiple partitions, which may be computed on different nodes of the cluster.

2 ways to create RDDS: 1. load an external dataset. 2. distribute a collection of objects(e.g. a list or set)

lazy fashion的好处,比如first这个action,如果不采用lazy fashion的话可能需要遍历整个文件。

If you would like to reuse an RDD in multiple actions, you can ask Spark to persist it using RDD.persist(). In practice, you will often use persist() to load a subset of your data into memory and query it repeatedly.

pythonLines.persist
pythonLines.count()
pythonLines.first()

步骤(spark program and shell session):

  1. create some input RDDs from external data or a collection of objects.
  2. transform them to define new RDDs using tansformations like filter().
  3. ask spark to persist() or cache() any intermediate RDDs that will need to be reused.
  4. launch actions such as count() and first() to kick off a parallel computation, which is then optimized and executed by spark.

creating rdds

#from a collection of objects
#outside of prototyping and testing, this is not widely used since
it requires that you have your entire dataset in memory on one machine.
lines = sc.parallelize(["pandas", "i like pandas"])

#load data from external storage
lines = sc.textFile("/path/to/README.md")

rdd operations

transformation

lazy evaluation

inputRDD = sc.textFile("log.txt")
errorsRDD = inputRDD.filter(lambda x: "error" in x)

#rdd is immutable, so filter command creates a new rdd
errorsRDD = inputRDD.filter(lambda x: "error" in x)
warningsRDD = inputRDD.filter(lambda x: "warning" in x)
badLinesRDD = errorsRDD.union(warningsRDD)

badLinesRDD = inputRDD.filter(lambda x: "warning" in x or "error" in x)

action

return a final value to the driver program or write data to an external storage system.

print "Input had " + badLinesRDD.count() + " concerning lines"
print "Here are 10 examples:"
for line in badLinesRDD.take(10):
    print line

rdd.saveAsTextFile() 
rdd.saveAsTextFile() 

Keep in mind that your entire dataset must fit in memory on a single machine to use collect() on it, so collect() shouldn’t be used on large datasets.

It is important to note that each time we call a new action, the entire RDD must be computed “from scratch.” To avoid this inefficiency, users can persist intermediate results.

lazy evaluation

Rather than thinking of an RDD as containing specific data, it is best to think of each RDD as consisting of instructions on how to compute the data that we build up through transformations.

Loading data into an RDD is lazily evaluated in the same way trans‐ formations are. So, when we call sc.textFile(), the data is not loaded until it is nec‐essary. As with transformations, the operation (in this case, reading the data) can occur multiple times.

In systems like Hadoop MapReduce, developers often have to spend a lot of time considering how to group together operations to minimize the number of MapReduce passes. In Spark, there is no substantial benefit to writing a single complex map instead of chaining together many simple opera‐ tions. Thus, users are free to organize their program into smaller, more manageable operations.

passing function

word = rdd.filter(lambda s: "error" in s)

def containsError(s):
    return "error" in s
word = rdd.filter(containsError)

one thing to watch out

common transformation and action

transformation

map()
filter()

nums = sc.parallelize([1, 2, 3, 4])
squared = nums.map(lambda x: x * x).collect()
for num in squared:
    print "%i " % (num)

lines = sc.parallelize(["hello world", "hi"])
words = lines.flatMap(lambda line: line.split(" "))
words.first() # returns "hello"

distinct() intersection() is expensive.

action

sum = rdd.reduce(lambda x, y: x + y)

sumCount = nums.aggregate((0, 0),
(lambda acc, value: (acc[0] + value, acc[1] + 1),
(lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1]))))
return sumCount[0] / float(sumCount[1])

foreach() action lets us perform computations on each element in the RDD without bringing it back locally.

caching

As discussed earlier, Spark RDDs are lazily evaluated, and sometimes we may wish to use the same RDD multiple times. If we do this naively, Spark will recompute the RDD and all of its dependencies each time we call an action on the RDD. This can be especially expensive for iterative algorithms, which look at the data many times.

val result = input.map(x => x*x)
println(result.count())
println(result.collect().mkString(","))

val result = input.map(x => x * x)
result.persist(StorageLevel.DISK_ONLY)
println(result.count())
println(result.collect().mkString(","))

#remove from cache
rdd.uppersist()

key-value pair

creating paired rdds

pairs = lines.map(lambda x: (x.split(" ")[0], x))

SparkContext.parallelize([(1,'m'),(1,'f'),(2,'m')]) 

transformation

general

aggregation

turning the level of parallelism

Every RDD has a fixed number of paritions that determine the degree of parallelism to use when executing operations on the RDD. When performing aggregations or grouping operations, we can ask Spark to use a specific number of partitions.

#somes, we want to change the partitioning of an RDD outside the context of grouping and aggregation operations.
rdd.repartition()  #shufle data across the network, which is very expensive.
rdd.coalesce() #avoid data movement, but only if you are decreasing the number of partition.
rdd.partitions.size()  
rdd.getNumPartitions()   #show partition size

grouping

joins

sorting data

action

general

partitioning

cause

类比单节点的程序需要有好的数据结构去存储数据方便查询提高效率。 对于分布式系统,communication非常昂贵,因此放置数据去最小化网络的沟通消耗会极大地提高系统的效率。 这就是partition发挥威力的地方,尤其在datasets is reused multiple times的情况下。

example

partition越大越好

有一些操作,比如sortByKey, groupByKey本身就会产生partition的RDD。

determining partition

operations which benefits from partition

operations which affects partition

pagerank

custom

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏大数据学习笔记

Spark2.x学习笔记:11、RDD依赖关系与stage划分

11、 RDD依赖关系与stage划分 Spark中RDD的高效与DAG图有着莫大的关系,在DAG调度中需要对计算过程划分stage,而划分依据就是RDD之间的...

2455
来自专栏Albert陈凯

4.0Spark编程模型RDD

Spark核心技术与高级应用 第4章 编程模型 不自见,故明;不自是,故彰;不自伐,故有功;不自矜,故能长。 ——《道德经》第二十二章 在面对自我的问题上,不...

2829
来自专栏Albert陈凯

3.5RDD的容错机制

3.5 RDD的容错机制 RDD实现了基于Lineage的容错机制。RDD的转换关系,构成了compute chain,可以把这个compute chain认...

2808
来自专栏一名叫大蕉的程序员

Spark你一定学得会(二)No.8

第一次分享的妥妥就是入门的干货,小伙伴们最好可以自己敲一敲代码,不然只看我的分享一点用都木有。但还是有很多小伙伴表示看不懂,没关系,慢慢来自己操作一遍有什么问题...

18510
来自专栏遊俠扎彪

Windiows桌面文件不能拖动以改变位置

2008-10-01 据说出现这种情况的解决方法是按几下ESC键就行了,我试了一下还挺灵。 百度知道解决方案位置:http://zhidao.baidu.com...

1999
来自专栏Albert陈凯

Spark概要掌握情况自我核查

1、Spark目前只持哪哪种语言的API? Java, Scala, Python, R. Ref: http://spark.apache.org/ 2、R...

2473
来自专栏个人分享

sparkStreaming与Kafka整合

createStream那几个参数折腾了我好久。。网上都是一带而过,最终才搞懂..关于sparkStreaming的还是太少,最终尝试成功。。。

691
来自专栏知识分享

stm32_DMA采集一个AD数据_并通过DMA向串口发送

这是以前学32的时候写的,那时候学了32之后感觉32真是太强大了,比51强的没影。关于dma网上有许多的资料,关于dma采集ad网上也有很多。亲们搜搜,这里只贴...

3418
来自专栏祝威廉

StreamingPro

Spark Streaming is an extension of the core Spark API that enables stream proces...

1245
来自专栏xingoo, 一个梦想做发明家的程序员

Hadoop HDFS 用户指南

This document is a starting point for users working with Hadoop Distributed Fil...

18710

扫码关注云+社区