我想把我的单线程脚本放到一个多线程脚本中,通过并行任务来提高性能。botleneck是请求注册员的延迟,我想启动一个以上的请求来提高性能。
find_document = collection.find({"dns": "ERROR"}, {'domain': 1, '_id': 0})
for d in find_document:
try:
domaine = d['domain']
print(domaine)
w = whois.whois(domaine)
date = w.expiration_date
print date
collection.update({"domain": domaine}, {"$set": {"expire": date}})
except whois.parser.PywhoisError, err:
print "AVAILABLE"
collection.update({"domain": domaine}, {"$set": {"expire": "AVAILABLE"}})
最好的方法是什么?用池和地图?另一种方式?
提前谢谢你的答复。
发布于 2016-12-23 15:30:07
如果您正在使用internet,您可以从线程中看到真正的性能提升,而不会因为能够同时等待多个请求而陷入多重处理的麻烦。但是,无论何时,当您执行并行执行时,在打印到标准输出或编写文件时都会出现潜在的问题。这可以很容易地用一个线程锁固定。
在您的例子中,我只需为每个d in find_document
创建一个线程
每个线程包含几个arg,包括:
我还重新命令您的try-except
限制try块中的行数(良好实践)。为此,我添加了一个else
块,这是一个很好的了解是可能的(也使用for和while循环)。这也使我可以将您的打印语句分组在一起,这样它们就可以被锁定,以防止单独的线程同时打印东西,并使输出无法正常进行。最后,我不知道您的集合对象是什么,如果它的update方法是线程安全的,所以我也将它封装在一个锁中。
import threading
find_document = collection.find({"dns": "ERROR"}, {'domain': 1, '_id': 0})
def foo(d, printlock, updatelock):
domaine = d['domain']
try:
w = whois.whois(domaine) #try to keep only what's necessary in try/except block
except whois.parser.PywhoisError, err:
with printlock:
print(domaine)
print("AVAILABLE")
with updatelock
collection.update({"domain": domaine}, {"$set": {"expire": "AVAILABLE"}})
else:
date = w.expiration_date
with printlock:
print(domaine) #move print statements together so lock doesn't block for long
print(date)
with updatelock
collection.update({"domain": domaine}, {"$set": {"expire": date}})
updatelock = threading.Lock() #I'm not sure this function is thread safe, so we'll take the safe way out and lock it off
printlock = threading.Lock() #make sure only one thread prints at a time
threads = []
for d in find_document: #Create a list of threads and start them all
t = threading.Thread(target=foo, args=(d,printlock,updatelock,))
threads.append(t)
t.start() #start each thread as we create it
for t in threads: #wait for all threads to complete
t.join()
根据您的注释,您有太多的作业要尝试同时运行它们,所以我们需要比前面的示例更类似于多处理池的东西。这样做的方法是设置给定数量的线程,这些线程循环在给定函数上,使用新的参数,直到没有更多的参数可供使用。为了保留我已经编写的代码,我将把它添加为一个也调用foo
的新函数,但是您可以将其全部写到一个函数中。
import threading
find_document = collection.find({"dns": "ERROR"}, {'domain': 1, '_id': 0})
def foo(d, printlock, updatelock):
domaine = d['domain']
try:
w = whois.whois(domaine) #try to keep only what's necessary in try/except block
except whois.parser.PywhoisError, err:
with printlock:
print(domaine)
print("AVAILABLE")
with updatelock:
collection.update({"domain": domaine}, {"$set": {"expire": "AVAILABLE"}})
else:
date = w.expiration_date
with printlock:
print(domaine) #move print statements together so lock doesn't block for long
print(date)
with updatelock:
collection.update({"domain": domaine}, {"$set": {"expire": date}})
def consumer(producer):
while True:
try:
with iterlock: #no idea if find_document.iter is thread safe... assume not
d = producer.next() #unrolling a for loop into a while loop
except StopIteration:
return #we're done
else:
foo(d, printlock, updatelock) #call our function from before
iterlock = threading.Lock() #lock to get next element from iterator
updatelock = threading.Lock() #I'm not sure this function is thread safe, so we'll take the safe way out and lock it off
printlock = threading.Lock() #make sure only one thread prints at a time
producer = iter(find_document) #create an iterator from find_document (expanded syntax of for _ in _ with function calls)
threads = []
for _ in range(16): #Create a list of 16 threads and start them all
t = threading.Thread(target=consumer, args=(producer,))
threads.append(t)
t.start() #start each thread as we create it
for t in threads: #wait for all threads to complete
t.join()
https://stackoverflow.com/questions/41303524
复制相似问题