我有一个很大的文本文件,我想在其中处理每一行(执行一些操作),并将它们存储在数据库中。由于单个简单的程序花费的时间太长,我希望通过多个进程或线程来完成。每个线程/进程都应该从单个文件中读取不同的数据(不同的行),并对其数据段(行)执行一些操作,然后将它们放入数据库中,以便最终处理完整个数据,并将我的数据库中包含所需的数据转储。
但是我不知道如何处理这个问题。
发布于 2012-06-26 04:46:25
下面是我编造的一个非常愚蠢的例子:
import os.path
import multiprocessing
def newlinebefore(f,n):
f.seek(n)
c=f.read(1)
while c!='\n' and n > 0:
n-=1
f.seek(n)
c=f.read(1)
f.seek(n)
return n
filename='gpdata.dat' #your filename goes here.
fsize=os.path.getsize(filename) #size of file (in bytes)
#break the file into 20 chunks for processing.
nchunks=20
initial_chunks=range(1,fsize,fsize/nchunks)
#You could also do something like:
#initial_chunks=range(1,fsize,max_chunk_size_in_bytes) #this should work too.
with open(filename,'r') as f:
start_byte=sorted(set([newlinebefore(f,i) for i in initial_chunks]))
end_byte=[i-1 for i in start_byte] [1:] + [None]
def process_piece(filename,start,end):
with open(filename,'r') as f:
f.seek(start+1)
if(end is None):
text=f.read()
else:
nbytes=end-start+1
text=f.read(nbytes)
# process text here. createing some object to be returned
# You could wrap text into a StringIO object if you want to be able to
# read from it the way you would a file.
returnobj=text
return returnobj
def wrapper(args):
return process_piece(*args)
filename_repeated=[filename]*len(start_byte)
args=zip(filename_repeated,start_byte,end_byte)
pool=multiprocessing.Pool(4)
result=pool.map(wrapper,args)
#Now take your results and write them to the database.
print "".join(result) #I just print it to make sure I get my file back ...
这里的棘手之处在于确保我们将文件拆分为换行符,这样您就不会遗漏任何行(或者只读取部分行)。然后,每个进程读取它在文件中的一部分,并返回一个可由主线程放入数据库的对象。当然,您甚至可能需要分块完成这一部分,这样就不必一次将所有信息都保存在内存中。(这很容易实现--只需将"args“列表拆分为X个块,然后调用pool.map(wrapper,chunk)
--参见here)
发布于 2012-06-26 04:09:10
我们将单个大文件分解为多个小文件,并在单独的线程中处理每个文件。
https://stackoverflow.com/questions/11196367
复制相似问题