2016-11-11 35 views
8

我遵循the Celery Django教程和我在示例中看到的任务(add, mul)完美地为我工作。当我做res = add.delay(1,2); res.get()时,我得到正确的答案。一些芹菜任务工作,其他人是NotRegistered

但是当我尝试执行另一个我的任务res = sayhello.delay('trex')时,我得到*** NotRegistered: u'pipeline.tasks.sayhello'

如果我做res = sayhello('trex')那么只要输入res即可得到结果。但是用这种方式,我可以在不使用芹菜的情况下执行函数。

任务工作只有当我运行它在Django的壳./manage shell

>>> res = sayhello.delay('trex') 
>>> res.get() 
u'Hello trex' 

所以,问题是,我不能从pipeline/views.py执行sayhello任务。但我可以从那里执行任务addmul

这是为什么?如何从views.py正确运行任务?

错误完整的消息:

[2016-11-11 10:56:09,870: ERROR/MainProcess] Received unregistered task of type u'pipeline.tasks.sayhello'. 
The message has been ignored and discarded. 

Did you remember to import the module containing this task? 
Or maybe you're using relative imports? 

Please see 
http://docs.celeryq.org/en/latest/internals/protocol.html 
for more information. 

The full contents of the message body was: 
'[["tiger"], {}, {"chord": null, "callbacks": null, "errbacks": null, "chain": null}]' (84b) 
Traceback (most recent call last): 
    File "/home/trex/Development/Sirius/new/rocket/rocket-venv/local/lib/python2.7/site-packages/celery/worker/consumer/consumer.py", line 549, in on_task_received 
    strategy = strategies[type_] 
KeyError: u'pipeline.tasks.sayhello' 

Django的版本

1.9.7 

芹菜版本:

celery==4.0.0 
django-celery==3.1.17 

Django项目目录树:

rocket 
├── etl 
│   ├── etl 
│   │   ├── celery.py 
│   │   ├── __init__.py 
│   │   ├── settings 
│   │   │   ├── base.py 
│   │   │   ├── dev.py 
│   │   │   ├── __init__.py 
│   │   │   ├── production.py 
│   │   │   └── test.py 
│   │   ├── urls.py 
│   │   ├── wsgi.py 
│   ├── manage.py 
│   ├── pipeline 
│   │   ├── __init__.py 
│   │   ├── models.py 
│   │   ├── tasks.py 
│   │   ├── tests.py 
│   │   ├── urls.py 
│   │   ├── views.py 

ETL /管道/ views.py

from .tasks import * 

def get_response(request): 
    result = add.delay(1, 2) 
    result.get() 
    result = sayhello.delay('tiger') 
    result.get() 

ETL /管道/ tasks.py

from __future__ import absolute_import, unicode_literals 
from celery import shared_task 

@shared_task 
def add(x, y): 
    return x + y 

@shared_task 
def mul(x, y): 
    return x * y 

@shared_task 
def sayhello(name): 
    return "Hello %s" % name 

而且我想这:

from celery.decorators import task 

@task(name="sayhello") 
def sayhello(name): 
    return "Hello {0}".format(name) 

ETL/celery.py

from __future__ import absolute_import, unicode_literals 
import os 
from celery import Celery 

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'etl.settings.base') 
app = Celery('etl') 
app.config_from_object('django.conf:settings', namespace='CELERY') 
app.autodiscover_tasks() 

@app.task(bind=True) 
def debug_task(self): 
    print('Request: {0!r}'.format(self.request)) 

ETL/__ init__py

from __future__ import absolute_import, unicode_literals 
from .celery import app as celery_app 
__all__ = ['celery_app'] 

ETL /设置/ base.py

... 
CELERY_BROKER_URL = 'redis://localhost:6379' 
CELERY_RESULT_BACKEND = 'redis://localhost:6379' 
CELERY_ACCEPT_CONTENT = ['application/json'] 
CELERY_TASK_SERIALIZER = 'json' 
CELERY_RESULT_SERIALIZER = 'json' 
CELERY_TIMEZONE = 'Europe/London' 
CELERY_IMPORTS = ('pipeline.tasks',) 

回答

4

Ť他错误是因为CELERY_IMPORTS设置无法正常工作到您的etl/settings/base.py文件中。 所以我的建议是:

CELERY_IMPORTS = ('pipeline.tasks' ,) 

删除逗号如果问题仍然存在,那么运行这个命令:

celery -A pipeline.tasks worker --loglevel=DEBUG 

一件事,你tasks.py文件的需求要在一个Django应用程序(这是在settings.py中注册)以导入。请检查这一点也。谢谢。

+0

是的,这是逗号。 – trex

+0

但是等一下,为什么它可以用于其他任务?不明白。 – trex

+0

最终没有逗号停止工作。任务'add'只能使用逗号。任务'sayhello'仍然不起作用。我不明白,它的工作,然后停止工作。 – trex