首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >补丁中的线程和队列

补丁中的线程和队列
EN

Stack Overflow用户
提问于 2018-11-06 23:42:02
回答 3查看 44关注 0票数 0

我有大约1000万个条目需要处理。目前,我逐个检查条目并运行子例程,当处理1000个条目时,我打开一个csv文件并将结果保存到其中。

代码语言:javascript
复制
for num, i in enumerate (some iterator)
       function(i)
       if num is multiple of 1000
              open file and save

如何在使用队列和线程的同时完成保存部分?现在,我将10个mill条目放入队列,并启动线程来运行子例程。它是有效的,但我不能理解节省的部分。

代码语言:javascript
复制
put all entries in queue
for i in number of threads
    run function
    start thread
EN

回答 3

Stack Overflow用户

发布于 2018-11-06 23:48:48

所以有几件事。您将希望每个线程写入一个单独的文件,然后在最后合并这些文件。使用锁定机制是可行的,但可能会迫使您的应用程序将性能返回到单个线程,这取决于您向CSV写入的数量。

这里有一个创建池和队列的很好的教程:

https://www.metachris.com/2016/04/python-threadpool/

和:

Threading pool similar to the multiprocessing Pool?

最后,您将希望合并您的文件(如果需要),最好在操作系统级别执行此操作,但在python中,您可以这样做:

代码语言:javascript
复制
filenames = ['file1.txt', 'file2.txt', ...]
with open('path/to/output/file', 'w') as outfile:
    for fname in filenames:
        with open(fname) as infile:
            for line in infile:
                outfile.write(line)
票数 0
EN

Stack Overflow用户

发布于 2018-11-06 23:49:19

这是假设您已经设置了所有其他线程

在初始化线程的位置,需要创建线程锁对象

代码语言:javascript
复制
threadLock = threading.Lock()

然后,在你要写的函数中,你可以做一些类似这样的事情:

代码语言:javascript
复制
for num, i in enumerate (some iterator)
    function(i)
    if num is multiple of 1000

        threadLock.acquire()
        #open file with an append
        #save
        #close file
        threadLock.release()

threadLock.acquire()可能需要放在if语句之前

锁定某部分代码的其他线程或访问共享变量(如文件)时,它们必须在那里轮流等待才能通过门,而另一个线程已经在使用它

票数 0
EN

Stack Overflow用户

发布于 2018-11-07 00:06:14

使用"secret sauce" of CPython threading -- Queues!

对文件的写入本质上是顺序的,因此您可能会让一个线程负责所有的写入。让所有工作线程将其结果推送到公共输出队列中。让单个写入器线程从该输出队列中读取,并每1000个条目或当所有工作线程完成时写入csv。

通过这种方式,您可以避免需要锁或在之后合并部分文件的麻烦。

这是我建议的基本结构。它创建2500个条目,用5个线程处理它们,并在每10个结果后输出:

代码语言:javascript
复制
import queue
import threading
SENTINEL = None

def worker(in_queue, out_queue):
    for n in iter(in_queue.get, SENTINEL):
        # print('task called: {n}'.format(n=n))
        out_queue.put(n*2)

def write(out_queue, chunksize=10):
    results = []
    for n in iter(out_queue.get, SENTINEL):
        results.append(n)
        if len(results) >= chunksize:
            print(results)
            results = []
    if len(results):
        # SENTINEL signals the worker threads are done.
        # print the remainder of the results
        print(results)

in_queue = queue.Queue()
out_queue = queue.Queue()
num_threads = 5

N = 2500
for i in range(N):
    in_queue.put(i)

for i in range(num_threads):
    # ad a SENTINEL to tell each worker to end
    in_queue.put(SENTINEL)

writer = threading.Thread(target=write, args=(out_queue,))
writer.start()
threads = [threading.Thread(target=worker, args=(in_queue, out_queue))
           for n in range(num_threads)]

for t in threads:
    t.start()

for t in threads:
    t.join()

# tell the writer to end
out_queue.put(SENTINEL)            
writer.join()

哪种打印

代码语言:javascript
复制
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
[20, 22, 24, 26, 28, 30, 32, 34, 36, 38]
[40, 42, 44, 46, 48, 50, 52, 54, 56, 58]
...
[4940, 4942, 4944, 4946, 4948, 4950, 4952, 4954, 4956, 4958]
[4960, 4962, 4964, 4966, 4968, 4970, 4972, 4974, 4976, 4978]
[4980, 4982, 4984, 4986, 4988, 4990, 4992, 4994, 4996, 4998]

请注意,打印的值可能不会按排序顺序显示。这取决于并发线程将结果推送到out_queue中的顺序。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/53175207

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档