我有一个由7-8个字段组成的数据集,这些字段的类型是String、Int和Float。
我正在尝试通过编程的方法创建模式,使用的方法如下:
val schema = StructType(header.split(",").map(column => StructField(column, StringType, true)))然后将其映射到Row类型,如下所示:
val dataRdd = datafile.filter(x => x!=header).map(x => x.split(",")).map(col => Row(col(0).trim, col(1).toInt, col(2).toFloat, col(3), col(4) ,col(5), col(6), col(7), col(8)))但是在创建DataFrame之后,当我使用DF.show()时,它给出了Integer字段的错误。
那么如何创建这样的模式,其中我们在数据集中有多种数据类型
发布于 2017-05-25 09:42:17
您在代码中遇到的问题是将所有字段都赋值为StringType。
假设在头文件中只有字段的名称,那么您就无法猜测类型。
让我们假设标题字符串是这样的
val header = "field1:Int,field2:Double,field3:String"那么代码应该是
def inferType(field: String) = field.split(":")(1) match {
case "Int" => IntegerType
case "Double" => DoubleType
case "String" => StringType
case _ => StringType
}
val schema = StructType(header.split(",").map(column => StructField(column, inferType(column), true)))对于头字符串示例,您将获得
root
|-- field1:Int: integer (nullable = true)
|-- field2:Double: double (nullable = true)
|-- field3:String: string (nullable = true)从另一方面来说。如果你需要的是来自文本的数据框,我建议你直接从文件本身创建DataFrame。从RDD创建它是没有意义的。
val fileReader = spark.read.format("com.databricks.spark.csv")
.option("mode", "DROPMALFORMED")
.option("header", "true")
.option("inferschema", "true")
.option("delimiter", ",")
val df = fileReader.load(PATH_TO_FILE)发布于 2017-05-26 05:02:49
首先定义结构类型:
val schema1 = StructType(Array(
StructField("AcutionId", StringType, true),
StructField("Bid", IntegerType, false),
StructField("BidTime", FloatType, false),
StructField("Bidder", StringType, true),
StructField("BidderRate", FloatType, false),
StructField("OpenBid", FloatType, false),
StructField("Price", FloatType, false),
StructField("Item", StringType, true),
StructField("DaystoLive", IntegerType, false)
))然后通过将其转换为特定类型来指定要在Row中最佳显示的每一列:
val dataRdd = datafile.filter(x => x!=header).map(x => x.split(","))
.map(col => Row(
col(0).trim,
col(1).trim.toInt,
col(2).trim.toFloat,
col(3).trim,
col(4).trim.toFloat,
col(5).trim.toFloat,
col(6).trim.toFloat,
col(7).trim,
col(8).trim.toInt)
)然后将模式应用于RDD
val auctionDF = spark.sqlContext.createDataFrame(dataRdd,schema1)https://stackoverflow.com/questions/44170145
复制相似问题