首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >如何将JSON数组转换为writeStream之前的行到Elasticsearch?

如何将JSON数组转换为writeStream之前的行到Elasticsearch?
EN

Stack Overflow用户
提问于 2018-11-23 11:42:11
回答 1查看 95关注 0票数 0

跟进this question

我有如下所示格式的JSON流数据

代码语言:javascript
运行
复制
|  A    | B                                        |
|-------|------------------------------------------|
|  ABC  |  [{C:1, D:1}, {C:2, D:4}]                | 
|  XYZ  |  [{C:3, D :6}, {C:9, D:11}, {C:5, D:12}] |

我需要把它转换成下面的格式

代码语言:javascript
运行
复制
|   A   |  C  |  D   |
|-------|-----|------|
|  ABC  |  1  |  1   |
|  ABC  |  2  |  4   |
|  XYZ  |  3  |  6   |
|  XYZ  |  9  |  11  |
|  XYZ  |  5  |  12  | 

为了实现这一点,执行了前面问题中建议的转换。

代码语言:javascript
运行
复制
val df1 = df0.select($"A", explode($"B")).toDF("A", "Bn")

val df2 = df1.withColumn("SeqNum", monotonically_increasing_id()).toDF("A", "Bn", "SeqNum") 

val df3 = df2.select($"A", explode($"Bn"), $"SeqNum").toDF("A", "B", "C", "SeqNum")

val df4 = df3.withColumn("dummy", concat( $"SeqNum", lit("||"), $"A"))

val df5 = df4.select($"dummy", $"B", $"C").groupBy("dummy").pivot("B").agg(first($"C")) 

val df6 = df5.withColumn("A", substring_index(col("dummy"), "||", -1)).drop("dummy")

现在,我需要将数据保存到ElasticSearch中。

代码语言:javascript
运行
复制
 df6.writeStream
  .outputMode("complete")
  .format("es")
  .option("es.resource", "index/type")
  .option("es.nodes", "localhost")
  .option("es.port", 9200)
  .start()
  .awaitTermination()

我得到一个错误,ElasticSearch不支持Append输出模式。在Append模式下,它无法写入writeStream,聚合不能在Append模式上完成。我能够在完全模式下写到控制台。我现在如何将数据写入ElasticSearch?

任何帮助都将不胜感激。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2018-11-23 11:54:17

这里不需要pivot或聚合。如果B列确实是Array[Map[String, String]] ( SQL类型中的array<map<string, string>>),则只需要一个简单的selectwithColumn

代码语言:javascript
运行
复制
df
  .withColumn("B", explode($"B"))
  .select($"A", $"B"("C") as "C", $"B"("D") as "D")
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/53446051

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档