在Python中,当使用多处理模块时,有两种队列:
队列
JoinableQueue。
他们之间有什么区别?
队列
from multiprocessing import Queue
q = Queue()
q.put(item) # Put an item on the queue
item = q.get() # Get an item from the queue
JoinableQueue
from multiprocessing import JoinableQueue
q = JoinableQueue()
q.task_done() # Signal task c
下面的代码会导致Nuke挂起。基本上,我要做的是从文件系统中获取文件和文件夹的列表,并试图通过并行处理来加快它的速度。这完全可以在Nuke之外工作,但正如我前面说过的,在Nuke中运行这个会导致Nuke挂起。有什么更好的方法可以让努克不被绞死吗?最好,我希望通过Python的标准库或平台无关的包来修复这个问题。但是,如果没办法这么做的话,那我也没意见。最坏的情况下,我将不得不回到不使用并行处理和找到其他优化。
此外,当我在Nuke中运行这段代码时,控制台中会出现以下错误:
Unknown units in -c from multiprocessing.forking import main;
我有以下Python3代码: from multiprocessing import Process, JoinableQueue, Value
from multiprocessing.managers import SyncManager
def test():
global numfail
global queue
while not queue.empty():
number = queue.get()
with numfail.get_lock():
numfail.value += number
在Python语言中,多处理中的队列和JoinableQueue有什么不同?这个问题已经被问到了here,但正如一些评论指出的那样,被接受的答案没有什么帮助,因为它所做的一切就是引用文档。有人能解释一下什么时候使用一个和另一个有什么区别吗?例如,如果除了提供两个额外的方法join()和task_done()之外,JoinableQueue几乎是相同的东西,那么为什么要选择使用Queue而不是JoinableQueue。此外,我链接的帖子中的另一个答案提到了Based on the documentation, it's hard to be sure that Queue is ac
我试图与multiprocessing.Process和请求并行运行多个API请求。我将urls放在JoinableQueue实例中进行解析,并将内容放回队列实例。我已经注意到,将response.content放到队列中会在某种程度上阻止进程终止。
下面是一个过程(Python3.5)的简化示例:
import multiprocessing as mp
import queue
import requests
import time
class ChildProcess(mp.Process):
def __init__(self, q, qout):
supe
我想通过使用实现
由于不能在Pool中使用(可以说是RuntimeError: JoinableQueue objects should only be shared between processes through inheritance),所以我必须使用由启发的。
问题是:现在,当消费者的工作岗位比生产者的工作更多时,这个计划可能会挂起。
import queue
import random
from multiprocessing import Manager, Pool
def consumer(q):
while True:
try:
我正在试用Python2.6中新的多处理模块。我正在创建几个进程,每个进程都有自己的multiprocessor.JoinableQueue实例。每个进程产生一个或多个工作线程(threading.Thread的子类),这些线程共享JoinableQueue实例(通过每个线程的__init__方法传入)。它似乎通常可以工作,但偶尔会出现以下错误,无法预测地失败:
File "C:\Documents and Settings\Brian\Desktop\testscript.py", line 49, in run
self.queue.task_done()
给定此Python程序:
# commented out code are alternatives I tried that don't work.
from multiprocessing import Process, Queue
#from multiprocessing import Process, JoinableQueue as Queue
def start_process(queue):
# queue.cancel_join_thread()
while True:
print queue.get()
if __name__
我是Python新手,我想我会制作一个简单的自动点击器,作为一个很酷的入门项目。
我希望用户能够指定一个单击间隔,然后用热键打开或关闭自动单击。
我知道Ctrl,您将在我当前的代码中看到这一点,但我希望程序能够正常工作,这样就不必在python窗口中激活热键。
import pyautogui, sys
print("Press Ctrl + C to quit.")
interval = float(input("Please give me an interval for between clicks in seconds: "))
try:
它和Windows上的finder一样,但是使用线程来获得更快的速度,
import os,threading,multiprocessing
def finder(path,q):
for x in os.walk(unicode(path)):
if x[1]:
for dirname in x[1]:
if target in dirname.lower():
q.put(os.path.join(x[0],dirname))
if x[2]:
for name in x[
#Each pic's size is 1280x720
import time
from multiprocessing import Process,JoinableQueue,Queue,Value
**def create_row(M, q_row,y_val):** #creating lines. len:720 len of each
line_len=720
col_len=1280
for i in range(line_len):
q_row.put(M[(i*col_len):((i+1)*col_l
我有个问题。我正在执行一项并行任务。我得到了这个错误:
Traceback (most recent call last):
File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
self.run()
File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File
我正在尝试将运行在100k+文件上的计算结果写入单个文件。处理一个文件需要大约1秒,并将一行写入输出文件。问题本身是“令人尴尬的并行的”,我只是在努力正确地写入一个文件(比如CSV)。这是很久以前对我起作用的方法(Python 3.4?): import os
from multiprocessing import Process, JoinableQueue
from joblib import Parallel, delayed
def save_to_file(q):
with open('test.csv', 'w') as out:
我试图在代码中使用python multiprocessing.Queue:
import multiprocessing as mp
import datetime as dt
def function_to_get_from_q(Queue):
#while not Queue.empty():
print(Queue.get())
def collect(Queue):
for i in range(10000):
Queue.put([i, (dt.datetime.utcnow() + dt.timedelta(hours=5, minut
考虑以下代码:
服务器:
import sys
from multiprocessing.managers import BaseManager, BaseProxy, Process
def baz(aa) :
l = []
for i in range(3) :
l.append(aa)
return l
class SolverManager(BaseManager): pass
class MyProxy(BaseProxy): pass
manager = SolverManager(address=('127.0.0.1'
我想使用多处理模块来加速遍历目录结构。首先,我做了一些研究,发现这个堆栈溢出线程:
然而,当我试图调整线程中的代码时,我仍然遇到了一个问题。下面是我编写的一个小脚本,用于测试池并了解它是如何工作的。
import os
from multiprocessing.pool import Pool
from multiprocessing import Process
from multiprocessing import JoinableQueue as Queue
def scan():
print "Hi!"
while True:
pr
我正在学习Python中的线程,并试图制作一个简单的程序,该程序使用线程从队列中获取一个数字并打印出来。
我有以下代码
import threading
from Queue import Queue
test_lock = threading.Lock()
tests = Queue()
def start_thread():
while not tests.empty():
with test_lock:
if tests.empty():
return
test = tests.
假设我们有一个multiprocessing.Pool,其中工作线程共享一个multiprocessing.JoinableQueue,去排队工作项,并潜在地对更多的工作进行排队:
def worker_main(queue):
while True:
work = queue.get()
for new_work in process(work):
queue.put(new_work)
queue.task_done()
当队列被填满时,queue.put()将阻塞。只要至少有一个进程使用queue.get()
至于multiprocessing中的多线程和多进程池
pool = Pool()
result = pool.map(func, arg)
pool.close()
pool.join()
为什么close和join是确保代码安全所必需的?如果没有它们,它会产生什么坏后果?
在循环中,更好的方法是将这些行放在循环的内部还是外部?
例如,
pool = Pool()
for x in a_ndarray:
result = pool.map(func, x)
save(result)
pool.close()
pool.join()
和
p
当使用多处理池运行时,我发现工作进程一直运行在抛出异常的点上。
考虑以下代码:
import multiprocessing
def worker(x):
print("input: " + x)
y = x + "_output"
raise Exception("foobar")
print("output: " + y)
return(y)
def main():
data = [str(x) for x in range(4)]
pool = multipr
我有一个多处理器程序。每个进程从data获取一个数字,然后将其插入到__queue_out中。
但是有一个问题:当最后一个过程开始的时候,一个无尽的循环开始了,所有的过程都消失了。
import time
import threading
import random
from queue import Queue, PriorityQueue
from multiprocessing import Pool, Process
data = range(1, 1001)
start = time.time()
end_date = time.time() - start
class
我想节省时间,并使用多处理来发出10个get请求。到目前为止我有这样的想法:
# get one text from the url
def get_one_request_text(url, multiprocessing_queue):
response = requests.get(url)
assert response.status_code == 200
multiprocessing_queue.put(response.text)
# urls is a list of links
def get_many_request_texts(urls):
我对在python中使用Asyncio模块很陌生。假设有两个函数我想异步运行。function_A()基本上是在运行循环,这需要在特定条件下用"if“语句停止。
我不确定事件循环到底是如何工作的,只是知道当我运行loop.stop()时,它会杀死内核并重新启动它,这样,在中断事件循环之后要保留的"lst"将在内核被杀死后自动删除。
以下是我想做的事:
global lst
lst = []
import asyncio
async def function_A():
for i in range(0,100):
我在python中使用多处理模块。下面是我使用的代码示例:
import multiprocessing as mp
def function(fun_var1, fun_var2):
b = fun_var1 + fun_var2
# and more computationally intensive stuff happens here
return b
# my program freezes after the return command
class Worker(mp.Process):
def __init__(self, que