我的ADLA解决方案正被转移到火花。我正在试图找到U 减缩表达式的正确替代品,以启用:
可能的任务实例:
LinkedTransactionId
中捕获链接。这个解决方案可能可以通过groupByKey
命令实现,但我无法确定如何跨多个行应用逻辑。我找到的所有示例都是内联函数的一些变化(通常是聚合函数--例如.map(t => (t._1, t._2.sum))
),它不需要来自同一个分区的单个记录的信息。
有没有人能分享类似解决方案的例子,或者指出正确的方向?
发布于 2018-11-13 01:34:16
以下是一种可能的解决方案--对不同方法的反馈和建议,或迭代Spark/Scala解决方案的示例,我们非常感谢:
CustomerId
),并将每个客户处理为单独的分区(外部mapPartition
循环)foreach
内环)。trnMap
正在防止每个事务的双重分配,并从进程中捕获更新。iterator
输出到最终的数据集dfOut2
中。注意:在这种情况下,使用使用迭代解的窗口函数w/o可以获得相同的结果,但目的是测试迭代逻辑本身)
import org.apache.spark.sql.SparkSession
import org.apache.spark._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.api.java.JavaRDD
case class Person(name: String, var age: Int)
case class SalesTransaction(
CustomerId : Int,
TransactionId : Int,
Score : Int,
Revenue : Double,
Type : String,
Credited : Double = 0.0,
LinkedTransactionId : Int = 0,
IsProcessed : Boolean = false
)
case class TransactionScore(
TransactionId : Int,
Score : Int
)
case class TransactionPair(
SalesId : Int,
CreditId : Int,
ScoreDiff : Int
)
object ExampleDataFramePartition{
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("Example Combiner")
.config("spark.some.config.option", "some-value")
.getOrCreate()
import spark.implicits._
val df = Seq(
(1, 1, 123, "Sales", 100),
(1, 2, 122, "Credit", 100),
(1, 3, 99, "Sales", 70),
(1, 4, 101, "Sales", 77),
(1, 5, 102, "Credit", 75),
(1, 6, 98, "Sales", 71),
(2, 7, 200, "Sales", 55),
(2, 8, 220, "Sales", 55),
(2, 9, 200, "Credit", 50),
(2, 10, 205, "Sales", 50)
).toDF("CustomerId", "TransactionId", "TransactionAttributesScore", "TransactionType", "Revenue")
.withColumn("Revenue", $"Revenue".cast(DoubleType))
.repartition(2,$"CustomerId")
df.show()
val dfOut2 = df.mapPartitions(p => {
println(p)
val trnMap = scala.collection.mutable.Map[Int, SalesTransaction]()
val trnSales = scala.collection.mutable.ArrayBuffer.empty[TransactionScore]
val trnCredits = scala.collection.mutable.ArrayBuffer.empty[TransactionScore]
val trnPairs = scala.collection.mutable.ArrayBuffer.empty[TransactionPair]
p.foreach(row => {
val trnKey: Int = row.getAs[Int]("TransactionId")
val trnValue: SalesTransaction = new SalesTransaction(row.getAs("CustomerId")
, trnKey
, row.getAs("TransactionAttributesScore")
, row.getAs("Revenue")
, row.getAs("TransactionType")
)
trnMap += (trnKey -> trnValue)
if(trnValue.Type == "Sales") {
trnSales += new TransactionScore(trnKey, trnValue.Score)}
else {
trnCredits += new TransactionScore(trnKey, trnValue.Score)}
})
if(trnCredits.size > 0 && trnSales.size > 0) {
//define transaction pairs
trnCredits.foreach(cr => {
trnSales.foreach(sl => {
trnPairs += new TransactionPair(cr.TransactionId, sl.TransactionId, math.abs(cr.Score - sl.Score))
})
})
}
trnPairs.sortBy(t => t.ScoreDiff)
.foreach(t => {
if(!trnMap(t.CreditId).IsProcessed && !trnMap(t.SalesId).IsProcessed){
trnMap(t.SalesId) = new SalesTransaction(trnMap(t.SalesId).CustomerId
, trnMap(t.SalesId).TransactionId
, trnMap(t.SalesId).Score
, trnMap(t.SalesId).Revenue
, trnMap(t.SalesId).Type
, math.min(trnMap(t.CreditId).Revenue, trnMap(t.SalesId).Revenue)
, t.CreditId
, true
)
trnMap(t.CreditId) = new SalesTransaction(trnMap(t.CreditId).CustomerId
, trnMap(t.CreditId).TransactionId
, trnMap(t.CreditId).Score
, trnMap(t.CreditId).Revenue
, trnMap(t.CreditId).Type
, math.min(trnMap(t.CreditId).Revenue, trnMap(t.SalesId).Revenue)
, t.SalesId
, true
)
}
})
trnMap.map(m => m._2).toIterator
})
dfOut2.show()
spark.stop()
}
}
https://stackoverflow.com/questions/52283373
复制相似问题