4
我有兴趣确保我的Tornado处理程序不会阻塞,所以我想写一些单元测试作为完整性检查。我如何编写测试并发性的Tornado单元测试
我想到的是一个异步睡眠2秒的处理程序。在测试中,我想连续调用这个处理程序两次以模拟“同时”请求。
如果我没有弄错,这两个请求应该同时运行,并且因此在不到4秒的时间内完成。问题是我不知道如何通过AsyncHTTPTestCase
向我的应用程序发出2个同时请求。
这是我到目前为止有:
class SyncSleepHandler(tornado.web.RequestHandler):
def get(self):
time.sleep(2)
class AsyncSleepHandler(tornado.web.RequestHandler):
@gen.coroutine
def get(self):
yield gen.sleep(2)
class SleepTest(AsyncHTTPTestCase):
def get_app(self):
return Application([(r'/sync', SyncSleepHandler),
(r'/async', AsyncSleepHandler)], debug=True)
def test_async_sleep(self):
start = time.time()
resp1 = self.fetch(r'/async', method='GET')
resp2 = self.fetch(r'/async', method='GET')
diff = time.time() - start
self.assertTrue(2 < diff < 4, msg="Difference is {:}".format(diff))