我感兴趣的是创建一个dask.array.Array
,它在compute()
之前/之后打开和关闭资源。但是,我不想假设最终用户将如何调用compute
,我希望避免创建自定义的dask Array
子类或代理对象,因此我试图将操作嵌入到数组的底层__dask_graph__
中。
(旁白:请暂时忽略关于在dask中使用有状态对象的警告,我知道其中的风险,这个问题只是关于任务图的操作)。
请考虑下面的类,该类模拟必须处于打开状态的文件读取器才能读取块,否则就会出现分段错误。
import dask.array as da
import numpy as np
class FileReader:
_open = True
def open(self):
self._open = True
def close(self):
self._open = False
def to_dask(self) -> da.Array:
return da.map_blocks(
self._dask_block,
chunks=((1,) * 4, 4, 4),
dtype=float,
)
def _dask_block(self):
if not self._open:
raise RuntimeError("Segfault!")
return np.random.rand(1, 4, 4)
如果文件保持打开状态,则一切正常,但如果关闭该文件,则从to_dask
返回的dask数组将失败:
>>> t = FileReader()
>>> darr = t.to_dask()
>>> t.close()
>>> darr.compute() # RuntimeError: Segfault!
当前任务图如下所示:
>>> list(darr.dask)
[
('_dask_block-aa4daac0835bafe001693f9ac085683a', 0, 0, 0),
('_dask_block-aa4daac0835bafe001693f9ac085683a', 1, 0, 0),
('_dask_block-aa4daac0835bafe001693f9ac085683a', 2, 0, 0),
('_dask_block-aa4daac0835bafe001693f9ac085683a', 3, 0, 0)
]
本质上,我想在开始时添加新任务,这是_dask_block
层所依赖的,而任务则是依赖于_dask_block
的。
我尝试直接操作HighLevelGraph
输出的da.map_blocks
来手动添加这些任务,但是发现它们在计算优化过程中被修剪了,因为darr.__dask_keys__()
不包含我的键(而且,我也希望避免子类化或要求最终用户使用特殊的优化标志调用compute
)。
一个解决方案是确保传递给map_blocks的map_blocks函数总是打开并关闭底层资源.但让我们假设开放/关闭过程相对缓慢,有效地破坏了单机并行性的性能。所以我们只想在开始的时候打开,在结尾的时候关闭。
我可以“欺骗”一些以包含打开文件的任务,方法是在对map_blocks
的调用中包含一个新密钥,如下所示:
...
# new method that will run at beginning of compute()
def _pre_compute(self):
was_open = self._open
if not was_open:
self.open()
return was_open
def to_dask(self) -> da.Array:
# new task key
pre_task = 'pre_compute-' + tokenize(self._pre_compute)
arr = da.map_blocks(
self._dask_block,
pre_task, # add key here so all chunks depend on it
chunks=((1,) * 4, 4, 4),
dtype=float,
)
# add key to HighLevelGraph
arr.dask.layers[pre_task] = {pre_task: (self._pre_compute,)}
return da.Array(arr.dask, arr.name, arr.chunks, arr.dtype)
# add "mock" argument to map_blocks function
def _dask_block(self, _):
if not self._open:
raise RuntimeError("Segfault!")
return np.random.rand(1, 4, 4)
到目前为止还不错,再也没有RuntimeError
了.但是现在我已经泄露了文件句柄,因为在结束时没有任何东西关闭它。
那么,我想要的是图末尾的一个任务,它依赖于pre_task
的输出(即是否必须为此计算打开文件),如果必须打开文件,则关闭该文件。
这就是我被困的地方,因为这个post-compute
键会被优化器修剪.
有没有办法做到这一点,而不创建一个自定义数组子类覆盖方法,如__dask_postcompute__
或__dask_keys__
,或要求最终用户调用计算而不进行优化?
发布于 2021-11-17 15:55:53
这是一个非常有趣的问题。我认为您在编辑任务图以包括打开和关闭共享资源的任务方面是正确的。但是手工的图形操作非常繁琐,很难正确处理。
我认为实现您想要的最简单的方法是使用一些相对最近添加的实用程序在dask.graph_manipulation
中进行图形操作。特别是,我认为我们需要bind
(它可以用来向Dask集合添加隐式依赖项)和wait_for
(它可以用来确保集合的依赖者等待另一个无关的集合)。
在修改示例时,我使用这些实用程序创建了各种to_dask()
,这是自打开和关闭的:
import dask
import dask.array as da
import numpy as np
from dask.graph_manipulation import bind, checkpoint, wait_on
class FileReader:
_open = False
def open(self):
self._open = True
def close(self):
self._open = False
def to_dask(self) -> da.Array:
# Delayed version of self.open
@dask.delayed
def open_resource():
self.open()
# Delayed version of self.close
@dask.delayed
def close_resource():
self.close()
opener = open_resource()
arr = da.map_blocks(
self._dask_block,
chunks=((1,) * 4, 4, 4),
dtype=float,
)
# Make sure the array is dependent on `opener`
arr = bind(arr, opener)
closer = close_resource()
# Make sure the closer is dependent on the array being done
closer = bind(closer, arr)
# Make sure dependents of arr happen after `closer` is done
arr, closer = wait_on(arr, closer)
return arr
def _dask_block(self):
if not self._open:
raise RuntimeError("Segfault!")
return np.random.rand(1, 4, 4)
我发现在操作前后查看任务图是很有趣的。
以前,它是一个相对简单的分块数组:
但是在操作之后,您可以看到数组块依赖于open_resource
,然后这些块流到close_resource
中,而close_resource
则让数组块进入更广阔的世界:
https://stackoverflow.com/questions/69967644
复制相似问题