在Spark中,折叠(折叠)数据帧的列值是指将多行数据中的列值合并或聚合为更紧凑的形式。这通常用于数据预处理、数据转换或简化数据结构。
这是最常用的列值折叠方式,通过分组后对特定列进行聚合操作。
import org.apache.spark.sql.functions._
// 示例数据帧
val df = Seq(
("A", "item1", 10),
("A", "item2", 20),
("B", "item1", 15),
("B", "item3", 25)
).toDF("category", "item", "value")
// 按category分组,折叠item和value列
val foldedDF = df.groupBy("category")
.agg(
collect_list("item").as("items"),
sum("value").as("total_value")
)
foldedDF.show()
/*
+--------+------------+-----------+
|category| items|total_value|
+--------+------------+-----------+
| B|[item1,item3]| 40|
| A|[item1,item2]| 30|
+--------+------------+-----------+
*/
当需要将行转为列时,可以使用pivot操作。
val pivotedDF = df.groupBy("category").pivot("item").sum("value")
pivotedDF.show()
/*
+--------+-----+-----+-----+
|category|item1|item2|item3|
+--------+-----+-----+-----+
| B| 15| null| 25|
| A| 10| 20| null|
+--------+-----+-----+-----+
*/
collect_list
: 保留所有值,包括重复项collect_set
: 只保留唯一值val withDuplicates = Seq(
("A", "item1"),
("A", "item1"),
("A", "item2")
).toDF("category", "item")
val result = withDuplicates.groupBy("category")
.agg(
collect_list("item").as("all_items"),
collect_set("item").as("unique_items")
)
result.show()
/*
+--------+----------------+----------------+
|category| all_items| unique_items|
+--------+----------------+----------------+
| A|[item1,item1,item2]|[item1, item2]|
+--------+----------------+----------------+
*/
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
class ConcatWithDelimiter(delimiter: String) extends UserDefinedAggregateFunction {
def inputSchema: StructType = StructType(StructField("value", StringType) :: Nil)
def bufferSchema: StructType = StructType(StructField("concat", StringType) :: Nil)
def dataType: DataType = StringType
def deterministic: Boolean = true
def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = ""
}
def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
if (!input.isNullAt(0)) {
if (buffer.getString(0).isEmpty) {
buffer(0) = input.getString(0)
} else {
buffer(0) = buffer.getString(0) + delimiter + input.getString(0)
}
}
}
def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
if (!buffer2.isNullAt(0)) {
if (buffer1.getString(0).isEmpty) {
buffer1(0) = buffer2.getString(0)
} else {
buffer1(0) = buffer1.getString(0) + delimiter + buffer2.getString(0)
}
}
}
def evaluate(buffer: Row): Any = {
buffer.getString(0)
}
}
// 使用自定义聚合函数
val concatUDF = new ConcatWithDelimiter(",")
val customDF = df.groupBy("category")
.agg(concatUDF(col("item")).as("concatenated_items"))
customDF.show()
import org.apache.spark.sql.expressions.Window
val windowSpec = Window.partitionBy("category").orderBy("value")
val windowDF = df.withColumn("running_total", sum("value").over(windowSpec))
windowDF.show()
/*
+--------+-----+-----+------------+
|category| item|value|running_total|
+--------+-----+-----+------------+
| B|item1| 15| 15|
| B|item3| 25| 40|
| A|item1| 10| 10|
| A|item2| 20| 30|
+--------+-----+-----+------------+
*/
问题原因:当折叠大量数据时,特别是使用collect_list
或collect_set
时,可能会导致单个分区的数据过大,引发OOM错误。
解决方案:
spark.executor.memory
aggregate
代替collect_list
处理大数据集df.repartition(numPartitions, $"category")
问题原因:某些键的值远多于其他键,导致任务执行不均衡。
解决方案:
salt
技术:在分组键上添加随机前缀sample
方法先检测数据分布问题原因:聚合函数对NULL值的处理方式不同可能导致意外结果。
解决方案:
coalesce
或ifnull
处理NULL值reduceByKey
替代groupBy
broadcast
连接小表以避免shufflespark.sql.shuffle.partitions
参数通过合理使用Spark的折叠操作,可以有效地处理和转换大规模数据集,满足各种数据分析需求。
没有搜到相关的文章