首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >火花结构化流中的jsontostructs to Row

火花结构化流中的jsontostructs to Row
EN

Stack Overflow用户
提问于 2017-10-12 16:25:58
回答 1查看 3.3K关注 0票数 7

我正在使用Spark2.2,我试图从Kafka读取JSON消息,将它们转换为DataFrame,并将它们作为Row

代码语言:javascript
运行
复制
spark
    .readStream()
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", "topic")
    .load()
    .select(col("value").cast(StringType).as("col"))
    .writeStream()
    .format("console")
    .start();

有了这一点,我就能做到:

代码语言:javascript
运行
复制
+--------------------+
|                 col|
+--------------------+
|{"myField":"somet...|
+--------------------+

我想要更像这样的东西:

代码语言:javascript
运行
复制
+--------------------+
|             myField|
+--------------------+
|"something"         |
+--------------------+

我尝试使用from_json函数使用struct

代码语言:javascript
运行
复制
DataTypes.createStructType(
    new StructField[] {
            DataTypes.createStructField("myField", DataTypes.StringType)
    }
)

但我只得到:

代码语言:javascript
运行
复制
+--------------------+
|  jsontostructs(col)|
+--------------------+
|[something]         |
+--------------------+

然后我试着使用explode,但是我只得到了一个例外,它说:

代码语言:javascript
运行
复制
cannot resolve 'explode(`col`)' due to data type mismatch: 
input to function explode should be array or map type, not 
StructType(StructField(...

知道该怎么做吗?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2017-10-12 16:47:26

你快到了,只要选择正确的东西。from_json返回一个与架构匹配的struct列。如果模式(JSON表示)如下所示:

代码语言:javascript
运行
复制
{"type":"struct","fields":[{"name":"myField","type":"string","nullable":false,"metadata":{}}]}

您将得到相当于以下内容的嵌套对象:

代码语言:javascript
运行
复制
root
 |-- jsontostructs(col): struct (nullable = true)
 |    |-- myField: string (nullable = false)

可以使用getField (或getItem)方法选择特定字段

代码语言:javascript
运行
复制
df.select(from_json(col("col"), schema).getField("myField").alias("myField"));

.*来选择struct中的所有顶级字段

代码语言:javascript
运行
复制
df.select(from_json(col("col"), schema).alias("tmp")).select("tmp.*");

尽管对于单个string列,get_json_object应该足够:

代码语言:javascript
运行
复制
df.select(get_json_object(col("col"), "$.myField"));
票数 11
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/46714561

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档