4

我想设计一个可以很容易地做出数据处理管道的异步管道。管道由几个功能组成。输入数据进入管道的一端,并在另一端出来。如何在python中设计一个异步管道模式

我想设计在一个方法管道:

  1. 附加功能可以在管道
  2. 功能已经在流水线可以被弹出来插入。

这里是我想出了:

import asyncio 

@asyncio.coroutine 
def add(x): 
    return x + 1 

@asyncio.coroutine 
def prod(x): 
    return x * 2 

@asyncio.coroutine 
def power(x): 
    return x ** 3 

def connect(funcs): 
    def wrapper(*args, **kwargs): 
     data_out = yield from funcs[0](*args, **kwargs) 
     for func in funcs[1:]: 
      data_out = yield from func(data_out) 
     return data_out 
    return wrapper 

pipeline = connect([add, prod, power]) 
input = 1 
output = asyncio.get_event_loop().run_until_complete(pipeline(input)) 
print(output) 

这个工作,当然,但问题是,如果我想添加另一个函数到(或从弹出的功能)这条管道,我必须再次拆卸和重新连接每个功能。

我想知道是否有更好的方案或设计模式来创建这样的管道?

+0

我认为标准的事情就是重新创建管道,例如'connect([add,prod,somethingelse,power])'或者connect([add,power])'。有没有你不想这样做的原因?或者我不明白你的问题? – maxymoo

+0

我想你已经明白了我的观点,我不想重新创建整个事情,仅仅因为当你只需要改变一小部分,如果管道包含许多功能并且我需要经常更改一些功能,重新创建一切变得乏味和低效。 – shelper

+1

看起来您可以创建一个Pipeline类并使用您的函数列表维护一个实例var,然后实现从该列表中获取/删除函数的方法。然后只需实现'__call__',以便可以将Pipeline的实例发送到asyncio事件循环。 –

回答

1

我不知道这是否是最好的方法,但这里是我的解决方案。

虽然我认为可以使用列表或字典来控制管道,但我发现使用生成器更容易,更高效。

考虑以下发电机:

def controller(): 
    old = value = None 
    while True: 
     new = (yield value) 
     value = old 
     old = new 

这基本上是一个元素的队列,它存储你的send(或next)的下一次调用发送,并释放它的价值。

例子:

>>> c = controller() 
>>> next(c)   # prime the generator 
>>> c.send(8)   # send a value 
>>> next(c)   # pull the value from the generator 
8 

通过每个协程在其控制的关联管道,我们将有我们可以用它来推动每一个目标的外部手柄。我们只需要定义我们的协同程序,以便他们将每个周期从我们的控制器中拉出新的目标。

现在考虑下面的协程:

def source(controller): 
    while True: 
     target = next(controller) 
     print("source sending to", target.__name__) 
     yield (yield from target) 

def add(): 
    return (yield) + 1 

def prod(): 
    return (yield) * 2 

源是一个协同程序是不return,使得第一循环之后也不会自行终止。其他协程是“汇”,不需要控制器。 如下例所示,可以在管道中使用这些协同程序。我们最初设置了一条路线source --> add,在收到第一个结果后,我们将路线更改为source --> prod

# create a controller for the source and prime it 
cont_source = controller() 
next(cont_source) 

# create three coroutines 
# associate the source with its controller 
coro_source = source(cont_source) 
coro_add = add() 
coro_prod = prod() 

# create a pipeline 
cont_source.send(coro_add) 

# prime the source and send a value to it 
coro_source.send(None) 
print("add =", coro_source.send(4)) 

# change target of the source 
cont_source.send(coro_prod) 

# reset the source, send another value 
coro_source.send(None) 
print("prod =", coro_source.send(8)) 

输出:

source sending to add 
add = 5 
source sending to prod 
prod = 16 
+0

这是一个非常有趣的想法!谢谢! – shelper

1

我已经做了类似的事情之前,只用multiprocessing库。这是一个更手动的,但它让你能够轻松地创建和修改你的管道,就像你在你的问题中所要求的。

这个想法是创建可以存在于多处理池中的函数,并且它们唯一的参数是输入队列和输出队列。通过向他们传递不同的队列,将各个阶段联系在一起。每个阶段在其输入队列上接收一些工作,完成一些工作,并通过其输出队列将结果传递到下一个阶段。

工作人员试图从队列中获得某些东西,当他们得到某些东西时,他们会尽自己的努力并将结果传递到下一个阶段。所有的工作的结束通过传递一个“毒丸”通过管道,导致所有阶段退出:

这个例子只是建立在多个工作阶段的字符串:

import multiprocessing as mp            

POISON_PILL = "STOP"              

def stage1(q_in, q_out):             

    while True: 

     # get either work or a poison pill from the previous stage (or main) 
     val = q_in.get()             

     # check to see if we got the poison pill - pass it along if we did 
     if val == POISON_PILL:            
      q_out.put(val)             
      return               

     # do stage 1 work                 
     val = val + "Stage 1 did some work.\n" 

     # pass the result to the next stage 
     q_out.put(val)              

def stage2(q_in, q_out):             

    while True:               

     val = q_in.get()             
     if val == POISON_PILL:            
      q_out.put(val)             
      return               

     val = val + "Stage 2 did some work.\n"        
     q_out.put(val)              

def main():                

    pool = mp.Pool()              
    manager = mp.Manager()             

    # create managed queues            
    q_main_to_s1 = manager.Queue()           
    q_s1_to_s2 = manager.Queue()           
    q_s2_to_main = manager.Queue()           

    # launch workers, passing them the queues they need     
    results_s1 = pool.apply_async(stage1, (q_main_to_s1, q_s1_to_s2))  
    results_s2 = pool.apply_async(stage2, (q_s1_to_s2, q_s2_to_main))  

    # Send a message into the pipeline          
    q_main_to_s1.put("Main started the job.\n")       

    # Wait for work to complete           
    print(q_s2_to_main.get()+"Main finished the job.")      

    q_main_to_s1.put(POISON_PILL)           

    pool.close()               
    pool.join()               

    return                 

if __name__ == "__main__":             
    main() 

的代码产生这样的输出:

主要开始工作。
阶段1做了一些工作。
第2阶段做了一些工作。
主要完成这项工作。

你可以很容易地把更多的阶段放在流水线中,或者通过改变哪些函数获得哪些队列来重新排列它们。我对asyncio模块并不是很熟悉,所以我不能说使用多处理库会失去哪些功能,但是这种方法非常直接的实现和理解,所以我喜欢它的简单性。