2012-06-25 33 views
55

我有一个单独的大文本文件,我想在其中处理每一行(执行一​​些操作)并将它们存储在数据库中。由于一个简单的程序花费的时间太长,我希望通过多个进程或线程来完成。 每个线程/进程应该读取来自单个文件的不同数据(不同的行),并对它们的数据(行)进行一些操作并将它们放入数据库中,以便最终处理完整个数据并我的数据库被倾倒了我需要的数据。在python中处理多个进程中的单个文件

但我无法弄清楚如何解决这个问题。

+2

不错的问题。我也有这个疑问。虽然我选择将文件分成更小的文件:) –

回答

70

你所寻找的是一个生产者/消费者模式

基本线程例如

下面是一个使用基本例如(而不是多处理)

import threading 
import Queue 
import sys 

def do_work(in_queue, out_queue): 
    while True: 
     item = in_queue.get() 
     # process 
     result = item 
     out_queue.put(result) 
     in_queue.task_done() 

if __name__ == "__main__": 
    work = Queue.Queue() 
    results = Queue.Queue() 
    total = 20 

    # start for workers 
    for i in xrange(4): 
     t = threading.Thread(target=do_work, args=(work, results)) 
     t.daemon = True 
     t.start() 

    # produce data 
    for i in xrange(total): 
     work.put(i) 

    work.join() 

    # get the results 
    for i in xrange(total): 
     print results.get() 

    sys.exit() 

你止跌与线程共享文件对象。通过向queue提供数据行,您可以为他们生产。然后,每个线程都会选取并处理它,然后将其返回队列中。

multiprocessing module中内置了一些更高级的工具来共享数据,如列表和special kind of Queue。使用多处理vs线程有一定的权衡,它取决于你的工作是cpu绑定还是IO绑定。

基本multiprocessing.Pool例如

这里是一个多池

from multiprocessing import Pool 

def process_line(line): 
    return "FOO: %s" % line 

if __name__ == "__main__": 
    pool = Pool(4) 
    with open('file.txt') as source_file: 
     # chunk the work into batches of 4 lines at a time 
     results = pool.map(process_line, source_file, 4) 

    print results 

A Pool是管理自己的流程便利对象的一个​​很基本的例子。由于打开的文件可以遍历其行,因此可以将它传递给映射,映射将循环并将行传递给工作函数。 Map块完成后返回整个结果。请注意,在一个非常简单的例子中,map将在开始工作之前一次性使用您的文件。所以请注意它是否更大。有更高级的方法来设计制作者/消费者设置。

手册“池”与极限和线重新排序

这是Pool.map的手动的例子,但代替的消耗整个迭代,可以设置一个队列的大小,以便你只供给它可以尽可能快地处理它。我还添加了行号,以便您可以跟踪它们并在以后再使用它们。

from multiprocessing import Process, Manager 
import time 
import itertools 

def do_work(in_queue, out_list): 
    while True: 
     item = in_queue.get() 
     line_no, line = item 

     # exit signal 
     if line == None: 
      return 

     # fake work 
     time.sleep(.5) 
     result = (line_no, line) 

     out_list.append(result) 


if __name__ == "__main__": 
    num_workers = 4 

    manager = Manager() 
    results = manager.list() 
    work = manager.Queue(num_workers) 

    # start for workers  
    pool = [] 
    for i in xrange(num_workers): 
     p = Process(target=do_work, args=(work, results)) 
     p.start() 
     pool.append(p) 

    # produce data 
    with open("source.txt") as f: 
     iters = itertools.chain(f, (None,)*num_workers) 
     for num_and_line in enumerate(iters): 
      work.put(num_and_line) 

    for p in pool: 
     p.join() 

    # get the results 
    # example: [(1, "foo"), (10, "bar"), (0, "start")] 
    print sorted(results) 
+0

是的,该文件较大,大约1 GB左右。我不知道你的意思是说更大,1 GB对我来说更大。 – pranavk

+0

这很好。我确定你可以采取这些例子并推断你的需求。线程一个就好了。多处理器只需要一个类似的队列供您使用。 – jdi

+1

这很好,但如果处理是I/O限制呢?在这种情况下,并行可能会减慢速度,而不是加快速度。在单个磁盘轨道内搜索比intertrack寻求的要快得多,并行I/O往往会引入intertrack寻求否则会是顺序I/O负载。为了从并行I/O中获得一些好处,有时使用RAID镜像有时会有所帮助。 – user1277476

-4

将单个大文件分解为多个较小的文件,并将它们中的每一个在单独的线程中处理。

+4

可以显示一些代码吗? – maq

+0

这不是OP想要的!但只是一个想法...不错。 – DRPK

5

下面是我做了一个非常愚蠢的例子:

import os.path 
import multiprocessing 

def newlinebefore(f,n): 
    f.seek(n) 
    c=f.read(1) 
    while c!='\n' and n > 0: 
     n-=1 
     f.seek(n) 
     c=f.read(1) 

    f.seek(n) 
    return n 

filename='gpdata.dat' #your filename goes here. 
fsize=os.path.getsize(filename) #size of file (in bytes) 

#break the file into 20 chunks for processing. 
nchunks=20 
initial_chunks=range(1,fsize,fsize/nchunks) 

#You could also do something like: 
#initial_chunks=range(1,fsize,max_chunk_size_in_bytes) #this should work too. 


with open(filename,'r') as f: 
    start_byte=sorted(set([newlinebefore(f,i) for i in initial_chunks])) 

end_byte=[i-1 for i in start_byte] [1:] + [None] 

def process_piece(filename,start,end): 
    with open(filename,'r') as f: 
     f.seek(start+1) 
     if(end is None): 
      text=f.read() 
     else: 
      nbytes=end-start+1 
      text=f.read(nbytes) 

    # process text here. createing some object to be returned 
    # You could wrap text into a StringIO object if you want to be able to 
    # read from it the way you would a file. 

    returnobj=text 
    return returnobj 

def wrapper(args): 
    return process_piece(*args) 

filename_repeated=[filename]*len(start_byte) 
args=zip(filename_repeated,start_byte,end_byte) 

pool=multiprocessing.Pool(4) 
result=pool.map(wrapper,args) 

#Now take your results and write them to the database. 
print "".join(result) #I just print it to make sure I get my file back ... 

这里最棘手的部分是确保我们分裂的换行符的文件,这样你就不会错过任何线(或只读取部分行)。然后,每个进程读取它是文件的一部分,并返回一个可由主线程放入数据库的对象。当然,你甚至可能需要以块的形式完成这部分,这样你就不必一次把所有的信息保存在内存中。 (这很容易实现 - 只需将“参数”列表拆分为X块并拨打pool.map(wrapper,chunk) - 请参阅here

相关问题