Spark入门_1_RddTransAction

ipython

Spark UI: ipaddress:4040

spark core concept

driver program: 每个spark的应用都包括一个driver program。driver program包含应用的主要函数并且定义了集群中的分布数据集,然后对数据集进行一定的操作。spark-shell,pyspark就是一个driver program。

SparkContext:driver program通过SparkContext连接Spark(连接到计算机群)。shell中spark context自动创建。

RDD:用来指代分布的数据集,可以在上面进行tansform、action等操作。一旦SparkContext创建好(也就是driver program和Spark连接好了),就可以创建RDD。比如sc.textFile('1.txt')

pythonLines = lines.filter(lambda line: ‘Python’ in line) write code in a single driver program and automatically have parts of it run on multiple nodes.

布置standalone program应用的时候,和shell方式唯一的不同就是需要初始化SparkContext,去和Spaek建立联系。 spark-submit在python脚本中添加了Spark 的依赖,建立了Spark’s Python API环境。

spark-submit my_script.py

from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local").setAppName("My App")
sc = SparkContext(conf=conf)
f = sc.textFile("/usr/local/spark/README.md")
print f.count()

#shutdown spark
sc.stop()
sys.exit()

RDD

In Spark all work is expressed as either creating new RDDs, transforming existing RDDs, or calling operations on RDDs to compute a result.

RDD is immutable distributed collection of objects. Each RDD is split into multiple partitions, which may be computed on different nodes of the cluster.

2 ways to create RDDS: 1. load an external dataset. 2. distribute a collection of objects(e.g. a list or set)

lazy fashion的好处,比如first这个action,如果不采用lazy fashion的话可能需要遍历整个文件。

If you would like to reuse an RDD in multiple actions, you can ask Spark to persist it using RDD.persist(). In practice, you will often use persist() to load a subset of your data into memory and query it repeatedly.

pythonLines.persist
pythonLines.count()
pythonLines.first()

步骤(spark program and shell session):

  1. create some input RDDs from external data or a collection of objects.
  2. transform them to define new RDDs using tansformations like filter().
  3. ask spark to persist() or cache() any intermediate RDDs that will need to be reused.
  4. launch actions such as count() and first() to kick off a parallel computation, which is then optimized and executed by spark.

creating rdds

#from a collection of objects
#outside of prototyping and testing, this is not widely used since
it requires that you have your entire dataset in memory on one machine.
lines = sc.parallelize(["pandas", "i like pandas"])

#load data from external storage
lines = sc.textFile("/path/to/README.md")

rdd operations

transformation

lazy evaluation

inputRDD = sc.textFile("log.txt")
errorsRDD = inputRDD.filter(lambda x: "error" in x)

#rdd is immutable, so filter command creates a new rdd
errorsRDD = inputRDD.filter(lambda x: "error" in x)
warningsRDD = inputRDD.filter(lambda x: "warning" in x)
badLinesRDD = errorsRDD.union(warningsRDD)

badLinesRDD = inputRDD.filter(lambda x: "warning" in x or "error" in x)

action

return a final value to the driver program or write data to an external storage system.

print "Input had " + badLinesRDD.count() + " concerning lines"
print "Here are 10 examples:"
for line in badLinesRDD.take(10):
    print line

rdd.saveAsTextFile() 
rdd.saveAsTextFile() 

Keep in mind that your entire dataset must fit in memory on a single machine to use collect() on it, so collect() shouldn’t be used on large datasets.

It is important to note that each time we call a new action, the entire RDD must be computed “from scratch.” To avoid this inefficiency, users can persist intermediate results.

lazy evaluation

Rather than thinking of an RDD as containing specific data, it is best to think of each RDD as consisting of instructions on how to compute the data that we build up through transformations.

Loading data into an RDD is lazily evaluated in the same way trans‐ formations are. So, when we call sc.textFile(), the data is not loaded until it is nec‐essary. As with transformations, the operation (in this case, reading the data) can occur multiple times.

In systems like Hadoop MapReduce, developers often have to spend a lot of time considering how to group together operations to minimize the number of MapReduce passes. In Spark, there is no substantial benefit to writing a single complex map instead of chaining together many simple opera‐ tions. Thus, users are free to organize their program into smaller, more manageable operations.

passing function

word = rdd.filter(lambda s: "error" in s)

def containsError(s):
    return "error" in s
word = rdd.filter(containsError)

one thing to watch out

common transformation and action

transformation

map()
filter()

nums = sc.parallelize([1, 2, 3, 4])
squared = nums.map(lambda x: x * x).collect()
for num in squared:
    print "%i " % (num)

lines = sc.parallelize(["hello world", "hi"])
words = lines.flatMap(lambda line: line.split(" "))
words.first() # returns "hello"

distinct() intersection() is expensive.

action

sum = rdd.reduce(lambda x, y: x + y)

sumCount = nums.aggregate((0, 0),
(lambda acc, value: (acc[0] + value, acc[1] + 1),
(lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1]))))
return sumCount[0] / float(sumCount[1])

foreach() action lets us perform computations on each element in the RDD without bringing it back locally.

caching

As discussed earlier, Spark RDDs are lazily evaluated, and sometimes we may wish to use the same RDD multiple times. If we do this naively, Spark will recompute the RDD and all of its dependencies each time we call an action on the RDD. This can be especially expensive for iterative algorithms, which look at the data many times.

val result = input.map(x => x*x)
println(result.count())
println(result.collect().mkString(","))

val result = input.map(x => x * x)
result.persist(StorageLevel.DISK_ONLY)
println(result.count())
println(result.collect().mkString(","))

#remove from cache
rdd.uppersist()

key-value pair

creating paired rdds

pairs = lines.map(lambda x: (x.split(" ")[0], x))

SparkContext.parallelize([(1,'m'),(1,'f'),(2,'m')]) 

transformation

general

aggregation

turning the level of parallelism

Every RDD has a fixed number of paritions that determine the degree of parallelism to use when executing operations on the RDD. When performing aggregations or grouping operations, we can ask Spark to use a specific number of partitions.

#somes, we want to change the partitioning of an RDD outside the context of grouping and aggregation operations.
rdd.repartition()  #shufle data across the network, which is very expensive.
rdd.coalesce() #avoid data movement, but only if you are decreasing the number of partition.
rdd.partitions.size()  
rdd.getNumPartitions()   #show partition size

grouping

joins

sorting data

action

general

partitioning

cause

类比单节点的程序需要有好的数据结构去存储数据方便查询提高效率。 对于分布式系统,communication非常昂贵,因此放置数据去最小化网络的沟通消耗会极大地提高系统的效率。 这就是partition发挥威力的地方,尤其在datasets is reused multiple times的情况下。

example

partition越大越好

有一些操作,比如sortByKey, groupByKey本身就会产生partition的RDD。

determining partition

operations which benefits from partition

operations which affects partition

pagerank

custom

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Java技术栈

Tomcat集群session复制与Oracle的坑。。

问题描述 公司某个系统使用了tomcat自带的集群session复制功能,然后后报了一个oracle驱动包里面的连接不能被序列化的异常。 01-Nov-2017...

39490
来自专栏祝威廉

StreamingPro

Spark Streaming is an extension of the core Spark API that enables stream proces...

16650
来自专栏北京马哥教育

Spark:一个高效的分布式计算系统

马哥linux运维 | 最专业的linux培训机构 ---- 概述 什么是Spark Spark是UC Berkeley AMP lab所开源的类Hado...

48960
来自专栏大数据学习笔记

Spark2.x学习笔记:11、RDD依赖关系与stage划分

11、 RDD依赖关系与stage划分 Spark中RDD的高效与DAG图有着莫大的关系,在DAG调度中需要对计算过程划分stage,而划分依据就是RDD之间的...

31050
来自专栏CSDN技术头条

Spark之RDD详解

RDD 概念与特性 RDD是Spark最重要的抽象。spark统一建立在抽象的RDD之上。设计一个通用的编程抽象,使得spark可以应对各种场合的大数据情景。R...

32660
来自专栏知识分享

stm32_DMA采集一个AD数据_并通过DMA向串口发送

这是以前学32的时候写的,那时候学了32之后感觉32真是太强大了,比51强的没影。关于dma网上有许多的资料,关于dma采集ad网上也有很多。亲们搜搜,这里只贴...

36380
来自专栏Java成神之路

hadoop_异常_02_ExitCodeException exitCode=1: chmod: changing permissions of `/ray/hadoop/dfs/data': Op

11540
来自专栏Albert陈凯

Spark概要掌握情况自我核查

1、Spark目前只持哪哪种语言的API? Java, Scala, Python, R. Ref: http://spark.apache.org/ 2、R...

27230
来自专栏LhWorld哥陪你聊算法

【Spark篇】--Spark中的宽窄依赖和Stage的划分

父RDD和子RDD partition之间的关系是一对一的。或者父RDD一个partition只对应一个子RDD的partition情况下的父RDD和子RDD ...

25210
来自专栏遊俠扎彪

Windiows桌面文件不能拖动以改变位置

2008-10-01 据说出现这种情况的解决方法是按几下ESC键就行了,我试了一下还挺灵。 百度知道解决方案位置:http://zhidao.baidu.com...

22190

扫码关注云+社区

领取腾讯云代金券