首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >尝试在具有JSON结构的Pyspark中将时间框转换为min和to数组

尝试在具有JSON结构的Pyspark中将时间框转换为min和to数组
EN

Stack Overflow用户
提问于 2020-11-17 18:54:54
回答 1查看 55关注 0票数 1

我的数据格式为

代码语言:javascript
运行
复制
<table>
<tr>
<td> id </td>
<td> field_2 </td>
<td> field_3 </td>
<td> date </td>
<td> a_blob </td>
</tr>
<tr>
<td> 1 </td>
<td> some_data </td>
<td> some_data </td>
<td> 11/1/2020 </td>
<td> {"name": "abc1", "usage_count": {"bin102": 1, "bin103": 1, "bin104": 1, "bin105": 1, "bin110": 1, "bin112": 1, "bin120": 1, "bin121": 1, "bin122": 1, "bin123": 1, "bin124": 1, "bin136": 2, "bin137": 1, "bin138": 1, "bin139": 1, "bin140": 1, "bin141": 2, "bin142": 2}, "usage_min": {"bin102": 7.7, "bin103": 10, "bin104": 10, "bin105": 2.5, "bin110": 0.1, "bin112": 0.8, "bin120": 6.8, "bin121": 10, "bin122": 10, "bin123": 10, "bin124": 4.3, "bin136": 2.5, "bin137": 10, "bin138": 10, "bin139": 10, "bin140": 10, "bin141": 9.3, "bin142": 3.8}, "p_name": "abc_1"}  </td>
</tr>
</table>

我想把它转换成下面的格式

代码语言:javascript
运行
复制
<table>
<tr>
<td> id </td>
<td> field_2 </td>
<td> field_3 </td>
<td> date </td>
<td>  mins_arr </td>
<td>   cnt_arr </td>
</tr>
<tr>
<td> 1 </td>
<td> some_data </td>
<td> some_data </td>
<td> 11/1/2020 </td>
<td> 0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,24.9,50.0,9.9,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0</td>
<td> 0,0,0,0,0,0,0,0,0,0,0,3,5,1,0,0,0,0,0,0,0,0,0,0</td>
</tr>
</table>

我使用以下代码来执行此转换

代码语言:javascript
运行
复制
def convert_to_dense_bins(u_count, u_minutes):
count = [0] * 144
minutes = [0.0] * 144
for key in u_count:
    bin_number = key.split("bin")[1]
    count[int(bin_number, 10)] = u_count[key]
    bin_minutes = u_minutes[key]
    minutes[int(bin_number, 10)] = bin_minutes
return count, minutes


def aggregate_to_hourly_bins(count_bin, minutes_bin):
    count = []
    minutes = []
    for i in range(0, 144, 6):
        hour_count = sum(count_bin[i:i + 6])
        count.append(str(hour_count))
        hour_minutes = sum(minutes_bin[i:i + 6])
        minutes.append(str(hour_minutes))
    return count, minutes


def transform(row):
    e_data = json.loads(row[4])
    p_name = e_data["p_name"]
    name = e_data["name"]
    count_bin, minutes_bin = convert_to_dense_bins(e_data["usage_count"],
                                                   e_data["usage_minutes"])
    count_hourly, minutes_hourly = aggregate_to_hourly_bins(count_bin, minutes_bin)
    return (row.id, name, row.feature_1, row.feature_2, p_name, row.date, ','.join(minutes_hourly),
            ','.join(count_hourly))

    new_columns = ["id", "name", "feature_1", "feature_2", "p_name", "date", "mins_arr", "cnt_arr"]
    df = df_old.rdd \
        .filter(some_filter_function) \
        .map(transform) \
        .toDF(new_columns)

随着我的数据增长,这段代码花费的时间太长了。我正在寻找在PySpark中进行这种转换的更有效的方法。由于解析为字符串的数据内部的JSON结构的复杂性,我不能使用窗口函数之类的。任何帮助都是非常感谢的。

EN

回答 1

Stack Overflow用户

发布于 2020-11-19 01:38:46

对于Spark 2.3.1,请使用pandas_udf,如下所示:

第-1步:使用json_tuple函数检索usage_countusage_min作为StringType字段:

代码语言:javascript
运行
复制
from pyspark.sql import functions as F
import numpy as np
import pandas as pd

j1 = """{"name": "abc1", "usage_count": {"bin102": 1, "bin103": 1, "bin104": 1, "bin105": 1, "bin110": 1, "bin112": 1, "bin120": 1, "bin121": 1, "bin122": 1, "bin123": 1, "bin124": 1, "bin136": 2, "bin137": 1, "bin138": 1, "bin139": 1, "bin140": 1, "bin141": 2, "bin142": 2}, "usage_min": {"bin102": 7.7, "bin103": 10, "bin104": 10, "bin105": 2.5, "bin110": 0.1, "bin112": 0.8, "bin120": 6.8, "bin121": 10, "bin122": 10, "bin123": 10, "bin124": 4.3, "bin136": 2.5, "bin137": 10, "bin138": 10, "bin139": 10, "bin140": 10, "bin141": 9.3, "bin142": 3.8}, "p_name": "abc_1"}"""

df = spark.createDataFrame([(j1,)],['e_data'])

cols = ["name", "p_name", "usage_count", "usage_min"]

df1 = df.select(F.json_tuple("e_data", *cols).alias(*cols))
df1.printSchema()
#root
# |-- name: string (nullable = true)
# |-- p_name: string (nullable = true)
# |-- usage_count: string (nullable = true)
# |-- usage_min: string (nullable = true)

注意:如果使用以下行使用spark-xml加载数据,则上面的e_data列应该是名为td (type=array<string>)的列的第5个元素(df['td'][4]):

代码语言:javascript
运行
复制
df = spark.read.format("com.databricks.spark.xml").options(rowTag="tr").load('/paths')

Step-2:设置pandas_udf,我们使用pd.Series.str.findall将所有bin条目检索到具有两个元素(对应于indexvalue)的元组列表中,将其转换/映射为np.array,然后使用这些索引和值填充由144个元素组成的一维数组。接下来,使用np.array_split将上述一维数组拆分为24个数组,并执行np.sum(axis=1),将结果作为pd.Series返回,其值为浮点数列表。

代码语言:javascript
运行
复制
def _pandas_bin_sum(s,N):
  ret = []
  for x in map(np.array, s.str.findall(r'"bin(\d+)":([\d.]+)')):
    try:
      z = np.zeros(144)
      z[x[:,0].astype(np.int)] = x[:,1].astype(np.float)
      ret.append([ float(e) for e in np.sum(np.array_split(z,N),axis=1) ])
    except:
      ret.append(None)
  return pd.Series(ret)

pandas_bin_sum = F.pandas_udf(lambda x: _pandas_bin_sum(x,24), "array<float>")

第-3步:应用pandas_udf并使用F.concat_ws()转换两列:

代码语言:javascript
运行
复制
df1.withColumn('usage_count', F.concat_ws(',', pandas_bin_sum('usage_count').astype("array<int>"))) \
    .withColumn('usage_min', F.concat_ws(',', pandas_bin_sum('usage_min'))) \
    .show(1,100,vertical=True)
-RECORD 0----------------------------------------------------------------------------------------------------------
 name        | abc1
 p_name      | abc_1
 usage_count | 0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,4,2,0,5,0,3,7
 usage_min   | 0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,30.2,0.9,0.0,41.1,0.0,12.5,43.1
only showing top 1 row
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/64873861

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档