2011-01-06 46 views
0

我已经创建了一个脚本,默认情况下会创建一个多处理进程;那么它工作正常。当启动多个进程时,它开始挂起,并不总是在同一个地方。该程序大约有700行代码,所以我会试着总结一下发生了什么。我想通过平行排列DNA序列的最慢任务来充分利用我的多核。为此,我使用子进程模块调用一个命令行程序:'hmmsearch',我可以通过/ dev/stdin按顺序输入,然后通过/ dev/stdout读出对齐的序列。我想象会发生挂起,因为这些多个subprocess实例从stdout/stdin读取/写入,而且我真的不知道最好的方式去做这件事... 我正在调查os.fdopen(...)& os.tmpfile()来创建临时文件句柄或管道,我可以在其中刷新数据。但是,我从来没有使用&之前,我无法想象如何用子进程模块做到这一点。理想情况下,我想完全绕过硬盘驱动器,因为高吞吐量数据处理的管道要好得多! 任何帮助,这将是超级棒!python多处理每个都有自己的子进程(Kubuntu,Mac)

import multiprocessing, subprocess 
from Bio import SeqIO 

class align_seq(multiprocessing.Process): 
    def __init__(self, inPipe, outPipe, semaphore, options): 
     multiprocessing.Process.__init__(self) 
     self.in_pipe = inPipe   ## Sequences in 
     self.out_pipe = outPipe  ## Alignment out 
     self.options = options.copy() ## Modifiable sub-environment 
     self.sem = semaphore 

    def run(self): 
     inp = self.in_pipe.recv() 
     while inp != 'STOP': 
      seq_record , HMM = inp # seq_record is only ever one Bio.Seq.SeqRecord object at a time. 
            # HMM is a file location. 
      align_process = subprocess.Popen(['hmmsearch', '-A', '/dev/stdout', '-o',os.devnull, HMM, '/dev/stdin'], shell=False, stdin=subprocess.PIPE, stdout=subprocess.PIPE) 
      self.sem.acquire() 
      align_process.stdin.write(seq_record.format('fasta')) 
      align_process.stdin.close() 
      for seq in SeqIO.parse(align_process.stdout, 'stockholm'): # get the alignment output 
       self.out_pipe.send_bytes(seq.seq.tostring()) # send it to consumer 
      align_process.wait() # Don't know if there's any need for this?? 
      self.sem.release() 
      align_process.stdout.close() 
      inp = self.in_pipe.recv() 
     self.in_pipe.close() #Close handles so don't overshoot max. limit on number of file-handles. 
     self.out_pipe.close() 

在花了一段时间调试这一点,我发现总是在那里,而不是相当尚未解决的问题,但有固定的一些其他低效在这个过程中(调试)。 有两个初始进纸器功能,这个align_seq类和文件分析器parseHMM(),它将位置特定评分矩阵(PSM)加载到字典中。 然后,主父流程将比对与PSM进行比较,使用字典(词典)作为每个残留相关分数的指针。为了计算我想要的分数,我有两个独立的多进程。进程类,一个类logScore(),计算对数比(math.exp());我平行这一个;和它会将所计算出的分数来的最后的方法,sumScore()刚刚总结这些分数(与math.fsum),返回之和所有位置特定分数回作为字典父进程。 即 Queue.put(Σ{残渣位置:位置特异性得分,...}]) 我觉得这是非常混乱围绕我的头,所以我希望读者正在设法(太多队列的!)按照...完成上述所有计算后,我然后选择将累计分数保存为制表符分隔的输出。这是它现在(从昨夜开始)的地方,有时会中断,因为我确保它会为每个应该有得分的位置打印一个分数。我认为由于延迟(计算机时序不同步),有时首先将队列放入队列logScore首先没有达到sumScore。 为了sumScore知道何时返回计数并重新开始计算,我将'endSEQ'放入执行计算的最后一个logScore进程的队列中。我认为那也应该达到sumScore,但情况并非总是如此;只有时候才会突破。所以现在我不再发生死锁了,而是在打印或保存结果时发生KeyError。 我相信有时会得到KeyError的原因是因为我为每个logScore进程创建了一个队列,但是他们应该都使用相同的队列。现在,我有这样的东西: -

class logScore(multiprocessing.Process): 
    def __init__(self, inQ, outQ): 
     self.inQ = inQ 
     ... 

def scoreSequence(processes, HMMPSM, sequenceInPipe): 
    process_index = -1 
    sequence = sequenceInPipe.recv_bytes() 
    for residue in sequence: 
     .... ## Get the residue score. 
     process_index += 1 
     processes[process_index].inQ.put(residue_score) 
    ## End of sequence 
    processes[process_index].inQ.put('endSEQ') 


logScore_to_sumScoreQ = multiprocessing.Queue() 
logScoreProcesses = [ logScore(multiprocessing.Queue() , logScore_to_sumScoreQ) for i in xrange(options['-ncpus']) ] 
sumScoreProcess = sumScore(logScore_to_sumScoreQ, scoresOut) 

而我应该创建一个队列来共享所有logScore实例。即

logScore_to_sumScoreQ = multiprocessing.Queue() 
scoreSeq_to_logScore = multiprocessing.Queue() 
logScoreProcesses = [ logScore(scoreSeq_to_logScore , logScore_to_sumScoreQ) for i in xrange(options['-ncpus']) ] 
sumScoreProcess = sumScore(logScore_to_sumScoreQ, scoresOut) 

回答

2

这不是流水线的工作原理......但把你的心放心,我们这里是从subprocess documentation的摘录:

标准输入,输出和错误分别指定 执行的程序标准输入, 标准输出和标准错误 文件句柄。有效的 值是PIPE,现有文件 描述符(正整数), 现有文件对象和无。 PIPE 表示应该创建新管到子 。如果没有,则不会发生重定向 ;孩子的 文件句柄将从父母继承 。

故障可能性最大的区域将在与主进程或管理信号量的通信中。由于错误,可能状态转换/同步没有按预期进行?我建议在每次阻塞调用之后在&之前添加日志记录/打印语句进行调试 - 您正在与主进程通信的位置以及获取/释放信号量的位置,以缩小出现问题的地方。

另外我很好奇 - 信号量是绝对必要的吗?

+0

感谢您的回复!因此,即使我多次引用/ dev/stdin&dev/stdout,每个进程都会获得它们自己的实例,并且它们不会互相混淆?甜!在杀死(^ C)程序后进一步查看错误,看起来死锁是由多处理产生的.Queue。我有一系列我认为是级联的,或通过发送'endSEQ'产生连锁效应,最后是'停止'。花了足够长的时间用上一个月的调试过程。试着让我的头回来,:( 而我不认为信号量是必要的。去了:) –

+0

np!/dev/stdin&dev/stdout文件映射到特殊的stdin&stdout文件描述符,因此它们不代表中央设备。 ---我在align_seq子进程中没有看到使用Queue,是否仅在主进程中使用?如果是这样,那么你可以使用queue.Queue。另外你说你怀疑会陷入僵局 - 你是不是在双向消费(主要和孩子/人)? –

+0

再次感谢您澄清stdin/stdout。现在我考虑一下,我使用它们的方式很明显,但我从来没有见过它明确说过。你是绝对正确的;队列是在程序的一个单独的部分,这是我的僵局似乎出现的地方。我会更新主帖子,因为对于我原来没有声明的程序有更多的多处理...尽管如此,我会回复下面的帖子。我现在已经知道它有时会完成,有时会在最后结束,我认为这是由于延迟(??)。 time.sleep是我的临时工,肮脏的修补程序 –

1

我也想并行化简单的任务,为此我创建了一个小python脚本。你可以看看: http://bioinf.comav.upv.es/psubprocess/index.html

是比你想要的更一般,但对于简单的任务是很容易使用。这可能至少对你有一些看法。

何塞布兰卡

+0

漂亮的外观网站,以及令人印象深刻的出版物清单!是的,一些灵感和例子会很好(!),因为多处理模块是一个野兽,我只是在抓表面而已!当我有机会的时候,我一定会看看psubprocess;这个周末将会看到我写的不仅仅是编码,还包括截止日期和周一的会议......感谢您的链接! –

0

这可能是在子进程死锁,你试着用沟通方法,而不是等待? http://docs.python.org/library/subprocess.html

+0

我避免使用通信,因为如果缓冲区被填充,通信将会死锁,这对于巨大的序列比对来说非常容易(即使使用bufsize = -1选项),所以我通常会在生成输出时读取输出。我最喜欢的方法是将Popen实例传递给一个单独的函数,我致力于解析来自任何程序的.stdout。在父函数中,我将检查返回码,如果它不是零,那么我会读取stderr。 –

相关问题