2014-02-17 128 views
5

我正在调用Django-Celery中任务的任务Python芹菜 - 如何在其他任务中调用芹菜任务

这是我的任务。

@shared_task 
def post_notification(data,url): 
    url = "http://posttestserver.com/data/?dir=praful" # when in production, remove this line. 
    headers = {'content-type': 'application/json'} 
    requests.post(url, data=json.dumps(data), headers=headers) 


@shared_task 
def shipment_server(data,notification_type): 
    notification_obj = Notification.objects.get(name = notification_type) 
    server_list = ServerNotificationMapping.objects.filter(notification_name=notification_obj) 

    for server in server_list: 
     task = post_notification.delay(data,server.server_id.url) 
     print task.status # it prints 'Nonetype' has no attribute id 

如何在任务中调用任务? 我在某处读取它可以使用group,但我无法形成正确的语法。我该怎么做?

我想这

for server in server_list: 
    task = group(post_notification.s(data, server.server_id.url))().get() 
    print task.status 

抛出警告说

TxIsolationWarning: Polling results w│                   
ith transaction isolation level repeatable-read within the same transacti│                   
on may give outdated results. Be sure to commit the transaction for each │                   
poll iteration.               │                   
    'Polling results with transaction isolation level ' 

不知道它是什么!

我该如何解决我的问题?

+0

'结果= task.delay' /'task.apply_async'给出了'AsyncResult'对象。这支持轮询'.status'属性,每次访问它将检查任务的状态。在发送任务后立即调用.state是没有意义的,因为工作人员没有开始执行它。在你以后的例子中,你调用'task = ..... get()。status',这将不起作用,因为你在任务的返回值上调用状态,而不是结果(result.status vs result.get() 。状态)。 – asksol

+0

最后,您不应该等待子任务的结果,因为这可能会导致死锁,您应该使用回调任务:'(post_notification.s()| do_sometihing_after_posted.s())。delay()'。请参阅http://docs.celeryproject.org/en/latest/userguide/tasks.html#avoid-launching-synchronous-subtasks和http://docs.celeryproject.org/en/latest/userguide/canvas.html – asksol

回答

5

这应该工作:

celery.current_app.send_task('mymodel.tasks.mytask', args=[arg1, arg2, arg3]) 
+0

什么是my_model和current_app? – PythonEnthusiast

+0

'current_app'是芹菜模块的属性。 'mymodel.tasks'是你的'tasks.py'的路径。如有必要,更改它。 –

+0

所以,我应该这样做'task = celery.current_app.send_task('mymodel.tasks.mytask',args = [arg1,arg2,arg3])' – PythonEnthusiast

0

您可以使用延时功能

from app.tasks import celery_add_task 
    celery_add_task.apply_async(args=[task_name]) 

调用从任务的任务......它会工作

+0

I已经试过 – PythonEnthusiast

+0

@ user1162512尝试使用这个工作4 me –

2

你是对的,因为在每一个任务你for循环将被覆盖task变量。

你可以尝试celery.group

from celery import group 

@shared_task 
def shipment_server(data,notification_type): 
    notification_obj = Notification.objects.get(name = notification_type) 
    server_list = ServerNotificationMapping.objects.filter(notification_name=notification_obj) 


    tasks = [post_notification.s(data, server.server_id.url) for server in server_list] 
    results = group(tasks)() 
    print results.get() # results.status() what ever you want