我知道这将被视为重复,但我有环顾四周问这个问题之前,但所有的问题似乎是过期或不符合我的问题在所有帮助。这是我写这个问题之前看了:单元测试芹菜任务直接
-
- How do you unit test a Celery task?(5岁,全死链接)
- How to unit test code that runs celery tasks?(2岁)
- How do I capture Celery tasks during unit testing?(3岁)
我目前正在对大量使用芹菜h的项目andle异步任务;使整个代码库稳定我写单元测试,整个项目,但是我一直没能写芹菜单测试工作至今。
我的大部分代码需要跟踪的是被以确定羯羊或不是所有的结果都准备好要查询运行的任务。这是在我的代码实现如下:
@app.task(bind=True)
def some_task(self, record_id):
associate(self.request.id, record_id) # Not the actual DB code, but you get the idea
# Somewhere else in my code, eg: Flask endpoint
record = some_db_record()
some_task.apply_async(args=[record.id])
由于我没有* nix中基于机器上,我试图通过设置总是渴望选项设置为true解决这个运行我的代码,但是这会导致问题每当有子任务尝试查询结果:
@app.task(bind=True)
def foo(self):
task = bar.apply_async()
foo_poll.apply_async(args=[task.id])
@app.task(bind=True, max_retries=None):
def foo_poll(self, celery_id)
task = AsyncResult(celery_id)
if not task.ready(): # RuntimeError: Cannot retrieve result with task_always_eager enabled
return self.retry(countdown=5)
else:
pass # Do something with the result
@app.task
def bar():
time.sleep(10)
我试图修补AsyncResult
方法解决这个,但是这导致的问题是self.request.id
将None
:
with patch.object(AsyncResult, "_get_task_meta", side_effect=lambda: {"status": SUCCESS, "result": None}) as method:
foo()
@app.task(bind=True)
def foo(self):
pass # self.request.id is now None, which I need to track sub-tasks
有谁知道我该怎么做?或者如果芹菜甚至值得使用?我处于找到文档和任何与测试有关的问题的地步,因此非常复杂,我只是想将它们放在一起,然后回到多线程。
芹菜是绝对值得使用的:P – dm03514