我有一个带有键和值的表格数据,并且这些键并不是唯一的。例如:
+-----+------+
| key | value|
--------------
| 1 | the |
| 2 | i |
| 1 | me |
| 1 | me |
| 2 | book |
| 1 |table |
+-----+------+
现在假设此表分布在星火集群中的不同节点上。如何使用电火花计算单词相对于不同键的频率?例如,在上面的示例中,我希望输出:
+-----+------+-------------+
| key | value| frequencies |
---------------------------+
| 1 | the | 1/4 |
| 2 | i | 1/2 |
| 1 | me | 2/4 |
| 2 | book | 1/2 |
| 1 |table | 1/4 |
+-----+------+-------------+
发布于 2019-07-24 02:45:44
不确定是否可以将多级操作与DFs结合起来,但只需分两个步骤完成,并由您自己决定,这是可行的:
# Running in Databricks, not all stuff required
# You may want to do to upper or lowercase for better results.
from pyspark.sql import Row
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import *
data = [("1", "the"), ("2", "I"), ("1", "me"),
("1", "me"), ("2", "book"), ("1", "table")]
rdd = sc.parallelize(data)
someschema = rdd.map(lambda x: Row(c1=x[0], c2=x[1]))
df = sqlContext.createDataFrame(someschema)
df1 = df.groupBy("c1", "c2") \
.count()
df2 = df1.groupBy('c1') \
.sum('count')
df3 = df1.join(df2,'c1')
df3.show()
返回:
+---+-----+-----+----------+
| c1| c2|count|sum(count)|
+---+-----+-----+----------+
| 1|table| 1| 4|
| 1| the| 1| 4|
| 1| me| 2| 4|
| 2| I| 1| 2|
| 2| book| 1| 2|
+---+-----+-----+----------+
你可以重新格式化最后两所学校,但我很好奇,如果我们能做到所有的在一次走。在普通SQL中,我们将使用内联视图并合并我怀疑。
这在集群中起着标准的作用,星火一般都是这样。groupBy将其全部考虑在内。
次要编辑
由于外面相当热,我更深入地研究了这个问题。这是一个很好的概述:http://stevendavistechnotes.blogspot.com/2018/06/apache-spark-bi-level-aggregation.html。在阅读了这篇文章并进行了实验之后,我再也不能让它变得更优雅了,把输出减少到5行
发布于 2019-07-24 07:38:00
另一个可行的选择是使用窗口函数。首先,定义每个值出现的次数-键和键。然后,只需添加另一列的分数(您将减少分数)
from pyspark.sql import Row
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import *
from fractions import Fraction
from pyspark.sql.functions import udf
@udf (StringType())
def getFraction(frequency):
return str(Fraction(frequency))
schema = StructType([StructField("key", IntegerType(), True),
StructField("value", StringType(), True)])
data = [(1, "the"), (2, "I"), (1, "me"),
(1, "me"), (2, "book"), (1, "table")]
spark = SparkSession.builder.appName('myPython').getOrCreate()
input_df = spark.createDataFrame(data, schema)
(input_df.withColumn("key_occurrence",
F.count(F.lit(1)).over(Window.partitionBy(F.col("key"))))
.withColumn("value_occurrence", F.count(F.lit(1)).over(Window.partitionBy(F.col("value"), F.col('key'))))
.withColumn("frequency", getFraction(F.col("value_occurrence"), F.col("key_occurrence"))).dropDuplicates().show())
https://stackoverflow.com/questions/57180259
复制