2017-07-10 77 views
2

我知道这将被视为重复,但我环顾四周问这个问题之前,但所有的问题似乎是过期或不符合我的问题在所有帮助。这是我写这个问题之前看了:单元测试芹菜任务直接


我目前正在对大量使用芹菜h的项目andle异步任务;使整个代码库稳定我写单元测试,整个项目,但是我一直没能写芹菜单测试工作至今。

我的大部分代码需要跟踪的是被以确定羯羊或不是所有的结果都准备好要查询运行的任务。这是在我的代码实现如下:

@app.task(bind=True) 
def some_task(self, record_id): 
    associate(self.request.id, record_id) # Not the actual DB code, but you get the idea 

# Somewhere else in my code, eg: Flask endpoint 
record = some_db_record() 
some_task.apply_async(args=[record.id]) 

由于我没有* nix中基于机器上,我试图通过设置总是渴望选项设置为true解决这个运行我的代码,但是这会导致问题每当有子任务尝试查询结果:

@app.task(bind=True) 
def foo(self): 
    task = bar.apply_async() 
    foo_poll.apply_async(args=[task.id]) 

@app.task(bind=True, max_retries=None): 
def foo_poll(self, celery_id) 
    task = AsyncResult(celery_id) 
    if not task.ready(): # RuntimeError: Cannot retrieve result with task_always_eager enabled 
     return self.retry(countdown=5) 
    else: 
     pass # Do something with the result 

@app.task 
def bar(): 
    time.sleep(10) 

我试图修补AsyncResult方法解决这个,但是这导致的问题是self.request.idNone

with patch.object(AsyncResult, "_get_task_meta", side_effect=lambda: {"status": SUCCESS, "result": None}) as method: 
    foo() 

@app.task(bind=True) 
def foo(self): 
    pass # self.request.id is now None, which I need to track sub-tasks 

有谁知道我该怎么做?或者如果芹菜甚至值得使用?我处于找到文档和任何与测试有关的问题的地步,因此非常复杂,我只是想将它们放在一起,然后回到多线程。

+0

芹菜是绝对值得使用的:P – dm03514

回答

1

我有同一个问题,用两种可能的方法上来:在直接测试

  1. 呼叫任务和包装所有内部芹菜 互动与if self.request.called_directly和直接,如果真或apply_async如果假运行任务 。
  2. task.ready()和其他状态检查与在那里我检查ALWAYS_EAGER和任务准备的功能。

最终我想出了两种规则混合,以尽可能避免嵌套任务。并且尽可能少的代码放在@app.task中,以便能够尽可能多地隔离任务函数。

它看起来相当令人沮丧和可怕的,但实际上它不是。你

还可以检查大家伙像Sentry如何做到这一点(扰流板:嘲笑和一些漂亮的助手)。

所以这绝对是可能的,它不是一个简单的方法来找到一些最佳做法。

+0

“self.request.called_directly”的问题是芹菜代码在整个项目中都被使用,而不仅仅是在任务本身中(例如:'AsyncResult'被查询为其中一个我的模型,它有一个方法'is_ready',用于检查是否所有的数据都被插入了,它使用'AsyncResult'确定是否所有的任务都插入了他们的数据)意思是我并不总是可以访问它 – Paradoxis

+0

是的,这就是为什么还有第二种方法可以在任务之外使用。它也有问题吗? – valignatev

0

我已经有一段时间没有使用过芹菜了,除非事情发生了变化,你应该可以直接调用你的方法来作为单元测试。

@app.task(bind=True, max_retries=None): 
def foo_poll(self, celery_id) 
    task = AsyncResult(celery_id) 
    if not task.ready(): # RuntimeError: Cannot retrieve result with task_always_eager enabled 
     return self.retry(countdown=5) 
    else: 
     pass # Do something with the result 

因为你可以在你的单元测试:

  • 补丁AsyncResult,触发正确的分支
  • 实例化类
  • 补丁retry方法,并断言这就是所谓的
  • 锻炼你的方法直接

这当然只是练习你的方法的逻辑,而不是芹菜。我通常会放一个或两个集成(协作)测试,指定ALWAYS_EAGER,以便它通过芹菜代码,即使芹菜将在没有队列的内存中执行。

+0

我试过直接调用我的测试并修补了一大堆东西,但是'app.task'修饰符太复杂了,我一直无法找到它的解决方案 – Paradoxis

+0

您可以发布一个最小工作示例你想要测试的芹菜代码,完整的类定义和完整的单元测试? – dm03514