2015-11-02 105 views
4

我有兴趣确保我的Tornado处理程序不会阻塞,所以我想写一些单元测试作为完整性检查。我如何编写测试并发性的Tornado单元测试

我想到的是一个异步睡眠2秒的处理程序。在测试中,我想连续调用这个处理程序两次以模拟“同时”请求。

如果我没有弄错,这两个请求应该同时运行,并且因此在不到4秒的时间内完成。问题是我不知道如何通过AsyncHTTPTestCase向我的应用程序发出2个同时请求。

这是我到目前为止有:

class SyncSleepHandler(tornado.web.RequestHandler): 
    def get(self): 
     time.sleep(2) 


class AsyncSleepHandler(tornado.web.RequestHandler): 
    @gen.coroutine 
    def get(self): 
     yield gen.sleep(2) 


class SleepTest(AsyncHTTPTestCase): 
    def get_app(self): 
     return Application([(r'/sync', SyncSleepHandler), 
          (r'/async', AsyncSleepHandler)], debug=True) 

    def test_async_sleep(self): 
     start = time.time() 
     resp1 = self.fetch(r'/async', method='GET') 
     resp2 = self.fetch(r'/async', method='GET') 
     diff = time.time() - start 
     self.assertTrue(2 < diff < 4, msg="Difference is {:}".format(diff)) 

回答

4

AsyncHTTPTestCase.fetch控制IOLoop,使单一的获取,因此不能用于这种测试的,但你可以直接去底层self.http_client,并使用@tornado.testing.gen_test来控制IOLoop

@gen_test 
def test_async_sleep(self): 
    start = time.time() 
    resp1, resp2 = yield [ 
     self.http_client.fetch(self.get_url('/async')), 
     self.http_client.fetch(self.get_url('/async')), 
    ] 
    diff = time.time() - start