2012-06-04 61 views
1

我使用Celery和Redis作为代理,我可以看到队列实际上是一个以序列化任务作为项目的重做列表。在芹菜如何获得队列中的任务位置?

我的问题是,如果我有一个AsyncResult对象作为调用<task>.delay()的结果,是否有方法来确定项目在队列中的位置?

UPDATE:

我终于能够得到使用位置:

from celery.task.control import inspect 
i = inspect() 
i.reserved() 

但它有点慢,因为它需要与所有的工作人员进行交流。

回答

9

您提到的inspect.reserved()/scheduled()可能工作,但不是 总是准确的,因为它只考虑工作人员预取的任务 。

Celery不允许队列上的带外操作,如从队列中删除消息 或重新排序它们,因为它不会在分布式系统中扩展。 消息可能尚未到达队列,这可能导致竞态条件下的 ,实际上它不是具有事务性的 操作的顺序队列,而是源自多个位置的消息流。即,Celery API基于严格的消息传递语义。

它可以直接访问一些券商 芹菜支持(如Redis的或数据库)的队列中,而不是公共API, 的一部分,你这样做劝阻,但当然,如果你没有计划在 支持规模的操作,你应该做任何最方便的,你 并放弃我的建议。

如果这只是为了给用户一些想法,他的工作将完成,然后 我相信你可以想出一个算法来预测任务将被执行, ,如果你只是有长度队列的时间和每个任务插入的时间。

第一只是一个redis.len("celery"),而后者,你可以 通过听取task_sent信号添加自己:

from celery.signals import task_sent 

@task_sent.connect 
def record_insertion_time(id, **kwargs): 
    redis.zadd("celery.insertion_times", id) 

使用排序在此处设置:http://redis.io/commands/zadd

对于纯消息传递解决方案您可以使用专用显示器 ,该显示器消耗Celery事件流并预测任务何时完成。 http://docs.celeryproject.org/en/latest/userguide/monitoring.html#event-reference

(只注意到了任务发送的文档中缺少 时间戳字段,但时间戳与事件发送,所以我会解决它)。

事件还包含一个“时钟”的字段,它是一个逻辑时钟 (见http://en.wikipedia.org/wiki/Lamport_timestamps) 这可以被用于检测在分布式 系统的事件的顺序,而不依赖于系统时间每台机器 上同步(这是不可能实现的)。