我正在处理Jira更新日志历史数据,并且由于大量的数据以及大多数处理时间都是基于I/O的事实,我认为异步方法可能运作良好。如何在concurrent.futures.ThreadPoolExecutor中使用不会导致死锁的锁?
我都issue_id
的,这我送入一个功能,使通过jira-python
API的请求的列表,提取信息为dict
,然后通过在DictWriter
一个通过写出来。为了使它成为线程安全的,我从threading
模块导入了一个Lock()
,我也通过了该模块。在测试中,它似乎在某个时间点处于死锁状态,并挂起。我在文档中注意到它说如果任务相互依赖,那么它们可能会挂起,我想它们是由于我正在执行的锁定。我怎样才能防止这种情况发生?
这里是我的参考代码:
(在代码中的这一点上是一个与所有的ISSUE_ID的所谓keys
列表)
def write_issue_history(
jira_instance: JIRA,
issue_id: str,
writer: DictWriter,
lock: Lock):
logging.debug('Now processing data for issue {}'.format(issue_id))
changelog = jira_instance.issue(issue_id, expand='changelog').changelog
for history in changelog.histories:
created = history.created
for item in history.items:
to_write = dict(issue_id=issue_id)
to_write['date'] = created
to_write['field'] = item.field
to_write['changed_from'] = item.fromString
to_write['changed_to'] = item.toString
clean_data(to_write)
add_etl_fields(to_write)
print(to_write)
with lock:
print('Lock obtained')
writer.writerow(to_write)
if __name__ == '__main__':
with open('outfile.txt', 'w') as outf:
writer = DictWriter(
f=outf,
fieldnames=fieldnames,
delimiter='|',
extrasaction='ignore'
)
writer_lock = Lock()
with ThreadPoolExecutor(max_workers=5) as exec:
for key in keys[:5]:
exec.submit(
write_issue_history,
j,
key,
writer,
writer_lock
)
编辑:这也是非常有可能的,我是受Jira API限制。