2015-07-06 33 views

回答

-2

你可以使用芹菜canvas objects

from celery import chain 
my_chain = chain(task1.s((arg1, arg2)), task2.s((arg3, arg4)) 
result = my_chain.apply_async() 
+0

任务在远程机器上运行,我是芹菜新手。我应该如何从远程机器创建任务对象? – mehdi

+0

就像所示 - 这应该在远程机器上运行任务 – scytale

0

我做了这样的: 诀窍是,你必须从你的任务名称创建新的签名。

app.send_task("{0}.{1}".format(app_name,task),args, link = [app.signature("{0}.{1}".format(app_name,task),next_args)]) 
+0

现在问题是链接任务将其结果返回到第一个任务,我无法阻止它。任何想法? – mehdi

+0

你应该不需要这样做 - 你应该可以调用'task.apply_async()'并让任务在远程机器上运行 – scytale

+1

@scytale我最初把我的头划伤了这个,但我认为这里的场景是一个在那里你有工人使用相同的经纪人,但有不同的代码库。所以这些任务并不存在于所有工人身上。因此,如果工作者A想要启动仅存在于工作者B上的任务X,则不能仅仅使用'task.apply_async()',因为不存在任务'对象。它必须按名称启动任务。 – Louis

1

最简单的方法是使用“签名”功能:

from celery import signature 
chain = signature(
    'app_name.task_1', 
    kwargs={..}, 
    queue='this_queue' 
) 
chain |= signature(
    'app_name2.task2', 
    kwargs={..}, 
    queue='another_queue' 
) 
chain.apply_async() 
1

我试图做同样的事情。我找不到任何内置的功能,它的名字纯粹是生成任务,但它不是很难添加这种工具:

from celery import Task as BaseTask 

class Task(BaseTask): 
    def __init__(self, name, *args, **kwargs): 
     super(BaseTask, self).__init__(*args, **kwargs) 
     self.name = name 

有了这个类,你就可以做这样的事情:

(
    Task('worker.hello').s('world') | 
    Task('messaging.email-results').s(email_address='[email protected]') 
)() 

,或者:

app.send_task(
    'worker.hello', ['world'], 
    chain=[ 
     Task('messaging.email-results').s(email_address='[email protected]') 
    ] 
) 

编辑:

无视上述,因为我已经意识到,这样做的正确方法是使用Signature类(如@Chrismit下面提到):

from celery import Signature 

(
    Signature('worker.hello', args=['world']) | 
    Signature('messaging.email-results', kwargs={'email_address':'[email protected]'}) 
)() 

,或者:

from celery import Signature 

app.send_task(
    'worker.hello', ['world'], 
    chain=[ 
     Signature('messaging.email-results', kwargs={'email_address': '[email protected]'}) 
    ] 
) 

重要提示:任何任务后在链中的第一个任务实际上并没有计划,直到工作进程面前的任务(这是有道理的,因为我们不知道以后任务的输入,直到前一个任务跑)。 后续任务安排在工作人员的代码库中。出于这个原因,你需要确保下面的一个为真:

  • 每个工人有task_routes的知识,以便它可以将后续任务在合适的队列中(例如,在我的例子,应该知道,开始messaging.*任务应在'messaging'队列中去)
  • 已编码的正确queue到每个Signature类当您创建链。芹菜已经有工具,以获得从任务名称的队列名可以上靠在:

    def get_queue_name(task_name): 
        return app.amqp.router.route({}, task_name)['queue'].name 
    
    (
        Signature('worker.hello', args=['world']) | 
        Signature(
         'messaging.email-results', 
         kwargs={'email_address':'[email protected]'}, 
         queue=get_queue_name('messaging.email-results') # If typing the task name twice annoys you, you could subclass Signature to do this automatically 
        ) 
    )() 
    

    (我认为这是最干净的解决方案,因为它允许工人不知道对方的存在)

  • 所有任务都在默认队列中执行。如果您没有在工作人员上声明task_routes,并且该任务的签名中没有指定queue,则Celery将在工作人员的default_queue中安排该任务。除非是定制的,那就是'celery'。我强烈建议这样做,因为它不是非常明确,并且不允许进行很多队列管理,但它是一个选项。