3
A
回答
-2
你可以使用芹菜canvas objects。
from celery import chain
my_chain = chain(task1.s((arg1, arg2)), task2.s((arg3, arg4))
result = my_chain.apply_async()
0
我做了这样的: 诀窍是,你必须从你的任务名称创建新的签名。
app.send_task("{0}.{1}".format(app_name,task),args, link = [app.signature("{0}.{1}".format(app_name,task),next_args)])
1
最简单的方法是使用“签名”功能:
from celery import signature
chain = signature(
'app_name.task_1',
kwargs={..},
queue='this_queue'
)
chain |= signature(
'app_name2.task2',
kwargs={..},
queue='another_queue'
)
chain.apply_async()
1
我试图做同样的事情。我找不到任何内置的功能,它的名字纯粹是生成任务,但它不是很难添加这种工具:
from celery import Task as BaseTask
class Task(BaseTask):
def __init__(self, name, *args, **kwargs):
super(BaseTask, self).__init__(*args, **kwargs)
self.name = name
有了这个类,你就可以做这样的事情:
(
Task('worker.hello').s('world') |
Task('messaging.email-results').s(email_address='[email protected]')
)()
,或者:
app.send_task(
'worker.hello', ['world'],
chain=[
Task('messaging.email-results').s(email_address='[email protected]')
]
)
编辑:
无视上述,因为我已经意识到,这样做的正确方法是使用Signature
类(如@Chrismit下面提到):
from celery import Signature
(
Signature('worker.hello', args=['world']) |
Signature('messaging.email-results', kwargs={'email_address':'[email protected]'})
)()
,或者:
from celery import Signature
app.send_task(
'worker.hello', ['world'],
chain=[
Signature('messaging.email-results', kwargs={'email_address': '[email protected]'})
]
)
重要提示:任何任务后在链中的第一个任务实际上并没有计划,直到工作进程面前的任务(这是有道理的,因为我们不知道以后任务的输入,直到前一个任务跑)。 后续任务安排在工作人员的代码库中。出于这个原因,你需要确保下面的一个为真:
- 每个工人有
task_routes
的知识,以便它可以将后续任务在合适的队列中(例如,在我的例子,应该知道,开始messaging.*
任务应在'messaging'
队列中去) 已编码的正确
queue
到每个Signature
类当您创建链。芹菜已经有工具,以获得从任务名称的队列名可以上靠在:def get_queue_name(task_name): return app.amqp.router.route({}, task_name)['queue'].name ( Signature('worker.hello', args=['world']) | Signature( 'messaging.email-results', kwargs={'email_address':'[email protected]'}, queue=get_queue_name('messaging.email-results') # If typing the task name twice annoys you, you could subclass Signature to do this automatically ) )()
(我认为这是最干净的解决方案,因为它允许工人不知道对方的存在)
- 所有任务都在默认队列中执行。如果您没有在工作人员上声明
task_routes
,并且该任务的签名中没有指定queue
,则Celery将在工作人员的default_queue
中安排该任务。除非是定制的,那就是'celery'
。我强烈建议这样做,因为它不是非常明确,并且不允许进行很多队列管理,但它是一个选项。
相关问题
- 1. 重试芹菜'send_task'发送的任务
- 2. 芹菜链任务
- 3. 芹菜 - 链接远程回调
- 4. 芹菜:多重参数链接任务
- 5. 顺序连续芹菜链接任务
- 6. 芹菜任务链取消?
- 7. 如何链芹菜任务
- 8. 重试芹菜任务通过task_id
- 9. django通过当地人()芹菜任务
- 10. 协调两个芹菜任务
- 11. 芹菜send_task和异常时重试
- 12. 芹菜任务
- 13. 芹菜任务
- 14. 芹菜:连接错误中止任务
- 15. 芹菜任务链和访问** kwargs
- 16. 推芹菜任务
- 17. 芹菜:怎么'|'操作员在链接多任务时工作?
- 18. 芹菜:链接的任务抛出连接错误
- 19. 任务中的芹菜任务
- 20. Python芹菜 - 如何在其他任务中调用芹菜任务
- 21. 链接两个任务
- 22. 单元测试芹菜任务直接
- 23. 芹菜未能接受任务
- 24. 芹菜跳过一小时的任务
- 25. 芹菜 - 如何从远程机器发送任务?
- 26. 芹菜不给错误的远程任务名称,为什么?
- 27. Python芹菜在单个节点上的任务链
- 28. 芹菜 - 链接组和子任务。 - >乱序执行
- 29. 在DB-Django中保存芹菜任务
- 30. 在芹菜任务中运行postgres VACUUM
任务在远程机器上运行,我是芹菜新手。我应该如何从远程机器创建任务对象? – mehdi
就像所示 - 这应该在远程机器上运行任务 – scytale