我的问题是,当我尝试调用test()时,它工作得很好,但是当我调用test.delay()时,它返回“received unregistered task”。芹菜Redis返回“接收未注册的任务类型”
运行试验(+)和test.delay()(你可以看到,第一个作品。)从test.delay
结果()
Settings.py
BROKER_URL = 'redis://localhost:6379/0'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_IMPORTS=("tasks")
INSTALLED_APPS = (
"systech_account",
#...
)
tasks.py
from __future__ import absolute_import
from celery import shared_task
@shared_task
def test():
return "Just a Test"
celeryconfig.py
from __future__ import absolute_import
import os
from celery import Celery
from django.conf import settings
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'root.settings')
app = Celery()
# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
。
我使用
- 的Django 1.9
- 芹菜4.0
- Redis的服务器2.8.4
- 的Python 2.7
- 的Ubuntu 14.04
感谢您的回复。当你说“重启Celery”时,你的意思是重新运行* celery --app = root.celery:app worker --loglevel = INFO *?如果是这样,我已经做到了,但没有运气。至于Tasks.py,它只是一种类型。感谢您指出了这一点。 – aldesabido