2016-03-04 120 views
1

我修改了示例队列,生产者&消费者来自这个Tornado documentation,但它似乎并没有传递给get()的超时参数可以工作,因为消费者不会等待10抛出异常之前的几秒钟。理想情况下,生产者和消费者将同时运行。另外,我不知道是否在超时参数为秒或毫秒传:龙卷风异步队列不等待

from tornado import gen 
from tornado.ioloop import IOLoop 
from tornado.queues import Queue 

q = Queue() 

@gen.coroutine 
def consumer(): 
    try: 
     while True: 
      item = yield q.get(timeout=10000) 
      try: 
       print('Doing work on %s' % item)  
      finally: 
       q.task_done() 
    except gen.TimeoutError: 
     print('timeout') 
     return 

@gen.coroutine 
def producer(): 
    for item in range(5): 
     yield q.put(item) 
     print('Put %s' % item) 
     yield gen.sleep(2) 

@gen.coroutine 
def main(): 
    # Start consumer without waiting (since it never finishes). 
    IOLoop.current().spawn_callback(consumer) 
    yield producer()  # Wait for producer to put all tasks. 
    yield q.join()  # Wait for consumer to finish all tasks. 
    print('Done') 

IOLoop.current().run_sync(main) 

,这里是它的执行:

Put 0 
Doing work on 0 
timeout 
Put 1 
Put 2 
Put 3 
Put 4 

回答

4

超时

,你可以读于Tornado' Queue.get docs

返回一个未来,它解决了o有一个项目可用,或在超时后引发tornado.gen.TimeoutError。

但它可能是相当误导,因为timeout实际上是一个deadline。因此,它必须要么datetime.timedelta object指定:

import datetime 
yield q.get(timeout=datetime.timedelta(seconds=1)) 

或绝对时间:

timeout = 1.5 # in seconds, floats acceptable 
deadline = IOLoop.current().time() + timeout 
# in most cases IOLoop time is just time.time() 
# I've used separate variables only for the notion 

yield q.get(timeout=deadline) 

Toro,这是合并到龙卷风,这种说法被称为deadline

在您的代码中,您指定了超时10000,即截止日期为Thu, 01 Jan 1970 02:46:40 GMT

消费者循环

既然你对整个功能try/except块,包括while环,当发生TimeoutError你消费停止工作。将异常处理移入while循环。

工作例如:

from tornado import gen 
from tornado.ioloop import IOLoop 
from tornado.queues import Queue 

q = Queue() 

@gen.coroutine 
def consumer(): 
    i = 0 
    while True: 
     i += 1 
     print('get cycle %s' % i) 
     try: 
      item = yield q.get(IOLoop.instance().time() + 3) 
      try: 
       print('Doing work on %s' % item) 
      finally: 
       q.task_done() 
     except gen.TimeoutError: 
      print('timeout') 

@gen.coroutine 
def producer(): 
    for item in range(5): 
     yield q.put(item) 
     print('Put %s' % item) 
     yield gen.sleep(2) 

@gen.coroutine 
def main(): 
    # Start consumer without waiting (since it never finishes). 
    IOLoop.current().spawn_callback(consumer) 
    yield producer()  # Wait for producer to put all tasks. 
    yield q.join()  # Wait for consumer to finish all tasks. 
    print('Done')