大家对简单数据类型的比较都很清楚,但是针对array、map、struct这些复杂类型,spark sql是否支持比较呢?都是怎么比较的?我们该怎么利用呢?
先给出一个结论:spark sql支持array、struct类型的比较,但不支持map类型的比较(Hive也是如此)。
那是怎么比较的呢?
以max函数为入口来查看:
从代码中,我们看到,比较的方法入口是TypeUtils类的getInterpretedOrdering方法。
def getInterpretedOrdering(t: DataType): Ordering[Any] = {
t match {
//AtomicType 是指一种内部类型,用于表示所有非null、UDT、数组、结构和映射。
case i: AtomicType => i.ordering.asInstanceOf[Ordering[Any]]
case a: ArrayType => a.interpretedOrdering.asInstanceOf[Ordering[Any]]
case s: StructType => s.interpretedOrdering.asInstanceOf[Ordering[Any]]
case udt: UserDefinedType[_] => getInterpretedOrdering(udt.sqlType)
}
}
处理四种类型:AtomicType(原子类型:一种内部类型,用于表示所有非null、UDT、数组、结构和映射)、ArrayType(数组的类型)、StructType(struct类型)、UserDefinedType(用户自定义的类型)
从这里可以了解到,没有对map类型的判断方法
array的比较方法是取最短的数组的长度做为size,从左往右,挨个儿比,直到比出大小。
几种情况:
1、如果两个同位置的元素都为null,则do nothing,接着比下一个
2、如果两个同位置的元素其中有一个为null,则不为null的那个数组大
3、按照从左往右,如果所有同位置的元素都相等,则按长短比,数组元素多的大,如果两个数组长短一样,则说明两个数组相等
@transient
private[sql] lazy val interpretedOrdering: Ordering[ArrayData] = new Ordering[ArrayData] {
private[this] val elementOrdering: Ordering[Any] = elementType match {
case dt: AtomicType => dt.ordering.asInstanceOf[Ordering[Any]]
case a : ArrayType => a.interpretedOrdering.asInstanceOf[Ordering[Any]]
case s: StructType => s.interpretedOrdering.asInstanceOf[Ordering[Any]]
case other =>
throw new IllegalArgumentException(
s"Type ${other.catalogString} does not support ordered operations")
}
def compare(x: ArrayData, y: ArrayData): Int = {
val leftArray = x
val rightArray = y
val minLength = scala.math.min(leftArray.numElements(), rightArray.numElements())
var i = 0
while (i < minLength) {
val isNullLeft = leftArray.isNullAt(i)
val isNullRight = rightArray.isNullAt(i)
if (isNullLeft && isNullRight) {
// Do nothing.
} else if (isNullLeft) {
return -1
} else if (isNullRight) {
return 1
} else {
val comp =
elementOrdering.compare(
leftArray.get(i, elementType),
rightArray.get(i, elementType))
if (comp != 0) {
return comp
}
}
i += 1
}
if (leftArray.numElements() < rightArray.numElements()) {
return -1
} else if (leftArray.numElements() > rightArray.numElements()) {
return 1
} else {
return 0
}
}
}
StructType处理方法
struct的比较方法和数组类似,因为StructType的fields是以一个数组的结构存储的。
StructType中要求元素个数必须是一样的,因此fields数组的长度是一样的。
比较方法也是:从左往右,挨个儿比,直到比出大小。
几种情况:
1、如果两个同位置的元素都为null,则do nothing,接着比下一个
2、如果两个同位置的元素其中有一个为null,则不为null的那个数组大(默认是Ascending,即:NullsFirst)
3、比较同位置元素时,会依据数据类型调用相应类型(AtomicType、ArrayType、StructType-->Struct套Struct的情况)的比较方法
class InterpretedOrdering(ordering: Seq[SortOrder]) extends BaseOrdering {
def this(ordering: Seq[SortOrder], inputSchema: Seq[Attribute]) =
this(bindReferences(ordering, inputSchema))
override def compare(a: InternalRow, b: InternalRow): Int = {
var i = 0
val size = ordering.size
while (i < size) {
val order = ordering(i)
val left = order.child.eval(a)
val right = order.child.eval(b)
if (left == null && right == null) {
// Both null, continue looking.
} else if (left == null) {
return if (order.nullOrdering == NullsFirst) -1 else 1
} else if (right == null) {
return if (order.nullOrdering == NullsFirst) 1 else -1
} else {
val comparison = order.dataType match {
case dt: AtomicType if order.direction == Ascending =>
dt.ordering.asInstanceOf[Ordering[Any]].compare(left, right)
case dt: AtomicType if order.direction == Descending =>
- dt.ordering.asInstanceOf[Ordering[Any]].compare(left, right)
case a: ArrayType if order.direction == Ascending =>
a.interpretedOrdering.asInstanceOf[Ordering[Any]].compare(left, right)
case a: ArrayType if order.direction == Descending =>
- a.interpretedOrdering.asInstanceOf[Ordering[Any]].compare(left, right)
case s: StructType if order.direction == Ascending =>
s.interpretedOrdering.asInstanceOf[Ordering[Any]].compare(left, right)
case s: StructType if order.direction == Descending =>
- s.interpretedOrdering.asInstanceOf[Ordering[Any]].compare(left, right)
case other =>
throw new IllegalArgumentException(s"Type $other does not support ordered operations")
}
if (comparison != 0) {
return comparison
}
}
i += 1
}
0
}
}
比如计算贡献gmv最大的用户id、购买时间最早的用户id:
可以通过构造struct,把gmv和购买时间做为第一个字段。这样在计算max、min的时候就可以按照gmv或者购买时间取最大、最小,且能同时把对应的其他的信息取出来。
select
max(gmv_struct('gmv', gmv, 'uid', uid)).gmv,
max(gmv_struct('gmv', gmv, 'uid', uid)).uid as max_gmv_uid,
min(paytime_struct('pay_time', pay_time, 'uid', uid)).pay_time,
min(paytime_struct('pay_time', pay_time, 'uid', uid)).uid as earliest_paytime_uid
from
XXX
where
XXX
给出一个小思考:为啥不支持map类型的比较呢?