我有大约1000万个条目需要处理。目前,我逐个检查条目并运行子例程,当处理1000个条目时,我打开一个csv文件并将结果保存到其中。
for num, i in enumerate (some iterator)
function(i)
if num is multiple of 1000
open file and save如何在使用队列和线程的同时完成保存部分?现在,我将10个mill条目放入队列,并启动线程来运行子例程。它是有效的,但我不能理解节省的部分。
put all entries in queue
for i in number of threads
run function
start thread发布于 2018-11-06 23:48:48
所以有几件事。您将希望每个线程写入一个单独的文件,然后在最后合并这些文件。使用锁定机制是可行的,但可能会迫使您的应用程序将性能返回到单个线程,这取决于您向CSV写入的数量。
这里有一个创建池和队列的很好的教程:
https://www.metachris.com/2016/04/python-threadpool/
和:
Threading pool similar to the multiprocessing Pool?
最后,您将希望合并您的文件(如果需要),最好在操作系统级别执行此操作,但在python中,您可以这样做:
filenames = ['file1.txt', 'file2.txt', ...]
with open('path/to/output/file', 'w') as outfile:
for fname in filenames:
with open(fname) as infile:
for line in infile:
outfile.write(line)发布于 2018-11-06 23:49:19
这是假设您已经设置了所有其他线程
在初始化线程的位置,需要创建线程锁对象
threadLock = threading.Lock()然后,在你要写的函数中,你可以做一些类似这样的事情:
for num, i in enumerate (some iterator)
function(i)
if num is multiple of 1000
threadLock.acquire()
#open file with an append
#save
#close file
threadLock.release()threadLock.acquire()可能需要放在if语句之前
锁定某部分代码的其他线程或访问共享变量(如文件)时,它们必须在那里轮流等待才能通过门,而另一个线程已经在使用它
发布于 2018-11-07 00:06:14
使用"secret sauce" of CPython threading -- Queues!
对文件的写入本质上是顺序的,因此您可能会让一个线程负责所有的写入。让所有工作线程将其结果推送到公共输出队列中。让单个写入器线程从该输出队列中读取,并每1000个条目或当所有工作线程完成时写入csv。
通过这种方式,您可以避免需要锁或在之后合并部分文件的麻烦。
这是我建议的基本结构。它创建2500个条目,用5个线程处理它们,并在每10个结果后输出:
import queue
import threading
SENTINEL = None
def worker(in_queue, out_queue):
for n in iter(in_queue.get, SENTINEL):
# print('task called: {n}'.format(n=n))
out_queue.put(n*2)
def write(out_queue, chunksize=10):
results = []
for n in iter(out_queue.get, SENTINEL):
results.append(n)
if len(results) >= chunksize:
print(results)
results = []
if len(results):
# SENTINEL signals the worker threads are done.
# print the remainder of the results
print(results)
in_queue = queue.Queue()
out_queue = queue.Queue()
num_threads = 5
N = 2500
for i in range(N):
in_queue.put(i)
for i in range(num_threads):
# ad a SENTINEL to tell each worker to end
in_queue.put(SENTINEL)
writer = threading.Thread(target=write, args=(out_queue,))
writer.start()
threads = [threading.Thread(target=worker, args=(in_queue, out_queue))
for n in range(num_threads)]
for t in threads:
t.start()
for t in threads:
t.join()
# tell the writer to end
out_queue.put(SENTINEL)
writer.join()哪种打印
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
[20, 22, 24, 26, 28, 30, 32, 34, 36, 38]
[40, 42, 44, 46, 48, 50, 52, 54, 56, 58]
...
[4940, 4942, 4944, 4946, 4948, 4950, 4952, 4954, 4956, 4958]
[4960, 4962, 4964, 4966, 4968, 4970, 4972, 4974, 4976, 4978]
[4980, 4982, 4984, 4986, 4988, 4990, 4992, 4994, 4996, 4998]请注意,打印的值可能不会按排序顺序显示。这取决于并发线程将结果推送到out_queue中的顺序。
https://stackoverflow.com/questions/53175207
复制相似问题