2016-02-22 70 views
0

所以这就是我想要做的。我有一个计划任务,每X分钟运行一次。在任务中,我创建了一组我希望它们相互平行运行的任务。他们都完成后,我想记录,如果该组成功完成或没有。这是我的代码:得到芹菜组结果

@shared_task(base=HandlersImplTenantTask, acks_late=True) 
def my_scheduled_task(): 
    try: 
     needed_ids = MyModel.objects.filter(some_field=False)\ 
           .filter(some_other_field=True)\ 
           .values_list("id", flat=True) \ 
           .order_by("id") 
     if needed_ids: 
      tasks = [my_single_task.s(needed_id=id) for id in needed_ids] 
      job = group(tasks) 
      result = job.apply_async() 
      returned_values = result.get() 
      if result.ready(): 
       if result.successful(): 
        logger.info("SUCCESSFULLY FINISHED ALL THE SUBTASKS") 
       else: 
        returned_values = result.get() 
        logger.info("UNSUCCESSFULLY FINISHED ALL THE SUBTASKS WITH THE RESULTS %s" % returned_values) 
     else: 
      logger.info("no needed ids found") 
    except: 
     logger.exception("got an unexpected exception while running task") 

这是my_single_task代码:

@shared_task(base=HandlersImplTenantTask) 
def my_single_task(needed_id): 

    logger.info("starting task for tenant: [%s]. got id [%s]", connection.tenant, needed_id) 
    return 

这是怎么运行我的芹菜: manage.py芹菜工人-C 2 --broker = [我的RabbitMQ布洛克尔网址]

当我到达线result.get()它挂起。我看到单个任务的单个日志条目与第一个ID,但我没有看到其他人。当我杀掉我的芹菜过程并重新启动时 - 它重新执行计划的任务,并且我看到了第二个日志条目(从第一次运行任务开始)。有想法该怎么解决这个吗?

编辑 - 所以尝试和克服这一点 - 我创建了一个不同的队列,称为'new_queue'。我开了一个不同的芹菜工人去听新队列。我想让其他工作人员完成任务并对其进行处理。我认为这可以解决僵局问题。 我已经改变了我的代码看起来像这样:

job = group(tasks) 
job_result = job.apply_async(queue='new_queue') 
results = job_result.get() 

,但我仍然获得了僵局,如果我删除了results = job_result.get()线,我可以看到,任务由主工人的工作并没有被发布到new_queue队列。有什么想法吗? 这是我的芹菜配置: tenant_celery_app.conf.update(CELERY_RESULT_BACKEND='djcelery.backends.database.DatabaseBackend' CELERY_RESULT_DB_TABLENAMES = { 'task': 'tenantapp_taskmeta', 'group': 'tenantapp_groupmeta', }
这是我的运行工:
芹菜工人-c 1 -Q NEW_QUEUE --broker = [amqp_brocker_url]/[虚拟主机]
芹菜工人-c 1 --broker = [amqp_brocker_url]/[虚拟主机]

回答

0

您似乎在锁定您的队列。想想看。如果您有任务等待其他任务,并且队列填满,那么第一个任务将永远挂起。

你需要重构代码,以避免调用result.get()任务中(你可能已经对这个在日志中警告)

我会推荐这:

@shared_task(base=HandlersImplTenantTask, acks_late=True) 
def my_scheduled_task(): 

    needed_ids = MyModel.objects.filter(some_field=False)\ 
          .filter(some_other_field=True)\ 
          .values_list("id", flat=True) \ 
          .order_by("id") 
    if needed_ids: 
     tasks = [my_single_task.s(needed_id=id) for id in needed_ids] 
     job = group(tasks) 
     result = job.apply_async() 

这就是你需要的。

使用日志记录来跟踪任务是否失败。

如果您的应用程序中的其他代码需要跟踪作业是否失败,那么您可以使用celery's inspect api。

+0

感谢您的答复。我明白我做错了什么 - 谢谢。在你的代码示例中,你有'result = job.apply_async()'。如果任务将异步运行,那么结果变量的目的是什么?在任务运行或完成之前,它可以给我什么信息? –

+0

请查看编辑 –

0

所以我寻找的解决方案的确是创建一个新的队列,并开始一个新的工作,处理新的队列。我遇到的唯一问题是将组任务发送到新队列。这是为我工作的代码。

tasks = [my_single_task.s(needed_id=id).set(queue='new_queue') for id in needed_ids] 
job = group(tasks) 
job_result = job.apply_async() 
results = job_result.get() # this will block until the tasks finish but it wont deadlock 

这些都是我的芹菜工人

celery worker -c 1 -Q new_queue --broker=[amqp_brocker_url]/[vhost] 
celery worker -c 1 --broker=[amqp_brocker_url]/[vhost] 
+0

在等待组的结果的子任务在不同的队列上执行时,您是否遇到过任何问题?就我而言,我可以观察到子任务成功执行,但由于超时而导致组失败(我设置了超时)。我看到你在这里有相同的用例,但我很好奇我错过了什么。 –