我正在阅读关于dask.distributed的文档,看起来我可以通过client.submit()向分布式集群提交函数。
我有一个现有的函数some_func,它异步获取单个文档(比如一个文本文件),我想获取原始文档并获取所有不包含元音的单词,然后将其放回另一个数据库中。此数据处理步骤正在阻塞。
假设有数百万个文档,而分布式集群只有10个节点和1个可用进程(即一次只能处理10个文档),那么dask.distributed将如何处理它需要处理的文档流?
下面是一些示例代码:
client = dask.distributed('tcp://1.2.3.4:8786')
def some_func():
doc = retrieve_next_document_asynchronously()
client.submit(get_vowelless_words, doc)
def get_vowelless_words(doc):
vowelless_words = process(doc)
write_to_database(vowelless_words)
if __name__ == '__main__':
for i in range(1000000):
some_func()由于一个文档的处理是阻塞的,并且集群只能同时处理10个文档,那么当集群繁忙时检索其他30个文档时会发生什么呢?我知道client.submit()是异步的,它将返回一个并发的未来,但是在这种情况下会发生什么呢?它是否会将文档保存在内存中,直到它有1/10个内核可用,并可能导致机器在等待1000个文档后耗尽内存。
在这种情况下,调度器会做什么?先进先出?我是否应该以某种方式更改代码,使其在检索下一个文档之前等待核心可用?如何做到这一点呢?
发布于 2018-10-06 20:18:54
要将队列与dask一起使用,下面是将dask队列与分布式集群(基于documentation)一起使用的修改后的示例:
#!/usr/bin/env python
import distributed
from queue import Queue
from threading import Thread
client = distributed.Client('tcp://1.2.3.4:8786')
nprocs = len(client.ncores())
def increment(x):
return x+1
def double(x):
return 2*x
input_q = Queue(maxsize=nprocs)
remote_q = client.scatter(input_q)
remote_q.maxsize = nprocs
inc_q = client.map(increment, remote_q)
inc_q.maxsize = nprocs
double_q = client.map(double, inc_q)
double_q.maxsize = nprocs
result_q = client.gather(double_q)
def load_data(q):
i = 0
while True:
q.put(i)
i += 1
load_thread = Thread(target=load_data, args=(input_q,))
load_thread.start()
while True:
size = result_q.qsize()
item = result_q.get()
print(item, size)在这种情况下,我们显式地限制每个队列的最大大小等于可用的分布式进程的数量。否则,while循环将使集群过载。当然,您也可以将maxsize调整为可用进程数的倍数。对于像增量和双精度这样的简单函数,我发现maxsize = 10*nprocs仍然是合理的,但这肯定会受到运行自定义函数所需时间的限制。
发布于 2018-10-05 19:18:28
当您调用submit时,所有参数都被序列化并立即发送到调度程序。另一种方法是获取文档并在集群上处理它们(这假设文档对所有工作进程都是全局可见的)。
for fn in filenames:
doc = client.submit(retrieve_doc, fn)
process = client.submit(process_doc, doc)
fire_and_forget(process)如果文档只在客户机上可用,而您想要限制流,那么可以考虑使用dask队列或as_completed迭代器。
https://stackoverflow.com/questions/52655260
复制相似问题