1
我看到某处如何处理大型数据集(说的文本行)与多快模块暗示,像这样:python3 multiprocessing.Process方法未能
... (form batch_set = nump batches [= lists of lines to process], batch_set
is a list of lists of strings (batches))
nump = len(batch_set)
output = mp.Queue()
processes = [mp.Process(target=proc_lines, args=(i, output, batch_set[i])) for i in range(nump)]
for p in processes:
p.start()
for p in processes:
p.join()
results = sorted([output.get() for p in processes])
... (do something with the processed outputs, ex print them in order,
given that each proc_lines function returns a couple (i, out_batch))
然而,当我运行代码有少量的行/批次,它工作正常 [例如:'./code.py -x 4:10'为nump = 4和numb = 10(行/批次)],而在 某些行/批处理挂起[例如:'./code.py -x 4:4000'],当我中断它时,我看到一个关于_wait_for_tstate_lock和系统 线程库的追踪提示。看来,代码没有达到最后的代码所示以上 行...
我提供了下面的代码,如果有人需要它来回答这是为什么 发生,如何解决它。
#!/usr/bin/env python3
import sys
import multiprocessing as mp
def fabl(numb, nump):
'''
Form And Batch Lines: form nump[roc] groups of numb[atch] indexed lines
'<idx> my line here' with indexes from 1 to (nump x numb).
'''
ret = []
idx = 1
for _ in range(nump):
cb = []
for _ in range(numb):
cb.append('%07d my line here' % idx)
idx += 1
ret.append(cb)
return ret
def proc_lines(i, output, rows_in):
ret = []
for row in rows_in:
row = row[0:8] + 'some other stuff\n' # replacement for the post-idx part
ret.append(row)
output.put((i,ret))
return
def mp_proc(batch_set):
'given the batch, disperse it to the number of processes and ret the results'
nump = len(batch_set)
output = mp.Queue()
processes = [mp.Process(target=proc_lines, args=(i, output, batch_set[i])) for i in range(nump)]
for p in processes:
p.start()
for p in processes:
p.join()
print('waiting for procs to complete...')
results = sorted([output.get() for p in processes])
return results
def write_set(proc_batch_set, fout):
'write p[rocessed]batch_set'
for _, out_batch in proc_batch_set:
for row in out_batch:
fout.write(row)
return
def main():
args = sys.argv
if len(args) < 2:
print('''
run with args: -x [ NumProc:BatchSize ]
(ex: '-x' | '-x 4:10' (default values) | '-x 4:4000' (hangs...))
''')
sys.exit(0)
numb = 10 # suppose we need this number of lines/batch : BatchSize
nump = 4 # number of processes to use. : NumProcs
if len(args) > 2 and ':' in args[2]: # use another np:bs
nump, numb = map(int, args[2].split(':'))
batch_set = fabl(numb, nump) # proc-batch made in here: nump (groups) x numb (lines)
proc_batch_set = mp_proc(batch_set)
with open('out-min', 'wt') as fout:
write_set(proc_batch_set, fout)
return
if __name__ == '__main__':
main()
所以,基本上,这将意味着,我正在试图通过过多添加到它,从而阻止系统滥用队列。我发现这篇文章说的相同:http://stackoverflow.com/questions/31665328/python-3-multiprocessing-queue-deadlock-when-calling-join-before-the-queue-is-em,以便可能是问题。谢谢! – vuvu
我用p.join()语句替换了一个稍大的块,在p.start()之后,只要有足够的项存储在那里,并且在最后,所有的子过程都已经完成,即使是非常大的批次,这个过程也是成功的。 – vuvu