我正在寻找最好的方法来计算存储在数据帧中的许多dask delayed指令。我不确定是否应该将pandas数据帧转换为包含delayed对象的dask数据帧,或者是否应该对pandas数据帧的所有值调用compute调用。
我非常感谢大家的建议,因为我在通过嵌套的for循环传递delayed对象的逻辑上遇到了问题。
import numpy as np
import pandas as pd
from scipy.stats import hypergeom
from dask import delayed, compute
steps = 5
sample = [int(x) for x
我正在使用Dask数组生成分布在几个节点中的大型(65kx65kx3) 3D信号。在下一步中,我需要使用存储在Dask包中的切片从这个数组中提取几千个瓦片。我的代码如下所示: import dask.array as da
import dask.bag as db
from dask.distributed import Client
def pick_tile(window, signal):
return np.array(surface[window])
def computation_on_tile(signal_tile):
# do some rather s
我正在构建一个FastAPI应用程序,该应用程序将为Dask的块提供服务。我想把和结合起来。下面是一个mcve,它演示了我试图在应用程序的服务器和客户端做什么:
服务器端:
import time
import dask.array as da
import numpy as np
import uvicorn
from dask.distributed import Client
from fastapi import FastAPI
app = FastAPI()
# create a dask array that we can serve
data = da.from_array(
我试图在不同数组中的所有元素的不同组合上与dask一起运行一个函数,并且我很难应用它。
串行代码如下:
for i in range(5):
for j in range(5):
for k in range(5):
function(listA[i],listB[j],listC[k])
print(f'{i}.{j}.{k}')
k=k+1
j=j+1
i=i+1
这段代码在我的计算机上运行时间是18分钟,而每个数组只有5个元素,我想在更大的数组上并行运行
我必须处理许多大的(大约10 to ) CSV文件。我目前正在使用Dask将数据预处理成一些聚合的统计数据,然后使用常规的Pandas进一步分析。 我遇到的问题是Dask会在每次调用compute()时重新加载数据。一些用来说明这个问题的虚拟代码: import dask.dataframe as dd
ddf = dd.read_csv('very_large_file.csv') # ca. 10GB
# Every line seems to trigger painfully slow re-reading of the CSV file from disk!
基于在上收到的答复,我编写了一个ETL过程,如下所示:
import pandas as pd
from dask import delayed
from dask import dataframe as dd
def preprocess_files(filename):
"""Reads file, collects metadata and identifies lines not containing data.
"""
...
return filename, metadata, skiprows
d
我对Python中的并行循环与Matlab中的parloop相比有多低感兴趣。在这里,我提出了一个简单的寻根问题,强迫a和b之间的初始猜测10^6。
import numpy as np
from scipy.optimize import root
import matplotlib.pyplot as plt
import multiprocessing
# define the function to find the roots
func = lambda x: np.sin(3*np.pi*np.cos(np.pi*x)*np.sin(np.pi*x))
def forfunc(
我定义了一个cpu绑定函数。
def countdown(n):
while n > 0:
n -= 1
在我的笔记本电脑上运行countdown(50000000)需要2.16秒。
首先,我测试multiprocess并行化。
from multiprocess import Pool
with Pool(2) as p:
l=p.map(countdown,[50000000,50000000])
需要2.46秒,这是一个很好的并行化。
然后,我测试了dask进程调度器并行化。
l=[dask.delayed(countdown)(50000000),
我在一起计算了许多dask延迟对象,导致文件被写入磁盘,例如在这个玩具示例中:
import xarray as xr
import dask.array as da
objs = []
for i in range(10):
ds = xr.Dataset({"x": (("a",), da.arange(10)*i)})
objs.append(ds.to_netcdf(f"/tmp/test{i:d}.nc", compute=False))
da.compute(objs)
一旦计算成功,是否有方法为每个延迟对象执行一些
我有一个简单的(但很大)任务图在达斯克。这是一个代码示例
results = []
for params in SomeIterable:
a = dask.delayed(my_function)(**params)
b = dask.delayed(my_other_function)(a)
results.append(b)
dask.compute(**results)
这里,SomeIterable是一个dict列表,其中每个参数都是my_function的参数。在每次迭代中,b依赖于a,因此如果生成a的任务失败,则无法计算b。但是,results的每个元
我试图使用这样的方式来使用dask分发来并行一个嵌套循环:
@dask.delayed
def delayed_a(e):
a = do_something_with(e)
return something
@dask.delayed
def delayed_b(element):
computations = []
for e in element:
computations.add(delayed_a(e))
b = dask.compute(*computations, scheduler='distributed
我试图用脚本并行读取16个gzip文件的内容:
import gzip
import glob
from dask import delayed
from dask.distributed import Client, LocalCluster
@delayed
def get_gzip_delayed(gzip_file):
with gzip.open(gzip_file) as f:
reads = f.readlines()
reads = [read.decode("utf-8") for read in reads]
re
我正在用Dask构建一个计算图。有些中间值将被多次使用,但我希望这些计算只运行一次。我一定是犯了个小错误,因为这不是事实。下面是一个很小的例子:
In [1]: import dask
dask.__version__
Out [1]: '1.0.0'
In [2]: class SumGenerator(object):
def __init__(self):
self.sources = []
def register(sel