2014-10-09 211 views
0

语境:芹菜链任务

tasks.py

def chunkify(lst,n): 
    return [ lst[i::n] for i in xrange(n) ] 

@task 
def swarm_restart(procs): 
    chunks = chunkify(procs, 4) 
    res = chain(
     group_restart.s(([ (proc.name, proc.host.name) for proc in chunks[0] ])), 
     group_restart.s(([ (proc.name, proc.host.name) for proc in chunks[1] ])), 
     group_restart.s(([ (proc.name, proc.host.name) for proc in chunks[2] ])), 
     group_restart.s(([ (proc.name, proc.host.name) for proc in chunks[3] ])), 
    )() 

@ task 
def group_restart(procs): 
    # this task runs only once, seems to be called just 1 time 
    res = group(proc_restart.s(proc[0], proc[1]) for proc in procs).apply_async() 

@ task 
def proc_restart(proc_name, hostname): 
    # This method works, tested several times 
    proc = Host.objects.get(name=hostname).get_proc(proc_name) 
    proc.restart() 

views.py

def call(): 
    procs = get_procs() 
    tasks.swarm_restart.delay(procs) 

我得到的错误: TypeError: group_restart() takes exactly 1 argument (2 given)

我做错了什么,任何灯光?

顺便说一句。芹菜== 3.0.25,django-celery == 3.0.23

回答

1

如果你看看你的swarm_restart任务,你正在链接group_restart任务。在这里,链中的第一个任务将正常执行,但第二个任务将抛出错误。

TypeError: group_restart() takes exactly 1 argument (2 given)

因为第一个任务的结果作为参数传递给它。链中的下一个任务也会发生同样的情况。

例如,

from celery import task, chain 

@app.task 
def t1(): 
    return 't1' 

@app.task 
def t2(): 
    return 't2' 

wrong_chain = chain(t1.s(), t2.s()) 

如果执行wrong_chain它得来,即使你没有传递任何参数来t2

所以类似的错误,你有你的工作流程取决于你有什么改变去做。

+1

你可以用'task.si()'替换你的'task.s()',这使得签名不可变,一切工作正常,与当前的工作流本身。 – ChillarAnand 2014-10-11 14:04:03