我需要开发一个流应用程序,从几个来源读取一些会话日志。批处理间隔可能在5分钟左右。
问题是,我在每个批次中得到的文件差别很大。在每一批中,我可能会得到一些10兆字节的文件,然后在另一批中,我可能会得到大约20 10的文件。
我想知道是否有任何方法来处理this..Is,对文件流可以为每个批处理生成的RDDs的大小有什么限制吗?
我是否可以限制spark流只将每批中的固定数量的数据读取到RDD中?
发布于 2016-08-25 12:32:55
到目前为止,我知道没有直接的方法来限制这一点。要考虑的文件由FileStream中的isNewFile私有函数控制。基于代码,我可以想出一种解决办法。
使用filter函数限制要读取的文件数。任何大于10的文件都会返回false,并使用touch命令更新下一个窗口要考虑的文件的时间戳。
globalcounter=10
val filterF = new Function[Path, Boolean] {
def apply(file: Path): Boolean = {
globalcounter --
if(globalcounter > 0) {
return true // consider only 10 files.
}
// touch the file so that timestamp of the file is updated.
return false
}
}
https://stackoverflow.com/questions/39091836
复制相似问题