我有一个大的dask工作流,它产生两个不同的数组,形状相同,但块(和数据类型)不同。我尝试重新分块这些数组(具有相同的块大小),但由于它们具有不同的数据类型,所以块也不同。有没有一种方法可以强制这些数组中的每个数组具有相同的块几何形状,以便我可以更容易地使用映射数组函数(具体地说,我必须设计一个同时迭代两个输入dask数组的map_blocks函数,并且具有相同的块将使这项工作变得非常容易)?这是一个重现问题的简化示例: import dask.array as da
import numpy as np
arrayOne = da.random.random((96700000000)
下面是使用dask和cupy运行的代码片段,我对此有问题。我在Google上运行GPU激活。
基本上,我的问题是,A和的是太大的阵列,所以我使用Dask。在这些太大的内存阵列上,我运行操作,但是我想获得AtW1:,k (作为一个立方体数组),而不损坏我的RAM或GPU内存,因为我需要这个值来进行进一步的操作。我怎样才能做到这一点?
import dask.array as da
import cupy as cp
from dask_cuda import LocalCUDACluster
from dask.distributed import Client
cluster = Loca
我有大量的大整数数组存储在hdf5 5格式的文件中。我发现将这些数据表示为dask数组(相对于h5py.File对象列表)是方便的数据索引,但是从磁盘加载数据片非常缓慢。
下面是一个示例,其中dsets是h5py.File对象的列表,x是由这些h5py.File对象构造的dask.array。dask数组的分块与h5py.File对象的块匹配。
# Index h5py objects individually
In [68]: %%time
...: tmp = [d[0,0,0] for d in dsets];
...:
CPU times: user 23.6 ms,
考虑将一个2D数组X放入内存--在我的例子中,它是以Zarr格式存储的,但这并不重要。我想在数组上按块映射一个函数,并保存结果,而无需将整个数组加载到内存中。例如,
import dask.array as da
import numpy as np
X = da.arange(10000000,
dtype=np.int32).reshape((10,1000000)).rechunk((10,1000))
def toy_function(chunk):
return np.mean(chunk,axis=0)
lazy_result = X.map_blocks(t
我试图执行颜色转换从3通道到1通道平行使用达克。我希望这样做,这样我就可以在将来执行内存不足的计算了。我用da.map_blocks。
from dask.array.image import imread
import dask.array as da
import numpy as np
import cv2
import matplotlib.pyplot as plt
%matplotlib inline
im = imread('../datatest/*.JPG') # wrap around existing images
def showplt(x):
我在dask dataframe中加载了一个大型xarray数据集,其中包含相当大的空间和时间范围的数据。我正在尝试做的是使用dask将这些数据拆分成更小的块并并行加载到内存中。下面是我正在尝试做的示例代码: import numpy as np
import xarray as xr
def chunk(ds,x_ends,y_ends):
'''
Function which takes a large dataset which has been lazily loaded and specified
indices within th
我需要从一个大的numpy数组中保存一个dask数组。下面是一个最小的工作示例,展示了该过程。请注意,a是使用numpy.random创建的,但不幸的是,我不能使用dask创建数组。
import numpy as np
import dask.array as da
from dask.distributed import Client
a = numpy.random.randint(0,2,size=4000000*8000).reshape((4000000,8000))
# here the conversion and saving
client = Client(n_worke
我的本地机器上有一个大型的NumPy数组,我想在集群上与Dask.array并行。
import numpy as np
x = np.random.random((1000, 1000, 1000))
然而,当我使用dask.array时,我发现调度程序开始占用大量内存。为什么会这样呢?这些数据不应该交给工人吗?
import dask.array as da
x = da.from_array(x, chunks=(100, 100, 100))
from dask.distributed import Client
client = Client(...)
x = x.persist(
我是dask的新手,正在尝试弄清楚如何重塑从dask数据帧的单个列中获得的dask数组,但遇到了错误。想知道有没有人知道这个修复方法(不需要强行计算)?谢谢!
示例:
import pandas as pd
import numpy as np
from dask import dataframe as dd, array as da
df = pd.DataFrame({'x': [1, 2, 3], 'y': [4, 5, 6]})
ddf = dd.from_pandas(df, npartitions=2)
# This does not work -
我正在尝试使用dask数组创建聚合统计信息。map_blocks看起来很理想,但却无法正常工作。
我是dask的新手,所以我想了解它的工作原理。我计划使用自定义函数,并从一些基础知识开始。我被卡住了,在几个小时的试错之后,我看不到解决方案。
import dask
import dask.array as da
from numpy import median,array
def func(a):
m = median(a)
print(m)
return array(m)
x = da.random.random((10000, 10000), chunks=(5
从一个库中,我得到一个函数,它读取一个文件并返回一个numpy数组。
我想用多个文件中的多个块构建一个Dask数组。
每个块都是在文件上调用函数的结果。
当我要求Dask计算时,Dask会要求函数同时从硬盘读取多个文件吗?
如果是这样的话,如何避免呢?我的电脑没有并行文件系统。
示例:
import numpy as np
import dask.array as da
import dask
# Make test data
n = 2
m = 3
x = np.arange(n * m, dtype=np.int).reshape(n, m)
np.save('0.npy'
我正在测试dask,我不明白为什么dask比普通的python慢。我在jupyer中开发了两个示例来获取每个示例的时间,我认为我做错了什么。
第一个是dask: 28.5秒,之后是纯python 140ms
import dask
import dask.array as da
%%time
def inc(x):
return x + 1
def double(x):
return x + 2
def add(x, y):
return x + y
N = 100000
d
我在一台有16 of内存的机器中运行下面粘贴的代码(故意的)。
import dask.array as da
import dask.delayed
from sklearn.datasets import make_blobs
import numpy as np
from dask_ml.cluster import KMeans
from dask.distributed import Client
client = Client(n_workers=4, threads_per_worker=1, processes=False,
memory_l
在第二次运行这个语句时,我得到了一个分段错误:
chunks[i].argv[0] = malloc( strlen(token) * sizeof(char *) + 1 );
在上下文中的守则是:
/* TODO: modify str_split to do the copying of its input string if it needs to (e.g. if it uses strtok on it), and return a struct that has the number of "chunks" it split out and the list of
GIL锁会显著降低以下代码的性能吗?
每个块上的函数使用python循环,而不是numpy函数。由于外部库,我必须使用python循环。
测试代码:
import numpy as np
import dask.array as da
import dask.sharedict as sharedict
from itertools import product
def block_func(block):
for i in range(len(block)): # <--- the python loop ...
block[i] += 1
ret
我从ERA5数据库下载了1979-2020年(42年) NetCDF格式的每日风数据。因此,我有15330个.nc文件。我需要提取一个单一网格点的风数据,并需要使这个过程尽可能快。
自从打开15300个文件以来,每个大小为97 in的文件使用xarray.mfdataset()函数都太慢了,首先我将每天的数据合并成年数据并存储在一个文件夹中。我使用以下命令来执行此操作:
for i in range (1979,2021):
InputFileNames = 'path to daily files\ea_wind_' + str(i) + '*.nc'
我感兴趣的是创建一个dask.array.Array,它在compute()之前/之后打开和关闭资源。但是,我不想假设最终用户将如何调用compute,我希望避免创建自定义的dask Array子类或代理对象,因此我试图将操作嵌入到数组的底层__dask_graph__中。
(旁白:请暂时忽略关于在dask中使用有状态对象的警告,我知道其中的风险,这个问题只是关于任务图的操作)。
请考虑下面的类,该类模拟必须处于打开状态的文件读取器才能读取块,否则就会出现分段错误。
import dask.array as da
import numpy as np
class FileReader:
我正在尝试在惰性dask管道中运行自定义numba向量化/ufunc函数。
当我运行下面的代码时,我会得到一个ValueError: Core dimension 'm' consists of multiple chunks。我不明白为什么m被认为是一个核心维度。知道我能怎么解决这个问题吗?
import numpy as np
import dask.array as da
import numba
from numba import float64
# Define ufunc that directly takes a 3D array and mean reduce
我需要处理大量的浮点数,但是我在x86系统上遇到了内存限制。我不知道最后的长度,所以我需要使用可扩展的类型。在x64系统上,我可以使用<gcAllowVeryLargeObjects>。
我当前的数据类型:
List<RawData> param1 = new List<RawData>();
List<RawData> param2 = new List<RawData>();
List<RawData> param3 = new List<RawData>();
public class RawData