2012-04-17 53 views
37

如果我有一个函数定义如下:如何动态地添加/删除定期任务,芹菜(celerybeat)

def add(x,y): 
    return x+y 

有没有办法动态地添加该功能作为芹菜PeriodicTask并开始它在运行?我希望能够做到像(伪):

some_unique_task_id = celery.beat.schedule_task(add, run_every=crontab(minute="*/30")) 
celery.beat.start(some_unique_task_id) 

我也想停止或动态地删除该任务有类似的信息(伪):

celery.beat.remove_task(some_unique_task_id) 

celery.beat.stop(some_unique_task_id) 

仅供参考我不使用djcelery,它允许您通过django管理员管理定期任务。

回答

18

不,我很抱歉,这对于常规的celerybeat是不可能的。

但它很容易扩展,可以做你想做的事情,例如, django-celery 调度器只是读取和写入数据库 (顶部有一些优化)的时间表的子类。

此外,即使对于非Django项目,您也可以使用django-celery调度程序。

事情是这样的:

  • 安装Django Django的+ - 芹菜:

    $ PIP安装-U的Django Django的芹菜

  • 添加以下设置到您的celeryconfig:

    DATABASES = { 
        'default': { 
         'NAME': 'celerybeat.db', 
         'ENGINE': 'django.db.backends.sqlite3', 
        }, 
    } 
    INSTALLED_APPS = ('djcelery',) 
    
  • 创建数据库表:

    与数据库调度
    $ PYTHONPATH=. django-admin.py syncdb --settings=celeryconfig 
    
  • 开始celerybeat:

    $ PYTHONPATH=. django-admin.py celerybeat --settings=celeryconfig \ 
        -S djcelery.schedulers.DatabaseScheduler 
    

而且还有可以用于非的Django的djcelerymon命令项目 开始celerycam和Django管理Web服务器中同样的过程,你可以使用 也可以在一个漂亮的网页界面中编辑你的周期性任务:

$ djcelerymon 

(注意:由于某种原因djcelerymon不能使用Ctrl + C停止,你 必须用Ctrl + Z +杀%1)

+1

你能否提一下代码来添加任务并删除?对不起,我没有得到。 – 2013-10-28 16:26:02

+6

从2012年到2016年这个变化吗? – Tanay 2016-06-02 09:44:28

32

这个问题被回答了google groups

我不是作者,都归功于让马克

下面是这个妥善的解决办法。确定工作,在我的情况下, 我分类了周期性任务,并创建了一个模型,因为我可以将 添加到模型中,因为我需要其他字段,所以我可以添加 “终止”方法。您必须将定期任务的启用 属性设置为False并在删除之前将其保存。整个 子类是不是必须的,schedule_every方法是 真的做的工作。当你准备好终止你的任务时(如果你的 没有继承它),你可以使用 PeriodicTask.objects.filter(name = ...)来搜索你的任务,禁用 它,然后删除它。

希望这会有所帮助!

from djcelery.models import PeriodicTask, IntervalSchedule 
from datetime import datetime 

class TaskScheduler(models.Model): 

    periodic_task = models.ForeignKey(PeriodicTask) 

    @staticmethod 
    def schedule_every(task_name, period, every, args=None, kwargs=None): 
    """ schedules a task by name every "every" "period". So an example call would be: 
     TaskScheduler('mycustomtask', 'seconds', 30, [1,2,3]) 
     that would schedule your custom task to run every 30 seconds with the arguments 1,2 and 3 passed to the actual task. 
    """ 
     permissible_periods = ['days', 'hours', 'minutes', 'seconds'] 
     if period not in permissible_periods: 
      raise Exception('Invalid period specified') 
     # create the periodic task and the interval 
     ptask_name = "%s_%s" % (task_name, datetime.datetime.now()) # create some name for the period task 
     interval_schedules = IntervalSchedule.objects.filter(period=period, every=every) 
     if interval_schedules: # just check if interval schedules exist like that already and reuse em 
      interval_schedule = interval_schedules[0] 
     else: # create a brand new interval schedule 
      interval_schedule = IntervalSchedule() 
      interval_schedule.every = every # should check to make sure this is a positive int 
      interval_schedule.period = period 
      interval_schedule.save() 
     ptask = PeriodicTask(name=ptask_name, task=task_name, interval=interval_schedule) 
     if args: 
      ptask.args = args 
     if kwargs: 
      ptask.kwargs = kwargs 
     ptask.save() 
     return TaskScheduler.objects.create(periodic_task=ptask) 

    def stop(self): 
     """pauses the task""" 
     ptask = self.periodic_task 
     ptask.enabled = False 
     ptask.save() 

    def start(self): 
     """starts the task""" 
     ptask = self.periodic_task 
     ptask.enabled = True 
     ptask.save() 

    def terminate(self): 
     self.stop() 
     ptask = self.periodic_task 
     self.delete() 
     ptask.delete() 
+1

这应该是公认的答案。 – kai 2015-06-07 16:59:53

+1

@kai'IntervalSchedule','PeriodicTask'等是'djcelery'类,OP说他没有使用'djcelery'。尽管如此,这绝对有用。 – Chris 2016-03-28 14:14:07

2

你可以看看这个flask-djcelery其配置瓶中,djcelery并且还提供了浏览的REST API

2

有一个叫Django的芹菜拍它提供了一个模型库的需求。为了使其动态加载新的周期性任务,必须创建自己的调度程序。

from django_celery_beat.schedulers import DatabaseScheduler 


class AutoUpdateScheduler(DatabaseScheduler): 

    def tick(self, *args, **kwargs): 
     if self.schedule_changed(): 
      print('resetting heap') 
      self.sync() 
      self._heap = None 
      new_schedule = self.all_as_schedule() 

      if new_schedule: 
       to_add = new_schedule.keys() - self.schedule.keys() 
       to_remove = self.schedule.keys() - new_schedule.keys() 
       for key in to_add: 
        self.schedule[key] = new_schedule[key] 
       for key in to_remove: 
        del self.schedule[key] 

     super(AutoUpdateScheduler, self).tick(*args, **kwargs) 

    @property 
    def schedule(self): 
     if not self._initial_read and not self._schedule: 
      self._initial_read = True 
      self._schedule = self.all_as_schedule() 

     return self._schedule 
+0

谢谢。如果key不在self.schedule.keys()中''to_add = [key在new_schedule.keys()中的键,'to_remove'和'to_remove'类似的做了这个诀窍,但没有马上工作。为什么这不是一个标准选项?直到现在,我必须让Celery任务调用其他Celery任务并倒计时。这听起来不太好。 – freethebees 2017-07-04 16:30:27