首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >Spark streaming DStream RDD获取文件名

Spark streaming DStream RDD获取文件名
EN

Stack Overflow用户
提问于 2015-03-13 19:38:46
回答 3查看 3K关注 0票数 9

Spark streaming textFileStreamfileStream可以监控目录并处理Dstream RDD中的新文件。

如何获取DStream RDD在该特定时间间隔正在处理的文件名?

EN

回答 3

Stack Overflow用户

发布于 2016-10-26 00:24:29

fileStream生成NewHadoopRDDs的UnionRDDsc.newAPIHadoopFile创建的NewHadoopRDDs的好处是它们的names被设置为它们的路径。

下面是你可以用这些知识做些什么的例子:

代码语言:javascript
运行
复制
def namedTextFileStream(ssc: StreamingContext, directory: String): DStream[String] =
  ssc.fileStream[LongWritable, Text, TextInputFormat](directory)
    .transform( rdd =>
      new UnionRDD(rdd.context,
        rdd.dependencies.map( dep =>
          dep.rdd.asInstanceOf[RDD[(LongWritable, Text)]].map(_._2.toString).setName(dep.rdd.name)
        )
      )
    )

def transformByFile[U: ClassTag](unionrdd: RDD[String],
                                 transformFunc: String => RDD[String] => RDD[U]): RDD[U] = {
  new UnionRDD(unionrdd.context,
    unionrdd.dependencies.map{ dep =>
      if (dep.rdd.isEmpty) None
      else {
        val filename = dep.rdd.name
        Some(
          transformFunc(filename)(dep.rdd.asInstanceOf[RDD[String]])
            .setName(filename)
        )
      }
    }.flatten
  )
}

def main(args: Array[String]) = {
  val conf = new SparkConf()
    .setAppName("Process by file")
    .setMaster("local[2]")

  val ssc = new StreamingContext(conf, Seconds(30))

  val dstream = namesTextFileStream(ssc, "/some/directory")

  def byFileTransformer(filename: String)(rdd: RDD[String]): RDD[(String, String)] =
    rdd.map(line => (filename, line))

  val transformed = dstream.
    transform(rdd => transformByFile(rdd, byFileTransformer))

  // Do some stuff with transformed

  ssc.start()
  ssc.awaitTermination()
}
票数 5
EN

Stack Overflow用户

发布于 2017-10-12 07:38:54

对于那些想要一些Java代码而不是Scala的人:

代码语言:javascript
运行
复制
JavaPairInputDStream<LongWritable, Text> textFileStream = 
        jsc.fileStream(
            inputPath, 
            LongWritable.class, 
            Text.class,
            TextInputFormat.class, 
            FileInputDStream::defaultFilter,
            false
        );
JavaDStream<Tuple2<String, String>> namedTextFileStream = textFileStream.transform((pairRdd, time) -> {
        UnionRDD<Tuple2<LongWritable, Text>> rdd = (UnionRDD<Tuple2<LongWritable, Text>>) pairRdd.rdd();
        List<RDD<Tuple2<LongWritable, Text>>> deps = JavaConverters.seqAsJavaListConverter(rdd.rdds()).asJava();
        List<RDD<Tuple2<String, String>>> collectedRdds = deps.stream().map( depRdd -> {
            if (depRdd.isEmpty()) {
                return null;
            }
            JavaRDD<Tuple2<LongWritable, Text>> depJavaRdd = depRdd.toJavaRDD();
            String filename = depRdd.name();
            JavaPairRDD<String, String> newDep = JavaPairRDD.fromJavaRDD(depJavaRdd).mapToPair(t -> new Tuple2<String, String>(filename, t._2().toString())).setName(filename);
            return newDep.rdd();
        }).filter(t -> t != null).collect(Collectors.toList());
        Seq<RDD<Tuple2<String, String>>> rddSeq = JavaConverters.asScalaBufferConverter(collectedRdds).asScala().toIndexedSeq();
        ClassTag<Tuple2<String, String>> classTag = scala.reflect.ClassTag$.MODULE$.apply(Tuple2.class);
        return new UnionRDD<Tuple2<String, String>>(rdd.sparkContext(), rddSeq, classTag).toJavaRDD();
});
票数 1
EN

Stack Overflow用户

发布于 2020-05-05 05:53:47

或者,通过修改FileInputDStream,而不是将文件内容加载到RDD中,它只需从文件名创建RDD即可。

如果您实际上不想将数据本身读取到RDD中,或者希望将文件名作为您的步骤之一传递给外部命令,则这会提高性能。

只需更改filesToRDD(..)因此,它对文件名进行RDD,而不是将数据加载到RDD中。

请参阅:https://github.com/HASTE-project/bin-packing-paper/blob/master/spark/spark-scala-cellprofiler/src/main/scala/FileInputDStream2.scala#L278

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/29031276

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档