首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >火花非晶型错配

火花非晶型错配
EN

Stack Overflow用户
提问于 2017-09-28 16:59:54
回答 1查看 429关注 0票数 0

我正试图在星火(2.0.1,Scala2.11)上创建一个UDAF,如下所示。这实际上是聚合元组并输出一个Map

代码语言:javascript
运行
复制
import org.apache.spark.sql.expressions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.{Row, Column}

class mySumToMap[K, V](keyType: DataType, valueType: DataType) extends UserDefinedAggregateFunction {
  override def inputSchema = new StructType()
    .add("a_key", keyType)
    .add("a_value", valueType)

  override def bufferSchema = new StructType()
    .add("buffer_map", MapType(keyType, valueType))

  override def dataType = MapType(keyType, valueType)

  override def deterministic = true 

  override def initialize(buffer: MutableAggregationBuffer) = {
    buffer(0) = Map[K, V]()
  }

  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {

    // input :: 0 = a_key (k), 1 = a_value
    if ( !(input.isNullAt(0)) ) {

      val a_map = buffer(0).asInstanceOf[Map[K, V]]
      val k = input.getAs[K](0)  // get the value of position 0 of the input as string (a_key)

      // I've split these on purpose to show that return values are all of type V
      val new_v1: V = a_map.getOrElse(k, 0.asInstanceOf[V])
      val new_v2: V = input.getAs[V](1)
      val new_v: V = new_v1 + new_v2

      buffer(0) = if (new_v != 0) a_map + (k -> new_v) else a_map - k
    }
  }

  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
    val map1: Map[K, V] = buffer1(0).asInstanceOf[Map[K, V]]
    val map2: Map[K, V] = buffer2(0).asInstanceOf[Map[K, V]]

    buffer1(0) = map1 ++ map2.map{ case (k,v) => k -> (v + map1.getOrElse(k, 0.asInstanceOf[V])) }
  }

  override def evaluate(buffer: Row) = buffer(0).asInstanceOf[Map[K, V]]

}

但是,当我编译它时,我会看到以下错误:

代码语言:javascript
运行
复制
<console>:74: error: type mismatch;
 found   : V
 required: String
             val new_v: V = new_v1 + new_v2
                                     ^
<console>:84: error: type mismatch;
 found   : V
 required: String
           buffer1(0) = map1 ++ map2.map{ case (k,v) => k -> (v + map1.getOrElse(k, 0.asInstanceOf[V])) }

我做错了什么?

编辑:标记为Spark UDAF - using generics as input type?副本的人员的--这不是该问题的副本,因为它不处理Map数据类型。对于使用Map数据类型所面临的问题,上述代码是非常具体和完整的。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2017-09-29 11:38:34

将类型限制为具有Numeric[_]的类型

代码语言:javascript
运行
复制
class mySumToMap[K, V: Numeric](keyType: DataType, valueType: DataType) 
  extends UserDefinedAggregateFunction {
    ...

使用Implicitly在运行时获得它:

代码语言:javascript
运行
复制
val n = implicitly[Numeric[V]]

用它的plus方法代替+zero来代替0

代码语言:javascript
运行
复制
buffer1(0) = map1 ++ map2.map{ 
  case (k,v) => k -> n.plus(v,  map1.getOrElse(k, n.zero))
}

要支持更广泛的类型集,可以使用cats Monoid

代码语言:javascript
运行
复制
import cats._
import cats.implicits._

并调整代码:

代码语言:javascript
运行
复制
class mySumToMap[K, V: Monoid](keyType: DataType, valueType: DataType) 
  extends UserDefinedAggregateFunction {
    ...

后来:

代码语言:javascript
运行
复制
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
  val map1: Map[K, V] = buffer1.getMap[K, V](0)
  val map2: Map[K, V] = buffer2.getMap[K, V](0)

  val m = implicitly[Monoid[Map[K, V]]]

  buffer1(0) = m.combine(map1, map2)
}
票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/46474610

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档