2016-02-29 41 views
0

我想在Celery框架之上构建一个应用程序。芹菜任务马上自动发现

我有一个模块settings/celery_settings.py与初始化像这样的芹菜应用程序的代码(我展开了一些变量):

from __future__ import absolute_import 
from celery import Celery 

pfiles = ['other_tasks.test123', 'balance_log.balance_log'] 
app = Celery('myapp') 
# here I just have some parameters defined like broker, result backend, etc 
# app.config_from_object(settings) 

# TRYING to discover tasks 
app.autodiscover_tasks(pfiles) 

文件other_tasks/test123.pybalance_log/balance_log.py包含任务定义这样的:

# file other_tasks/test123.py 
from celery import shared_task, Task 

@shared_task() 
def mytask(): 
    print("Test 1234!") 

class TestTask01(Task): 

    def run(self, client_id=None): 
     logger.debug("TestTask01: run") 
     return client_id 

我运行芹菜工:

python3 /usr/local/bin/celery -A settings.celery_settings worker 

而这种方式可以发现任务。我可以称这些任务。

但后来我尝试使用IPython的:

In [1]: from settings.celery_settings import app 

In [2]: app.tasks 
Out[2]: 
{'celery.backend_cleanup': <@task: celery.backend_cleanup of XExchange:0x7f9f50267ac8>, 
'celery.chain': <@task: celery.chain of XExchange:0x7f9f50267ac8>, 
'celery.chord': <@task: celery.chord of XExchange:0x7f9f50267ac8>, 
'celery.chord_unlock': <@task: celery.chord_unlock of XExchange:0x7f9f50267ac8>, 
'celery.chunks': <@task: celery.chunks of XExchange:0x7f9f50267ac8>, 
'celery.group': <@task: celery.group of XExchange:0x7f9f50267ac8>, 
'celery.map': <@task: celery.map of XExchange:0x7f9f50267ac8>, 
'celery.starmap': <@task: celery.starmap of XExchange:0x7f9f50267ac8>} 

而且很显然它发现任务。

看来,当我明确地调用任务时,我首先导入它们并指定芹菜的确切路径。这就是它工作的原因。

问:我如何让他们发现有已知任务的列表?

回答

4

最后我想通了,那里是autodiscover_tasks功能的附加参数:

def autodiscover_tasks(self, packages, related_name='tasks', force=False): 
    ... 

因此,设置force=True后它变成工作!

app.autodiscover_tasks(pfiles, force=True) 
1

这是我的示例配置:

的conf/celeryconfig

from conf import settings 

CELERYD_CHDIR='/usr/local/src/imbue/application/imbue' 
CELERY_IGNORE_RESULT = False 
CELERY_RESULT_BACKEND = "amqp" 
CELERY_TASK_RESULT_EXPIRES = 360000 
CELERY_RESULT_PERSISTENT = True 
BROKER_URL='amqp://<USERNAME>:<PASSWORD>@rabbitmq:5672' 
CELERY_ENABLE_UTC=True 
CELERY_TIMEZONE= "US/Eastern" 
CELERY_IMPORTS=("hypervisor.esxi.vm_operations", 
       "tools.deploy_tools",) 

管理程序/ ESXi的/ vm_operations.py

@task(bind=True, default_retry_delay=300, max_retries=5) 
def cancel_job(self, host_id=None, vm_id=None, job=None, get_job=False, **kwargs): 
    pass 

call_task.py

def call_task(): 
    log.info('api() | Sending task: ' + job_instance.reference)  
    celery = Celery() 
    celery.config_from_object('conf.celeryconfig') 
    celery.send_task("hypervisor.esxi.vm_operations.cancel_job", 
        kwargs={'job': job_instance, 
          'get_job': True}, 
        task_id=job_instance.reference) 

我用芹菜与导师,我从conf目录下启动:

source ~/.profile 
CELERY_LOGFILE=/usr/local/src/imbue/application/imbue/log/celeryd.log 
CELERYD_OPTS=" --loglevel=INFO --autoscale=10,5" 
cd /usr/local/src/imbue/application/imbue/conf 
exec celery worker -n [email protected]%h -f $CELERY_LOGFILE $CELERYD_OPTS 
+0

那么如何获得已知发现任务的列表? – baldr

+0

'from celery.task.control import inspect' 'i = inspect()。registered_tasks()' – felix