所以这就是我想要做的。我有一个计划任务,每X分钟运行一次。在任务中,我创建了一组我希望它们相互平行运行的任务。他们都完成后,我想记录,如果该组成功完成或没有。这是我的代码:得到芹菜组结果
@shared_task(base=HandlersImplTenantTask, acks_late=True)
def my_scheduled_task():
try:
needed_ids = MyModel.objects.filter(some_field=False)\
.filter(some_other_field=True)\
.values_list("id", flat=True) \
.order_by("id")
if needed_ids:
tasks = [my_single_task.s(needed_id=id) for id in needed_ids]
job = group(tasks)
result = job.apply_async()
returned_values = result.get()
if result.ready():
if result.successful():
logger.info("SUCCESSFULLY FINISHED ALL THE SUBTASKS")
else:
returned_values = result.get()
logger.info("UNSUCCESSFULLY FINISHED ALL THE SUBTASKS WITH THE RESULTS %s" % returned_values)
else:
logger.info("no needed ids found")
except:
logger.exception("got an unexpected exception while running task")
这是my_single_task代码:
@shared_task(base=HandlersImplTenantTask)
def my_single_task(needed_id):
logger.info("starting task for tenant: [%s]. got id [%s]", connection.tenant, needed_id)
return
这是怎么运行我的芹菜: manage.py芹菜工人-C 2 --broker = [我的RabbitMQ布洛克尔网址]
当我到达线result.get()
它挂起。我看到单个任务的单个日志条目与第一个ID,但我没有看到其他人。当我杀掉我的芹菜过程并重新启动时 - 它重新执行计划的任务,并且我看到了第二个日志条目(从第一次运行任务开始)。有想法该怎么解决这个吗?
编辑 - 所以尝试和克服这一点 - 我创建了一个不同的队列,称为'new_queue'。我开了一个不同的芹菜工人去听新队列。我想让其他工作人员完成任务并对其进行处理。我认为这可以解决僵局问题。 我已经改变了我的代码看起来像这样:
job = group(tasks)
job_result = job.apply_async(queue='new_queue')
results = job_result.get()
,但我仍然获得了僵局,如果我删除了results = job_result.get()
线,我可以看到,任务由主工人的工作并没有被发布到new_queue队列。有什么想法吗? 这是我的芹菜配置: tenant_celery_app.conf.update(CELERY_RESULT_BACKEND='djcelery.backends.database.DatabaseBackend' CELERY_RESULT_DB_TABLENAMES = { 'task': 'tenantapp_taskmeta', 'group': 'tenantapp_groupmeta', }
这是我的运行工:
芹菜工人-c 1 -Q NEW_QUEUE --broker = [amqp_brocker_url]/[虚拟主机]
芹菜工人-c 1 --broker = [amqp_brocker_url]/[虚拟主机]
感谢您的答复。我明白我做错了什么 - 谢谢。在你的代码示例中,你有'result = job.apply_async()'。如果任务将异步运行,那么结果变量的目的是什么?在任务运行或完成之前,它可以给我什么信息? –
请查看编辑 –