2016-09-25 83 views
0

我想从子进程异步运行Popen命令,以便我可以在后台运行其他的东西。异步子进程Popen python 3.5

import subprocess 
import requests 
import asyncio 
import asyncio.subprocess  

    async def x(message): 
     if len(message.content.split()) > 1: 
      #output = asyncio.create_subprocess_shell(message.content[3:], shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT) 
      output = subprocess.Popen(message.content[3:], shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT) 
      return output.communicate()[0].decode('utf-8') 

我试图弄明白https://docs.python.org/3/library/asyncio-subprocess.html,但我不知道协议的工厂是什么。

+0

问题是什么? –

+0

这不会异步运行。它等待这个过程完成,但我希望它在此期间运行其他的东西。所以就像我先调用这个函数,然后调用另一个函数,这个函数大约需要20s,另一个大约需要2s。第二个func在跑之前必须等待20秒。 – init6

+0

你有避免多线程的理由吗? –

回答

0

经python 3.5测试。只是问你是否有问题。

import threading 
import time 
import subprocess 
import shlex 
from sys import stdout 


# Only data wihtin a class are actually shared by the threads. 
# Let's use a class as communicator (there could be problems if you have more than 
# a single thread) 
class Communicator(object): 
    counter = 0 
    stop = False 
    arg = None 
    result = None 

# Here we can define what you want to do. There are other methods to do that 
# but this is the one I prefer. 
class ThreadedFunction(threading.Thread): 

    def run(self, *args, **kwargs): 
     super().run() 
     command = c.arg 

     # Here what you want to do... 
     command = shlex.split(command) 
     print(time.time()) # this is just to check that the command (sleep 5) is executed 
     output = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate() 
     print('\n',time.time()) 
     c.result = output 
     if c.stop: return None # This is useful only within loops within threads 

# Create a class instance 
c = Communicator() 
c.arg = 'time sleep 5' # Here I used the 'time' only to have some output 

# Create the thread and start it 
t = ThreadedFunction() 
t.start() # Start the thread and do something else... 

# ...for example count the seconds in the mean time.. 
try: 
    for j in range(100): 
     c.counter += 1 
     stdout.write('\r{:}'.format(c.counter)) 
     stdout.flush() 
     time.sleep(1) 
     if c.result != None: 
      print(c.result) 
      break 
except: 
    c.stop = True 
+0

我怎么会得到这个输出? – init6

+0

输出在'c.result' –

0

这一个更简单,我发现它后,其他答复可以,无论如何,有趣...所以我离开了它。

import time 
import subprocess 
import shlex 
from sys import stdout 


command = 'time sleep 5' # Here I used the 'time' only to have some output 

def x(command): 
    cmd = shlex.split(command) 
    p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) 
    return p 

# Start the subprocess and do something else... 
p = x(command) 
# ...for example count the seconds in the mean time.. 

try: # This take care of killing the subprocess if problems occur 
    for j in range(100): 
     stdout.write('\r{:}'.format(j)) 
     stdout.flush() 
     time.sleep(1) 
     if p.poll() != None: 
      print(p.communicate()) 
      break 
except: 
    p.terminate() # or p.kill() 

的异步从而后台进程运行sleep命令的Python脚本打印在标准输出计数器值的事实是明显的。事实上,在〜5秒之后,python脚本退出打印输出的bash命令打印计数器的同时,这是脚本工作的证据。

+0

我试过这个,但它似乎不是异步的。我不得不等待子流程完成。 – init6

+0

@JinyangLiu一旦你运行'p = x(command)',那么脚本可以继续执行,而子进程将在后台运行。一旦你需要后台进程的输出,你可以运行'p.communicate()',python脚本将等待命令结束,如果还没有完成的话。在某个时刻,您需要使用子流程重新加入脚本。如果您需要在子流程结束之前重新加入它们,脚本必须等待。 –

+0

是的,我明白了。我发现它不是异步运行的原因是因为.communicate()部分。如果我删除它。它打印出像这样的<'。它不显示输出,但允许其他程序异步运行。 – init6

1

当我提到这个问题时,我期望答案真的使用asyncio进行进程间通信。

我已经发现以下资源有用: https://github.com/python/asyncio/blob/master/examples/child_process.py

并且在下面我的简化的例子(使用3.5+异步/ AWAIT语法),其内容为线,并输出它们来分类:

import asyncio 

from subprocess import Popen, PIPE 


async def connect_write_pipe(file): 
    """Return a write-only transport wrapping a writable pipe""" 
    loop = asyncio.get_event_loop() 
    transport, _ = await loop.connect_write_pipe(asyncio.Protocol, file) 
    return transport 


async def connect_read_pipe(file): 
    """Wrap a readable pipe in a stream""" 
    loop = asyncio.get_event_loop() 
    stream_reader = asyncio.StreamReader(loop=loop) 

    def factory(): 
     return asyncio.StreamReaderProtocol(stream_reader) 

    transport, _ = await loop.connect_read_pipe(factory, file) 
    return stream_reader, transport 


async def main(loop): 
    # start subprocess and wrap stdin, stdout, stderr 
    p = Popen(['/usr/bin/sort'], stdin=PIPE, stdout=PIPE, stderr=PIPE) 

    stdin = await connect_write_pipe(p.stdin) 
    stdout, stdout_transport = await connect_read_pipe(p.stdout) 
    stderr, stderr_transport = await connect_read_pipe(p.stderr) 

    # interact with subprocess 
    name = {stdout: 'OUT', stderr: 'ERR'} 
    registered = { 
     asyncio.Task(stderr.read()): stderr, 
     asyncio.Task(stdout.read()): stdout 
    } 

    to_sort = b"one\ntwo\nthree\n" 
    stdin.write(to_sort) 
    stdin.close() # this way we tell we do not have anything else 

    # get and print lines from stdout, stderr 
    timeout = None 
    while registered: 
     done, pending = await asyncio.wait(
      registered, timeout=timeout, 
      return_when=asyncio.FIRST_COMPLETED) 
     if not done: 
      break 
     for f in done: 
      stream = registered.pop(f) 
      res = f.result() 
      if res != b'': 
       print(name[stream], res.decode('ascii').rstrip()) 
       registered[asyncio.Task(stream.read())] = stream 
     timeout = 0.0 

    stdout_transport.close() 
    stderr_transport.close() 


if __name__ == '__main__': 
    loop = asyncio.get_event_loop() 
    try: 
     loop.run_until_complete(main(loop)) 
    finally: 
     loop.close() 

NB :没有采取特殊措施,要写入管道的数据量是有限的。在我的系统中,在使用管道缓冲区之前,可以写入超过700000字节。

那里还有其他的例子,使用create_subprocess_shell

我还没有在真实项目中使用asyncio,所以欢迎评论中的改进建议。