2016-07-14 94 views
1

我正在处理Jira更新日志历史数据,并且由于大量的数据以及大多数处理时间都是基于I/O的事实,我认为异步方法可能运作良好。如何在concurrent.futures.ThreadPoolExecutor中使用不会导致死锁的锁?

我都issue_id的,这我送入一个功能,使通过jira-python API的请求的列表,提取信息为dict,然后通过在DictWriter一个通过写出来。为了使它成为线程安全的,我从threading模块导入了一个Lock(),我也通过了该模块。在测试中,它似乎在某个时间点处于死锁状态,并挂起。我在文档中注意到它说如果任务相互依赖,那么它们可能会挂起,我想它们是由于我正在执行的锁定。我怎样才能防止这种情况发生?

这里是我的参考代码:

(在代码中的这一点上是一个与所有的ISSUE_ID的所谓keys列表)

def write_issue_history(
     jira_instance: JIRA, 
     issue_id: str, 
     writer: DictWriter, 
     lock: Lock): 
    logging.debug('Now processing data for issue {}'.format(issue_id)) 
    changelog = jira_instance.issue(issue_id, expand='changelog').changelog 

    for history in changelog.histories: 
     created = history.created 
     for item in history.items: 
      to_write = dict(issue_id=issue_id) 
      to_write['date'] = created 
      to_write['field'] = item.field 
      to_write['changed_from'] = item.fromString 
      to_write['changed_to'] = item.toString 
      clean_data(to_write) 
      add_etl_fields(to_write) 
      print(to_write) 
      with lock: 
       print('Lock obtained') 
       writer.writerow(to_write) 

if __name__ == '__main__': 
    with open('outfile.txt', 'w') as outf: 
       writer = DictWriter(
        f=outf, 
        fieldnames=fieldnames, 
        delimiter='|', 
        extrasaction='ignore' 
       ) 
       writer_lock = Lock() 
       with ThreadPoolExecutor(max_workers=5) as exec: 
        for key in keys[:5]: 
         exec.submit(
          write_issue_history, 
          j, 
          key, 
          writer, 
          writer_lock 
         ) 

编辑:这也是非常有可能的,我是受Jira API限制。

回答

0

您需要将exec的结果存储到一个名为futs的列表中,然后遍历该列表调用result()以获取它们的结果,以处理可能发生的任何错误。

(我也想碰碰运气execexecutor因为这是更为传统,它避免了重写内置)

from traceback import print_exc 

... 

with ThreadPoolExecutor(max_workers=5) as executor: 
    futs = [] 
    for key in keys[:5]: 
     futs.append(executor.submit(
      write_issue_history, 
      j, 
      key, 
      writer, 
      writer_lock) 
     ) 

for fut in futs: 
    try: 
     fut.result() 
    except Exception as e: 
     print_exc()