前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >spark杂记:Word Count

spark杂记:Word Count

作者头像
MachineLP
发布2019-05-26 16:52:15
6860
发布2019-05-26 16:52:15
举报
文章被收录于专栏:小鹏的专栏

版权声明:本文为博主原创文章,未经博主允许不得转载。有问题可以加微信:lp9628(注明CSDN)。 https://cloud.tencent.com/developer/article/1435822

Spark 学习笔记可以follow这里:https://github.com/MachineLP/Spark-

Word Count

Counting the number of occurances of words in a text is a popular first exercise using map-reduce.

The Task

Input: A text file consisisting of words separated by spaces.

Output: A list of words and their counts, sorted from the most to the least common.

We will use the book "Moby Dick" as our input.

代码语言:javascript
复制
#start the SparkContext
from pyspark import SparkContext
sc=SparkContext(master="local[4]")

# set import path
import sys
sys.path.append('../../Utils/')
#from plan2table import plan2table
代码语言:javascript
复制
def pretty_print_plan(rdd):
    for x in rdd.toDebugString().decode().split('\n'):
        print(x)

Download data file from S3

代码语言:javascript
复制
##If this cell fails, download the file from https://mas-dse-open.s3.amazonaws.com/Moby-Dick.txt on your browser
# and put it in the '../../Data/ directory
import requests
data_dir='../../Data'
filename='Moby-Dick.txt'
url = "https://mas-dse-open.s3.amazonaws.com/"+filename
local_path = data_dir+'/'+filename
!mkdir -p {data_dir}
# Copy URL content to local_path
r = requests.get(url, allow_redirects=True)
open(local_path, 'wb').write(r.content)

# check that the text file is where we expect it to be
!ls -l $local_path

Define an RDD that will read the file

  • Execution of read is lazy
  • File has been opened.
  • Reading starts when stage is executed.
代码语言:javascript
复制
text_file = sc.textFile(data_dir+'/'+filename)
type(text_file) 

CPU times: user 0 ns, sys: 0 ns, total: 0 ns

Wall time: 1.6 s

Steps for counting the words

  • split line by spaces.
  • map word to (word,1)
  • count the number of occurances of each word.
代码语言:javascript
复制
words =     text_file.flatMap(lambda line: line.split(" "))
not_empty = words.filter(lambda x: x!='') 
key_values= not_empty.map(lambda word: (word, 1)) 
counts=     key_values.reduceByKey(lambda a, b: a + b)

CPU times: user 20 ms, sys: 10 ms, total: 30 ms

Wall time: 318 ms

flatMap()

Note the line:

代码语言:javascript
复制
words =     text_file.flatMap(lambda line: line.split(" "))

Why are we using flatMap, rather than map?

The reason is that the operation line.split(" ") generates a list of strings, so had we used map the result would be an RDD of lists of words. Not an RDD of words.

The difference between map and flatMap is that the second expects to get a list as the result from the map and it concatenates the lists to form the RDD.

The execution plan

In the last cell we defined the execution plan, but we have not started to execute it.

  • Preparing the plan took ~100ms, which is a non-trivial amount of time,
  • But much less than the time it will take to execute it.
  • Lets have a look a the execution plan.

Understanding the details

To see which step in the plan corresponds to which RDD we print out the execution plan for each of the RDDs.

Note that the execution plan for words, not_empty and key_values are all the same.

Execution

Finally we count the number of times each word has occured. Now, finally, the Lazy execution model finally performs some actual work, which takes a significant amount of time.

代码语言:javascript
复制
## Run #1
Count=counts.count()  # Count = the number of different words
Sum=counts.map(lambda x:x[1]).reduce(lambda x,y:x+y) # 
print('Different words=%5.0f, total words=%6.0f, mean no. occurances per word=%4.2f'%(Count,Sum,float(Sum)/Count))

Different words=33781, total words=215133, mean no. occurances per word=6.37

CPU times: user 10.5 ms, sys: 7.39 ms, total: 17.9 ms

Wall time: 1.04 s

Amortization

When the same commands are performed repeatedly on the same data, the execution time tends to decrease in later executions.

The cells below are identical to the one above, with one exception at Run #3

Observe that Run #2 take much less time that Run #1. Even though no cache() was explicitly requested. The reason is that Spark caches (or materializes) key_values, before executing reduceByKey() because performng reduceByKey requires a shuffle, and a shuffle requires that the input RDD is materialized. In other words, sometime caching happens even if the programmer did not ask for it.

代码语言:javascript
复制
## Run #2
Count=counts.count()
Sum=counts.map(lambda x:x[1]).reduce(lambda x,y:x+y)
print('Different words=%5.0f, total words=%6.0f, mean no. occurances per word=%4.2f'%(Count,Sum,float(Sum)/Count))

Different words=33781, total words=215133, mean no. occurances per word=6.37

CPU times: user 20 ms, sys: 10 ms, total: 30 ms

Wall time: 3.9 s

Explicit Caching

In Run #3 we explicitly ask for counts to be cached. This will reduce the execution time in the following run by a little bit, but not by much.

代码语言:javascript
复制
## Run #3, cache
Count=counts.cache().count()
Sum=counts.map(lambda x:x[1]).reduce(lambda x,y:x+y)
print('Different words=%5.0f, total words=%6.0f, mean no. occurances per word=%4.2f'%(Count,Sum,float(Sum)/Count))
代码语言:javascript
复制
Different words=33781, total words=215133, mean no. occurances per word=6.37
CPU times: user 20 ms, sys: 0 ns, total: 20 ms
Wall time: 597 ms
代码语言:javascript
复制
#Run #4
Count=counts.count()
Sum=counts.map(lambda x:x[1]).reduce(lambda x,y:x+y)
print('Different words=%5.0f, total words=%6.0f, mean no. occurances per word=%4.2f'%(Count,Sum,float(Sum)/Count))

Different words=33781, total words=215133, mean no. occurances per word=6.37

CPU times: user 20 ms, sys: 10 ms, total: 30 ms

Wall time: 432 ms

代码语言:javascript
复制
#Run #5
Count=counts.count()
Sum=counts.map(lambda x:x[1]).reduce(lambda x,y:x+y)
print('Different words=%5.0f, total words=%6.0f, mean no. occurances per word=%4.2f'%(Count,Sum,float(Sum)/Count))

Different words=33781, total words=215133, mean no. occurances per word=6.37

CPU times: user 20 ms, sys: 0 ns, total: 20 ms

Wall time: 307 ms

Summary

This was our first real pyspark program, hurray!

Some things you learned:

1) An RDD is a distributed immutable array.

It is the core data structure of Spark is an RDD.

2) You cannot operate on an RDD directly. Only through Transformations and Actions.

3) Transformations transform an RDD into another RDD.

4) Actions output their results on the head node.

Slide Type

5) After the action is done, you are using just the head node, not the workers.

Lazy Execution:

1) RDD operations are added to an Execution Plan.

2) The plan is executed when a result is needed.

3) Explicit and implicit caching cause internediate results to be saved.

Finding the most common words

  • counts: RDD with 33301 pairs of the form (word,count).
  • Find the 5 most frequent words.
  • Method1: collect and sort on head node.
  • Method2: Pure Spark, collect only at the end.

Method1: collect and sort on head node

Collect the RDD into the driver node

  • Collect can take significant time.
代码语言:javascript
复制
C=counts.collect()

CPU times: user 50 ms, sys: 0 ns, total: 50 ms

Wall time: 200 ms

Sort

  • RDD collected into list in driver node.
  • No longer using spark parallelism.
  • Sort in python
  • will not scale to very large documents.
代码语言:javascript
复制
C.sort(key=lambda x:x[1])
print('most common words\n'+'\n'.join(['%s:\t%d'%c for c in reversed(C[-5:])]))

most common words

the: 13766

of: 6587

and: 5951

a: 4533

to: 4510

CPU times: user 10 ms, sys: 0 ns, total: 10 ms

Wall time: 18.1 ms

Compute the mean number of occurances per word.

代码语言:javascript
复制
Count2=len(C)
Sum2=sum([i for w,i in C])
print('count2=%f, sum2=%f, mean2=%f'%(Count2,Sum2,float(Sum2)/Count2))

count2=33781.000000, sum2=215133.000000, mean2=6.368462

Method2: Pure Spark, collect only at the end.

  • Collect into the head node only the more frquent words.
  • Requires multiple stages

Step 1 split, clean and map to (word,1)

代码语言:javascript
复制
word_pairs=text_file.flatMap(lambda x: x.split(' '))\
    .filter(lambda x: x!='')\
    .map(lambda word: (word,1))

CPU times: user 0 ns, sys: 0 ns, total: 0 ns

Wall time: 66.3 µs

Step 2 Count occurances of each word.

代码语言:javascript
复制
counts=word_pairs.reduceByKey(lambda x,y:x+y)

CPU times: user 10 ms, sys: 0 ns, total: 10 ms

Wall time: 37.5 ms

Step 3 Reverse (word,count) to (count,word) and sort by key

代码语言:javascript
复制
reverse_counts=counts.map(lambda x:(x[1],x[0]))   # reverse order of word and count
sorted_counts=reverse_counts.sortByKey(ascending=False)

CPU times: user 30 ms, sys: 10 ms, total: 40 ms

Wall time: 1.49 s

Full execution plan

We now have a complete plan to compute the most common words in the text. Nothing has been executed yet! Not even a single byte has been read from the file Moby-Dick.txt !

For more on execution plans and lineage see jace Klaskowski's blog

代码语言:javascript
复制
print('word_pairs:')
pretty_print_plan(word_pairs)
print('\ncounts:')
pretty_print_plan(counts)
print('\nreverse_counts:')
pretty_print_plan(reverse_counts)
print('\nsorted_counts:')
pretty_print_plan(sorted_counts)

Step 4 Take the top 5 words:

代码语言:javascript
复制
D=sorted_counts.take(5)
print('most common words\n'+'\n'.join(['%d:\t%s'%c for c in D]))

most common words

13766: the

6587: of

5951: and

4533: a

4510: to

CPU times: user 10 ms, sys: 0 ns, total: 10 ms

Wall time: 398 ms

Summary

We showed two ways for finding the most common words:

  1. Collecting and sorting at the head node. -- Does not scale.
  2. Using RDDs to the end.
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2019年03月21日,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Word Count
    • The Task
      • Download data file from S3
        • Define an RDD that will read the file
          • Steps for counting the words
            • flatMap()
          • The execution plan
            • Understanding the details
          • Execution
            • Amortization
          • Summary
          • Finding the most common words
            • Method1: collect and sort on head node
              • Collect the RDD into the driver node
              • Sort
              • Compute the mean number of occurances per word.
            • Method2: Pure Spark, collect only at the end.
              • Step 1 split, clean and map to (word,1)
              • Step 2 Count occurances of each word.
              • Step 3 Reverse (word,count) to (count,word) and sort by key
              • Full execution plan
              • Step 4 Take the top 5 words:
            • Summary
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档