首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >UDAF Spark中的多列输出

UDAF Spark中的多列输出
EN

Stack Overflow用户
提问于 2017-03-12 23:21:18
回答 1查看 1.2K关注 0票数 3

我从我的mongodb中获得了一些数据,如下所示:

代码语言:javascript
运行
复制
     +------+-------+
     | view | data  |
     +------+-------+
     |  xx  | ***   |
     |  yy  | ***   |
     |  xx  | ***   |
     +------+-------+

没有必要知道里面是什么。

我写了一个这样的UserDefinedAggregateFunction,因为我想在视图上分组。

代码语言:javascript
运行
复制
class Extractor() extends UserDefinedAggregateFunction{
  override def inputSchema: StructType = // some stuff

  override def bufferSchema: StructType = 
      StructType(
        List(
          StructField("0",IntegerType,false),
          StructField("1",IntegerType,false),
          StructField("2",IntegerType,false),
          StructField("3",IntegerType,false),
          StructField("4",IntegerType,false),
          StructField("5",IntegerType,false),
          StructField("6",IntegerType,false),
          StructField("7",IntegerType,false)
        )
      )

  override def dataType: DataType = bufferSchema        

  override def deterministic: Boolean = true

  override def initialize(buffer: MutableAggregationBuffer): Unit = {
    for (x <- 0 to 7){
      buffer(x) = 0
    }
  }

  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = // some stuff

  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = // some stuff

  override def evaluate(buffer: Row): Any = 
      var l = List.empty[Integer]
      for (x <- 7 to 0 by -1){
          l = buffer.getInt(x) :: l
      }
      l
}

我的输出应该是这样的:

代码语言:javascript
运行
复制
     +------+---+---+---+---+---+---+---+---+
     | view | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 |
     +------+---+---+---+---+---+---+---+---+
     |  xx  | 0 | 0 | 4 | 1 | 0 | 0 | 3 | 0 |
     |  yy  | 0 | 0 | 0 | 3 | 0 | 1 | 0 | 0 |
     +------+---+---+---+---+---+---+---+---+

这些值是在上面的update/merge函数中计算的,但它是有效的,没有必要让您看到它。

然后我像这样使用它:

代码语言:javascript
运行
复制
val ex = new Extractor()
val df = dataset.groupBy("view").agg(
      ex(dataset.col("data"))
)
df.show()

当我执行df.show()时,它总是给我一个IndexOutOfBoundException。我知道这是一个懒惰的计算,这就是为什么我在df.show()中得到一个错误的原因。

据我所知,它可以执行第一组并结束evaluate函数。但在那之后我得到了IndexOutOfBoundException..。

另外,当我将dataType和evaluate函数更改为:

代码语言:javascript
运行
复制
override def dataType: DataType =
    ArrayType(IntegerType,false)

override def evaluate(buffer: Row): Any = {
    var l = ofDim[Integer](8)
    for (x <- 0 to 7){
      l(x) = buffer.getInt(x)
    }
    l

输出将如下所示:

代码语言:javascript
运行
复制
     +------+------------------------------+
     | view | Extractor                    |
     +------+------------------------------+
     |  xx  | [0, 0, 4, 1, 0, 0, 3, 0]     |
     |  yy  | [0, 0, 0, 3, 0, 1, 0, 0]     |
     +------+------------------------------+

模式看起来像这样:

代码语言:javascript
运行
复制
root
 |-- view: string (nullable = true)
 |-- Extractor: array (nullable = true)
 |    |-- element: integer (containsNull = false)

我不能把它转换成我想要的形式。

因为第二种方法可以工作,所以我想我在第一种方法中搞砸了DataType,但我不知道如何修复它……

对我的问题有很多介绍:

如何获得我想要的输出?我真的不关心这两种方法中的哪一种(首先使用多个输出列或一个可以转换为我想要的形式的数组),只要它是有效的即可。

感谢您的帮助

EN

回答 1

Stack Overflow用户

发布于 2017-03-12 23:54:29

您正在将聚合输出定义为列表:

代码语言:javascript
运行
复制
 override def dataType: DataType = bufferSchema

因为bufferSchema是一个列表,所以这是您最终得到的结果。您可以稍后更改您的模式,并将列表中的每一列转换为新列。

对于您的错误,不同之处在于:

代码语言:javascript
运行
复制
override def evaluate(buffer: Row): Any = 
  var l = List.empty[Integer]
  for (x <- 7 to 0 by -1){
      l = buffer.getInt(x) :: l
  }
  l

代码语言:javascript
运行
复制
override def evaluate(buffer: Row): Any = 
  var l = ofDim[Integer](8)
  for (x <- 0 to 7){
    l = buffer.getInt(x) :: l
  }
  l

在第二个示例中,您定义了一个预定义的列数。因此,您确信可以从0迭代到7而不会出现任何问题。

第一个例子并非如此,因此,我怀疑你可能有一个格式错误的数据,使得你的缓冲区在initializemerge中被错误地初始化。我建议你在转换缓冲区长度的每一步之后添加一个try/catch来验证大小(至少是initialize,但也可以是updatemerge )。

要为列表中的每个元素添加列,可以使用withColumn或通过映射添加列。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/42749155

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档