如何使用每小时查询数据库的Celerybeat和Flask设置定期任务?安装芹菜周期性任务
的环境是这样的:
/
|-app
|-__init__.py
|-jobs
|-task.py
|-celery-beat.sh
|-celery-worker.sh
|-manage.py
我现在有一个名为run_query()
查询功能位于task.py
我想要调度程序踢一旦应用程序启动,所以我有下面几行我/app/__init__.py
文件夹:
celery = Celery()
@celery.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
sender.add_periodic_task(1, app.jobs.task.run_query())
(为简单起见,我已经设定,使如果它运行,它会每分钟运行一次。没有这样的运气呢。)
当我启动celery-worker.sh
它确认我的功能在[tasks]
标题下。但计划的函数从不运行。我可以手动迫使功能通过发出运行在命令提示以下:
>> from app.jobs import task
>> task.run_query.delay()
EDIT:添加celerybeat.sh
作为后续:如果数据库是通过烧瓶上下文中访问,在我的异步函数调用期间,创建一个新的上下文来访问数据库是明智的吗?使用现有的烧瓶环境?或者完全忘记上下文,只是启动到数据库的连接?我担心的是,如果我只是启动一个新的连接,它可能会干扰现有的上下文的连接?
您还需要运行调度服务。 http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html#starting-the-scheduler。 **芹菜击败**是一个调度程序;它定期开始执行任务,然后由集群中可用的工作节点执行。 –