2016-05-17 89 views
0

以下代码错误偶尔出现一次。如果我只启动一个进程,它工作正常。但我不断增加进程的数量,可能是11,并开始抛出一个错误。python multiprocessing TypeError:'int'对象不可迭代

try: 
    num_workers = int(sys.argv[1]) 
except: 
    num_workers = 1 

someval = 10 
def do_work(in_queue,x): 
    i = 0 
    while True: 
     item = in_queue.get() 
     line_no, line = item 

     # exit signal 
     if line == None: 
      if i > 0 : 
       work.put(i,) 
      # work.put(i) 
      return 
     else: 
      print "value from work " + line.rstrip('\n') 
      i = i + 1 

if __name__ == "__main__": 

    manager = Manager() 
    work = manager.Queue(num_workers) 
    someval = 20 
    print " Number of workers is " + str(num_workers) 
    pool = [] 
    for i in xrange(num_workers): 
     p = Process(target=do_work, args=(work,someval)) 
     p.start() 
     pool.append(p) 
    with open("/home/jay/scripts/a.txt") as f: 
     iters = itertools.chain(f, (None,)*num_workers) 
     for num_and_line in enumerate(iters): 
       work.put(num_and_line) 

    x = 0 
    for p in pool: 
     p.join() 

文件/home/jay/scripts/a.txt有10行。

如果我做

./x.py 7 
    Number of workers is 7 
    value from work 1 
    value from work 2 
    value from work 3 
    value from work 4 
    value from work 5 
    value from work 6 
    value from work 7 
    value from work 8 
    value from work 9 
    value from work 10 
    x is 0 
    all done 

./x.py 11 
Number of workers is 11 
value from work 1 
value from work 2 
value from work 3 
value from work 4 
value from work 5 
value from work 6 
value from work 7 
value from work 8 
value from work 9 
value from work 10 
Process Process-11: 
Traceback (most recent call last): 
    File "/usr/lib64/python2.7/multiprocessing/process.py", line 258, in _bootstrap 
    self.run() 
    File "/usr/lib64/python2.7/multiprocessing/process.py", line 114, in run 
    self._target(*self._args, **self._kwargs) 
    File "./x.py", line 18, in do_work 
    line_no, line = item 
TypeError: 'int' object is not iterable 
x is 0 
all done 
+0

在第18行之前加上'print(repr(item))'这样你就可以知道这个值是什么 –

回答

2

的违规行为do_work work.put(i,)你把int到队列和int被读取,而另一个工人解开。

另外我同意dano使用multiprocessing.Pool更容易和更短。

if __name__ == "__main__": 
    pool = multiprocessing.Pool(num_workers) 
    with open("/home/jay/scripts/a.txt") as f: 
     mapped = pool.map(do_work, f) 

如果您需要i从工人只返回它,它会被存储在mapped

+0

我的真实文件很大 - 大于100 GB。根据另一个线程,“地图将在开始工作之前一次性使用您的文件。”因此,我决定采取这种方法。 http://stackoverflow.com/questions/11196367/processing-single-file-from-multiple-processes-in-python – Jayadevan

2

的问题是,work.put(1,)没有做什么,你认为它。您打算将1元组(1,)放入队列中,但实际上只是将1放入队列中。如果您将该行更改为work.put((1,)),您会看到您期望的行为。

有与num_workers较大值,让你的子过程的在主处理循环完成装载Queue了与(None,)定点值前添加1到队列中的一个竞争条件。对于较小的值num_workers,您可以在任何工作进程将1添加到队列之前通过for循环。

此外,您是否考虑过使用multiprocessing.Pool,而不是使用ProcessQueue手动创建Pool?它会相当简化你的代码。