我的数据格式为
<table>
<tr>
<td> id </td>
<td> field_2 </td>
<td> field_3 </td>
<td> date </td>
<td> a_blob </td>
</tr>
<tr>
<td> 1 </td>
<td> some_data </td>
<td> some_data </td>
<td> 11/1/2020 </td>
<td> {"name": "abc1", "usage_count": {"bin102": 1, "bin103": 1, "bin104": 1, "bin105": 1, "bin110": 1, "bin112": 1, "bin120": 1, "bin121": 1, "bin122": 1, "bin123": 1, "bin124": 1, "bin136": 2, "bin137": 1, "bin138": 1, "bin139": 1, "bin140": 1, "bin141": 2, "bin142": 2}, "usage_min": {"bin102": 7.7, "bin103": 10, "bin104": 10, "bin105": 2.5, "bin110": 0.1, "bin112": 0.8, "bin120": 6.8, "bin121": 10, "bin122": 10, "bin123": 10, "bin124": 4.3, "bin136": 2.5, "bin137": 10, "bin138": 10, "bin139": 10, "bin140": 10, "bin141": 9.3, "bin142": 3.8}, "p_name": "abc_1"} </td>
</tr>
</table>
我想把它转换成下面的格式
<table>
<tr>
<td> id </td>
<td> field_2 </td>
<td> field_3 </td>
<td> date </td>
<td> mins_arr </td>
<td> cnt_arr </td>
</tr>
<tr>
<td> 1 </td>
<td> some_data </td>
<td> some_data </td>
<td> 11/1/2020 </td>
<td> 0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,24.9,50.0,9.9,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0</td>
<td> 0,0,0,0,0,0,0,0,0,0,0,3,5,1,0,0,0,0,0,0,0,0,0,0</td>
</tr>
</table>
我使用以下代码来执行此转换
def convert_to_dense_bins(u_count, u_minutes):
count = [0] * 144
minutes = [0.0] * 144
for key in u_count:
bin_number = key.split("bin")[1]
count[int(bin_number, 10)] = u_count[key]
bin_minutes = u_minutes[key]
minutes[int(bin_number, 10)] = bin_minutes
return count, minutes
def aggregate_to_hourly_bins(count_bin, minutes_bin):
count = []
minutes = []
for i in range(0, 144, 6):
hour_count = sum(count_bin[i:i + 6])
count.append(str(hour_count))
hour_minutes = sum(minutes_bin[i:i + 6])
minutes.append(str(hour_minutes))
return count, minutes
def transform(row):
e_data = json.loads(row[4])
p_name = e_data["p_name"]
name = e_data["name"]
count_bin, minutes_bin = convert_to_dense_bins(e_data["usage_count"],
e_data["usage_minutes"])
count_hourly, minutes_hourly = aggregate_to_hourly_bins(count_bin, minutes_bin)
return (row.id, name, row.feature_1, row.feature_2, p_name, row.date, ','.join(minutes_hourly),
','.join(count_hourly))
new_columns = ["id", "name", "feature_1", "feature_2", "p_name", "date", "mins_arr", "cnt_arr"]
df = df_old.rdd \
.filter(some_filter_function) \
.map(transform) \
.toDF(new_columns)随着我的数据增长,这段代码花费的时间太长了。我正在寻找在PySpark中进行这种转换的更有效的方法。由于解析为字符串的数据内部的JSON结构的复杂性,我不能使用窗口函数之类的。任何帮助都是非常感谢的。
发布于 2020-11-19 01:38:46
对于Spark 2.3.1,请使用pandas_udf,如下所示:
第-1步:使用json_tuple函数检索usage_count和usage_min作为StringType字段:
from pyspark.sql import functions as F
import numpy as np
import pandas as pd
j1 = """{"name": "abc1", "usage_count": {"bin102": 1, "bin103": 1, "bin104": 1, "bin105": 1, "bin110": 1, "bin112": 1, "bin120": 1, "bin121": 1, "bin122": 1, "bin123": 1, "bin124": 1, "bin136": 2, "bin137": 1, "bin138": 1, "bin139": 1, "bin140": 1, "bin141": 2, "bin142": 2}, "usage_min": {"bin102": 7.7, "bin103": 10, "bin104": 10, "bin105": 2.5, "bin110": 0.1, "bin112": 0.8, "bin120": 6.8, "bin121": 10, "bin122": 10, "bin123": 10, "bin124": 4.3, "bin136": 2.5, "bin137": 10, "bin138": 10, "bin139": 10, "bin140": 10, "bin141": 9.3, "bin142": 3.8}, "p_name": "abc_1"}"""
df = spark.createDataFrame([(j1,)],['e_data'])
cols = ["name", "p_name", "usage_count", "usage_min"]
df1 = df.select(F.json_tuple("e_data", *cols).alias(*cols))
df1.printSchema()
#root
# |-- name: string (nullable = true)
# |-- p_name: string (nullable = true)
# |-- usage_count: string (nullable = true)
# |-- usage_min: string (nullable = true)注意:如果使用以下行使用spark-xml加载数据,则上面的e_data列应该是名为td (type=array<string>)的列的第5个元素(df['td'][4]):
df = spark.read.format("com.databricks.spark.xml").options(rowTag="tr").load('/paths')Step-2:设置pandas_udf,我们使用pd.Series.str.findall将所有bin条目检索到具有两个元素(对应于index和value)的元组列表中,将其转换/映射为np.array,然后使用这些索引和值填充由144个元素组成的一维数组。接下来,使用np.array_split将上述一维数组拆分为24个数组,并执行np.sum(axis=1),将结果作为pd.Series返回,其值为浮点数列表。
def _pandas_bin_sum(s,N):
ret = []
for x in map(np.array, s.str.findall(r'"bin(\d+)":([\d.]+)')):
try:
z = np.zeros(144)
z[x[:,0].astype(np.int)] = x[:,1].astype(np.float)
ret.append([ float(e) for e in np.sum(np.array_split(z,N),axis=1) ])
except:
ret.append(None)
return pd.Series(ret)
pandas_bin_sum = F.pandas_udf(lambda x: _pandas_bin_sum(x,24), "array<float>")第-3步:应用pandas_udf并使用F.concat_ws()转换两列:
df1.withColumn('usage_count', F.concat_ws(',', pandas_bin_sum('usage_count').astype("array<int>"))) \
.withColumn('usage_min', F.concat_ws(',', pandas_bin_sum('usage_min'))) \
.show(1,100,vertical=True)
-RECORD 0----------------------------------------------------------------------------------------------------------
name | abc1
p_name | abc_1
usage_count | 0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,4,2,0,5,0,3,7
usage_min | 0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,30.2,0.9,0.0,41.1,0.0,12.5,43.1
only showing top 1 rowhttps://stackoverflow.com/questions/64873861
复制相似问题