首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >星火中的迭代RDD/Dataframe处理

星火中的迭代RDD/Dataframe处理
EN

Stack Overflow用户
提问于 2018-09-11 19:59:08
回答 1查看 596关注 0票数 0

我的ADLA解决方案正被转移到火花。我正在试图找到U 减缩表达式的正确替代品,以启用:

  1. 读取逻辑分区并将信息存储在内存中的列表/字典/向量或其他数据结构中
  2. 应用需要多次迭代的逻辑
  3. 输出结果为附加列和原始数据(原始行可能被部分删除或重复)

可能的任务实例:

  • 输入数据集具有带有ID和属性的销售和返回事务。
  • 解决方案应该是为每一次返回找到最有可能的销售方式。
  • 返回事务必须发生在销售事务之后,并尽可能与销售事务相似(最佳可用匹配)
  • 返回事务必须链接到一个销售事务;销售事务可以链接到一个或没有返回事务-应该在新列LinkedTransactionId中捕获链接。

这个解决方案可能可以通过groupByKey命令实现,但我无法确定如何跨多个行应用逻辑。我找到的所有示例都是内联函数的一些变化(通常是聚合函数--例如.map(t => (t._1, t._2.sum))),它不需要来自同一个分区的单个记录的信息。

有没有人能分享类似解决方案的例子,或者指出正确的方向?

EN

回答 1

Stack Overflow用户

发布于 2018-11-13 01:34:16

以下是一种可能的解决方案--对不同方法的反馈和建议,或迭代Spark/Scala解决方案的示例,我们非常感谢:

  • 示例将读取每个客户的销售和信用事务(CustomerId),并将每个客户处理为单独的分区(外部mapPartition循环)
  • 信用将映射到得分最近的销售(即最小的分数差-使用每个分区内的foreach内环)。
  • 可变map trnMap正在防止每个事务的双重分配,并从进程中捕获更新。
  • 结果通过iterator输出到最终的数据集dfOut2中。

注意:在这种情况下,使用使用迭代解的窗口函数w/o可以获得相同的结果,但目的是测试迭代逻辑本身)

代码语言:javascript
运行
复制
import org.apache.spark.sql.SparkSession
import org.apache.spark._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.api.java.JavaRDD



case class Person(name: String, var age: Int)

case class SalesTransaction(
                      CustomerId : Int,
                      TransactionId : Int,
                      Score : Int,
                      Revenue : Double,
                      Type : String,
                      Credited : Double = 0.0,
                      LinkedTransactionId : Int = 0,
                      IsProcessed : Boolean = false
                      )

case class TransactionScore(
                           TransactionId : Int,
                           Score : Int
                           )

case class TransactionPair(
                          SalesId : Int,
                          CreditId : Int,
                          ScoreDiff : Int
                          )

object ExampleDataFramePartition{
  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .appName("Example Combiner")
      .config("spark.some.config.option", "some-value")
      .getOrCreate()

    import spark.implicits._

    val df = Seq(
      (1, 1, 123, "Sales", 100),
      (1, 2, 122, "Credit", 100),
      (1, 3, 99, "Sales", 70),
      (1, 4, 101, "Sales", 77),
      (1, 5, 102, "Credit", 75),
      (1, 6, 98, "Sales", 71),
      (2, 7, 200, "Sales", 55),
      (2, 8, 220, "Sales", 55),
      (2, 9, 200, "Credit", 50),
      (2, 10, 205, "Sales", 50)
    ).toDF("CustomerId", "TransactionId", "TransactionAttributesScore", "TransactionType", "Revenue")
      .withColumn("Revenue", $"Revenue".cast(DoubleType))
      .repartition(2,$"CustomerId")

    df.show()


    val dfOut2 = df.mapPartitions(p => {

      println(p)


      val trnMap = scala.collection.mutable.Map[Int, SalesTransaction]()
      val trnSales = scala.collection.mutable.ArrayBuffer.empty[TransactionScore]
      val trnCredits = scala.collection.mutable.ArrayBuffer.empty[TransactionScore]
      val trnPairs = scala.collection.mutable.ArrayBuffer.empty[TransactionPair]


      p.foreach(row => {
        val trnKey: Int = row.getAs[Int]("TransactionId")
        val trnValue: SalesTransaction = new SalesTransaction(row.getAs("CustomerId")
          , trnKey
          , row.getAs("TransactionAttributesScore")
          , row.getAs("Revenue")
          , row.getAs("TransactionType")
        )

        trnMap += (trnKey -> trnValue)
        if(trnValue.Type == "Sales") {
          trnSales += new TransactionScore(trnKey, trnValue.Score)}
        else {
          trnCredits += new TransactionScore(trnKey, trnValue.Score)}
      })

      if(trnCredits.size > 0 && trnSales.size > 0) {
        //define transaction pairs
        trnCredits.foreach(cr => {
          trnSales.foreach(sl => {
            trnPairs += new TransactionPair(cr.TransactionId, sl.TransactionId, math.abs(cr.Score - sl.Score))
          })
        })
      }

      trnPairs.sortBy(t => t.ScoreDiff)
          .foreach(t => {
            if(!trnMap(t.CreditId).IsProcessed && !trnMap(t.SalesId).IsProcessed){
              trnMap(t.SalesId) = new SalesTransaction(trnMap(t.SalesId).CustomerId
                , trnMap(t.SalesId).TransactionId
                , trnMap(t.SalesId).Score
                , trnMap(t.SalesId).Revenue
                , trnMap(t.SalesId).Type
                , math.min(trnMap(t.CreditId).Revenue, trnMap(t.SalesId).Revenue)
                , t.CreditId
                , true
              )
              trnMap(t.CreditId) = new SalesTransaction(trnMap(t.CreditId).CustomerId
                , trnMap(t.CreditId).TransactionId
                , trnMap(t.CreditId).Score
                , trnMap(t.CreditId).Revenue
                , trnMap(t.CreditId).Type
                , math.min(trnMap(t.CreditId).Revenue, trnMap(t.SalesId).Revenue)
                , t.SalesId
                , true
              )              
            }
          })

      trnMap.map(m => m._2).toIterator

    })

    dfOut2.show()


    spark.stop()
  }
}
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/52283373

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档