首页
学习
活动
专区
工具
TVP
发布
社区首页 >问答首页 >如何读取Spark中的嵌套集合

如何读取Spark中的嵌套集合
EN

Stack Overflow用户
提问于 2015-05-03 06:20:06
回答 4查看 18.4K关注 0票数 19

我有一张镶木镶木桌上有一根柱子

、array>

可以使用横向视图语法在配置单元中对此表运行查询。

如何将该表读入RDD,更重要的是如何过滤、映射等Spark中的嵌套集合?

在Spark文档中找不到任何与此相关的引用。提前感谢您提供的任何信息!

ps。我觉得在谈判桌上提供一些统计数据可能会有所帮助。主表中的列数约为600。行数~200m。嵌套集合中的“列”数~10。嵌套集合中的平均记录数~35。

EN

回答 4

Stack Overflow用户

回答已采纳

发布于 2015-05-03 19:30:26

在嵌套集合的情况下没有魔术。Spark将以与RDD[(String, String)]RDD[(String, Seq[String])]相同的方式处理。

不过,从Parquet文件中读取此类嵌套集合可能会比较棘手。

让我们以spark-shell (1.3.1)为例:

代码语言:javascript
复制
scala> import sqlContext.implicits._
import sqlContext.implicits._

scala> case class Inner(a: String, b: String)
defined class Inner

scala> case class Outer(key: String, inners: Seq[Inner])
defined class Outer

编写拼图文件:

代码语言:javascript
复制
scala> val outers = sc.parallelize(List(Outer("k1", List(Inner("a", "b")))))
outers: org.apache.spark.rdd.RDD[Outer] = ParallelCollectionRDD[0] at parallelize at <console>:25

scala> outers.toDF.saveAsParquetFile("outers.parquet")

阅读拼图文件:

代码语言:javascript
复制
scala> import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.catalyst.expressions.Row

scala> val dataFrame = sqlContext.parquetFile("outers.parquet")
dataFrame: org.apache.spark.sql.DataFrame = [key: string, inners: array<struct<a:string,b:string>>]   

scala> val outers = dataFrame.map { row =>
     |   val key = row.getString(0)
     |   val inners = row.getAs[Seq[Row]](1).map(r => Inner(r.getString(0), r.getString(1)))
     |   Outer(key, inners)
     | }
outers: org.apache.spark.rdd.RDD[Outer] = MapPartitionsRDD[8] at map at DataFrame.scala:848

最重要的部分是row.getAs[Seq[Row]](1)struct嵌套序列的内部表示是ArrayBuffer[Row],您可以使用它的任何超类型来代替Seq[Row]1是外部行中的列索引。我在这里使用了getAs方法,但在最新版本的Spark中也有替代方法。请参阅Row trait的源代码。

现在您有了一个RDD[Outer],您可以应用任何所需的转换或操作。

代码语言:javascript
复制
// Filter the outers
outers.filter(_.inners.nonEmpty)

// Filter the inners
outers.map(outer => outer.copy(inners = outer.inners.filter(_.a == "a")))

请注意,我们使用spark-SQL库仅用于读取parquet文件。例如,您可以直接在DataFrame上选择所需的列,然后再将其映射到RDD。

代码语言:javascript
复制
dataFrame.select('col1, 'col2).map { row => ... }
票数 20
EN

Stack Overflow用户

发布于 2015-06-20 04:34:44

我将给出一个基于Python的答案,因为这就是我正在使用的。我认为Scala也有类似的东西。

根据Python API docs的说法,在Spark 1.4.0中添加了explode函数,以处理DataFrames中的嵌套数组。

创建测试数据帧:

代码语言:javascript
复制
from pyspark.sql import Row

df = sqlContext.createDataFrame([Row(a=1, intlist=[1,2,3]), Row(a=2, intlist=[4,5,6])])
df.show()

## +-+--------------------+
## |a|             intlist|
## +-+--------------------+
## |1|ArrayBuffer(1, 2, 3)|
## |2|ArrayBuffer(4, 5, 6)|
## +-+--------------------+

使用explode平整列表列:

代码语言:javascript
复制
from pyspark.sql.functions import explode

df.select(df.a, explode(df.intlist)).show()

## +-+---+
## |a|_c0|
## +-+---+
## |1|  1|
## |1|  2|
## |1|  3|
## |2|  4|
## |2|  5|
## |2|  6|
## +-+---+
票数 8
EN

Stack Overflow用户

发布于 2015-11-05 01:16:25

另一种方法是像这样使用模式匹配:

代码语言:javascript
复制
val rdd: RDD[(String, List[(String, String)]] = dataFrame.map(_.toSeq.toList match { 
  case List(key: String, inners: Seq[Row]) => key -> inners.map(_.toSeq.toList match {
    case List(a:String, b: String) => (a, b)
  }).toList
})

您可以直接在Row上进行模式匹配,但由于几个原因,它可能会失败。

票数 3
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/30008127

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档