用SparkStreaming做奇怪的事

作者:尹会生

无需授权即可转载,甚至无需保留以上版权声明

Spark Steaming 是非常著名的流式计算工具,这次用它来搞一个奇葩的需求:开发给定一个日志同步服务器,日志达到10MB会同步过来一个新的文件,要求判断里面包含“error”关键字的次数,累积达到5次以后就发送紧急通知。

这个奇葩需求要注意两个点,一个是文件会不断的增加,所以要定时删除文件;另一个是"error"会在不定长的时间出现。这让我想到了Spark Streaming 的高级功能,我们要用到状态查询才能搞的定。

首先我们来搞定Spark Steaming 启动的问题,Spark Steaming 支持“文本文件 流”函数, 即textFileStream(),要是用这个调用你需要先导入一个streaming库

import org.apache.spark.streaming._ ,

然后声明Streaming的入口

StreamingContext(sparkConf, Seconds(1))

这里的 Seconds(1)是每隔多久来做一次统计,最后想要开始的时候执行

sparkstreamingcontext.start()。

那读取文件呢,就用textFileStream(),官方文档没有解释用法,那么看源代码,它是这么定义的

def textFileStream(directory: String): DStream[String] = withNamedScope("text file stream") { fileStream[LongWritable, Text, TextInputFormat](directory).map(_._2.toString)

}

代码位置 $SPARK_src\streaming\src\main\scala\org\apache\spark\streaming\StreamingContext.scala 下,这样让spark streaming天然的就支持了基于文件变动统计的功能。

最后一个大坑是需要增量记录,那就是使用mapWithState() 来解决。

按照这个思路参考example,花了10分钟写了一段实现基本功能的代码,内容如下:

package examples.streaming import org.apache.spark.SparkConf import org.apache.spark.SparkContext._ import org.apache.spark.streaming._ object FileWordCount { def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName("FileWordCount") //设置统计间隔时间,测试用1秒 val ssc = new StreamingContext(sparkConf, Seconds(1)) //设置checkpoint用于恢复 ssc.checkpoint(".") val initialRDD = ssc.sparkContext.parallelize(List(("error", 0), ("warn", 0))) val lines = ssc.textFileStream("/tmp/test") val errNums = lines.filter( line => line.contains("error") ).count() val errDstream = errNums.map(x => ("error", x.toInt)) val mappingFunc = (word: String, one: Option[Int], state: State[Int]) => { val sum = one.getOrElse(0) + state.getOption.getOrElse(0) val output = (word, sum) state.update(sum) output } //更新状态 val errStateDstream = errDstream.mapWithState(StateSpec.function(mappingFunc).initialState(initialRDD)) errStateDstream.print() ssc.start() ssc.awaitTermination() } }

原文发布于微信公众号 - 奇点(qddata)

原文发表时间:2016-08-24

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏闵开慧

hadoop压缩与解压

1 压缩 一般来说,计算机处理的数据都存在一些冗余度,同时数据中间,尤其是相邻数据间存在着相关性,所以可以通过一些有别于原始编码的特殊编码方式来保存数据, 使数...

4038
来自专栏Java3y

JDBC【数据库连接池、DbUtils框架、分页】

1.数据库连接池 什么是数据库连接池 简单来说:数据库连接池就是提供连接的。。。 为什么我们要使用数据库连接池 数据库的连接的建立和关闭是非常消耗资源的 频繁地...

3724
来自专栏祝威廉

Structured Streaming如何实现Parquet存储目录按时间分区

StreamingPro现在支持以SQL脚本的形式写Structured Streaming流式程序了: mlsql-stream。不过期间遇到个问题,我希望按...

1331
来自专栏Kirito的技术分享

Spring揭秘--寻找遗失的web.xml

今天我们来放松下心情,不聊分布式,云原生,来聊一聊初学者接触的最多的 java web 基础。几乎所有人都是从 servlet,jsp,filter 开始编写自...

2642
来自专栏大内老A

让我们的ASP.NET MVC应用可以单独维护验证消息

在项目开发中,我们会使用到很多的描述性文字,比如验证消息、错误消息和确认消息等,让这些文本消息具有可维护性具有重要的意义。虽然我们可以将它们存储于资源文件中,并...

1997
来自专栏文渊之博

pyspark 内容介绍(一)

pyspark 包介绍 子包 pyspark.sql module pyspark.streaming module pyspark.ml package py...

6066
来自专栏伦少的博客

spark基本概念(便于自己随时查阅--摘自Spark快速大数据分析)

转载请务必注明原创地址为:http://dongkelun.com/2018/01/23/sparkBasicConcept/

3808
来自专栏数据科学与人工智能

【Spark研究】Spark编程指南(Python版)

Spark编程指南 译者说在前面:最近在学习Spark相关的知识,在网上没有找到比较详细的中文教程,只找到了官网的教程。出于自己学习同时也造福其他初学者的目的,...

1.5K5
来自专栏Small Code

【Python】自动生成命令行工具 - fire 简介

Python 中用于生成命令行接口(Command Line Interfaces, CLIs)的工具已经有一些了,例如已经成为 Python 标准库的 arg...

4519
来自专栏伦少的博客

SparkStreaming+Kafka 实现基于缓存的实时wordcount程序

1836

扫码关注云+社区

领取腾讯云代金券