2012-06-07 52 views
0

我对下面的代码有一个很奇怪的问题。当numrows = 10 Process循环自行完成并继续完成。如果越来越大的清单变得越来越大,就会陷入僵局。为什么是这样的,我该如何解决这个问题?在大循环中多处理,写入文件和死锁

import multiprocessing, time, sys 

# ----------------- Calculation Engine ------------------- 
def feed(queue, parlist): 
    for par in parlist: 
     queue.put(par) 

def calc(queueIn, queueOut): 
    while True: 
     try: 
      par = queueIn.get(block = False) 
      print "Project ID: %s started. " % par 
      res = doCalculation(par) 
      queueOut.put(res) 

     except: 
      break 

def write(queue, fname): 
    print 'Started to write to file' 
    fhandle = open(fname, "w") 
    while True: 
     try: 
      res = queue.get(block = False) 
      for m in res: 
       print >>fhandle, m 
     except: 
      break 
    fhandle.close() 
    print 'Complete writing to the file' 


def doCalculation(project_ID): 
    numrows = 100 
    toFileRowList = [] 

    for i in range(numrows): 
     toFileRowList.append([project_ID]*100) 
     print "%s %s" % (multiprocessing.current_process().name, i) 

    return toFileRowList 


def main(): 
    parlist  = [276, 266] 

    nthreads = multiprocessing.cpu_count() 
    workerQueue = multiprocessing.Queue() 
    writerQueue = multiprocessing.Queue() 

    feedProc = multiprocessing.Process(target = feed , args = (workerQueue, parlist)) 
    calcProc = [multiprocessing.Process(target = calc , args = (workerQueue, writerQueue)) for i in range(nthreads)] 
    writProc = multiprocessing.Process(target = write, args = (writerQueue, 'somefile.csv')) 

    feedProc.start() 
    feedProc.join() 

    for p in calcProc: 
     p.start() 
    for p in calcProc: 
     p.join() 

    writProc.start() 
    writProc.join() 

if __name__=='__main__': 
    sys.exit(main()) 

回答

1

我认为问题是队列缓冲区被填充,所以你需要从队列中读取,然后才可以添加其他的东西。 例如,在你的feed线程您有:

queue.put(par) 

如果你一直把太多的东西不读这将导致它,直到缓冲区块被释放,但问题是,你只释放缓冲区你calc线程,在加入阻塞线程之前,线程又不会启动。

因此,为了您的feed线程完成,缓冲区应该被释放,但缓冲区将不会在线程结束:)

之前被释放,尝试举办队列访问更多。

+0

的确如此。看来缓冲区是问题所在。必须找到解决方法。谢谢。 –

1

feedProc和writeProc实际上并没有与其他程序并行运行。当你有

proc.start() 
proc.join() 

启动过程,然后,在join()你立刻等待它完成。在这种情况下,多处理没有收益,只有开销。尝试在加入它们之前立即启动所有流程。这也会影响你的队列清空regularyl,你不会陷入僵局。

+0

不幸的是,在连接之上加入'writProc.start()'并没有帮助。 –

+0

第一启动所有的处理在calcProc加入其中的任何 'feedProc.start() 对于p之前: p.start() writProc.start() feedProc.join() 对于p在calcProc: p .join() writProc.join()' –