前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark Streaming流式计算的WordCount入门

Spark Streaming流式计算的WordCount入门

作者头像
我是攻城师
发布2018-05-14 15:27:22
1.6K0
发布2018-05-14 15:27:22
举报
文章被收录于专栏:我是攻城师我是攻城师

Spark Streaming是一种近实时的流式计算模型,它将作业分解成一批一批的短小的批处理任务,然后并行计算,具有可扩展,高容错,高吞吐,实时性高等一系列优点,在某些场景可达到与Storm一样的处理程度或优于storm,也可以无缝集成多重日志收集工具或队列中转器,比如常见的 kakfa,flume,redis,logstash等,计算完后的数据结果,也可以 存储到各种存储系统中,如HDFS,数据库等,一张简单的数据流图如下:

内部处理流程:

下面来看一个wordcount级别的入门例子,注意需要导入相关的包:

Java代码

  1. //下面不需要使用的依赖,大家可根据情况去舍
  2. name := "scala-spark"
  3. version := "1.0"
  4. scalaVersion := "2.11.7"
  5. //使用公司的私服
  6. resolvers += "Local Maven Repository" at "http://dev.bizbook-inc.com:8083/nexus/content/groups/public/"
  7. //使用内部仓储
  8. externalResolvers := Resolver.withDefaultResolvers(resolvers.value, mavenCentral = false)
  9. //Hadoop的依赖
  10. libraryDependencies += "org.apache.hadoop" % "hadoop-client" % "2.7.1" //% "provided"
  11. //Habse的依赖
  12. libraryDependencies += "org.apache.hbase" % "hbase-client" % "0.98.12-hadoop2" // % "provided"
  13. libraryDependencies += "org.apache.hbase" % "hbase-common" % "0.98.12-hadoop2" //% "provided"
  14. libraryDependencies += "org.apache.hbase" % "hbase-server" % "0.98.12-hadoop2" //% "provided"
  15. //Spark的依赖
  16. libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "1.6.0" //% "provided"
  17. //Spark SQL 依赖
  18. libraryDependencies += "org.apache.spark" % "spark-sql_2.11" % "1.6.0" //% "provided"
  19. //Spark For Hive 依赖
  20. libraryDependencies += "org.apache.spark" % "spark-hive_2.11" % "1.6.0"
  21. //Spark for Streaming
  22. libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "1.6.0"
  23. //java servlet 依赖
  24. libraryDependencies += "javax.servlet" % "javax.servlet-api" % "3.0.1" //% "provided"

Java代码

  1. package com.tools.streaming
  2. import org.apache.spark.SparkConf
  3. import org.apache.spark.streaming._
  4. /**
  5. * Created by qindongliang on 2016/1/28.
  6. */
  7. object StreamingWordCount {
  8. def main(args: Array[String]) {
  9. //开本地线程两个处理
  10. val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
  11. //每隔10秒计算一批数据
  12. val ssc = new StreamingContext(conf, Seconds(10))
  13. //监控机器ip为192.168.1.187:9999端号的数据,注意必须是这个9999端号服务先启动nc -l 9999,否则会报错,但进程不会中断
  14. val lines = ssc.socketTextStream("192.168.1.187", 9999)
  15. //按空格切分输入数据
  16. val words = lines.flatMap(_.split(" "))
  17. //计算wordcount
  18. val pairs = words.map(word => (word, 1))
  19. //word ++
  20. val wordCounts = pairs.reduceByKey(_ + _)
  21. //排序结果集打印,先转成rdd,然后排序true升序,false降序,可以指定key和value排序_._1是key,_._2是value
  22. val sortResult=wordCounts.transform(rdd=>rdd.sortBy(_._2,false))
  23. sortResult.print()
  24. ssc.start() // 开启计算
  25. ssc.awaitTermination() // 阻塞等待计算
  26. }
  27. }

然后在对应的linux机器上,开一个nc服务,并写入一些数据:

Java代码

  1. nc -l 9999
  2. a a a c c d d v v e p x x x x o

然后在控制台,可见计算结果,并且是排好序的:

至此,第一个体验流式计算的demo就入门了,后面我们还可以继续完善这个例子,比如从kakfa或者redis里面接受数据,然后存储到hbase,或者mysql或者solr,lucene,elasticsearch索引中,用来给前端js图表绘图所用。 参考文章: http://blog.scottlogic.com/2013/07/29/spark-stream-analysis.html http://spark.apache.org/docs/latest/streaming-programming-guide.html

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2016-01-28,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 我是攻城师 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
流计算 Oceanus
流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的企业级实时大数据分析平台,具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档