我在scala/spark数据管道中有3个对象。2是数据帧,1是长值。
我需要创建一个单独的json对象,其中包括3个对象。
例如,如果将特定日期作为请求传递给应用程序api:
val df1 = getDF_1(date)
val df2 = getDF_2(date)
val value_3 = getValue_3(date)
我可以单独地将它们转换为json,但我很难创建一个JSON响应,比如:
response = {"date":date, "values"{
"df1":{nested json report}
"df2":{nested json report}
"long3":23234
}
}
下面是数据帧的创建方式:
case class IpAccessCount(ip:String, uri:String, accessCount:Long)
def toIpAccessCount(ip:String, uri:String, accessCount:Long): IpAccessCount = IpAccessCount(ip, uri,accessCount)
val ipAccessCount=udf(toIpAccessCount _)
spark.udf.register("ipAccessCount", ipAccessCount)
val ipAccessCountByDate = spark.sql("""select ip, uri, datetime, count(*) as accessCount from csvData group by ip, uri, datetime """)
ipAccessCountByDate.createOrReplaceTempView("ipAccessCountByDate")
def GetDateIpAccessCount(date_as_str:String)=
ipAccessCountByDate.filter(col("datetime")===s"$date_as_str").drop("datetime").map(r => toIpAccessCount(r.getString(0), r.getString(1), r.getLong(2))).toDF().coalesce(1)
我不知道如何将它们合并到单个json响应中。非常感谢!
发布于 2020-08-06 15:46:53
如果你真的需要以这种方式实现,我有一个解决方案。
val df1 = "[" + getDF_1(date).toJSON.collect().mkString(",") + "]"
val df2 = "[" + getDF_2(date).toJSON.collect().mkString(",") + "]"
val value_3 = getValue_3(date)
val json_response = s"""
|{"date":"${date}", "values":{
| "df1":"${df1}",
| "df2":"${df2}",
| "long3":23234
| }
| }
|""".stripMargin
说明:让我们创建同时包含df1和df2的JSON数组字符串。然后将df1、df2、date作为json字符串附加到json_response中,如果需要,可以将该字符串转换为JSON对象。
注意:如果在df1或df2中有大量数据,那么在执行collect()时,可能会出现内存不足异常。
https://stackoverflow.com/questions/63285484
复制