首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
社区首页 >问答首页 >时间序列/滴答数据集的火花变换

时间序列/滴答数据集的火花变换
EN

Stack Overflow用户
提问于 2019-09-20 02:59:03
回答 2查看 534关注 0票数 3

我们有表格在蜂巢,存储交易订单数据的每一天结束作为order_date。其他重要专栏有:productcontractprice(订单价格)、ttime (交易时间)状态(插入、更新或删除)E 110价格e 211(订单价格)

我们必须在主表上以滴答数据的方式建立一个图表表,每一行(订单)的最高价格和最低价格订单都是从市场开放到那时为止的。也就是说,对于给定的订单,我们将有4列填充为maxPrice(目前为止的最高价格)、maxpriceOrderId(最高价格的顺序)、minPrice和minPriceOrderId。

这必须是每个产品,合同,即最高和最低价格之间的产品,合同。

在计算这些值时,我们需要从聚合中排除所有已关闭的订单。到目前为止,不包括“删除”状态的订单,即最高和最低的订单价格。

我们正在使用:火花2.2和输入数据格式是地板。输入记录

输出记录

为了给出一个简单的SQL视图--这个问题是用一个自联接解决的,并且看起来是这样的:使用ttime上的有序数据集,我们必须得到特定产品的最高和最低价格,从早上到那个订单的时间,每一行(订单)都有合同。这将在批处理的每个爆炸物处理(order_date)数据集上运行:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
select mainSet.order_id,    mainSet.product,mainSet.contract,mainSet.order_date,mainSet.price,mainSet.ttime,mainSet.status,
max(aggSet.price) over (partition by mainSet.product,mainSet.contract,mainSet.ttime) as max_price,
first_value(aggSet.order_id) over (partition by mainSet.product,mainSet.contract,mainSet.ttime) order by (aggSet.price desc,aggSet.ttime desc ) as maxOrderId
min(aggSet.price) over (partition by mainSet.product,mainSet.contract,mainSet.ttime) as min_price as min_price
first_value(aggSet.order_id) over (partition by mainSet.product,mainSet.contract,mainSet.ttime) order by (aggSet.price ,aggSet.ttime) as minOrderId
from order_table mainSet 
join order_table aggSet
ON (mainSet.produuct=aggSet.product,
mainSet.contract=aggSet.contract,
mainSet.ttime>=aggSet.ttime,
aggSet.status <> 'Remove')

写作在火花

我们从星星之火sql开始,如下所示:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
val mainDF: DataFrame= sparkSession.sql("select * from order_table where order_date ='eod_date' ")

  val ndf=mainDf.alias("mainSet").join(mainDf.alias("aggSet"),
        (col("mainSet.product")===col("aggSet.product")
          && col("mainSet.contract")===col("aggSet.contract")
          && col("mainSet.ttime")>= col("aggSet.ttime")
          && col("aggSet.status") <> "Remove")
        ,"inner")
        .select(mainSet.order_id,mainSet.ttime,mainSet.product,mainSet.contract,mainSet.order_date,mainSet.price,mainSet.status,aggSet.order_id as agg_orderid,aggSet.ttime as agg_ttime,price as agg_price) //Renaming of columns

  val max_window = Window.partitionBy(col("product"),col("contract"),col("ttime"))
  val min_window = Window.partitionBy(col("product"),col("contract"),col("ttime"))
  val maxPriceCol = max(col("agg_price")).over(max_window)
  val minPriceCol = min(col("agg_price")).over(min_window)
  val firstMaxorder = first_value(col("agg_orderid")).over(max_window.orderBy(col("agg_price").desc, col("agg_ttime").desc))
  val firstMinorder = first_value(col("agg_orderid")).over(min_window.orderBy(col("agg_price"), col("agg_ttime")))      


  val priceDF=  ndf.withColumn("max_price",maxPriceCol)
                    .withColumn("maxOrderId",firstMaxorder)
                    .withColumn("min_price",minPriceCol)
                    .withColumn("minOrderId",firstMinorder)

    priceDF.show(20)

卷统计:

每组(产品、合同)的平均计数为600 K=600 K

这项工作持续了几个小时,但还没有完成,我试着增加内存和其他参数,但没有运气。作业被卡住了,很多次我都有内存问题,Container killed by YARN for exceeding memory limits. 4.9 GB of 4.5 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead

另一种方法

对我们的最低组列(Product)进行重新分区,然后按时在分区中进行排序,这样我们就可以及时收到为mapPartition函数排序的每一行。

在分区级别执行映射分区,同时维护集合(键为order_id,价格为值),以计算最大和最小价格及其顺序。

我们将继续删除状态为“移除”的订单,当我们收到它们时,我们将从收集中删除它们。一旦集合被更新到指定的行中,我们就可以从集合中计算出最大值和最小值,并返回更新后的行。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
val mainDF: DataFrame= sparkSession.sql("select order_id,product,contract,order_date,price,status,null as maxPrice,null as maxPriceOrderId,null as minPrice,null as minPriceOrderId from order_table where order_date ='eod_date' ").repartitionByRange(col("product"),col("contract"))

case class summary(order_id:String ,ttime:string,product:String,contract :String,order_date:String,price:BigDecimal,status :String,var maxPrice:BigDecimal,var maxPriceOrderId:String ,var minPrice:BigDecimal,var minPriceOrderId String)

val summaryEncoder = Encoders.product[summary]
val priceDF= mainDF.as[summary](summaryEncoder).sortWithinPartitions(col("ttime")).mapPartitions( iter => {
    //collection at partition level
    //key as order_id and value as price
    var priceCollection = Map[String, BigDecimal]()

    iter.map( row => {
        val orderId= row.order_id
        val rowprice= row.price

        priceCollection = row.status match {
                            case "Remove" => if (priceCollection.contains(orderId)) priceCollection -= orderId
                            case _ => priceCollection += (orderId -> rowPrice)
                         }

        row.maxPrice = if(priceCollection.size > 0) priceCollection.maxBy(_._2)._2  // Gives key,value tuple from collectin for  max value )
        row.maxPriceOrderId = if(priceCollection.size > 0) priceCollection.maxBy(_._2)._1

        row.minPrice =  if(priceCollection.size > 0) priceCollection.minBy(_._2)._2   // Gives key,value tuple from collectin for  min value )
        row.minPriceOrderId = if(priceCollection.size > 0) priceCollection.minBy(_._2)._1

      row

    })
  }).show(20)

这是运行良好,并在20分钟内完成较小的数据集,但我发现23磨记录(有17个不同的产品和合同),结果似乎不正确。我可以看到来自一个分区(输入分割)的数据正在转到另一个分区,从而扰乱了值。

如我所知,map分区在每个星火分区上执行函数(类似于映射减少中的输入分块),所以我如何才能强制spark创建具有该产品和契约组的所有值的inputsplits/分区。

->

当我们被困在这里的时候,我会很感激你的帮助。

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2019-10-07 23:08:16

编辑:这是一个关于为什么许多小文件是坏的文章

为什么压缩不好的数据是坏的?压缩得不好的数据对Spark应用程序不利,因为处理起来非常慢。继续我们前面的例子,每当我们想要处理一天的事件时,我们必须打开86,400个文件才能到达数据。这极大地减缓了处理速度,因为我们的Spark应用程序实际上只花了大部分时间打开和关闭文件。我们通常希望我们的Spark应用程序将大部分时间用于实际处理数据。接下来,我们将进行一些实验,以说明在使用适当压缩的数据时,性能与压缩性能差的数据之间的差异。

我敢打赌,如果你正确地将你的源数据分区到你加入和摆脱所有这些窗口的时候,你最终会在一个更好的地方结束。

每次你点击partitionBy,你都会强迫你洗牌,每次你点击orderBy,你都会迫使你付出昂贵的代价。

我建议您查看Dataset API,并学习一些用于O(n)时间计算的groupBy和平台flatMapGroups/减/滑动。你可以一次就得到你的最小/最高分。

此外,听起来您的驱动程序由于许多小文件问题而耗尽了内存。尽量压缩源数据,并正确划分表。在这种特殊情况下,我建议使用order_date进行分区(可能每天进行?)然后是产品和合同的子分区。

这里有一个片段,我花了大约30分钟编写,并且可能比你的窗口函数运行得更好。它应该在O(n)时间内运行,但如果您有许多小文件问题,它无法弥补。如果有什么遗漏请告诉我。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession}
import scala.collection.mutable

case class Summary(
  order_id: String,
  ttime: String,
  product: String,
  contract: String,
  order_date: String,
  price: BigDecimal,
  status: String,
  maxPrice: BigDecimal = 0,
  maxPriceOrderId: String = null,
  minPrice: BigDecimal = 0,
  minPriceOrderId: String = null
)

class Workflow()(implicit spark: SparkSession) {

  import MinMaxer.summaryEncoder

  val mainDs: Dataset[Summary] =
    spark.sql(
      """
        select order_id, ttime, product, contract, order_date, price, status
        from order_table where order_date ='eod_date'
      """
    ).as[Summary]

  MinMaxer.minMaxDataset(mainDs)
}

object MinMaxer {

  implicit val summaryEncoder: Encoder[Summary] = Encoders.product[Summary]
  implicit val groupEncoder: Encoder[(String, String)] = Encoders.product[(String, String)]

  object SummaryOrderer extends Ordering[Summary] {
    def compare(x: Summary, y: Summary): Int = x.ttime.compareTo(y.ttime)
  }

  def minMaxDataset(ds: Dataset[Summary]): Dataset[Summary] = {
    ds
      .groupByKey(x => (x.product, x.contract))
      .flatMapGroups({ case (_, t) =>
        val sortedRecords: Seq[Summary] = t.toSeq.sorted(SummaryOrderer)

        generateMinMax(sortedRecords)
      })
  }

  def generateMinMax(summaries: Seq[Summary]): Seq[Summary] = {
    summaries.foldLeft(mutable.ListBuffer[Summary]())({case (b, summary) =>

      if (b.lastOption.nonEmpty) {
        val lastSummary: Summary = b.last

        var minPrice: BigDecimal = 0
        var minPriceOrderId: String = null
        var maxPrice: BigDecimal = 0
        var maxPriceOrderId: String = null

        if (summary.status != "remove") {
          if (lastSummary.minPrice >= summary.price) {
            minPrice = summary.price
            minPriceOrderId = summary.order_id
          } else {
            minPrice = lastSummary.minPrice
            minPriceOrderId = lastSummary.minPriceOrderId
          }

          if (lastSummary.maxPrice <= summary.price) {
            maxPrice = summary.price
            maxPriceOrderId = summary.order_id
          } else {
            maxPrice = lastSummary.maxPrice
            maxPriceOrderId = lastSummary.maxPriceOrderId
          }

          b.append(
            summary.copy(
              maxPrice = maxPrice,
              maxPriceOrderId = maxPriceOrderId,
              minPrice = minPrice,
              minPriceOrderId = minPriceOrderId
            )
          )
        } else {
          b.append(
            summary.copy(
              maxPrice = lastSummary.maxPrice,
              maxPriceOrderId = lastSummary.maxPriceOrderId,
              minPrice = lastSummary.minPrice,
              minPriceOrderId = lastSummary.minPriceOrderId
            )
          )
        }
      } else {
        b.append(
          summary.copy(
            maxPrice = summary.price,
            maxPriceOrderId = summary.order_id,
            minPrice = summary.price,
            minPriceOrderId = summary.order_id
          )
        )
      }

      b
    })
  }
}
票数 0
EN

Stack Overflow用户

发布于 2019-10-05 05:38:57

用于重新划分数据的方法-- repartitionByRange在这些列表达式上对数据进行分区,但执行范围分区。您想要的是这些列上的散列分区。

将方法更改为repartition并将这些列传递给它,它应该确保相同的值组在一个分区中结束。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/58027080

复制
相关文章
C#线程篇---线程池如何管理线程(6完结篇)
C#线程基础在前几篇博文中都介绍了,现在最后来挖掘一下线程池的管理机制,也算为这个线程基础做个完结。   我们现在都知道了,线程池线程分为工作者线程和I/O线程,他们是怎么管理的?   对于Microsoft设计的CLR线程池,线程池会随着CLR的每个版本的发布,都会发生变化,很难去挖掘,这里的提议是:  最好将线程看成一个黑盒。不要拿单个应用程序去衡量这个黑盒的性能,因为它对任何一个应用程序来说都无法做到完美。 相反,它是一种常规用途的线程调度技术,面向大量应用程序;它对某些应用程序的效果要好于其他应
逸鹏
2018/04/10
2.3K0
C#线程篇---线程池如何管理线程(6完结篇)
Linux内存管理 - slab分配器
Linux内存管理是一个非常复杂的子系统,要完全说清的话估计要一本书的篇幅。但Linux内存管理可以划分成多个部分来阐述,这篇文章主要介绍slab算法。
用户7686797
2020/08/25
2.4K0
c#线程-线程同步
如果有多个线程同时访问共享数据的时候,就必须要用线程同步,防止共享数据被破坏。如果多个线程不会同时访问共享数据,可以不用线程同步。 线程同步也会有一些问题存在: 1、性能损耗。获取,释放锁,线程上下文建切换都是耗性能的。 2、同步会使线程排队等待执行。
苏州程序大白
2021/08/13
7590
【Linux 内核 内存管理】伙伴分配器 ② ( 伙伴分配器分配内存流程 )
页 / 阶 概念参考 【Linux 内核 内存管理】伙伴分配器 ① ( 伙伴分配器引入 | 页块、阶 | 伙伴 ) 博客 ;
韩曙亮
2023/03/30
7.1K0
C#线程
线程 ----   线程是程序中的一个执行流,每个线程都有自己的专有寄存器(栈指针、程序计数器等),但代码区是共享的,即不同的线程可以执行同样的函数。   多线程是指程序中包含多个执行流,即在一个程序中可以同时运行多个不同的线程来执行不同的任务,也就是说允许单个程序创建多个并行执行的线程来完成各自的任务。   多线程可以提高CPU的利用率。在多线程程序中,一个线程必须等待的时候,CPU可以运行其它的线程而不是等待,这样就大大提高了程序的效率。   在 C# 中,System.Threading.Thread
拾点阳光
2018/05/10
8740
【Linux 内核 内存管理】伙伴分配器 ① ( 伙伴分配器引入 | 页块、阶 | 伙伴 )
Linux 内核 初始化 完成之后 , 就会 丢弃 引导内存分配器 , 如 : bootmem 分配器 , memblock 分配器 ;
韩曙亮
2023/03/30
1K0
C#线程入门
 C#支持通过多线程并行地执行代码,一个线程有它独立的执行路径,能够与其它的线程同时地运行。一个C#程序开始于一个单线程,这个单线程是被CLR和操作系统(也称为“主线程”)自动创建的,并具有多线程创建额外的线程。
aehyok
2019/02/25
5550
C# 多线程
 Thread类的使用: 初始化: Thread th1 = new Thread(function1); Thread th2 = new Thread(new ThreadStart(function1)); Thread th3 = new Thread(new ParameterizedThreadStart(function2)); ThreadStart和ParameterizedThreadStart是两个委托,方法的抽象。 function1...是方法名,在新线程里要执行的方法名。 pu
小端
2018/04/16
1.1K0
线程详解——c#
使用线程,我们需要引用System.Threading命名空间。创建一个线程最简单的方法就是在 new 一个 Thread,并传递一个ThreadStart委托(无参数)或ParameterizedThreadStart委托(带参数),如下:
vv彭
2020/12/16
4280
线程详解——c#
C#多线程
主线程和通过Thread构造函数创建的线程默认都是前台线程,线程池获取的则默认是后台线程,通过 IsBackground 属性可以设置和获取当前线程是前台线程还是后台线程。
全栈程序员站长
2022/09/06
1.4K0
C#多线程
根据上一节中http://www.cnblogs.com/aehyok/archive/2013/05/02/3054615.html对多线程的入门了解。本节就来探讨一下简单的使用多线程。
aehyok
2018/09/11
5330
C#多线程
C#线程入门
 C#支持通过多线程并行地执行代码,一个线程有它独立的执行路径,能够与其它的线程同时地运行。一个C#程序开始于一个单线程,这个单线程是被CLR和操作系统(也称为“主线程”)自动创建的,并具有多线程创建额外的线程。
aehyok
2018/09/11
5240
C#线程入门
Go并不需要Java风格的GC
像Go、Julia和Rust这样的现代语言不需要像Java c#所使用的那样复杂的垃圾收集器。但这是为什么呢? 我们首先要了解垃圾收集器是如何工作的,以及各种语言分配内存的方式有什么不同。首先,我们看
Robert Lu
2021/12/05
9250
C#多线程(12):线程池
线程池全称为托管线程池,线程池受 .NET 通用语言运行时(CLR)管理,线程的生命周期由 CLR 处理,因此我们可以专注于实现任务,而不需要理会线程管理。
痴者工良
2021/04/26
1.5K0
C#多线程(6):线程通知
回顾一下,前面 lock、Monitor 部分我们学习了线程锁,Mutex 部分学习了进程同步,Semaphor 部分学习了资源池限制。
痴者工良
2021/04/26
9110
C#多线程(11):线程等待
前面我们学习了很多用于线程管理的 类型,也学习了多种线程同步的使用方法,这一篇主要讲述线程等待相关的内容。
痴者工良
2021/04/26
2.3K0
C#多线程开发-线程基础 01
最近由于工作的需要,一直在使用C#的多线程进行开发,其中也遇到了很多问题,但也都解决了。后来发觉自己对于线程的知识和运用不是很熟悉,所以将利用几篇文章来系统性的学习汇总下C#中的多线程开发。
冬夜先生
2021/09/03
4450
C#多线程调试
这篇文章主要分享多线程部分调试技巧,在日常的开发工作中会经常遇到多线程调试的需要。在我们调试的过程中会出现断点的焦点在多个线程之间“反复横跳”根本无法集中跟踪某一个线程的操作链路。那么今天我们来看看如何调试操作。如果有其它需要的可以参考下面微软官方的文档地址。
JusterZhu
2022/12/07
7600
C#多线程调试
C#多线程(8):线程完成数
假如,程序需要向一个 Web 发送 5 次请求,受网路波动影响,有一定几率请求失败。如果失败了,就需要重试。
痴者工良
2021/04/26
5170
C#多线程开发-线程池03
前面2篇文章介绍了线程的基础知识和线程同步,下面我们来一起认识学习下,线程池的使用。
zls365
2021/09/24
9130
C#多线程开发-线程池03

相似问题

如何管理加载动画线程?- C#

21

分配器与多线程

11

C#:管理线程等待队列

21

内存管理与std::分配器

23

如何管理多线程c#应用程序

20
添加站长 进交流群

领取专属 10元无门槛券

AI混元助手 在线答疑

扫码加入开发者社群
关注 腾讯云开发者公众号

洞察 腾讯核心技术

剖析业界实践案例

扫码关注腾讯云开发者公众号
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
查看详情【社区公告】 技术创作特训营有奖征文