Spark Streaming流式计算的WordCount入门

Spark Streaming是一种近实时的流式计算模型,它将作业分解成一批一批的短小的批处理任务,然后并行计算,具有可扩展,高容错,高吞吐,实时性高等一系列优点,在某些场景可达到与Storm一样的处理程度或优于storm,也可以无缝集成多重日志收集工具或队列中转器,比如常见的 kakfa,flume,redis,logstash等,计算完后的数据结果,也可以 存储到各种存储系统中,如HDFS,数据库等,一张简单的数据流图如下:

内部处理流程:

下面来看一个wordcount级别的入门例子,注意需要导入相关的包:

Java代码

  1. //下面不需要使用的依赖,大家可根据情况去舍
  2. name := "scala-spark"
  3. version := "1.0"
  4. scalaVersion := "2.11.7"
  5. //使用公司的私服
  6. resolvers += "Local Maven Repository" at "http://dev.bizbook-inc.com:8083/nexus/content/groups/public/"
  7. //使用内部仓储
  8. externalResolvers := Resolver.withDefaultResolvers(resolvers.value, mavenCentral = false)
  9. //Hadoop的依赖
  10. libraryDependencies += "org.apache.hadoop" % "hadoop-client" % "2.7.1" //% "provided"
  11. //Habse的依赖
  12. libraryDependencies += "org.apache.hbase" % "hbase-client" % "0.98.12-hadoop2" // % "provided"
  13. libraryDependencies += "org.apache.hbase" % "hbase-common" % "0.98.12-hadoop2" //% "provided"
  14. libraryDependencies += "org.apache.hbase" % "hbase-server" % "0.98.12-hadoop2" //% "provided"
  15. //Spark的依赖
  16. libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "1.6.0" //% "provided"
  17. //Spark SQL 依赖
  18. libraryDependencies += "org.apache.spark" % "spark-sql_2.11" % "1.6.0" //% "provided"
  19. //Spark For Hive 依赖
  20. libraryDependencies += "org.apache.spark" % "spark-hive_2.11" % "1.6.0"
  21. //Spark for Streaming
  22. libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "1.6.0"
  23. //java servlet 依赖
  24. libraryDependencies += "javax.servlet" % "javax.servlet-api" % "3.0.1" //% "provided"

Java代码

  1. package com.tools.streaming
  2. import org.apache.spark.SparkConf
  3. import org.apache.spark.streaming._
  4. /**
  5. * Created by qindongliang on 2016/1/28.
  6. */
  7. object StreamingWordCount {
  8. def main(args: Array[String]) {
  9. //开本地线程两个处理
  10. val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
  11. //每隔10秒计算一批数据
  12. val ssc = new StreamingContext(conf, Seconds(10))
  13. //监控机器ip为192.168.1.187:9999端号的数据,注意必须是这个9999端号服务先启动nc -l 9999,否则会报错,但进程不会中断
  14. val lines = ssc.socketTextStream("192.168.1.187", 9999)
  15. //按空格切分输入数据
  16. val words = lines.flatMap(_.split(" "))
  17. //计算wordcount
  18. val pairs = words.map(word => (word, 1))
  19. //word ++
  20. val wordCounts = pairs.reduceByKey(_ + _)
  21. //排序结果集打印,先转成rdd,然后排序true升序,false降序,可以指定key和value排序_._1是key,_._2是value
  22. val sortResult=wordCounts.transform(rdd=>rdd.sortBy(_._2,false))
  23. sortResult.print()
  24. ssc.start() // 开启计算
  25. ssc.awaitTermination() // 阻塞等待计算
  26. }
  27. }

然后在对应的linux机器上,开一个nc服务,并写入一些数据:

Java代码

  1. nc -l 9999
  2. a a a c c d d v v e p x x x x o

然后在控制台,可见计算结果,并且是排好序的:

至此,第一个体验流式计算的demo就入门了,后面我们还可以继续完善这个例子,比如从kakfa或者redis里面接受数据,然后存储到hbase,或者mysql或者solr,lucene,elasticsearch索引中,用来给前端js图表绘图所用。 参考文章: http://blog.scottlogic.com/2013/07/29/spark-stream-analysis.html http://spark.apache.org/docs/latest/streaming-programming-guide.html

原文发布于微信公众号 - 我是攻城师(woshigcs)

原文发表时间:2016-01-28

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Albert陈凯

3.0Spark RDD实现详解

Spark技术内幕:深入解析Spark内核架构设计与实现原理 第三章 Spark RDD实现详解 RDD是Spark最基本也是最根本的数据抽象,它具备像MapR...

35170
来自专栏xingoo, 一个梦想做发明家的程序员

Structured Streaming教程(1) —— 基本概念与使用

在有过1.6的streaming和2.x的streaming开发体验之后,再来使用Structured Streaming会有一种完全不同的体验,尤其是在代码设...

17510
来自专栏Albert陈凯

3.6 Shuffle机制

3.6 Shuffle机制 在MapReduce框架中,Shuffle是连接Map和Reduce之间的桥梁,Map的输出要用到Reduce中必须经过Shuff...

31540
来自专栏加米谷大数据

技术分享 | Spark RDD详解

1、RDD是什么 RDD:Spark的核心概念是RDD (resilientdistributed dataset),指的是一个只读的,可分区的分布式数据集,这...

48450
来自专栏Albert陈凯

Spark系列课程-00xxSpark RDD持久化

我们这节课讲一下RDD的持久化 ? RDD的持久化 这段代码我们上午已经看过了,有瑕疵大家看出来了吗? 有什么瑕疵啊? 大家是否还记得我在第二节课的时候跟大...

42480
来自专栏Albert陈凯

Hive迁移Saprk SQL的坑和改进办法

Qcon 全球软件开发者大会2016北京站 演讲主题:Spark在360的大规模实践与经验分享 李远策 360-Spark集群概况 ? 360-Spark集...

75170
来自专栏Jed的技术阶梯

Spark性能调优04-数据倾斜调优

数据倾斜的原理很简单:在进行shuffle的时候,必须将各个节点上相同的key拉取到某个节点上的一个task来进行处理,比如按照key进行聚合或join等操作。...

73150
来自专栏Albert陈凯

Spark详解06容错机制Cache 和 Checkpoint Cache 和 Checkpoint

Cache 和 Checkpoint 作为区别于 Hadoop 的一个重要 feature,cache 机制保证了需要访问重复数据的应用(如迭代型算法和交互式应...

525120
来自专栏Albert陈凯

2.0Spark编程模型

循序渐进学Saprk 与Hadoop相比,Spark最初为提升性能而诞生。Spark是Hadoop MapReduce的演化和改进,并兼容了一些数据库的基本思想...

40880
来自专栏about云

spark零基础学习线路指导【包括spark2】

问题导读 1.你认为spark该如何入门? 2.你认为spark入门编程需要哪些步骤? 3.本文介绍了spark哪些编程知识?

12630

扫码关注云+社区

领取腾讯云代金券