Spark2.x学习笔记:17、Spark Streaming之HdfsWordCount 学习

Spark2.x学习笔记:17、Spark Streaming之HdfsWordCount 学习

17.1 HdfsWordCount 源码解析

// scalastyle:off println
package org.apache.spark.examples.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 * Counts words in new text files created in the given directory
 * Usage: HdfsWordCount <directory>
 *   <directory> is the directory that Spark Streaming will use to find and read new text files.
 *
 * To run this on your local machine on directory `localdir`, run this example
 *    $ bin/run-example \
 *       org.apache.spark.examples.streaming.HdfsWordCount localdir
 *
 * Then create a text file in `localdir` and the words in the file will get counted.
 */
object HdfsWordCount {
  def main(args: Array[String]) {
    if (args.length < 1) {
      System.err.println("Usage: HdfsWordCount <directory>")
      System.exit(1)
    }

    StreamingExamples.setStreamingLogLevels()
    val sparkConf = new SparkConf().setAppName("HdfsWordCount")
    // Create the context
    val ssc = new StreamingContext(sparkConf, Seconds(2))

    // Create the FileInputDStream on the directory and use the
    // stream to count words in new files created
    val lines = ssc.textFileStream(args(0))
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()
  }
}
// scalastyle:on println

通过注释可以知道,

  • HdfsWordCount 是统计在给定目录中新文本文件中的单词
  • 运行方式run-example org.apache.spark.examples.streaming.HdfsWordCount localdir,其中localdir是Spark Streaming将用来查找和读取新文本文件的目录

17.2 测试运行

(1)创建目录

[root@node1 ~]# hdfs dfs -mkdir /streaming
[root@node1 ~]# hdfs dfs -ls /streaming
[root@node1 ~]#

(2)先上传一个文件

[root@node1 ~]# hdfs dfs -put data/word1.txt /streaming
[root@node1 ~]# hdfs dfs -ls /streaming
Found 1 items
-rw-r--r--   3 root supergroup         30 2017-11-04 09:21 /streaming/word1.txt
[root@node1 ~]#

这里需要先在Spark Streaming需要读取的目录中上传一个文件,不然HdfsWordCount 运行后再上传会报错

java.io.FileNotFoundException: File does not exist: /streaming/books.txt._COPYING_

(3)开始运行

[root@node1 ~]# run-example org.apache.spark.examples.streaming.HdfsWordCount /streaming
17/11/04 09:22:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
-------------------------------------------
Time: 1509801734000 ms
-------------------------------------------

-------------------------------------------
Time: 1509801736000 ms
-------------------------------------------

-------------------------------------------
Time: 1509801738000 ms
-------------------------------------------

-------------------------------------------
Time: 1509801740000 ms
-------------------------------------------

-------------------------------------------
Time: 1509801742000 ms
-------------------------------------------

-------------------------------------------
Time: 1509801744000 ms
-------------------------------------------

(4)上传需要处理的文件 另外开一个终端,上传文件。

[root@node1 ~]# hdfs dfs -put data/books.txt /streaming

这是可以看到HdfsWordCount 程序的输出

-------------------------------------------
Time: 1509801746000 ms
-------------------------------------------

-------------------------------------------
Time: 1509801748000 ms
-------------------------------------------
(2001   49.0    S2  Java,1)
(3003   49.0    S3  Hive教程,1)
(3002   98.0    S3  Spark基础,1)
(3004   56.0    S3  HBase教程,1)
(3005   49.5    S3  大数据概论,1)
(1002   39.0    S1  C语言,1)
(2071   99.0    S2  Oracle,1)
(1021   45.0    S1  数据结构,1)
(1001   39.0    S1  计算机基础,1)
(2091   69.0    S2  Linux,1)
...

-------------------------------------------
Time: 1509801750000 ms
-------------------------------------------

-------------------------------------------
Time: 1509801752000 ms
-------------------------------------------

-------------------------------------------
Time: 1509801754000 ms
-------------------------------------------

-------------------------------------------
Time: 1509801756000 ms
-------------------------------------------

再上传一个文件

[root@node1 ~]# hdfs dfs -put data/Hamlet.txt /streaming

同样,这时可以可以看到HdfsWordCount 程序的输出

-------------------------------------------
Time: 1509801758000 ms
-------------------------------------------

-------------------------------------------
Time: 1509801760000 ms
-------------------------------------------

-------------------------------------------
Time: 1509801762000 ms
-------------------------------------------
-------------------------------------------
Time: 1509801764000 ms
-------------------------------------------
(weary,,1)
(pate,4)
(whereof,,1)
(joy.,1)
(rises.,1)
(lug,1)
(stuck,,1)
(shot,7)
(line:,1)
(order,2)
...

-------------------------------------------
Time: 1509801766000 ms
-------------------------------------------

-------------------------------------------
Time: 1509801768000 ms
-------------------------------------------

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏算法修养

PAT 1009 Product of Polynomials

1009. Product of Polynomials (25) 时间限制 400 ms 内存限制 65536 kB 代码长度限制 16...

36811
来自专栏ml

HDUOJ----2487Ugly Windows

Ugly Windows Time Limit: 2000/1000 MS (Java/Others)    Memory Limit: 32768/32768...

2799
来自专栏杨建荣的学习笔记

数据库静默安装总结(r3笔记第58天)

在学习数据库的时候,不知道dbca的命令用了多少遍,但是越是安装也是觉得自己会的越少,因为图形界面的清晰直白反而不知道哪些准备工作是需要特别准备的,如果在远程支...

2818
来自专栏bboysoul

linux下的彩蛋和各种有趣的命令

循环输出 for ((i=1;i<=30;i++));do linux_logo -f -L $i;sleep 0.1;done

954
来自专栏bboysoul

linux服务器性能测试脚本serverreview-benchmark

欢迎关注Bboysoul的博客www.bboysoul.com Have Fun

763
来自专栏张善友的专栏

Setting Up KeePass For Centos 6

This mini-howto describes how to set up KeePass on Centos 6. It requires buildin...

1808
来自专栏iOSDevLog

android 设置状态栏颜色 透明

https://stackoverflow.com/questions/22192291/how-to-change-the-status-bar-color-...

431
来自专栏生信技能树

下载TCGA所有癌症的maf文件做signature分析

才sanger研究所已经做好了这个分析,但是值得我们重复一下,效果如下: ? TCGA所有癌症的mutation signature 首先TCGA所有癌症的ma...

41712
来自专栏数据处理

提交任务到集群

1342
来自专栏about云

spark1.x升级spark2如何升级及需要考虑的问题

问题导读 1.spark2升级哪些内容变化? 2.升级中spark哪些没有发生变化? 3.cloudera中,spark1和spark2能否并存? 4.升级后...

7604

扫码关注云+社区