我有一个使用mongoexport
从mongodb导出数据的进程。正如文档提到的,所有json输出都处于严格的模式中。
这意味着数据将如下所示:
"{amount":{"$numberLong":"3"},"count":{"$numberLong":"245"}}
其中,我的Scala案例类被定义为:
case class MongoData(amount: Long, count: Long)
当然,读取数据会像这样失败:
spark
.read
.json(inputPath)
.as[MongoData]
有没有一种方法可以在没有严格模式的情况下从mongo导出,或者在Scala中导入json而不手动地将每个字段重组到适当的结构中?
发布于 2017-07-06 01:51:08
我现在用这个作为解决方案。但感觉有点烦躁。
case class DataFrameExtended(dataFrame: DataFrame) {
def undoMongoStrict(): DataFrame = {
val numberLongType = StructType(List(StructField("$numberLong", StringType, true)))
def restructure(fields: Array[StructField], nesting: List[String] = Nil): List[Column] = {
fields.flatMap(field => {
val fieldPath = nesting :+ field.name
val fieldPathStr = fieldPath.mkString(".")
field.dataType match {
case dt: StructType if dt == numberLongType =>
Some(col(s"$fieldPathStr.$$numberLong").cast(LongType).as(field.name))
case dt: StructType =>
Some(struct(restructure(dt.fields, fieldPath): _*).as(field.name))
case _ => Some(col(fieldPathStr).as(field.name))
// case dt:ArrayType => //@todo handle other DataTypes Array??
}
})
}.toList
dataFrame.select(restructure(dataFrame.schema.fields): _*)
}
}
implicit def dataFrameExtended(df: DataFrame): DataFrameExtended = {
DataFrameExtended(df)
}
spark
.read
.json(inputPath)
.undoMongoStrict()
https://stackoverflow.com/questions/44926616
复制