我正在尝试实现min和max在agg中的groupByKey操作。代码如下所示:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.TypedColumn
import org.apache.spark.sql.expressions.scalalang.typed.{
count => typedCount,
sum => typedSum }
inputFlowRecords.groupByKey(inputFlowRecrd => inputFlowRecrd.FlowInformation)
.agg(typedSum[InputFlowRecordV1](_.FlowStatistics.minFlowTime).name("minFlowTime"),
typedSum[InputFlowRecordV1](_.FlowStatistics.maxFlowTime).name("maxFlowTime"),
typedSum[InputFlowRecordV1](_.FlowStatistics.flowStartedCount).name("flowStartedCount"),
typedSum[InputFlowRecordV1](_.FlowStatistics.flowEndedCount).name("flowEndedCount"),
typedSum[InputFlowRecordV1](_.FlowStatistics.packetsCountFromSource).name("packetsCountFromSource"),
typedSum[InputFlowRecordV1](_.FlowStatistics.bytesCountFromSource).name("bytesCountFromSource"),
typedSum[InputFlowRecordV1](_.FlowStatistics.packetsCountFromDestination).name("packetsCountFromDestination"),
typedSum[InputFlowRecordV1](_.FlowStatistics.bytesCountFromDestination).name("bytesCountFromDestination"))我在这里面临两个问题:
org.apache.spark.sql.functions.min/max操作时,错误说应该使用TypedColumns。如何解决这一问题?agg函数只允许我们指定最多4列。在其中,我有8列要聚合。如何才能做到这一点?发布于 2019-07-23 19:58:55
不幸的是,似乎:
在您的例子中,一件合理的事情可能是定义您自己的专门聚合器来聚合InputFlowStatistics对象,所以您只有一个参数给agg。
类型化聚合器是在这里定义的:typedaggregators.scala和Spark文档提供了一些关于创建自定义聚合器(->链接)的信息。
https://stackoverflow.com/questions/57170313
复制相似问题