我必须从Azure数据湖Gen2中读取数据库中的数百个avro文件,从每个文件中的Body字段中提取数据,并将所有提取的数据连接在一个唯一的数据中。要点是,所有要读取的avro文件都是存储在湖中不同子目录中的,如下所示:
根/YYYY/mm/DD/HH/mm/ss.avro
这迫使我循环摄取和选择数据。我正在使用这个Python代码,其中list_avro_files是指向所有文件的路径列表:
list_data = []
for file_avro in list_avro_files:
df = spark.read.format('avro').load(file_avro)
data1 = spark.read.json(df.select(df.Body.cast('string')).rdd.map(lambda x: x[0]))
list_data.append(data1)
data = reduce(DataFrame.unionAll, list_data)有什么办法更有效地做到这一点吗?我如何并行/加速这个过程?
发布于 2020-06-17 13:22:39
只要您的list_avro_files可以通过标准通配符语法表示,您就可以使用Spark自己的能力将读取操作并行化。您只需要为您的avro文件指定一个basepath和一个文件名模式:
scala> var df = spark.read
.option("basepath","/user/hive/warehouse/root")
.format("avro")
.load("/user/hive/warehouse/root/*/*/*/*.avro")而且,如果您发现需要确切地知道任何给定行来自哪个文件,请使用input_file_name()内置函数来丰富您的数据:
scala> df = df.withColumn("source",input_file_name())https://stackoverflow.com/questions/62426810
复制相似问题