Spark2.x学习笔记:17、Spark Streaming之HdfsWordCount 学习

Spark2.x学习笔记:17、Spark Streaming之HdfsWordCount 学习

17.1 HdfsWordCount 源码解析

// scalastyle:off println
package org.apache.spark.examples.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 * Counts words in new text files created in the given directory
 * Usage: HdfsWordCount <directory>
 *   <directory> is the directory that Spark Streaming will use to find and read new text files.
 *
 * To run this on your local machine on directory `localdir`, run this example
 *    $ bin/run-example \
 *       org.apache.spark.examples.streaming.HdfsWordCount localdir
 *
 * Then create a text file in `localdir` and the words in the file will get counted.
 */
object HdfsWordCount {
  def main(args: Array[String]) {
    if (args.length < 1) {
      System.err.println("Usage: HdfsWordCount <directory>")
      System.exit(1)
    }

    StreamingExamples.setStreamingLogLevels()
    val sparkConf = new SparkConf().setAppName("HdfsWordCount")
    // Create the context
    val ssc = new StreamingContext(sparkConf, Seconds(2))

    // Create the FileInputDStream on the directory and use the
    // stream to count words in new files created
    val lines = ssc.textFileStream(args(0))
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()
  }
}
// scalastyle:on println

通过注释可以知道,

  • HdfsWordCount 是统计在给定目录中新文本文件中的单词
  • 运行方式run-example org.apache.spark.examples.streaming.HdfsWordCount localdir,其中localdir是Spark Streaming将用来查找和读取新文本文件的目录

17.2 测试运行

(1)创建目录

[root@node1 ~]# hdfs dfs -mkdir /streaming
[root@node1 ~]# hdfs dfs -ls /streaming
[root@node1 ~]#

(2)先上传一个文件

[root@node1 ~]# hdfs dfs -put data/word1.txt /streaming
[root@node1 ~]# hdfs dfs -ls /streaming
Found 1 items
-rw-r--r--   3 root supergroup         30 2017-11-04 09:21 /streaming/word1.txt
[root@node1 ~]#

这里需要先在Spark Streaming需要读取的目录中上传一个文件,不然HdfsWordCount 运行后再上传会报错

java.io.FileNotFoundException: File does not exist: /streaming/books.txt._COPYING_

(3)开始运行

[root@node1 ~]# run-example org.apache.spark.examples.streaming.HdfsWordCount /streaming
17/11/04 09:22:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
-------------------------------------------
Time: 1509801734000 ms
-------------------------------------------

-------------------------------------------
Time: 1509801736000 ms
-------------------------------------------

-------------------------------------------
Time: 1509801738000 ms
-------------------------------------------

-------------------------------------------
Time: 1509801740000 ms
-------------------------------------------

-------------------------------------------
Time: 1509801742000 ms
-------------------------------------------

-------------------------------------------
Time: 1509801744000 ms
-------------------------------------------

(4)上传需要处理的文件 另外开一个终端,上传文件。

[root@node1 ~]# hdfs dfs -put data/books.txt /streaming

这是可以看到HdfsWordCount 程序的输出

-------------------------------------------
Time: 1509801746000 ms
-------------------------------------------

-------------------------------------------
Time: 1509801748000 ms
-------------------------------------------
(2001   49.0    S2  Java,1)
(3003   49.0    S3  Hive教程,1)
(3002   98.0    S3  Spark基础,1)
(3004   56.0    S3  HBase教程,1)
(3005   49.5    S3  大数据概论,1)
(1002   39.0    S1  C语言,1)
(2071   99.0    S2  Oracle,1)
(1021   45.0    S1  数据结构,1)
(1001   39.0    S1  计算机基础,1)
(2091   69.0    S2  Linux,1)
...

-------------------------------------------
Time: 1509801750000 ms
-------------------------------------------

-------------------------------------------
Time: 1509801752000 ms
-------------------------------------------

-------------------------------------------
Time: 1509801754000 ms
-------------------------------------------

-------------------------------------------
Time: 1509801756000 ms
-------------------------------------------

再上传一个文件

[root@node1 ~]# hdfs dfs -put data/Hamlet.txt /streaming

同样,这时可以可以看到HdfsWordCount 程序的输出

-------------------------------------------
Time: 1509801758000 ms
-------------------------------------------

-------------------------------------------
Time: 1509801760000 ms
-------------------------------------------

-------------------------------------------
Time: 1509801762000 ms
-------------------------------------------
-------------------------------------------
Time: 1509801764000 ms
-------------------------------------------
(weary,,1)
(pate,4)
(whereof,,1)
(joy.,1)
(rises.,1)
(lug,1)
(stuck,,1)
(shot,7)
(line:,1)
(order,2)
...

-------------------------------------------
Time: 1509801766000 ms
-------------------------------------------

-------------------------------------------
Time: 1509801768000 ms
-------------------------------------------

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏草根专栏

使用angular4和asp.net core 2 web api做个练习项目(一)

这是一篇学习笔记. angular 5 正式版都快出了, 不过主要是性能升级. 我认为angular 4还是很适合企业的, 就像.net一样. 我用的是wind...

6015
来自专栏Kubernetes

NVIDIA/k8s-device-plugin源码分析

Author: xidianwangtao@gmail.com k8s-device-plugin内部实现原理图 在Kubernetes如何通过Devic...

4878
来自专栏everhad

Android AppBar

AppBar官方文档摘记 2016-6-12 本文摘自Android官方文档,为方便自己及其他开发者朋友阅读。 章节目录为“Develop > Trainin...

2096
来自专栏绿巨人专栏

Spark集群 + Akka + Kafka + Scala 开发(4) : 开发一个Kafka + Spark的应用

3387
来自专栏Android干货

项目实战工具类(一):PhoneUtil(手机信息相关)

1595
来自专栏乐沙弥的世界

使用 ASMCMD 工具管理ASM目录及文件

在ASM实例中,所有的存储于ASM磁盘组中的文件对于操作系统命令而言是不可访问的,因此也无法使用常规的命令来操纵ASM磁盘中的文

692
来自专栏Java呓语

Tips·检测应用程序被卸载

我们知道广播ACTION_PACKAGE_REMOVED可以监听应用程序卸载,但不幸的是这个意图被卸载的程序是不可知的,所以无法监听到自己的程序被卸载。

833
来自专栏小樱的经验随笔

Codeforces Round #415 (Div. 2)(A,暴力,B,贪心,排序)

A. Straight «A» time limit per test:1 second memory limit per test:256 megabytes...

3259
来自专栏ACM小冰成长之路

HDU-5559-Frog and String

ACM模版 描述 ? 题解 丧心病狂的构造题!!! ? Ps. 截图来自 JeraKrs’s blog。 代码 #include <cstdio> #inclu...

1715
来自专栏蛋未明的专栏

Build a JavaScript Compressor tool using NodeJS, ExpressJS, Jade, UglifyJS tutorial Read more: http

1012

扫码关注云+社区