版权声明:本文为博主原创文章,未经博主允许不得转载。有问题可以加微信:lp9628(注明CSDN)。 https://cloud.tencent.com/developer/article/1435822
Spark 学习笔记可以follow这里:https://github.com/MachineLP/Spark-
Counting the number of occurances of words in a text is a popular first exercise using map-reduce.
Input: A text file consisisting of words separated by spaces.
Output: A list of words and their counts, sorted from the most to the least common.
We will use the book "Moby Dick" as our input.
#start the SparkContext
from pyspark import SparkContext
sc=SparkContext(master="local[4]")
# set import path
import sys
sys.path.append('../../Utils/')
#from plan2table import plan2table
def pretty_print_plan(rdd):
for x in rdd.toDebugString().decode().split('\n'):
print(x)
##If this cell fails, download the file from https://mas-dse-open.s3.amazonaws.com/Moby-Dick.txt on your browser
# and put it in the '../../Data/ directory
import requests
data_dir='../../Data'
filename='Moby-Dick.txt'
url = "https://mas-dse-open.s3.amazonaws.com/"+filename
local_path = data_dir+'/'+filename
!mkdir -p {data_dir}
# Copy URL content to local_path
r = requests.get(url, allow_redirects=True)
open(local_path, 'wb').write(r.content)
# check that the text file is where we expect it to be
!ls -l $local_path
text_file = sc.textFile(data_dir+'/'+filename)
type(text_file)
CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 1.6 s
word
to (word,1)
words = text_file.flatMap(lambda line: line.split(" "))
not_empty = words.filter(lambda x: x!='')
key_values= not_empty.map(lambda word: (word, 1))
counts= key_values.reduceByKey(lambda a, b: a + b)
CPU times: user 20 ms, sys: 10 ms, total: 30 ms
Wall time: 318 ms
Note the line:
words = text_file.flatMap(lambda line: line.split(" "))
Why are we using flatMap
, rather than map
?
The reason is that the operation line.split(" ")
generates a list of strings, so had we used map
the result would be an RDD of lists of words. Not an RDD of words.
The difference between map
and flatMap
is that the second expects to get a list as the result from the map and it concatenates the lists to form the RDD.
In the last cell we defined the execution plan, but we have not started to execute it.
To see which step in the plan corresponds to which RDD we print out the execution plan for each of the RDDs.
Note that the execution plan for words
, not_empty
and key_values
are all the same.
Finally we count the number of times each word has occured. Now, finally, the Lazy execution model finally performs some actual work, which takes a significant amount of time.
## Run #1
Count=counts.count() # Count = the number of different words
Sum=counts.map(lambda x:x[1]).reduce(lambda x,y:x+y) #
print('Different words=%5.0f, total words=%6.0f, mean no. occurances per word=%4.2f'%(Count,Sum,float(Sum)/Count))
Different words=33781, total words=215133, mean no. occurances per word=6.37
CPU times: user 10.5 ms, sys: 7.39 ms, total: 17.9 ms
Wall time: 1.04 s
When the same commands are performed repeatedly on the same data, the execution time tends to decrease in later executions.
The cells below are identical to the one above, with one exception at Run #3
Observe that Run #2
take much less time that Run #1
. Even though no cache()
was explicitly requested. The reason is that Spark caches (or materializes) key_values
, before executing reduceByKey()
because performng reduceByKey requires a shuffle, and a shuffle requires that the input RDD is materialized. In other words, sometime caching happens even if the programmer did not ask for it.
## Run #2
Count=counts.count()
Sum=counts.map(lambda x:x[1]).reduce(lambda x,y:x+y)
print('Different words=%5.0f, total words=%6.0f, mean no. occurances per word=%4.2f'%(Count,Sum,float(Sum)/Count))
Different words=33781, total words=215133, mean no. occurances per word=6.37
CPU times: user 20 ms, sys: 10 ms, total: 30 ms
Wall time: 3.9 s
Explicit Caching
In Run #3
we explicitly ask for counts
to be cached. This will reduce the execution time in the following run by a little bit, but not by much.
## Run #3, cache
Count=counts.cache().count()
Sum=counts.map(lambda x:x[1]).reduce(lambda x,y:x+y)
print('Different words=%5.0f, total words=%6.0f, mean no. occurances per word=%4.2f'%(Count,Sum,float(Sum)/Count))
Different words=33781, total words=215133, mean no. occurances per word=6.37
CPU times: user 20 ms, sys: 0 ns, total: 20 ms
Wall time: 597 ms
#Run #4
Count=counts.count()
Sum=counts.map(lambda x:x[1]).reduce(lambda x,y:x+y)
print('Different words=%5.0f, total words=%6.0f, mean no. occurances per word=%4.2f'%(Count,Sum,float(Sum)/Count))
Different words=33781, total words=215133, mean no. occurances per word=6.37
CPU times: user 20 ms, sys: 10 ms, total: 30 ms
Wall time: 432 ms
#Run #5
Count=counts.count()
Sum=counts.map(lambda x:x[1]).reduce(lambda x,y:x+y)
print('Different words=%5.0f, total words=%6.0f, mean no. occurances per word=%4.2f'%(Count,Sum,float(Sum)/Count))
Different words=33781, total words=215133, mean no. occurances per word=6.37
CPU times: user 20 ms, sys: 0 ns, total: 20 ms
Wall time: 307 ms
This was our first real pyspark program, hurray!
Some things you learned:
1) An RDD is a distributed immutable array.
It is the core data structure of Spark is an RDD.
2) You cannot operate on an RDD directly. Only through Transformations and Actions.
3) Transformations transform an RDD into another RDD.
4) Actions output their results on the head node.
Slide Type
5) After the action is done, you are using just the head node, not the workers.
Lazy Execution:
1) RDD operations are added to an Execution Plan.
2) The plan is executed when a result is needed.
3) Explicit and implicit caching cause internediate results to be saved.
counts
: RDD with 33301 pairs of the form (word,count)
. collect
and sort on head node.collect
only at the end.collect
and sort on head nodeC=counts.collect()
CPU times: user 50 ms, sys: 0 ns, total: 50 ms
Wall time: 200 ms
C.sort(key=lambda x:x[1])
print('most common words\n'+'\n'.join(['%s:\t%d'%c for c in reversed(C[-5:])]))
most common words
the: 13766
of: 6587
and: 5951
a: 4533
to: 4510
CPU times: user 10 ms, sys: 0 ns, total: 10 ms
Wall time: 18.1 ms
Count2=len(C)
Sum2=sum([i for w,i in C])
print('count2=%f, sum2=%f, mean2=%f'%(Count2,Sum2,float(Sum2)/Count2))
count2=33781.000000, sum2=215133.000000, mean2=6.368462
collect
only at the end.(word,1)
word_pairs=text_file.flatMap(lambda x: x.split(' '))\
.filter(lambda x: x!='')\
.map(lambda word: (word,1))
CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 66.3 µs
counts=word_pairs.reduceByKey(lambda x,y:x+y)
CPU times: user 10 ms, sys: 0 ns, total: 10 ms
Wall time: 37.5 ms
(word,count)
to (count,word)
and sort by keyreverse_counts=counts.map(lambda x:(x[1],x[0])) # reverse order of word and count
sorted_counts=reverse_counts.sortByKey(ascending=False)
CPU times: user 30 ms, sys: 10 ms, total: 40 ms
Wall time: 1.49 s
We now have a complete plan to compute the most common words in the text. Nothing has been executed yet! Not even a single byte has been read from the file Moby-Dick.txt
!
For more on execution plans and lineage see jace Klaskowski's blog
print('word_pairs:')
pretty_print_plan(word_pairs)
print('\ncounts:')
pretty_print_plan(counts)
print('\nreverse_counts:')
pretty_print_plan(reverse_counts)
print('\nsorted_counts:')
pretty_print_plan(sorted_counts)
D=sorted_counts.take(5)
print('most common words\n'+'\n'.join(['%d:\t%s'%c for c in D]))
most common words
13766: the
6587: of
5951: and
4533: a
4510: to
CPU times: user 10 ms, sys: 0 ns, total: 10 ms
Wall time: 398 ms
We showed two ways for finding the most common words: