我正在使用Spark2.2,我试图从Kafka读取JSON消息,将它们转换为DataFrame,并将它们作为Row
spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "topic")
.load()
.select(col("value").cast(StringType).as("col"))
.writeStream()
.format("console")
.start();有了这一点,我就能做到:
+--------------------+
| col|
+--------------------+
|{"myField":"somet...|
+--------------------+我想要更像这样的东西:
+--------------------+
| myField|
+--------------------+
|"something" |
+--------------------+我尝试使用from_json函数使用struct
DataTypes.createStructType(
new StructField[] {
DataTypes.createStructField("myField", DataTypes.StringType)
}
)但我只得到:
+--------------------+
| jsontostructs(col)|
+--------------------+
|[something] |
+--------------------+然后我试着使用explode,但是我只得到了一个例外,它说:
cannot resolve 'explode(`col`)' due to data type mismatch:
input to function explode should be array or map type, not
StructType(StructField(...知道该怎么做吗?
发布于 2017-10-12 16:47:26
你快到了,只要选择正确的东西。from_json返回一个与架构匹配的struct列。如果模式(JSON表示)如下所示:
{"type":"struct","fields":[{"name":"myField","type":"string","nullable":false,"metadata":{}}]}您将得到相当于以下内容的嵌套对象:
root
|-- jsontostructs(col): struct (nullable = true)
| |-- myField: string (nullable = false)可以使用getField (或getItem)方法选择特定字段
df.select(from_json(col("col"), schema).getField("myField").alias("myField"));或.*来选择struct中的所有顶级字段
df.select(from_json(col("col"), schema).alias("tmp")).select("tmp.*");尽管对于单个string列,get_json_object应该足够:
df.select(get_json_object(col("col"), "$.myField"));https://stackoverflow.com/questions/46714561
复制相似问题