我已经创建了一个脚本,默认情况下会创建一个多处理进程;那么它工作正常。当启动多个进程时,它开始挂起,并不总是在同一个地方。该程序大约有700行代码,所以我会试着总结一下发生了什么。我想通过平行排列DNA序列的最慢任务来充分利用我的多核。为此,我使用子进程模块调用一个命令行程序:'hmmsearch',我可以通过/ dev/stdin按顺序输入,然后通过/ dev/stdout读出对齐的序列。我想象会发生挂起,因为这些多个subprocess实例从stdout/stdin读取/写入,而且我真的不知道最好的方式去做这件事... 我正在调查os.fdopen(...)& os.tmpfile()来创建临时文件句柄或管道,我可以在其中刷新数据。但是,我从来没有使用&之前,我无法想象如何用子进程模块做到这一点。理想情况下,我想完全绕过硬盘驱动器,因为高吞吐量数据处理的管道要好得多! 任何帮助,这将是超级棒!python多处理每个都有自己的子进程(Kubuntu,Mac)
import multiprocessing, subprocess
from Bio import SeqIO
class align_seq(multiprocessing.Process):
def __init__(self, inPipe, outPipe, semaphore, options):
multiprocessing.Process.__init__(self)
self.in_pipe = inPipe ## Sequences in
self.out_pipe = outPipe ## Alignment out
self.options = options.copy() ## Modifiable sub-environment
self.sem = semaphore
def run(self):
inp = self.in_pipe.recv()
while inp != 'STOP':
seq_record , HMM = inp # seq_record is only ever one Bio.Seq.SeqRecord object at a time.
# HMM is a file location.
align_process = subprocess.Popen(['hmmsearch', '-A', '/dev/stdout', '-o',os.devnull, HMM, '/dev/stdin'], shell=False, stdin=subprocess.PIPE, stdout=subprocess.PIPE)
self.sem.acquire()
align_process.stdin.write(seq_record.format('fasta'))
align_process.stdin.close()
for seq in SeqIO.parse(align_process.stdout, 'stockholm'): # get the alignment output
self.out_pipe.send_bytes(seq.seq.tostring()) # send it to consumer
align_process.wait() # Don't know if there's any need for this??
self.sem.release()
align_process.stdout.close()
inp = self.in_pipe.recv()
self.in_pipe.close() #Close handles so don't overshoot max. limit on number of file-handles.
self.out_pipe.close()
在花了一段时间调试这一点,我发现总是在那里,而不是相当尚未解决的问题,但有固定的一些其他低效在这个过程中(调试)。 有两个初始进纸器功能,这个align_seq类和文件分析器parseHMM(),它将位置特定评分矩阵(PSM)加载到字典中。 然后,主父流程将比对与PSM进行比较,使用字典(词典)作为每个残留相关分数的指针。为了计算我想要的分数,我有两个独立的多进程。进程类,一个类logScore(),计算对数比(math.exp());我平行这一个;和它会将所计算出的分数来的最后的方法,sumScore()刚刚总结这些分数(与math.fsum),返回之和所有位置特定分数回作为字典父进程。 即 Queue.put(Σ{残渣位置:位置特异性得分,...}]) 我觉得这是非常混乱围绕我的头,所以我希望读者正在设法(太多队列的!)按照...完成上述所有计算后,我然后选择将累计分数保存为制表符分隔的输出。这是它现在(从昨夜开始)的地方,有时会中断,因为我确保它会为每个应该有得分的位置打印一个分数。我认为由于延迟(计算机时序不同步),有时首先将队列放入队列logScore首先没有达到sumScore。 为了sumScore知道何时返回计数并重新开始计算,我将'endSEQ'放入执行计算的最后一个logScore进程的队列中。我认为那也应该达到sumScore,但情况并非总是如此;只有时候才会突破。所以现在我不再发生死锁了,而是在打印或保存结果时发生KeyError。 我相信有时会得到KeyError的原因是因为我为每个logScore进程创建了一个队列,但是他们应该都使用相同的队列。现在,我有这样的东西: -
class logScore(multiprocessing.Process):
def __init__(self, inQ, outQ):
self.inQ = inQ
...
def scoreSequence(processes, HMMPSM, sequenceInPipe):
process_index = -1
sequence = sequenceInPipe.recv_bytes()
for residue in sequence:
.... ## Get the residue score.
process_index += 1
processes[process_index].inQ.put(residue_score)
## End of sequence
processes[process_index].inQ.put('endSEQ')
logScore_to_sumScoreQ = multiprocessing.Queue()
logScoreProcesses = [ logScore(multiprocessing.Queue() , logScore_to_sumScoreQ) for i in xrange(options['-ncpus']) ]
sumScoreProcess = sumScore(logScore_to_sumScoreQ, scoresOut)
而我应该创建一个队列来共享所有logScore实例。即
logScore_to_sumScoreQ = multiprocessing.Queue()
scoreSeq_to_logScore = multiprocessing.Queue()
logScoreProcesses = [ logScore(scoreSeq_to_logScore , logScore_to_sumScoreQ) for i in xrange(options['-ncpus']) ]
sumScoreProcess = sumScore(logScore_to_sumScoreQ, scoresOut)
感谢您的回复!因此,即使我多次引用/ dev/stdin&dev/stdout,每个进程都会获得它们自己的实例,并且它们不会互相混淆?甜!在杀死(^ C)程序后进一步查看错误,看起来死锁是由多处理产生的.Queue。我有一系列我认为是级联的,或通过发送'endSEQ'产生连锁效应,最后是'停止'。花了足够长的时间用上一个月的调试过程。试着让我的头回来,:( 而我不认为信号量是必要的。去了:) –
np!/dev/stdin&dev/stdout文件映射到特殊的stdin&stdout文件描述符,因此它们不代表中央设备。 ---我在align_seq子进程中没有看到使用Queue,是否仅在主进程中使用?如果是这样,那么你可以使用queue.Queue。另外你说你怀疑会陷入僵局 - 你是不是在双向消费(主要和孩子/人)? –
再次感谢您澄清stdin/stdout。现在我考虑一下,我使用它们的方式很明显,但我从来没有见过它明确说过。你是绝对正确的;队列是在程序的一个单独的部分,这是我的僵局似乎出现的地方。我会更新主帖子,因为对于我原来没有声明的程序有更多的多处理...尽管如此,我会回复下面的帖子。我现在已经知道它有时会完成,有时会在最后结束,我认为这是由于延迟(??)。 time.sleep是我的临时工,肮脏的修补程序 –