Spark2.x学习笔记:17、Spark Streaming之HdfsWordCount 学习

Spark2.x学习笔记:17、Spark Streaming之HdfsWordCount 学习

17.1 HdfsWordCount 源码解析

// scalastyle:off println
package org.apache.spark.examples.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 * Counts words in new text files created in the given directory
 * Usage: HdfsWordCount <directory>
 *   <directory> is the directory that Spark Streaming will use to find and read new text files.
 *
 * To run this on your local machine on directory `localdir`, run this example
 *    $ bin/run-example \
 *       org.apache.spark.examples.streaming.HdfsWordCount localdir
 *
 * Then create a text file in `localdir` and the words in the file will get counted.
 */
object HdfsWordCount {
  def main(args: Array[String]) {
    if (args.length < 1) {
      System.err.println("Usage: HdfsWordCount <directory>")
      System.exit(1)
    }

    StreamingExamples.setStreamingLogLevels()
    val sparkConf = new SparkConf().setAppName("HdfsWordCount")
    // Create the context
    val ssc = new StreamingContext(sparkConf, Seconds(2))

    // Create the FileInputDStream on the directory and use the
    // stream to count words in new files created
    val lines = ssc.textFileStream(args(0))
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()
  }
}
// scalastyle:on println

通过注释可以知道,

  • HdfsWordCount 是统计在给定目录中新文本文件中的单词
  • 运行方式run-example org.apache.spark.examples.streaming.HdfsWordCount localdir,其中localdir是Spark Streaming将用来查找和读取新文本文件的目录

17.2 测试运行

(1)创建目录

[root@node1 ~]# hdfs dfs -mkdir /streaming
[root@node1 ~]# hdfs dfs -ls /streaming
[root@node1 ~]#

(2)先上传一个文件

[root@node1 ~]# hdfs dfs -put data/word1.txt /streaming
[root@node1 ~]# hdfs dfs -ls /streaming
Found 1 items
-rw-r--r--   3 root supergroup         30 2017-11-04 09:21 /streaming/word1.txt
[root@node1 ~]#

这里需要先在Spark Streaming需要读取的目录中上传一个文件,不然HdfsWordCount 运行后再上传会报错

java.io.FileNotFoundException: File does not exist: /streaming/books.txt._COPYING_

(3)开始运行

[root@node1 ~]# run-example org.apache.spark.examples.streaming.HdfsWordCount /streaming
17/11/04 09:22:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
-------------------------------------------
Time: 1509801734000 ms
-------------------------------------------

-------------------------------------------
Time: 1509801736000 ms
-------------------------------------------

-------------------------------------------
Time: 1509801738000 ms
-------------------------------------------

-------------------------------------------
Time: 1509801740000 ms
-------------------------------------------

-------------------------------------------
Time: 1509801742000 ms
-------------------------------------------

-------------------------------------------
Time: 1509801744000 ms
-------------------------------------------

(4)上传需要处理的文件 另外开一个终端,上传文件。

[root@node1 ~]# hdfs dfs -put data/books.txt /streaming

这是可以看到HdfsWordCount 程序的输出

-------------------------------------------
Time: 1509801746000 ms
-------------------------------------------

-------------------------------------------
Time: 1509801748000 ms
-------------------------------------------
(2001   49.0    S2  Java,1)
(3003   49.0    S3  Hive教程,1)
(3002   98.0    S3  Spark基础,1)
(3004   56.0    S3  HBase教程,1)
(3005   49.5    S3  大数据概论,1)
(1002   39.0    S1  C语言,1)
(2071   99.0    S2  Oracle,1)
(1021   45.0    S1  数据结构,1)
(1001   39.0    S1  计算机基础,1)
(2091   69.0    S2  Linux,1)
...

-------------------------------------------
Time: 1509801750000 ms
-------------------------------------------

-------------------------------------------
Time: 1509801752000 ms
-------------------------------------------

-------------------------------------------
Time: 1509801754000 ms
-------------------------------------------

-------------------------------------------
Time: 1509801756000 ms
-------------------------------------------

再上传一个文件

[root@node1 ~]# hdfs dfs -put data/Hamlet.txt /streaming

同样,这时可以可以看到HdfsWordCount 程序的输出

-------------------------------------------
Time: 1509801758000 ms
-------------------------------------------

-------------------------------------------
Time: 1509801760000 ms
-------------------------------------------

-------------------------------------------
Time: 1509801762000 ms
-------------------------------------------
-------------------------------------------
Time: 1509801764000 ms
-------------------------------------------
(weary,,1)
(pate,4)
(whereof,,1)
(joy.,1)
(rises.,1)
(lug,1)
(stuck,,1)
(shot,7)
(line:,1)
(order,2)
...

-------------------------------------------
Time: 1509801766000 ms
-------------------------------------------

-------------------------------------------
Time: 1509801768000 ms
-------------------------------------------

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏码匠的流水账

聊聊jdbc的batch操作

statement的batch操作,可以批量进行insert或update操作,提升操作性能,特别是在大数据量的insert或update的时候。

14320
来自专栏智能计算时代

「大数据系列」:Apache Hive 分布式数据仓库项目介绍

Apache Hive™数据仓库软件有助于读取,编写和管理驻留在分布式存储中的大型数据集并使用SQL语法进行查询

60820
来自专栏码匠的流水账

kafka0.8生产者实例

19410
来自专栏斑斓

大数据 | 理解Spark的核心RDD

与许多专有的大数据处理平台不同,Spark建立在统一抽象的RDD之上,使得它可以以基本一致的方式应对不同的大数据处理场景,包括MapReduce,Streami...

41090
来自专栏Hadoop实操

Spark2Streaming读非Kerberos环境的Kafka并写数据到Kudu

在前面的文章Fayson介绍了在Kerberos环境下《Spark2Streaming读Kerberos环境的Kafka并写数据到Kudu》,本篇文章Fayso...

41510
来自专栏Hadoop实操

如何使用Hue创建Spark1和Spark2的Oozie工作流

使用Hue可以方便的通过界面制定Oozie的工作流,支持Hive、Pig、Spark、Java、Sqoop、MapReduce、Shell等等。Spark?那能...

2.9K70
来自专栏Hadoop实操

Spark2Streaming读Kerberos环境的Kafka并写数据到Hive

在前面的文章Fayson介绍了一些关于Spark2Streaming的示例如《Spark2Streaming读Kerberos环境的Kafka并写数据到HBas...

1.3K30
来自专栏蓝天

启动Hadoop HDFS时的“Incompatible clusterIDs”错误原因分析

Hadoop HDFS时的“Incompatible clusterIDs”错误原因分析.pdf

7310
来自专栏Java 源码分析

SparkStreaming 入门

25480
来自专栏Java技术栈

Tomcat集群session复制与Oracle的坑。。

问题描述 公司某个系统使用了tomcat自带的集群session复制功能,然后后报了一个oracle驱动包里面的连接不能被序列化的异常。 01-Nov-2017...

39490

扫码关注云+社区

领取腾讯云代金券