前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >spark sql是如何比较复杂数据类型的?该如何利用呢?

spark sql是如何比较复杂数据类型的?该如何利用呢?

作者头像
数据仓库践行者
发布2022-03-15 08:20:39
1.6K0
发布2022-03-15 08:20:39
举报
文章被收录于专栏:数据仓库践行者

Hi,我是小萝卜算子

大家对简单数据类型的比较都很清楚,但是针对array、map、struct这些复杂类型,spark sql是否支持比较呢?都是怎么比较的?我们该怎么利用呢?

先给出一个结论:spark sql支持array、struct类型的比较,但不支持map类型的比较(Hive也是如此)。

那是怎么比较的呢?

先来看一下sparksql支持的数据类型

  • 数字类型
    • TimestampType:代表包含字段年,月,日,时,分,秒的值
    • DateType:代表包含字段年,月,日的值
    • ByteType:代表一个字节的整数。范围是-128到127
    • ShortType:代表两个字节的整数。范围是-32768到32767
    • IntegerType:代表4个字节的整数。范围是-2147483648到2147483647
    • LongType:代表8个字节的整数。范围是-9223372036854775808到9223372036854775807
    • FloatType:代表4字节的单精度浮点数
    • DoubleType:代表8字节的双精度浮点数
    • DecimalType:代表任意精度的10进制数据。通过内部的java.math.BigDecimal支持。BigDecimal由一个任意精度的整型非标度值和一个32位整数组成
    • StringType:代表一个字符串值
    • BinaryType:代表一个byte序列值
    • BooleanType:代表boolean值
    • Datetime类型
  • 复杂类型
    • StructField(name, dataType, nullable):代表StructType中的一个字段,字段的名字通过name指定,dataType指定field的数据类型,nullable表示字段的值是否有null值。
    • ArrayType(elementType, containsNull):代表由elementType类型元素组成的序列值。containsNull用来指明ArrayType中的值是否有null值
    • MapType(keyType, valueType, valueContainsNull):表示包括一组键 - 值对的值。通过keyType表示key数据的类型,通过valueType表示value数据的类型。valueContainsNull用来指明MapType中的值是否有null值
    • StructType(fields):表示一个拥有StructFields (fields)序列结构的值

源码分析

以max函数为入口来查看:

max.scala-->greatest方法
arithmetic.scala-->Greatest类

从代码中,我们看到,比较的方法入口是TypeUtils类的getInterpretedOrdering方法。

TypeUtils.getInterpretedOrdering(dataType)
代码语言:javascript
复制
def getInterpretedOrdering(t: DataType): Ordering[Any] = {
    t match {
      //AtomicType 是指一种内部类型,用于表示所有非null、UDT、数组、结构和映射。
      case i: AtomicType => i.ordering.asInstanceOf[Ordering[Any]]
      case a: ArrayType => a.interpretedOrdering.asInstanceOf[Ordering[Any]]
      case s: StructType => s.interpretedOrdering.asInstanceOf[Ordering[Any]]
      case udt: UserDefinedType[_] => getInterpretedOrdering(udt.sqlType)
    }
  }

处理四种类型:AtomicType(原子类型:一种内部类型,用于表示所有非null、UDT、数组、结构和映射)、ArrayType(数组的类型)、StructType(struct类型)、UserDefinedType(用户自定义的类型)

从这里可以了解到,没有对map类型的判断方法

ArrayType处理方法

array的比较方法是取最短的数组的长度做为size,从左往右,挨个儿比,直到比出大小。

几种情况:

1、如果两个同位置的元素都为null,则do nothing,接着比下一个

2、如果两个同位置的元素其中有一个为null,则不为null的那个数组大

3、按照从左往右,如果所有同位置的元素都相等,则按长短比,数组元素多的大,如果两个数组长短一样,则说明两个数组相等

代码语言:javascript
复制
@transient
private[sql] lazy val interpretedOrdering: Ordering[ArrayData] = new Ordering[ArrayData] {
  private[this] val elementOrdering: Ordering[Any] = elementType match {
    case dt: AtomicType => dt.ordering.asInstanceOf[Ordering[Any]]
    case a : ArrayType => a.interpretedOrdering.asInstanceOf[Ordering[Any]]
    case s: StructType => s.interpretedOrdering.asInstanceOf[Ordering[Any]]
    case other =>
      throw new IllegalArgumentException(
        s"Type ${other.catalogString} does not support ordered operations")
  }

  def compare(x: ArrayData, y: ArrayData): Int = {
    val leftArray = x
    val rightArray = y
    val minLength = scala.math.min(leftArray.numElements(), rightArray.numElements())
    var i = 0
    while (i < minLength) {
      val isNullLeft = leftArray.isNullAt(i)
      val isNullRight = rightArray.isNullAt(i)
      if (isNullLeft && isNullRight) {
        // Do nothing.
      } else if (isNullLeft) {
        return -1
      } else if (isNullRight) {
        return 1
      } else {
        val comp =
          elementOrdering.compare(
            leftArray.get(i, elementType),
            rightArray.get(i, elementType))
        if (comp != 0) {
          return comp
        }
      }
      i += 1
    }
    if (leftArray.numElements() < rightArray.numElements()) {
      return -1
    } else if (leftArray.numElements() > rightArray.numElements()) {
      return 1
    } else {
      return 0
    }
  }
}

StructType处理方法

struct的比较方法和数组类似,因为StructType的fields是以一个数组的结构存储的。

StructType中要求元素个数必须是一样的,因此fields数组的长度是一样的。

比较方法也是:从左往右,挨个儿比,直到比出大小。

几种情况:

1、如果两个同位置的元素都为null,则do nothing,接着比下一个

2、如果两个同位置的元素其中有一个为null,则不为null的那个数组大(默认是Ascending,即:NullsFirst)

3、比较同位置元素时,会依据数据类型调用相应类型(AtomicType、ArrayType、StructType-->Struct套Struct的情况)的比较方法

代码语言:javascript
复制
class InterpretedOrdering(ordering: Seq[SortOrder]) extends BaseOrdering {

  def this(ordering: Seq[SortOrder], inputSchema: Seq[Attribute]) =
    this(bindReferences(ordering, inputSchema))

  override def compare(a: InternalRow, b: InternalRow): Int = {
    var i = 0
    val size = ordering.size
    while (i < size) {
      val order = ordering(i)
      val left = order.child.eval(a)
      val right = order.child.eval(b)

      if (left == null && right == null) {
        // Both null, continue looking.
      } else if (left == null) {
        return if (order.nullOrdering == NullsFirst) -1 else 1
      } else if (right == null) {
        return if (order.nullOrdering == NullsFirst) 1 else -1
      } else {
        val comparison = order.dataType match {
          case dt: AtomicType if order.direction == Ascending =>
            dt.ordering.asInstanceOf[Ordering[Any]].compare(left, right)
          case dt: AtomicType if order.direction == Descending =>
            - dt.ordering.asInstanceOf[Ordering[Any]].compare(left, right)
          case a: ArrayType if order.direction == Ascending =>
            a.interpretedOrdering.asInstanceOf[Ordering[Any]].compare(left, right)
          case a: ArrayType if order.direction == Descending =>
            - a.interpretedOrdering.asInstanceOf[Ordering[Any]].compare(left, right)
          case s: StructType if order.direction == Ascending =>
            s.interpretedOrdering.asInstanceOf[Ordering[Any]].compare(left, right)
          case s: StructType if order.direction == Descending =>
            - s.interpretedOrdering.asInstanceOf[Ordering[Any]].compare(left, right)
          case other =>
            throw new IllegalArgumentException(s"Type $other does not support ordered operations")
        }
        if (comparison != 0) {
          return comparison
        }
      }
      i += 1
    }
    0
  }
}

怎么利用?

比如计算贡献gmv最大的用户id、购买时间最早的用户id:

可以通过构造struct,把gmv和购买时间做为第一个字段。这样在计算max、min的时候就可以按照gmv或者购买时间取最大、最小,且能同时把对应的其他的信息取出来。

代码语言:javascript
复制
select
    max(gmv_struct('gmv', gmv, 'uid', uid)).gmv,
    max(gmv_struct('gmv', gmv, 'uid', uid)).uid as max_gmv_uid,
    min(paytime_struct('pay_time', pay_time, 'uid', uid)).pay_time,
    min(paytime_struct('pay_time', pay_time, 'uid', uid)).uid as earliest_paytime_uid 
from
   XXX
where
   XXX

给出一个小思考:为啥不支持map类型的比较呢?

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-01-17,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 数据仓库践行者 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Hi,我是小萝卜算子
    • 先来看一下sparksql支持的数据类型
      • 源码分析
        • max.scala-->greatest方法
        • arithmetic.scala-->Greatest类
        • TypeUtils.getInterpretedOrdering(dataType)
    • 怎么利用?
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档