首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
社区首页 >问答首页 >在dask.Array任务图中嵌入前/后计算操作

在dask.Array任务图中嵌入前/后计算操作
EN

Stack Overflow用户
提问于 2021-11-14 14:20:00
回答 1查看 135关注 0票数 1

我感兴趣的是创建一个dask.array.Array,它在compute()之前/之后打开和关闭资源。但是,我不想假设最终用户将如何调用compute,我希望避免创建自定义的dask Array子类或代理对象,因此我试图将操作嵌入到数组的底层__dask_graph__中。

(旁白:请暂时忽略关于在dask中使用有状态对象的警告,我知道其中的风险,这个问题只是关于任务图的操作)。

请考虑下面的类,该类模拟必须处于打开状态的文件读取器才能读取块,否则就会出现分段错误。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
import dask.array as da
import numpy as np

class FileReader:
    _open = True

    def open(self):
        self._open = True

    def close(self):
        self._open = False

    def to_dask(self) -> da.Array:
        return da.map_blocks(
            self._dask_block,
            chunks=((1,) * 4, 4, 4),
            dtype=float,
        )

    def _dask_block(self):
        if not self._open:
            raise RuntimeError("Segfault!")
        return np.random.rand(1, 4, 4)

如果文件保持打开状态,则一切正常,但如果关闭该文件,则从to_dask返回的dask数组将失败:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
>>> t = FileReader()
>>> darr = t.to_dask()
>>> t.close()
>>> darr.compute()  # RuntimeError: Segfault!

当前任务图如下所示:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
>>> list(darr.dask)
[
    ('_dask_block-aa4daac0835bafe001693f9ac085683a', 0, 0, 0),
    ('_dask_block-aa4daac0835bafe001693f9ac085683a', 1, 0, 0),
    ('_dask_block-aa4daac0835bafe001693f9ac085683a', 2, 0, 0),
    ('_dask_block-aa4daac0835bafe001693f9ac085683a', 3, 0, 0)
]

本质上,我想在开始时添加新任务,这是_dask_block层所依赖的,而任务则是依赖于_dask_block的。

我尝试直接操作HighLevelGraph输出的da.map_blocks来手动添加这些任务,但是发现它们在计算优化过程中被修剪了,因为darr.__dask_keys__()不包含我的键(而且,我也希望避免子类化或要求最终用户使用特殊的优化标志调用compute )。

一个解决方案是确保传递给map_blocks的map_blocks函数总是打开并关闭底层资源.但让我们假设开放/关闭过程相对缓慢,有效地破坏了单机并行性的性能。所以我们只想在开始的时候打开,在结尾的时候关闭。

我可以“欺骗”一些以包含打开文件的任务,方法是在对map_blocks的调用中包含一个新密钥,如下所示:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    ...
    
    # new method that will run at beginning of compute()
    def _pre_compute(self):
        was_open = self._open
        if not was_open:
            self.open()
        return was_open

    def to_dask(self) -> da.Array:
        # new task key
        pre_task = 'pre_compute-' + tokenize(self._pre_compute)
        arr = da.map_blocks(
            self._dask_block,
            pre_task,  # add key here so all chunks depend on it
            chunks=((1,) * 4, 4, 4),
            dtype=float,
        )
        # add key to HighLevelGraph
        arr.dask.layers[pre_task] = {pre_task: (self._pre_compute,)}
        return da.Array(arr.dask, arr.name, arr.chunks, arr.dtype)

    # add "mock" argument to map_blocks function
    def _dask_block(self, _):
        if not self._open:
            raise RuntimeError("Segfault!")
        return np.random.rand(1, 4, 4)

到目前为止还不错,再也没有RuntimeError了.但是现在我已经泄露了文件句柄,因为在结束时没有任何东西关闭它。

那么,我想要的是图末尾的一个任务,它依赖于pre_task的输出(即是否必须为此计算打开文件),如果必须打开文件,则关闭该文件。

这就是我被困的地方,因为这个post-compute键会被优化器修剪.

有没有办法做到这一点,而不创建一个自定义数组子类覆盖方法,如__dask_postcompute____dask_keys__,或要求最终用户调用计算而不进行优化?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-11-17 15:55:53

这是一个非常有趣的问题。我认为您在编辑任务图以包括打开和关闭共享资源的任务方面是正确的。但是手工的图形操作非常繁琐,很难正确处理。

我认为实现您想要的最简单的方法是使用一些相对最近添加的实用程序在dask.graph_manipulation中进行图形操作。特别是,我认为我们需要bind (它可以用来向Dask集合添加隐式依赖项)和wait_for (它可以用来确保集合的依赖者等待另一个无关的集合)。

在修改示例时,我使用这些实用程序创建了各种to_dask(),这是自打开和关闭的:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
import dask
import dask.array as da
import numpy as np
from dask.graph_manipulation import bind, checkpoint, wait_on


class FileReader:
    _open = False

    def open(self):
        self._open = True

    def close(self):
        self._open = False

    def to_dask(self) -> da.Array:
        # Delayed version of self.open
        @dask.delayed
        def open_resource():
            self.open()
        # Delayed version of self.close
        @dask.delayed
        def close_resource():
            self.close()
            
        opener = open_resource()
        arr = da.map_blocks(
            self._dask_block,
            chunks=((1,) * 4, 4, 4),
            dtype=float,
        )
        # Make sure the array is dependent on `opener`
        arr = bind(arr, opener)

        closer = close_resource()
        # Make sure the closer is dependent on the array being done
        closer = bind(closer, arr)
        # Make sure dependents of arr happen after `closer` is done
        arr, closer = wait_on(arr, closer)
        return arr

    def _dask_block(self):
        if not self._open:
            raise RuntimeError("Segfault!")
        return np.random.rand(1, 4, 4)

我发现在操作前后查看任务图是很有趣的。

以前,它是一个相对简单的分块数组:

但是在操作之后,您可以看到数组块依赖于open_resource,然后这些块流到close_resource中,而close_resource则让数组块进入更广阔的世界:

票数 4
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/69967644

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
查看详情【社区公告】 技术创作特训营有奖征文