首页
学习
活动
专区
工具
TVP
发布
社区首页 >问答首页 >如何在Spark SQL中定义自定义类型的模式?

如何在Spark SQL中定义自定义类型的模式?
EN

Stack Overflow用户
提问于 2015-09-07 21:59:20
回答 1查看 19.8K关注 0票数 27

下面的示例代码尝试将一些case对象放入dataframe中。代码包括使用以下特征的case对象层次结构和case类的定义:

import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.SQLContext

sealed trait Some
case object AType extends Some
case object BType extends Some

case class Data( name : String, t: Some)

object Example {
  def main(args: Array[String]) : Unit = {
    val conf = new SparkConf()
      .setAppName( "Example" )
      .setMaster( "local[*]")

    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)

    import sqlContext.implicits._

    val df = sc.parallelize( Seq( Data( "a", AType), Data( "b", BType) ), 4).toDF()
    df.show()
  }
}    

在执行代码时,我不幸地遇到了以下异常:

java.lang.UnsupportedOperationException: Schema for type Some is not supported

问题

  • 是否有可能为某些类型添加或定义模式(这里的类型Some)?
  • Does还有另一种方法来表示这种枚举?
  • 我尝试过直接使用Enumeration,但也没有成功。(请参阅below)

Enumeration的代码

object Some extends Enumeration {
  type Some = Value
  val AType, BType = Value
}

提前谢谢。我希望,最好的方法不是使用字符串。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2015-09-08 17:51:56

Spark 2.0.0+

在Spark2.0.0中,UserDefinedType已经成为私有的,到目前为止,它还没有Dataset友好的替代品。

请参阅:SPARK-14155 (Hide UserDefinedType in Spark 2.0)

大多数时候,静态类型的Dataset可以作为替代,有一个悬而未决的Jira SPARK-7768可以使UDT API在目标版本2.4中再次公开。

另请参阅How to store custom objects in Dataset?

火花< 2.0.0

是否有可能为某些类型(这里是某些类型)添加或定义模式?

我想答案取决于你有多需要它。看起来可以创建一个UserDefinedType,但它需要访问DeveloperApi,而且并不是很简单,也没有很好的文档。

import org.apache.spark.sql.types._

@SQLUserDefinedType(udt = classOf[SomeUDT])
sealed trait Some
case object AType extends Some
case object BType extends Some

class SomeUDT extends UserDefinedType[Some] {
  override def sqlType: DataType = IntegerType

  override def serialize(obj: Any) = {
    obj match {
      case AType => 0
      case BType => 1
    }
  }

  override def deserialize(datum: Any): Some = {
    datum match {
      case 0 => AType
      case 1 => BType
    }
  }

  override def userClass: Class[Some] = classOf[Some]
}

您可能还应该覆盖hashCodeequals

它的PySpark对等项可能如下所示:

from enum import Enum, unique
from pyspark.sql.types import UserDefinedType, IntegerType

class SomeUDT(UserDefinedType):
    @classmethod
    def sqlType(self):
        return IntegerType()

    @classmethod
    def module(cls):
        return cls.__module__

    @classmethod 
    def scalaUDT(cls): # Required in Spark < 1.5
        return 'net.zero323.enum.SomeUDT'

    def serialize(self, obj):
        return obj.value

    def deserialize(self, datum):
        return {x.value: x for x in Some}[datum]

@unique
class Some(Enum):
    __UDT__ = SomeUDT()
    AType = 0
    BType = 1

在Spark < 1.5中,Python UDT需要成对的Scala UDT,但在1.5中看起来不再是这样了。

对于像这样的简单UDT,您可以使用简单类型(例如,IntegerType而不是整个Struct)。

票数 24
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/32440461

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档