作者:尹会生
无需授权即可转载,甚至无需保留以上版权声明
Spark Steaming 是非常著名的流式计算工具,这次用它来搞一个奇葩的需求:开发给定一个日志同步服务器,日志达到10MB会同步过来一个新的文件,要求判断里面包含“error”关键字的次数,累积达到5次以后就发送紧急通知。
这个奇葩需求要注意两个点,一个是文件会不断的增加,所以要定时删除文件;另一个是"error"会在不定长的时间出现。这让我想到了Spark Streaming 的高级功能,我们要用到状态查询才能搞的定。
首先我们来搞定Spark Steaming 启动的问题,Spark Steaming 支持“文本文件 流”函数, 即textFileStream(),要是用这个调用你需要先导入一个streaming库
import org.apache.spark.streaming._ ,
然后声明Streaming的入口
StreamingContext(sparkConf, Seconds(1))
这里的 Seconds(1)是每隔多久来做一次统计,最后想要开始的时候执行
sparkstreamingcontext.start()。
那读取文件呢,就用textFileStream(),官方文档没有解释用法,那么看源代码,它是这么定义的
def textFileStream(directory: String): DStream[String] = withNamedScope("text file stream") { fileStream[LongWritable, Text, TextInputFormat](directory).map(_._2.toString)
}
代码位置 $SPARK_src\streaming\src\main\scala\org\apache\spark\streaming\StreamingContext.scala 下,这样让spark streaming天然的就支持了基于文件变动统计的功能。
最后一个大坑是需要增量记录,那就是使用mapWithState() 来解决。
按照这个思路参考example,花了10分钟写了一段实现基本功能的代码,内容如下:
package examples.streaming import org.apache.spark.SparkConf import org.apache.spark.SparkContext._ import org.apache.spark.streaming._ object FileWordCount { def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName("FileWordCount") //设置统计间隔时间,测试用1秒 val ssc = new StreamingContext(sparkConf, Seconds(1)) //设置checkpoint用于恢复 ssc.checkpoint(".") val initialRDD = ssc.sparkContext.parallelize(List(("error", 0), ("warn", 0))) val lines = ssc.textFileStream("/tmp/test") val errNums = lines.filter( line => line.contains("error") ).count() val errDstream = errNums.map(x => ("error", x.toInt)) val mappingFunc = (word: String, one: Option[Int], state: State[Int]) => { val sum = one.getOrElse(0) + state.getOption.getOrElse(0) val output = (word, sum) state.update(sum) output } //更新状态 val errStateDstream = errDstream.mapWithState(StateSpec.function(mappingFunc).initialState(initialRDD)) errStateDstream.print() ssc.start() ssc.awaitTermination() } }