xarray的典型计算工作流程通常包括:
xr.open_mfdataset
或 xr.open_dataset(chunks=...)
读取单个或多个文件到 Dataset
to_netcdf
方法保存结果上述步骤通常会产生很大的nc文件(>10G),尤其是在处理大量数据时。最近在处理卫星数据时,最终生成的文件甚至超过了50G,有些甚至超过了100G。而目前xarray对于nc格式的大文件存储让人头疼。在存储这些大文件时耗时很长,甚至可能会导致程序挂起。
为了避免上述问题,可以利用xr.save_mfdataset
,可以同时存储多个dataset对象。关于此函数的说明可查看官方文档。
首先导入所需要的库:
import xarray as xr
import numpy as np
from distributed import Client, performance_report
然后创建Client
对象,构建本地cluster
:
client = Client()
dask创建的多进程cluster
不同的机器和参数设置上述信息会存在差异
然后加载数据集:
ds = xr.tutorial.open_dataset('rasm', chunks={'time': 12})
此数据集为xarray官方提供的示例数据。这里设置的 time 维度的块大小为12。然后,对上述数据集执行相关计算操作:
result = np.sqrt(np.sin(ds) ** 2 + np.cos(ds) ** 2)
计算过程使用了 dask,可以执行如下语句查看计算图:
result.Tair.data.visualize()
dask计算图,点击可看大图
计算完成后,为了并行存储nc文件,需要将上述结果分割为多个对象:
创建分割函数将上述dataset对象分割为多个子dataset对象:
import itertools
def split_by_chunks(dataset):
chunk_slices = {}
for dim, chunks in dataset.chunks.items():
slices = []
start = 0
for chunk in chunks:
if start >= dataset.sizes[dim]:
break
stop = start + chunk
slices.append(slice(start, stop))
start = stop
chunk_slices[dim] = slices
for slices in itertools.product(*chunk_slices.values()):
selection = dict(zip(chunk_slices.keys(), slices))
yield dataset[selection]
分割对象:
datasets = list(split_by_chunks(result))
返回结果中的每一项对应的是xarray的dataset对象的每一个切片。
然后需要一个函数为分割后的每一个dataset对象生成路径:
def create_filepath(ds, prefix='filename', root_path="."):
"""
Generate a filepath when given an xarray dataset
"""
start = ds.time.data[0].strftime("%Y-%m-%d")
end = ds.time.data[-1].strftime("%Y-%m-%d")
filepath = f'{root_path}/{prefix}_{start}_{end}.nc'
return filepath
先在一个dataset对象上执行上述函数,测试函数是否能正常运行:
create_filepath(datasets[1])
下一步就是为每一个dataset对象创建一个路径,用于保存数据:
paths = [create_filepath(ds) for ds in datasets]
最后,就可以利用xr.sace_mfdataset
函数并行存储nc文件了:
xr.save_mfdataset(datasets=datasets, paths=paths)
保存完数据之后,可以检查一下并行存储的结果和单独存储的结果是否一致。
读取存储的数据:
new_ds = xr.open_mfdataset(paths, combine='by_coords')
然后和上述计算的结果进行对比:
try:
xr.testing.assert_identical(result, new_ds)
except AssertionError:
print('The datasets are not identical!')
else:
print('The datasets are identical!')
netCDF可是的写操作一直是xarray的痛点,尤其是在并行写和增量写文件方面。之前也介绍过另一种文件格式 Zarr真的能替代NetCDF4和HDF5吗,在文件并行写和增量写方面非常友好,尤其是涉及到大文件时。
目前新版本的netCDF库也逐渐支持zarr格式,但还没测试过效果如何。如果不是一定要netCDF格式的话,可以尝试使用zarr格式。
后话:虽然本文使用了dask,但是涉及到dask的内容比较少。最近在处理数据时用到了dask,后面有时间可能会更一些dask相关的推文,比如数据并行处理。