简单地说,在Python3.5和3.6中,出于理智的原因,max_workers
参数不允许为0
。所以我的解决方案是创建一个更“模拟”的ThreadPoolExecutor版本,以记录ThreadPool的活动,以防将某些内容添加到队列中,然后对此进行断言。我会在这里分享代码,以防有人想要将其用于他们的目的。
import threading
from concurrent import futures
class RecordingThreadPool(futures.Executor):
"""A thread pool that records if used."""
def __init__(self, max_workers):
self._tp_executor = futures.ThreadPoolExecutor(max_workers=max_workers)
self._lock = threading.Lock()
self._was_used = False
def submit(self, fn, *args, **kwargs):
with self._lock:
self._was_used = True
self._tp_executor.submit(fn, *args, **kwargs)
def was_used(self):
with self._lock:
return self._was_used