首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >Spark窗口函数"rowsBetween“应该只考虑完整的行集

Spark窗口函数"rowsBetween“应该只考虑完整的行集
EN

Stack Overflow用户
提问于 2019-05-19 19:09:29
回答 1查看 361关注 0票数 0

我使用"rowsBetween“窗口函数来计算移动中位数,如下所示

代码语言:javascript
复制
val mm = new MovingMedian
var rawdataFiltered = rawdata.withColumn("movingmedian", mm(col("value")).over( Window.partitionBy("raw_data_field_id").orderBy("date_time_epoch").rowsBetween(-50,50)) )

我将在当前行的前面和后面各取一个50行的窗口。但我需要排除在开始和接近结束时没有当前行前面或后面50行的所有行。

参考代码:

代码语言:javascript
复制
class MovingMedian extends org.apache.spark.sql.expressions.UserDefinedAggregateFunction {
  def inputSchema: org.apache.spark.sql.types.StructType =
    org.apache.spark.sql.types.StructType(org.apache.spark.sql.types.StructField("value", org.apache.spark.sql.types.DoubleType) :: Nil)

  def bufferSchema: org.apache.spark.sql.types.StructType = org.apache.spark.sql.types.StructType(
    org.apache.spark.sql.types.StructField("window_list", org.apache.spark.sql.types.ArrayType(org.apache.spark.sql.types.DoubleType, false)) :: Nil
  )
  def dataType: org.apache.spark.sql.types.DataType = org.apache.spark.sql.types.DoubleType
  def deterministic: Boolean = true
  def initialize(buffer: org.apache.spark.sql.expressions.MutableAggregationBuffer): Unit = {
    buffer(0) = new scala.collection.mutable.ArrayBuffer[Double]()
  }
  def update(buffer: org.apache.spark.sql.expressions.MutableAggregationBuffer,input: org.apache.spark.sql.Row): Unit = {
    var bufferVal=buffer.getAs[scala.collection.mutable.WrappedArray[Double]](0).toBuffer
    bufferVal+=input.getAs[Double](0)
    buffer(0) = bufferVal
  }
  def merge(buffer1: org.apache.spark.sql.expressions.MutableAggregationBuffer, buffer2: org.apache.spark.sql.Row): Unit = {
    buffer1(0) = buffer1.getAs[scala.collection.mutable.ArrayBuffer[Double]](0) ++ buffer2.getAs[scala.collection.mutable.ArrayBuffer[Double]](0)
  }
  def evaluate(buffer: org.apache.spark.sql.Row): Any = {
      var sortedWindow=buffer.getAs[scala.collection.mutable.WrappedArray[Double]](0).sorted.toBuffer
      var windowSize=sortedWindow.size
      if(windowSize%2==0){
          var index=windowSize/2
          (sortedWindow(index) + sortedWindow(index-1))/2
      }else{
          var index=(windowSize+1)/2 - 1
          sortedWindow(index)
      }
  }
}
EN

回答 1

Stack Overflow用户

发布于 2019-05-19 20:03:56

您可以根据窗口大小进行过滤:

代码语言:javascript
复制
val df = Seq(1, 2, 3, 4, 5).toDF("foo")
val win = Window.orderBy("foo").rowsBetween(-1, 1)

df.select($"foo",
          collect_list($"foo") over win as "agg",
          count($"*") over win as "cnt")
  .filter($"cnt" === 3)
  .show()

输出:

代码语言:javascript
复制
+---+---------+---+
|foo|      agg|cnt|
+---+---------+---+
|  2|[1, 2, 3]|  3|
|  3|[2, 3, 4]|  3|
|  4|[3, 4, 5]|  3|
+---+---------+---+
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/56207192

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档