2015-05-11 119 views
0

我写了一个应用程序,它从一个队列中取出一个作业并异步执行它。龙卷风异步作业龙卷风gen协同程序

def job(self): 
    print 'In job' 
    time.sleep(0.01) 

@gen.coroutine 
def start_jobs(self): 
    jobs = filter(lambda x: x['status'] == 0, self.queue) 
    for job in jobs: 
     yield self.job() 
    print 'exit from start job' 

但是,这段代码不起作用。

输出:

在工作

在工作

在工作等

我怎么做是正确的?

我如何使它与期货合作,并且是否有一种更简单的方式与Tornado合作?

回答

3

千万不要在Tornado中打电话time.sleep!改为使用yield gen.sleep

pip install toro安装Toro和使用JoinableQueue:

import random 
from tornado import ioloop, gen 
import toro 


class C(object): 
    def __init__(self): 
     self.queue = toro.JoinableQueue() 

    @gen.coroutine 
    def start_jobs(self): 
     while True: 
      job_id = yield self.queue.get() 
      self.job(job_id) 

    @gen.coroutine 
    def job(self, job_id): 
     print 'job_id', job_id 
     yield gen.sleep(random.random()) 
     print 'job_id', job_id, 'done' 
     self.queue.task_done() 


c = C() 
for i in range(5): 
    c.queue.put_nowait(i) 

c.start_jobs() 

io_loop = ioloop.IOLoop.instance() 

# block until all tasks are done 
c.queue.join().add_done_callback(lambda future: io_loop.stop()) 
io_loop.start() 

与旋风4.2开始,红牛是龙卷风的一部分,所以你可以做queue = tornado.queues.Queue()而是采用了托罗JoinableQueue:

http://tornado.readthedocs.org/en/latest/releases/v4.2.0.html#new-modules-tornado-locks-and-tornado-queues

+0

如果我想添加一些自定义函数而不是gen.sleep(random.random())会怎么样?我想添加使用job_id的函数并进行长时间的计算。 – cehmja

+0

当然,只需使用“yield”调用该函数即可。查看我的文章emptysqua.re/blog/refactoring-tornado-coroutines/ –