让我们考虑记录为一组netcdf文件的大型数据集,如下面的代码所创建的:
import xarray as xr
import numpy as np
import dask
from scipy.stats import median_absolute_deviation
import os
n_epochs = 200
n_times = 1500
n_series = 10000
def some_process_generating_sequential_data_epochs():
return np.random.rand(n_series, n_times)
os.mkdir("temp")
# writes 200 files of 120.1 MB each
for no_epoch in range(n_epochs):
array = xr.DataArray(some_process_generating_sequential_data_epochs()[:, :, None],
dims=("series", "times", "epochs"),
coords={"series": range(n_series),
"times": range(n_times),
"epochs": [no_epoch]})
array.chunk({"series": 1000, "times": 500, "epochs":1})\
.to_netcdf("temp/dummy_file_{}.nc".format(no_epoch)) 现在,如果我们想要跨时代维度(即在文件中拆分数据集的维度)应用聚合函数,我们可以使用
dataset = xr.open_mfdataset("temp/dummy_file_*.nc", chunks={"series": 1000, "times": 500, "epochs":1},
concat_dim='epochs', combine='by_coords')
dataset.reduce(np.std, "epochs", allow_lazy=True)\
.compute().to_netcdf("temp/std.nc") 明确提到分块是很重要的,否则dask将加载不适合内存[1]的整个文件
默认情况下,将选择块将整个输入文件一次性加载到内存中。
这显然适用于numpy均值、std和中值函数。但是,它不适用于其他函数(例如,scipy.stats.median_absolute_deviation),这些函数将尝试加载整个数据集并触发MemoryError。我怀疑这与dask github 5岁版有关。在这张票上,他们报告
当一个大型数组与平均值交互时,就会出现问题,如下所示: (X-x.mean()).sum().compute() 当前的解决办法是显式地事先计算平均值。 ( x.mean().compute()).sum().compute()
我试图通过修改scipy.stats版本的median_absolute_deviation来应用这样的策略,但没有成功。他们还建议使用dask.set_options(split_every=2) (现在已经不再支持dask.config.set(split_every=2)了),但在这里似乎没有帮助。
是否有一个恰当的、无头抓取的、或多或少总是有效的成语来对这种数据集执行这些减缩操作?
发布于 2020-03-13 16:56:42
你的问题很长,所以我没有全部阅读(抱歉,我试着回答这些问题,但有很多问题)。
我想你是在问..。
如何将任意约简函数应用于大型数据集?
答案是你做不到,像da.mean这样的操作是用并行算法重写的。你不能就这样接受Numpy或Scipy版本,并按原样应用它。但是,如果您可以考虑如何分解您的还原操作以并行操作,那么您可能需要尝试一下da.reduction函数。
或者,如果您只在一个轴上应用缩减,那么您可以重新块数据,以便该轴是单一块的,然后使用类似于map_blocks的东西以令人尴尬的并行方式应用您的函数。
https://stackoverflow.com/questions/60554731
复制相似问题