前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >用SparkStreaming做奇怪的事

用SparkStreaming做奇怪的事

作者头像
刀刀老高
发布2018-04-11 10:57:22
6450
发布2018-04-11 10:57:22
举报
文章被收录于专栏:奇点大数据奇点大数据

作者:尹会生

无需授权即可转载,甚至无需保留以上版权声明

Spark Steaming 是非常著名的流式计算工具,这次用它来搞一个奇葩的需求:开发给定一个日志同步服务器,日志达到10MB会同步过来一个新的文件,要求判断里面包含“error”关键字的次数,累积达到5次以后就发送紧急通知。

这个奇葩需求要注意两个点,一个是文件会不断的增加,所以要定时删除文件;另一个是"error"会在不定长的时间出现。这让我想到了Spark Streaming 的高级功能,我们要用到状态查询才能搞的定。

首先我们来搞定Spark Steaming 启动的问题,Spark Steaming 支持“文本文件 流”函数, 即textFileStream(),要是用这个调用你需要先导入一个streaming库

import org.apache.spark.streaming._ ,

然后声明Streaming的入口

StreamingContext(sparkConf, Seconds(1))

这里的 Seconds(1)是每隔多久来做一次统计,最后想要开始的时候执行

sparkstreamingcontext.start()。

那读取文件呢,就用textFileStream(),官方文档没有解释用法,那么看源代码,它是这么定义的

def textFileStream(directory: String): DStream[String] = withNamedScope("text file stream") { fileStream[LongWritable, Text, TextInputFormat](directory).map(_._2.toString)

}

代码位置 $SPARK_src\streaming\src\main\scala\org\apache\spark\streaming\StreamingContext.scala 下,这样让spark streaming天然的就支持了基于文件变动统计的功能。

最后一个大坑是需要增量记录,那就是使用mapWithState() 来解决。

按照这个思路参考example,花了10分钟写了一段实现基本功能的代码,内容如下:

package examples.streaming import org.apache.spark.SparkConf import org.apache.spark.SparkContext._ import org.apache.spark.streaming._ object FileWordCount { def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName("FileWordCount") //设置统计间隔时间,测试用1秒 val ssc = new StreamingContext(sparkConf, Seconds(1)) //设置checkpoint用于恢复 ssc.checkpoint(".") val initialRDD = ssc.sparkContext.parallelize(List(("error", 0), ("warn", 0))) val lines = ssc.textFileStream("/tmp/test") val errNums = lines.filter( line => line.contains("error") ).count() val errDstream = errNums.map(x => ("error", x.toInt)) val mappingFunc = (word: String, one: Option[Int], state: State[Int]) => { val sum = one.getOrElse(0) + state.getOption.getOrElse(0) val output = (word, sum) state.update(sum) output } //更新状态 val errStateDstream = errDstream.mapWithState(StateSpec.function(mappingFunc).initialState(initialRDD)) errStateDstream.print() ssc.start() ssc.awaitTermination() } }

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2016-08-24,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 奇点 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
流计算 Oceanus
流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的企业级实时大数据分析平台,具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档