前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark2.x学习笔记:17、Spark Streaming之HdfsWordCount 学习

Spark2.x学习笔记:17、Spark Streaming之HdfsWordCount 学习

作者头像
程裕强
发布2018-01-02 16:44:06
6390
发布2018-01-02 16:44:06
举报

Spark2.x学习笔记:17、Spark Streaming之HdfsWordCount 学习

17.1 HdfsWordCount 源码解析

代码语言:javascript
复制
// scalastyle:off println
package org.apache.spark.examples.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 * Counts words in new text files created in the given directory
 * Usage: HdfsWordCount <directory>
 *   <directory> is the directory that Spark Streaming will use to find and read new text files.
 *
 * To run this on your local machine on directory `localdir`, run this example
 *    $ bin/run-example \
 *       org.apache.spark.examples.streaming.HdfsWordCount localdir
 *
 * Then create a text file in `localdir` and the words in the file will get counted.
 */
object HdfsWordCount {
  def main(args: Array[String]) {
    if (args.length < 1) {
      System.err.println("Usage: HdfsWordCount <directory>")
      System.exit(1)
    }

    StreamingExamples.setStreamingLogLevels()
    val sparkConf = new SparkConf().setAppName("HdfsWordCount")
    // Create the context
    val ssc = new StreamingContext(sparkConf, Seconds(2))

    // Create the FileInputDStream on the directory and use the
    // stream to count words in new files created
    val lines = ssc.textFileStream(args(0))
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()
  }
}
// scalastyle:on println

通过注释可以知道,

  • HdfsWordCount 是统计在给定目录中新文本文件中的单词
  • 运行方式run-example org.apache.spark.examples.streaming.HdfsWordCount localdir,其中localdir是Spark Streaming将用来查找和读取新文本文件的目录

17.2 测试运行

(1)创建目录

代码语言:javascript
复制
[root@node1 ~]# hdfs dfs -mkdir /streaming
[root@node1 ~]# hdfs dfs -ls /streaming
[root@node1 ~]#

(2)先上传一个文件

代码语言:javascript
复制
[root@node1 ~]# hdfs dfs -put data/word1.txt /streaming
[root@node1 ~]# hdfs dfs -ls /streaming
Found 1 items
-rw-r--r--   3 root supergroup         30 2017-11-04 09:21 /streaming/word1.txt
[root@node1 ~]#

这里需要先在Spark Streaming需要读取的目录中上传一个文件,不然HdfsWordCount 运行后再上传会报错

代码语言:javascript
复制
java.io.FileNotFoundException: File does not exist: /streaming/books.txt._COPYING_

(3)开始运行

代码语言:javascript
复制
[root@node1 ~]# run-example org.apache.spark.examples.streaming.HdfsWordCount /streaming
17/11/04 09:22:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
-------------------------------------------
Time: 1509801734000 ms
-------------------------------------------

-------------------------------------------
Time: 1509801736000 ms
-------------------------------------------

-------------------------------------------
Time: 1509801738000 ms
-------------------------------------------

-------------------------------------------
Time: 1509801740000 ms
-------------------------------------------

-------------------------------------------
Time: 1509801742000 ms
-------------------------------------------

-------------------------------------------
Time: 1509801744000 ms
-------------------------------------------

(4)上传需要处理的文件 另外开一个终端,上传文件。

代码语言:javascript
复制
[root@node1 ~]# hdfs dfs -put data/books.txt /streaming

这是可以看到HdfsWordCount 程序的输出

代码语言:javascript
复制
-------------------------------------------
Time: 1509801746000 ms
-------------------------------------------

-------------------------------------------
Time: 1509801748000 ms
-------------------------------------------
(2001   49.0    S2  Java,1)
(3003   49.0    S3  Hive教程,1)
(3002   98.0    S3  Spark基础,1)
(3004   56.0    S3  HBase教程,1)
(3005   49.5    S3  大数据概论,1)
(1002   39.0    S1  C语言,1)
(2071   99.0    S2  Oracle,1)
(1021   45.0    S1  数据结构,1)
(1001   39.0    S1  计算机基础,1)
(2091   69.0    S2  Linux,1)
...

-------------------------------------------
Time: 1509801750000 ms
-------------------------------------------

-------------------------------------------
Time: 1509801752000 ms
-------------------------------------------

-------------------------------------------
Time: 1509801754000 ms
-------------------------------------------

-------------------------------------------
Time: 1509801756000 ms
-------------------------------------------

再上传一个文件

代码语言:javascript
复制
[root@node1 ~]# hdfs dfs -put data/Hamlet.txt /streaming

同样,这时可以可以看到HdfsWordCount 程序的输出

代码语言:javascript
复制
-------------------------------------------
Time: 1509801758000 ms
-------------------------------------------

-------------------------------------------
Time: 1509801760000 ms
-------------------------------------------

-------------------------------------------
Time: 1509801762000 ms
-------------------------------------------
-------------------------------------------
Time: 1509801764000 ms
-------------------------------------------
(weary,,1)
(pate,4)
(whereof,,1)
(joy.,1)
(rises.,1)
(lug,1)
(stuck,,1)
(shot,7)
(line:,1)
(order,2)
...

-------------------------------------------
Time: 1509801766000 ms
-------------------------------------------

-------------------------------------------
Time: 1509801768000 ms
-------------------------------------------
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2017-11-04 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Spark2.x学习笔记:17、Spark Streaming之HdfsWordCount 学习
    • 17.1 HdfsWordCount 源码解析
      • 17.2 测试运行
      相关产品与服务
      大数据
      全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档