2016-12-26 49 views
2

我是新来的异步编程和被混淆它。龙卷风请求响应时间太长,即使使用gen.coroutine

当我用gen.coroutine装饰RequestHandler但发现请求仍然被阻止时,我感到困惑。

这里是一个简短的代码,与Python 2.7.11和龙卷风4.4.1

@gen.coroutine 
def store_data(data): 
    try: 
     # parse_data 
     ... 
    except ParseError as e: 
     logger.warning(e) 
     return 
    yield motor.insert_many(parsed_data) # asynchronous mongo 
    print motor.count() 

class MainHandler(RequestHandler): 
    @gen.coroutine 
    def post(self): 
     try: 
      some_argument = int(self.get_argument("some", 0)) 
      data = self.request.body 
     except Exception: 
      self.write("Improper Argument") 
      self.finish() 
      return 
     IOLoop.current().spawn_callback(lambda: store_data(data)) 
     self.write("Request Done") 
     self.finish() 

而且我做了一个试验用10个线程。根据访问日志的响应时间,我想一些请求被封锁更新

追溯信息的set_blocking_log_threshold(0.5)

File "********", line 74, in <dictcomp> 
    data = [dict({"sid": sid}, **{key: value for key, value in i.iteritems() 

[I 161222 15:40:22 web:1971] 200 POST /upload/ (::1) 9.00ms 
[I 161222 15:40:23 web:1971] 200 POST /upload/ (::1) 8.00ms 
[I 161222 15:40:23 web:1971] 200 POST /upload/ (::1) 8.00ms 
[I 161222 15:40:23 web:1971] 200 POST /upload/ (::1) 7.00ms 
[I 161222 15:40:23 web:1971] 200 POST /upload/ (::1) 8.00ms 
[I 161222 15:40:23 web:1971] 200 POST /upload/ (::1) 9.00ms 
[I 161222 15:40:23 web:1971] 200 POST /upload/ (::1) 8.00ms 
[I 161222 15:40:23 web:1971] 200 POST /upload/ (::1) 9.00ms 
[I 161222 15:40:23 web:1971] 200 POST /upload/ (::1) 701.00ms # Seem blocked 
[I 161222 15:40:23 web:1971] 200 POST /upload/ (::1) 696.00ms # Seem blocked 

这段代码的全线

data = [dict({"sid": sid}, **{key: value for key, value in i.iteritems() if key in need_cols}) for i in v_data] 

和解压的逻辑是这样的

data = [] 
# `v_data` is a huge dict which could be considered as a mongo collection, and `i` as a mongo document 
for i in v_data: 
    temp = {key: value for key, value in i.iteritems() if key in need_cols} # discard some keys 
    temp["sid"] = sid # add same `sid` to all items 
    data.append(temp) 

我改成发电机

def data_generator(v_data, need_cols, sid): 
    for i in v_data: 
     temp = {key: value for key, value in i.iteritems() if key in need_cols} # discard some keys 
     temp["sid"] = sid # add same `sid` to all items 
     yield temp 

@gen.coroutine 
def store_data(data): 
    try: 
     # parse_data 
     ... 
    except ParseError as e: 
     logger.warning(e) 
     return 
    ge = data_generator(v_data, need_cols, sid) 
    yield motor.insert_many(ge) # asynchronous mongo 
    print motor.count() 

没有门槛警告日志中报告了,而是响应时间似乎仍然受阻

[I 170109 17:26:32 web:1971] 200 POST /upload/ (::1) 3.00ms 
[I 170109 17:26:33 web:1971] 200 POST /upload/ (::1) 2.00ms 
[I 170109 17:26:33 web:1971] 200 POST /upload/ (::1) 4.00ms 
[I 170109 17:26:33 web:1971] 200 POST /upload/ (::1) 3.00ms 
[I 170109 17:26:33 web:1971] 200 POST /upload/ (::1) 3.00ms 
[I 170109 17:26:33 web:1971] 200 POST /upload/ (::1) 2.00ms 
[I 170109 17:26:33 web:1971] 200 POST /upload/ (::1) 354.00ms 
[I 170109 17:26:33 web:1971] 200 POST /upload/ (::1) 443.00ms 

然后我将阈值设置为0.2s。得到这个消息

File "*******", line 76, in store_data 
    increment = json.load(fr) 
    File "/usr/local/python2.7/lib/python2.7/json/__init__.py", line 291, in load 
    **kw) 
    File "/usr/local/python2.7/lib/python2.7/json/__init__.py", line 339, in loads 
    return _default_decoder.decode(s) 
    File "/usr/local/python2.7/lib/python2.7/json/decoder.py", line 364, in decode 
    obj, end = self.raw_decode(s, idx=_w(s, 0).end()) 
    File "/usr/local/python2.7/lib/python2.7/json/decoder.py", line 380, in raw_decode 
    obj, end = self.scan_once(s, idx) 

现在我已经不知道该怎么做此发言异步

回答

0

我认为这个问题可能是你是如何调用你store_data协程功能。你必须以正确的方式调用协程。因为它是提here

在几乎所有情况下,调用协同程序必须是 协程本身的任何功能,在通话使用yield关键字。

所以store_data必须像这样调用yield store_data()而不是store_data()。这里

IOLoop.current().spawn_callback(lambda: store_data(data)) 

我猜你正在使用lambda,因为你想给data给你的函数作为参数,但是你可以用spawn_callback自己做到这一点。你可以试试这个:

IOLoop.current().spawn_callback(store_data, data) 

希望这会有所帮助。

0

@gen.coroutine装饰功能如果该功能从不yields没有任何好处。

您的post()方法看起来是正确的:它不会阻止或做任何干扰IOLoop。然而,IOLoop是一个共享资源,任何阻止它的东西都可能导致您看到的时间。我怀疑你没有在store_data(或程序中的其他地方)显示我们的东西被阻止。要突出显示此阻塞的位置,请在程序开始时调用IOLoop.current().set_blocking_log_threshold(0.5),并在IOLoop被阻塞半秒钟后记录堆栈跟踪。

+0

谢谢,本!我已更新问题描述。你能否给我进一步的帮助? – Morry

+0

如果你有太多的数据,只是解析json阻塞太久,那么你可能只需要将这个工作移动到一个ThreadPoolExecutor。 'increment = yield executor.submit(json.load,fr)'。一旦你解析了json,你可能会将它分成更小的批次以供进一步处理。还有第三方json库可能会更快和/或支持流式接口。 –