Spark2.x学习笔记:16、Spark Streaming入门实例NetworkWordCount

16、 Spark Streaming入门实例NetworkWordCount

16.1 源码解析

在Spark源码的spark-2.1.0-bin-hadoop2.7\examples\src\main\scala\org\apache\spark\examples\streaming目录下即可找到NetworkWordCount.scala源文件。

// scalastyle:off println
package org.apache.spark.examples.streaming

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}

object NetworkWordCount {
  def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println("Usage: NetworkWordCount <hostname> <port>")
      System.exit(1)
    }

    StreamingExamples.setStreamingLogLevels()

    // Create the context with a 1 second batch size
    //创建SparkConf实例
    val sparkConf = new SparkConf().setAppName("NetworkWordCount")
    //每隔1秒钟处理一批数据
    val ssc = new StreamingContext(sparkConf, Seconds(1))

    // Create a socket stream on target ip:port and count the
    // words in input stream of \n delimited text (eg. generated by 'nc')
    // Note that no duplication in storage level only for running locally.
    // Replication necessary in distributed scenario for fault tolerance.
    //创建一个socket流,监听args(0)的args(1)端口输入的数据
    val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
    //flatMap是把将每一行使用空格做分解,words是个集合,存放单词
    val words = lines.flatMap(_.split(" "))
    //单词个数统计
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    wordCounts.print()
    //启动计算作业
    ssc.start()
    //等待结束,什么时候结束作业,即触发什么条件会让作业执行结束
    ssc.awaitTermination()
  }
}
// scalastyle:on println

16.2 测试运行

(1)修改CPU核数 因为我的Spark程序是在虚拟机中运行,如果虚拟机是单核,会导致NetworkWordCount卡住。所以需要修改虚拟机的核心数,这里修改node1节点为2个核即可。

(2)nc nc是netcat的命令。如果没有安装nc软件,可以通过下面命令安装

[root@node1 ~]# yum install nc
[root@node1 ~]# nc --help
...
  -l, --listen               Bind and listen for incoming connections
  -k, --keep-open            Accept multiple connections in listen mode
...

永久监听TCP端口9999

[root@node1 ~]# nc -lk 9999

(3)启动NetworkWordCount

[root@node1 ~]# run-example org.apache.spark.examples.streaming.NetworkWordCount localhost 9999
17/11/01 09:26:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
-------------------------------------------
Time: 1509542803000 ms
-------------------------------------------

-------------------------------------------
Time: 1509542804000 ms
-------------------------------------------

-------------------------------------------
Time: 1509542805000 ms
-------------------------------------------

-------------------------------------------
Time: 1509542806000 ms
-------------------------------------------

-------------------------------------------
Time: 1509542807000 ms
-------------------------------------------

(4)流处理

在9999端口不停输入数据

[root@node1 ~]# nc -lk 9999
hi
hello
how do you do!
i am hadron
java
hadoop
spark

可以看到NetworkWordCount 实时处理结果

[root@node1 ~]# run-example org.apache.spark.examples.streaming.NetworkWordCount localhost 9999
17/11/01 09:26:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
-------------------------------------------
Time: 1509542803000 ms
-------------------------------------------

-------------------------------------------
Time: 1509542804000 ms
-------------------------------------------

-------------------------------------------
Time: 1509542805000 ms
-------------------------------------------

-------------------------------------------
Time: 1509542806000 ms
-------------------------------------------

-------------------------------------------
Time: 1509542807000 ms
-------------------------------------------

-------------------------------------------
Time: 1509542808000 ms
-------------------------------------------

-------------------------------------------
Time: 1509542809000 ms
-------------------------------------------

-------------------------------------------
Time: 1509542810000 ms
-------------------------------------------

-------------------------------------------
Time: 1509542811000 ms
-------------------------------------------

-------------------------------------------
Time: 1509542812000 ms
-------------------------------------------
(hi,1)

-------------------------------------------
Time: 1509542813000 ms
-------------------------------------------

-------------------------------------------
Time: 1509542814000 ms
-------------------------------------------

-------------------------------------------
Time: 1509542815000 ms
-------------------------------------------
(hello,1)

-------------------------------------------
Time: 1509542816000 ms
-------------------------------------------

-------------------------------------------
Time: 1509542817000 ms
-------------------------------------------

-------------------------------------------
Time: 1509542818000 ms
-------------------------------------------

-------------------------------------------
Time: 1509542819000 ms
-------------------------------------------

-------------------------------------------
Time: 1509542820000 ms
-------------------------------------------

-------------------------------------------
Time: 1509542821000 ms
-------------------------------------------

-------------------------------------------
Time: 1509542822000 ms
-------------------------------------------

-------------------------------------------
Time: 1509542823000 ms
-------------------------------------------
(do!,1)
(how,1)
(you,1)
(do,1)

-------------------------------------------
Time: 1509542824000 ms
-------------------------------------------

-------------------------------------------
Time: 1509542825000 ms
-------------------------------------------

-------------------------------------------
Time: 1509542826000 ms
-------------------------------------------

-------------------------------------------
Time: 1509542827000 ms
-------------------------------------------

-------------------------------------------
Time: 1509542828000 ms
-------------------------------------------

-------------------------------------------
Time: 1509542829000 ms
-------------------------------------------

-------------------------------------------
Time: 1509542830000 ms
-------------------------------------------

-------------------------------------------
Time: 1509542831000 ms
-------------------------------------------

-------------------------------------------
Time: 1509542832000 ms
-------------------------------------------
(hadron,1)
(am,1)
(i,1)

-------------------------------------------
Time: 1509542833000 ms
-------------------------------------------

-------------------------------------------
Time: 1509542834000 ms
-------------------------------------------

-------------------------------------------
Time: 1509542835000 ms
-------------------------------------------

-------------------------------------------
Time: 1509542836000 ms
-------------------------------------------

-------------------------------------------
Time: 1509542837000 ms
-------------------------------------------

-------------------------------------------
Time: 1509542838000 ms
-------------------------------------------

-------------------------------------------
Time: 1509542839000 ms
-------------------------------------------

-------------------------------------------
Time: 1509542840000 ms
-------------------------------------------

-------------------------------------------
Time: 1509542841000 ms
-------------------------------------------

-------------------------------------------
Time: 1509542842000 ms
-------------------------------------------

-------------------------------------------
Time: 1509542843000 ms
-------------------------------------------

-------------------------------------------
Time: 1509542844000 ms
-------------------------------------------

-------------------------------------------
Time: 1509542845000 ms
-------------------------------------------

-------------------------------------------
Time: 1509542846000 ms
-------------------------------------------
(java,1)

-------------------------------------------
Time: 1509542847000 ms
-------------------------------------------

-------------------------------------------
Time: 1509542848000 ms
-------------------------------------------

-------------------------------------------
Time: 1509542849000 ms
-------------------------------------------
(hadoop,1)

-------------------------------------------
Time: 1509542850000 ms
-------------------------------------------

-------------------------------------------
Time: 1509542851000 ms
-------------------------------------------
(spark,1)

-------------------------------------------
Time: 1509542852000 ms

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏安富莱嵌入式技术分享

【RL-TCPnet网络教程】第21章 RL-TCPnet之高效的事件触发框架

本章节为大家讲解高效的事件触发框架实现方法,BSD Socket编程和后面章节要讲解到的FTP、TFTP和HTTP等都非常适合使用这种方式。实际项目中也推荐大家...

714
来自专栏增长技术

Android文件存储使用

一般地,通过 Context 和 Environment 相关的方法获取文件存取的路径。

602
来自专栏Android 研究

Android系统启动——4 zyogte进程 (C篇)

我们大家都是知道"一鼎三足"和"三角形的稳定性",那么支撑Android系统的三个"足"是什么?即init进程、SystemServer进程和Zygote进程。...

971
来自专栏一场梦

ping命令的高级用法

1065
来自专栏python学习之旅

Python笔记(十二):操作数据库

(一)      前言     本文说明如何连接Oracle、MySQL、sqlserver,以及执行sql、获取查询结果等。 (二)      DB-API ...

3489
来自专栏蓝天

Redis源码笔记-初步

Redis代码优美,注释也很到位,阅读起来会赏心悦目,大大降低了理解门槛。由于redis单线程几乎完成所有工作,整体逻辑是相当复杂的,涉及了太多状态,作者的...

642
来自专栏Seebug漏洞平台

Memcached 命令执行漏洞(CVE-2016-8704、CVE-2016-8705、CVE-2016-8706)简析

Author: p0wd3r, dawu (知道创宇404安全实验室) Date: 2016-11-01 0x00 漏洞概述 1.漏洞简介 Memcached...

2886
来自专栏Golang语言社区

[Go 语言社区] Golang架构底层---日志函数

服务器后台架构,日志是必不可少的一个功能模块,日志可以分为很多中:统计日志,访问日志,错误日志等 今天大家发是运行中的日志函数 // 日志函数,传入数据为字...

3126
来自专栏抠抠空间

Flask之flask-session

993
来自专栏技术碎碎念

windows API 开发飞机订票系统 图形化界面 (三)

来吧,接下来是各个功能的函数的实现代码。 首先,程序运行时加载读入账户信息和航班信息。接下来就该读取文件了。 我把账户资料和航班信息储存在了.txt文件里 那么...

37214

扫码关注云+社区