我正在尝试使用dask并行处理python中的时间序列预测。数据的格式是,每个时间序列都是一列,它们有一个月日期的共同索引。我有一个自定义预测函数,它返回带有拟合值和预测值的时间序列对象。我想要将这个函数应用于dataframe的所有列(所有时间序列),并返回一个新的dataframe,并将所有这些序列上传到DB。我通过运行以下代码使代码工作:
data = pandas_df.copy()
ddata = dd.from_pandas(data, npartitions=1)
res = ddata.map_partitions(lambda df: df.apply(forecast_func,
axis=0)).compute(get=dask.multiprocessing.get)我的问题是,Dask中是否有一种按列而不是行进行分区的方法,因为在这个用例中,我需要保持有序的时间索引,就像预测函数正确工作一样。
如果不是,我将如何重新格式化数据以使高效的大规模预测成为可能,并且仍然以我需要的格式返回数据到DB?
发布于 2018-03-22 16:59:47
谢谢你的帮助,我真的很感激。我使用了dask.delayed解决方案,它运行得很好,仅使用本地集群就需要大约1/3的时间。
对于任何感兴趣的人,我已经实现的解决方案:
from dask.distributed import Client, LocalCluster
import pandas as pd
import dask
cluster = LocalCluster(n_workers=3,ncores=3)
client = Client(cluster)
#get list of time series back
output = []
for i in small_df:
forecasted_series = dask.delayed(custom_forecast_func)(small_df[i])
output.append(forecasted_series)
total = dask.delayed(output).compute()
#combine list of series into 1 dataframe
full_df = pd.concat(total,ignore_index=False,keys=small_df.columns,names=['time_series_names','Date'])
final_df = full_df.to_frame().reset_index()
final_df.columns = ['time_series_names','Date','value_variable']
final_df.head()这为您提供了熔化的dataframe结构,因此,如果您希望将该系列作为列,则可以使用
pivoted_df = final_df.pivot(index='Date', columns='time_series_names', values='value_variable')发布于 2018-03-22 02:11:14
Dask dataframe只按行对数据进行分区。见Dask数据文件
然而,Dask阵列可以沿任何维度进行分区。您必须使用Numpy语义,而不是Pandas语义。
你可以用达克延迟或期货做任何你想做的事情。在一个更通用的教程中给出的这个并行计算实例可能会给你一些想法。
https://stackoverflow.com/questions/49416980
复制相似问题