2012-12-30 46 views
3

我最近张贴的问题Using multiprocessing for finding network paths,并高兴地已主动提出@unutbupython多处理挂起,潜在的队列内存错误?

很好地解决执行test_workers()(利用多处理)功能时,我却已经陷入困境。该代码运行,但挂起了大量的节点N在我的网络中G

运行使用Mac OS X Lion 10.7.5 - python 2.7时,它挂起时,N> 500。测井带来下述信息,之后将其挂起

[DEBUG/MainProcess] doing self._thread.start() 
[DEBUG/MainProcess] starting thread to feed data to pipe 
[DEBUG/MainProcess] ... done self._thread.start() 

运行在通过VMware融合视窗7有利于较大的网络,但最终与围绕其中N> 20000个节点(I将理想地要使用此网络上向上图表挂起到N = 500,000)。来自窗边悬挂点的消息:

[DEBUG/MainProcess] starting thread to feed data to pipe 
[DEBUG/MainProcess] ... done self._thread.start()[DEBUG/MainProcess] telling queue thread to quit 
Traceback (most recent call last): 
     File "C:\Users\Scott\Desktop\fp_test.py", line 75, in <module> 
    Traceback (most recent call last): 
      File "C:\Python27\lib\multiprocessing\queues.py", line 264, in _feed 
    test_workers() 
    MemoryError 

我想知道是否有人对此有何看法?并且如果有任何关于如何使这项工作适用于大型网络的建议?

非常感谢您的任何建议,您可能有。

@ unutbu代码:

import networkx as nx 
import multiprocessing as mp 
import random 
import sys 
import itertools as IT 
import logging 
logger = mp.log_to_stderr(logging.DEBUG) 


def worker(inqueue, output): 
    result = [] 
    count = 0 
    for pair in iter(inqueue.get, sentinel): 
     source, target = pair 
     for path in nx.all_simple_paths(G, source = source, target = target, 
             cutoff = None): 
      result.append(path) 
      count += 1 
      if count % 10 == 0: 
       logger.info('{c}'.format(c = count)) 
    output.put(result) 

def test_workers(): 
    result = [] 
    inqueue = mp.Queue() 
    for source, target in IT.product(sources, targets): 
     inqueue.put((source, target)) 
    procs = [mp.Process(target = worker, args = (inqueue, output)) 
      for i in range(mp.cpu_count())] 
    for proc in procs: 
     proc.daemon = True 
     proc.start() 
    for proc in procs:  
     inqueue.put(sentinel) 
    for proc in procs: 
     result.extend(output.get()) 
    for proc in procs: 
     proc.join() 
    return result 

def test_single_worker(): 
    result = [] 
    count = 0 
    for source, target in IT.product(sources, targets): 
     for path in nx.all_simple_paths(G, source = source, target = target, 
             cutoff = None): 
      result.append(path) 
      count += 1 
      if count % 10 == 0: 
       logger.info('{c}'.format(c = count)) 

    return result 

sentinel = None 

seed = 1 
m = 1 
N = 1340//m 
G = nx.gnm_random_graph(N, int(1.7*N), seed) 
random.seed(seed) 
sources = [random.randrange(N) for i in range(340//m)] 
targets = [random.randrange(N) for i in range(1000//m)] 
output = mp.Queue() 

if __name__ == '__main__': 
    test_workers() 
    # test_single_worker() 
    # assert set(map(tuple, test_workers())) == set(map(tuple, test_single_worker())) 

回答

2

您在死锁附带的logging模块。

该模块保留了一些线程锁以允许跨线程进行安全日志记录,但是在当前进程分叉时它不能很好地运行。例如,请参阅here以了解正在发生的情况。

解决方法是取消logging调用或使用普通的print

无论如何,作为一般规则,避免使用线程+分叉。并且始终检查哪些模块在幕后使用线程。

请注意,在Windows上,它的工作原理很简单,因为windows没有fork,因此没有锁定克隆随后死锁的问题。 在这种情况下,MemoryError表明该进程消耗的RAM太多。 您可能不得不重新考虑使用较少RAM的算法,但它与您在OSX上遇到的问题完全不同。

+0

删除了日志记录调用后,我仍然发现代码挂在OSX上。你认为其他模块是否保持线程锁定? (以及我怎样才能找到答案?) - 另外;你知道我使用的队列大小是否有限制吗? –

+0

@scott_ouce搜索一下我在跟踪'multiprocessing'的问题中发现了这个[issue](http://bugs.python.org/issue7200)。它可能与你的问题有关,因为它涉及到MacOSX和'mp.Queue'。 无论如何,它似乎真的很奇怪,因为我没有在代码中看到可能的weakref问题。 – Bakuriu