首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >异步实时并行分布式Dask

异步实时并行分布式Dask
EN

Stack Overflow用户
提问于 2018-10-05 05:16:56
回答 2查看 411关注 0票数 0

我正在阅读关于dask.distributed的文档,看起来我可以通过client.submit()向分布式集群提交函数。

我有一个现有的函数some_func,它异步获取单个文档(比如一个文本文件),我想获取原始文档并获取所有不包含元音的单词,然后将其放回另一个数据库中。此数据处理步骤正在阻塞。

假设有数百万个文档,而分布式集群只有10个节点和1个可用进程(即一次只能处理10个文档),那么dask.distributed将如何处理它需要处理的文档流?

下面是一些示例代码:

代码语言:javascript
运行
复制
client = dask.distributed('tcp://1.2.3.4:8786')

def some_func():
    doc = retrieve_next_document_asynchronously() 
    client.submit(get_vowelless_words, doc)

def get_vowelless_words(doc):
    vowelless_words = process(doc)
    write_to_database(vowelless_words)

if __name__ == '__main__':
    for i in range(1000000):
        some_func()

由于一个文档的处理是阻塞的,并且集群只能同时处理10个文档,那么当集群繁忙时检索其他30个文档时会发生什么呢?我知道client.submit()是异步的,它将返回一个并发的未来,但是在这种情况下会发生什么呢?它是否会将文档保存在内存中,直到它有1/10个内核可用,并可能导致机器在等待1000个文档后耗尽内存。

在这种情况下,调度器会做什么?先进先出?我是否应该以某种方式更改代码,使其在检索下一个文档之前等待核心可用?如何做到这一点呢?

EN

回答 2

Stack Overflow用户

发布于 2018-10-06 20:18:54

要将队列与dask一起使用,下面是将dask队列与分布式集群(基于documentation)一起使用的修改后的示例:

代码语言:javascript
运行
复制
#!/usr/bin/env python

import distributed
from queue import Queue
from threading import Thread

client = distributed.Client('tcp://1.2.3.4:8786')
nprocs = len(client.ncores())

def increment(x):
    return x+1

def double(x):
    return 2*x

input_q = Queue(maxsize=nprocs)
remote_q = client.scatter(input_q)
remote_q.maxsize = nprocs
inc_q = client.map(increment, remote_q)
inc_q.maxsize = nprocs
double_q = client.map(double, inc_q)
double_q.maxsize = nprocs
result_q = client.gather(double_q)

def load_data(q):
    i = 0
    while True:
        q.put(i)
        i += 1

load_thread = Thread(target=load_data, args=(input_q,))
load_thread.start()

while True:
    size = result_q.qsize()
    item = result_q.get()
    print(item, size)

在这种情况下,我们显式地限制每个队列的最大大小等于可用的分布式进程的数量。否则,while循环将使集群过载。当然,您也可以将maxsize调整为可用进程数的倍数。对于像增量和双精度这样的简单函数,我发现maxsize = 10*nprocs仍然是合理的,但这肯定会受到运行自定义函数所需时间的限制。

票数 1
EN

Stack Overflow用户

发布于 2018-10-05 19:18:28

当您调用submit时,所有参数都被序列化并立即发送到调度程序。另一种方法是获取文档并在集群上处理它们(这假设文档对所有工作进程都是全局可见的)。

代码语言:javascript
运行
复制
for fn in filenames:
    doc = client.submit(retrieve_doc, fn)
    process = client.submit(process_doc, doc)
    fire_and_forget(process)

如果文档只在客户机上可用,而您想要限制流,那么可以考虑使用dask队列或as_completed迭代器。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/52655260

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档