2016-03-06 12 views
0

我目前正在使用一个具有多线程的Python 2.7脚本。其中一个线程是在长轮询模式下侦听JSON数据,并在接收后解析它,或者在一段时间后进入超时。我注意到它只在调试模式下才能正常工作(我使用Wing IDE)。在正常运行的情况下,在进入“for”循环之前,该脚本的特定线程似乎挂起第一个GET请求之后。循环条件不会影响结果。同时其他线程继续正常工作。具有多个线程的Python脚本通常只能在调试模式下工作

我相信这与多线程有关。如何正确排除故障并解决此问题?

下面我把负责长轮询工作的类的代码。

class Listener(threading.Thread): 

def __init__(self, router, *args, **kwargs): 
    self.stop = False 

    self._cid = kwargs.pop("cid", None) 
    self._auth = kwargs.pop("auth", None) 
    self._router = router 
    self._c = webclient.AAHWebClient() 

    threading.Thread.__init__(self, *args, **kwargs) 

def run(self): 
    while True: 
     try: 
      # Data items that should be routed to the device is retrieved by doing a 
      # long polling GET request on the "/tunnel" resource. This will block until 
      # there are data items available, or the request times out 
      log.info("LISTENER: Waiting for data...") 

      response = self._c.send_request("GET", self._cid, auth=self._auth) 

      # A timed out request will not contain any data    
      if len(response) == 0: 
       log.info("LISTENER: No data this time")    
      else: 
       items = response["resources"]["tunnel"] 
       undeliverable = [] 

       #print items # - reaching this point, able to return output 

       for item in items: 

        # The data items contains the data as a base64 encoded string and the 
        # external reference ID for the device that should receive it 
        extId = item["extId"] 
        data = base64.b64decode(item["data"]) 

        # Try to deliver the data to the device identified by "extId" 
        if not self._router.route(extId, data): 
         item["message"] = "Could not be routed" 
         undeliverable.append(item) 

       # Data items that for some reason could not be delivered to the device should 
       # be POST:ed back to the "/tunnel" resource as "undeliverable" 
       if len(undeliverable) > 0: 
        log.warning("LISTENER: Sending error report...") 
        response = self._c.send_request("POST", "/tunnel", body={"undeliverable": undeliverable}, auth=self._auth) 

      except webclient.RequestError as e: 
       log.error("LISTENER: ERROR %d - %s", e.status, e.response) 

UPD:

class Router: 
def route(self, extId, data): 

    log.info("ROUTER: Received data for %s: %s", extId, repr(data)) 
    # nothing special 
    return True 
+0

[转储所有活动线程的堆栈跟踪](http://stackoverflow.com/questions/1032813/dump-stacktraces-of-all-active-threads)有一些很好的提示,找出你的线程卡住的地方。我没有看到你怎么知道你的代码是否卡在第一个GET和for循环之间。你可以在get之后立即登录,然后添加一个'continue'来短路循环并返回GET。如果循环有效,你可以在函数的后面进一步了解你所做的一个调用是否存在死锁。 – tdelaney

+0

我用简单的打印语句确定了“卡住”位置(在上面的源代码中进行了评论)。试过你的方法,发现线程挂在'self._router.route(extId,data)'语句上。除了来自pyodbc的SQL查询之外,类“Router”不包含任何特殊内容。但是它不包含'__init__'方法。可以这是悬挂问题的原因。但我不明白它为什么在调试模式下工作。 – pahanela

+0

我不是'pyodbc'的专家,但我依稀记得,访问它的连接和游标必须被序列化。您可以在连接时保留'self._my_connection_lock = threading.Lock()',并在访问时使用'self._my_connection_lock:'进行操作。用调试器调试竞态条件是非常困难的,因为调试器会改变执行环境。 – tdelaney

回答

0

如果您使用CPython的解释you're not actually system threading

CPython的实现细节:在CPython的,由于全球 解释器锁,只有一个线程可以立即执行Python代码 (即使某些面向性能的库可能会克服此限制)。如果您希望您的应用程序更好地使用多核机器的计算资源,建议您使用 使用多处理器。但是,如果要同时运行多个I/O限制的任务,线程仍然是合适的模型 。

因此,您的过程可能会在第一次请求时进行锁定,因为您是长时间轮询。

多处理可能是一个更好的选择。我没有用长时间轮询尝试它,但the Twisted framework也可能适用于您的情况。

+0

不太可能。 GIL在阻塞操作期间释放,让其他线程运行。在执行I/O绑定任务时,Python线程是非常好的选择。 – tdelaney

相关问题