2016-06-08 18 views
5

我正在进入python mutliprocessing模块,我遇到了一些问题。我对线程模块非常熟悉,但我需要确保正在执行的进程并行运行。Python多处理 - AssertionError:只能加入子进程

下面是我想要做的事情的概要。请忽略诸如未声明的变量/函数之类的内容,因为我无法完整地粘贴我的代码。

import multiprocessing 
import time 

def wrap_func_to_run(host, args, output): 
    output.append(do_something(host, args)) 
    return 

def func_to_run(host, args): 
    return do_something(host, args) 

def do_work(server, client, server_args, client_args): 
    server_output = func_to_run(server, server_args) 
    client_output = func_to_run(client, client_args) 
    #handle this output and return a result 
    return result 

def run_server_client(server, client, server_args, client_args, server_output, client_output): 
    server_process = multiprocessing.Process(target=wrap_func_to_run, args=(server, server_args, server_output)) 
    server_process.start() 
    client_process = multiprocessing.Process(target=wrap_func_to_run, args=(client, client_args, client_output)) 
    client_process.start() 
    server_process.join() 
    client_process.join() 
    #handle the output and return some result  

def run_in_parallel(server, client): 
    #set up commands for first process 
    server_output = client_output = [] 
    server_cmd = "cmd" 
    client_cmd = "cmd" 
    process_one = multiprocessing.Process(target=run_server_client, args=(server, client, server_cmd, client_cmd, server_output, client_output)) 
    process_one.start() 
    #set up second process to run - but this one can run here 
    result = do_work(server, client, "some server args", "some client args") 
    process_one.join() 
    #use outputs above and the result to determine result 
    return final_result 

def main(): 
    #grab client 
    client = client() 
    #grab server 
    server = server() 
    return run_in_parallel(server, client) 

if __name__ == "__main__": 
    main() 

这里是我得到的错误:

Error in sys.exitfunc: 
Traceback (most recent call last): 
    File "/usr/lib64/python2.7/atexit.py", line 24, in _run_exitfuncs 
    func(*targs, **kargs) 
    File "/usr/lib64/python2.7/multiprocessing/util.py", line 319, in _exit_function 
    p.join() 
    File "/usr/lib64/python2.7/multiprocessing/process.py", line 143, in join 
    assert self._parent_pid == os.getpid(), 'can only join a child process' 
AssertionError: can only join a child process 

我已经尝试了很多不同的东西来解决这个问题,但我的感觉是,有一些错误,我用这个方式模块。

编辑:

所以我创建了一个将通过模拟客户端/服务器和他们所做的工作重现这个文件 - 我也错过了一个重要的点,这是我在UNIX上运行此。另一个重要的信息是do_work在我的实际案例涉及使用os.fork()。如果没有使用os.fork(),我无法重现错误,因此我假设问题出在那里。在我的真实世界的情况下,这部分代码不是我的,所以我把它当作一个黑盒子(可能是我的错误)。反正这里的代码重现 -

#!/usr/bin/python 

import multiprocessing 
import time 
import os 
import signal 
import sys 

class Host(): 
    def __init__(self): 
     self.name = "host" 

    def work(self): 
     #override - use to simulate work 
     pass 

class Server(Host): 
    def __init__(self): 
     self.name = "server" 

    def work(self): 
     x = 0 
     for i in range(10000): 
      x+=1 
     print x 
     time.sleep(1) 

class Client(Host): 
    def __init__(self): 
     self.name = "client" 

    def work(self): 
     x = 0 
     for i in range(5000): 
      x+=1 
     print x 
     time.sleep(1) 

def func_to_run(host, args): 
    print host.name + " is working" 
    host.work() 
    print host.name + ": " + args 
    return "done" 

def do_work(server, client, server_args, client_args): 
    print "in do_work" 
    server_output = client_output = "" 
    child_pid = os.fork() 
    if child_pid == 0: 
     server_output = func_to_run(server, server_args) 
     sys.exit(server_output) 
    time.sleep(1) 

    client_output = func_to_run(client, client_args) 
    # kill and wait for server to finish 
    os.kill(child_pid, signal.SIGTERM) 
    (pid, status) = os.waitpid(child_pid, 0) 

    return (server_output == "done" and client_output =="done") 

def run_server_client(server, client, server_args, client_args): 
    server_process = multiprocessing.Process(target=func_to_run, args=(server, server_args)) 
    print "Starting server process" 
    server_process.start() 
    client_process = multiprocessing.Process(target=func_to_run, args=(client, client_args)) 
    print "Starting client process" 
    client_process.start() 
    print "joining processes" 
    server_process.join() 
    client_process.join() 
    print "processes joined and done" 

def run_in_parallel(server, client): 
    #set up commands for first process 
    server_cmd = "server command for run_server_client" 
    client_cmd = "client command for run_server_client" 
    process_one = multiprocessing.Process(target=run_server_client, args=(server, client, server_cmd, client_cmd)) 
    print "Starting process one" 
    process_one.start() 
    #set up second process to run - but this one can run here 
    print "About to do work" 
    result = do_work(server, client, "server args from do work", "client args from do work") 
    print "Joining process one" 
    process_one.join() 
    #use outputs above and the result to determine result 
    print "Process one has joined" 
    return result 

def main(): 
    #grab client 
    client = Client() 
    #grab server 
    server = Server() 
    return run_in_parallel(server, client) 

if __name__ == "__main__": 
    main() 

如果我删除do_work使用os.fork()我没有得到错误和代码的行为就像我本来期望它之前(除产出的传球,我已被接受为我的错误/误解)。我可以将旧代码更改为不使用os.fork(),但我也想知道为什么这会导致此问题,以及是否有可行的解决方案。

编辑2:

我开始省略了接受的答案之前os.fork()的解决方案工作。下面是我有一些调整,可以做的模拟工作量 -

#!/usr/bin/python 

import multiprocessing 
import time 
import os 
import signal 
import sys 
from Queue import Empty 

class Host(): 
    def __init__(self): 
     self.name = "host" 

    def work(self, w): 
     #override - use to simulate work 
     pass 

class Server(Host): 
    def __init__(self): 
     self.name = "server" 

    def work(self, w): 
     x = 0 
     for i in range(w): 
      x+=1 
     print x 
     time.sleep(1) 

class Client(Host): 
    def __init__(self): 
     self.name = "client" 

    def work(self, w): 
     x = 0 
     for i in range(w): 
      x+=1 
     print x 
     time.sleep(1) 

def func_to_run(host, args, w, q): 
    print host.name + " is working" 
    host.work(w) 
    print host.name + ": " + args 
    q.put("ZERO") 
    return "done" 

def handle_queue(queue): 
    done = False 
    results = [] 
    return_val = 0 
    while not done: 
     #try to grab item from Queue 
     tr = None 
     try: 
      tr = queue.get_nowait() 
      print "found element in queue" 
      print tr 
     except Empty: 
      done = True 
     if tr is not None: 
      results.append(tr) 
    for el in results: 
     if el != "ZERO": 
      return_val = 1 
    return return_val 

def do_work(server, client, server_args, client_args): 
    print "in do_work" 
    server_output = client_output = "" 
    child_pid = os.fork() 
    if child_pid == 0: 
     server_output = func_to_run(server, server_args) 
     sys.exit(server_output) 
    time.sleep(1) 

    client_output = func_to_run(client, client_args) 
    # kill and wait for server to finish 
    os.kill(child_pid, signal.SIGTERM) 
    (pid, status) = os.waitpid(child_pid, 0) 

    return (server_output == "done" and client_output =="done") 



def run_server_client(server, client, server_args, client_args, w, mq): 
    local_queue = multiprocessing.Queue() 
    server_process = multiprocessing.Process(target=func_to_run, args=(server, server_args, w, local_queue)) 
    print "Starting server process" 
    server_process.start() 
    client_process = multiprocessing.Process(target=func_to_run, args=(client, client_args, w, local_queue)) 
    print "Starting client process" 
    client_process.start() 
    print "joining processes" 
    server_process.join() 
    client_process.join() 
    print "processes joined and done" 
    if handle_queue(local_queue) == 0: 
     mq.put("ZERO") 

def run_in_parallel(server, client): 
    #set up commands for first process 
    master_queue = multiprocessing.Queue() 
    server_cmd = "server command for run_server_client" 
    client_cmd = "client command for run_server_client" 
    process_one = multiprocessing.Process(target=run_server_client, args=(server, client, server_cmd, client_cmd, 400000000, master_queue)) 
    print "Starting process one" 
    process_one.start() 
    #set up second process to run - but this one can run here 
    print "About to do work" 
    #result = do_work(server, client, "server args from do work", "client args from do work") 
    run_server_client(server, client, "server args from do work", "client args from do work", 5000, master_queue) 
    print "Joining process one" 
    process_one.join() 
    #use outputs above and the result to determine result 
    print "Process one has joined" 
    return_val = handle_queue(master_queue) 
    print return_val 
    return return_val 

def main(): 
    #grab client 
    client = Client() 
    #grab server 
    server = Server() 
    val = run_in_parallel(server, client) 
    if val: 
     print "failed" 
    else: 
     print "passed" 
    return val 

if __name__ == "__main__": 
    main() 

这段代码有一些微调的打印只是为了看看到底发生了什么。我使用了一个multiprocessing.Queue来存储和分享整个流程的输出,并返回到我的主线程中进行处理。我认为这解决了我的问题的python部分,但是我正在处理的代码中仍然存在一些问题。我唯一能说的是,相当于func_to_run涉及通过ssh发送一个命令,并随输出一起抓取任何err。出于某种原因,这对于执行时间较短的命令来说工作得很好,但对于执行时间/输出大得多的命令来说效果不佳。我试着在我的代码中使用完全不同的工作值来模拟这个,但一直没有能够重现类似的结果。

编辑我使用(再不是我的)3 库代码使用Popen.wait()的SSH命令,我只是阅读:

Popen.wait() Wait for child process to terminate. Set and return returncode attribute.

Warning This will deadlock when using stdout=PIPE and/or stderr=PIPE and the >child process generates enough output to a pipe such that it blocks waiting for >the OS pipe buffer to accept more data. Use communicate() to avoid that.

我调整了代码,没有缓冲,只是打印,因为它是收到,一切正常。

+2

这里有多个问题。首先:'output.append()'在使用'multiprocessing'模块时可能无法达到你想要的效果。其次,你所报告的问题如下所述:你不能在当前正在执行的'Process'所不具有的'Process'上调用'.join()'。你承认你的例子是合成的,所以很难说出问题的出在哪里。你确定你将'Process()'的结果赋值给一个像本例一样的短暂局部变量?或者你是使用全局变量还是实例变量(例如'self.process_one')? –

+0

对于第一个问题 - 你是对的。它在我使用线程时工作,但可能是因为共享内存空间。进程的变量是本地的。这些进程是否不属于调用进程?我只是在创建它们的函数中加入进程,所以我会假设所有权是可以的,除非有一些调度问题。 –

+0

啊!我现在看到它,你正在'atexit'处理程序中调用'join',但只是因为'multiprocessing'试图自行清理。我会推测这是将一个'Process'实例传递给另一个进程的结果。如果是这样的话,这是IMO在CPython中的一个微妙的错误。 “流程”应该拒绝“pickle”,并在您尝试通过时给予例外。我知道创建最低限度的播放器并不容易,但我认为你会发现它值得你花时间。它会帮助你和我们确定错误的关键因素。 –

回答

3

I can change the old code to not use os.fork() but I'd also like to know why this caused this problem and if there's a workable solution.

理解问题的关键是要确切知道fork()的作用。 CPython文档声明“分叉子进程”。但是这假定您了解C库调用fork()

下面是glibc的的手册页说的只是:

fork() creates a new process by duplicating the calling process. The new process, referred to as the child, is an exact duplicate of the calling process, referred to as the parent, except for the following points: ...

它基本上,如果你把你的计划,并与小的差异作出了程序的状态(堆,栈,指令指针等)的副本,并让它独立于原始执行。当这个子进程自然退出时,它将使用exit(),并且这将触发模块注册的处理程序。

你能做些什么来避免它?

  • 省略os.fork():使用multiprocessing代替,就像你正在探索
  • 可能有效:执行fork()import multiprocessing,只有在儿童或所需的父。
  • 在孩子中使用_exit()(CPython文档状态,“注意退出的标准方式是sys.exit(n)。_exit()通常应该只能在fork()后的子进程中使用。”)

https://docs.python.org/2/library/os.html#os._exit

+0

我打算接受这个答案。非常感谢您的帮助!我最后一次编辑了这个问题,还有一些笔记,以及我为您的第一个建议开始的实施。 –

0

在我看来,你是一次穿太多。我不会从run_in_parallel开始编写代码,而只是用正确的参数调用run_server_client,因为它们会在内部进行编程。

+0

但是,直到完成后才会阻止该阻止?我需要'run_server_client'和'do_work'同时运行,这就是为什么我在这里创建一个单独的进程。 –