2015-01-26 183 views
6

是否有一个标准的方法去除Celery任务?去抖芹菜任务?

例如,这样一个任务可以是“开始”多次,但仅在一些延迟后运行一次:

def debounce_task(task): 
    if task_is_queued(task): 
     return 
    task.apply_async(countdown=30) 
+0

您可以使用缓存。许多键值存储都有时间记录,尝试从存储中获取结果,如果没有结果,则在返回之前运行任务并将结果存储过期时间。只使用一个工人,以便顺序执行任务。避免锁定方案,除非你想处理陈旧的锁。 – 2015-01-26 18:53:29

+0

哦,绝对。但我宁愿避免执行自我反省(检查参数,跟踪结果等)的烦人之处,我想知道是否有任何标准的方式来做到这一点。 – 2015-01-26 18:55:40

+0

在Python中编写缓存修饰器(可能是4行)很简单,我希望我有时间发布一个完整的答案。 – 2015-01-26 18:59:42

回答

4

下面是我们如何使用Redis的柜台做。所有这些可能都可以在装饰器中推广,但我们只将它用于特定任务(webhooks)

您的公共任务是您从其他功能调用的任务。它需要在Redis中增加一个密钥。关键是你的函数的参数,不管他们可能是形成(这确保了计数器是众多个人的任务是唯一的)

@task 
def your_public_task(*args, **kwargs): 
    cache_key = make_public_task_cache_key(*args, **kwargs) 
    get_redis().incr(cache_key) 
    _your_task(*args, **kwargs, countdown=settings.QUEUE_DELAY) 

注缓存键功能共享(你想在每一个功能相同的缓存键)和countdown设置。

然后,执行代码的实际任务执行以下操作:

@task 
def _your_task(*args, **kwargs): 
    cache_key = make_public_task_cache_key(*args, **kwargs) 
    counter = get_redis().getset(cache_key, 0) 
    # redis makes the zero a string. 
    if counter == '0': 
     return 

    ... execute your actual task code. 

这可以让你打your_public_task.delay(..)多次,只要你想,你QUEUE_DELAY内,它会只火了一次。

1

下面是你如何使用Mongo来做到这一点。

注:我必须让设计更宽容一点,因为芹菜任务不能保证执行etacountdown用完的确切时刻。

此外,Mongo过期索引只是每分钟左右清理一次;因此,在eta启动的时刻,您无法围绕被删除的记录进行设计。

总之,流动是这样的:

  1. 客户端代码调用my_task
  2. preflight递增呼叫计数器,并且如flight_id
  3. _my_task设置TTL秒之后执行返回。
  4. _my_task运行时,它会检查它是否仍然是最新的flight_id。如果不是,它会中止。
  5. ...晚些时候...蒙戈清理收集陈旧的条目,via an expiring index.

@celery.task(track_started=False, ignore_result=True) 
def my_task(my_arg): 
    flight_id = preflight(inflight_collection, 'my_task', HASH(my_arg), TTL) 
    _my_task.apply_async((my_arg,), {'flight_id':flight_id}, countdown=TTL) 

@celery.task(track_started=False, ignore_result=True) 
def _my_task(my_arg, flight_id=None): 
    if not check_for_takeoff(inflight_collection, 'my_task', HASH(my_arg), flight_id): 
     return 
    # ... actual work ... # 

库代码:

TTL = 5 * 60  # Run tasks after 5 minutes 
EXPIRY = 6 * TTL # This needs to be much larger than TTL. 

# We need to store a list of task-executions currently pending 
inflight_collection = db['celery_In_Flight'] 
inflight_collection.create_index([('fn', pymongo.ASCENDING,), 
            ('key', pymongo.ASCENDING,)]) 
inflight_collection.create_index('eta', expiresAfterSeconds=EXPIRY) 


def preflight(collection, fn, key, ttl): 
    eta = datetime.datetime.now() + datetime.timedelta(seconds=ttl) 
    result = collection.find_one_and_update({ 
     'fn': fn, 
     'key': key, 
    }, { 
     '$set': { 
      'eta': eta 
     }, 
     '$inc': { 
      'flightId': 1 
     } 
    }, upsert=True, return_document=pymongo.ReturnDocument.AFTER) 
    print 'Preflight[{}][{}] = {}'.format(fn, key, result['flightId']) 
    return result['flightId'] 


def check_for_takeoff(collection, fn, key, flight_id): 
    result = collection.find_one({ 
     'fn': fn, 
     'key': key 
    }) 
    ready = result is None or result['flightId'] == flight_id 
    print 'Check[{}][{}] = {}, {}'.format(fn, key, result['flightId'], ready) 
    return ready 
0

鲍尔泰克有想法,使用redis计数器是原子的(如果你的代理是redis,应该很容易获得)。虽然他的解决方案是打击,而不是反弹。虽然差异很小(getset vs decr)。

排队的任务:

conn = get_redis() 
conn.incr(key) 
task.apply_async(args=args, kwargs=kwargs, countdown=countdown) 

然后在任务:

conn = get_redis() 
counter = conn.decr(key) 
if counter > 0: 
    # task is still queued 
    return 
# continue on to rest of task 

很难使它成为一个装饰,因为你需要装饰的任务和调用任务本身。所以你需要一个装饰器在芹菜@task装饰器之前和之后。

现在我只是提出了一些帮助我调用任务的函数,以及一个在任务开始时进行检查的函数。

+0

http://stackoverflow.com/a/43625455/4391298是我终于得到的解决方案,包括一些关键失效处理它融化(不是原来的解决方案中的问题)。 – 2017-04-26 05:06:42

0

这是我想出了一个解决方案:https://gist.github.com/wolever/3cf2305613052f3810a271e09d42e35c

在这里复制,为后人:

import time 

import redis 


def get_redis_connection(): 
    return redis.connect() 

class TaskDebouncer(object): 
    """ A simple Celery task debouncer. 

     Usage:: 

      def debounce_process_corpus(corpus): 
       # Only one task with ``key`` will be allowed to execute at a 
       # time. For example, if the task was resizing an image, the key 
       # might be the image's URL. 
       key = "process_corpus:%s" %(corpus.id,) 
       TaskDebouncer.delay(
        key, my_taks, args=[corpus.id], countdown=0, 
       ) 

      @task(bind=True) 
      def process_corpus(self, corpus_id, debounce_key=None): 
       debounce = TaskDebouncer(debounce_key, keepalive=30) 

       corpus = Corpus.load(corpus_id) 

       try: 
        for item in corpus: 
         item.process() 

         # If ``debounce.keepalive()`` isn't called every 
         # ``keepalive`` interval (the ``keepalive=30`` in the 
         # call to ``TaskDebouncer(...)``) the task will be 
         # considered dead and another one will be allowed to 
         # start. 
         debounce.keepalive() 

       finally: 
        # ``finalize()`` will mark the task as complete and allow 
        # subsequent tasks to execute. If it returns true, there 
        # was another attempt to start a task with the same key 
        # while this task was running. Depending on your business 
        # logic, this might indicate that the task should be 
        # retried. 
        needs_retry = debounce.finalize() 

       if needs_retry: 
        raise self.retry(max_retries=None) 

    """ 

    def __init__(self, key, keepalive=60): 
     if key: 
      self.key = key.partition("!")[0] 
      self.run_key = key 
     else: 
      self.key = None 
      self.run_key = None 
     self._keepalive = keepalive 
     self.cxn = get_redis_connection() 
     self.init() 
     self.keepalive() 

    @classmethod 
    def delay(cls, key, task, args=None, kwargs=None, countdown=30): 
     cxn = get_redis_connection() 
     now = int(time.time()) 
     first = cxn.set(key, now, nx=True, ex=countdown + 10) 
     if not first: 
      now = cxn.get(key) 

     run_key = "%s!%s" %(key, now) 
     if first: 
      kwargs = dict(kwargs or {}) 
      kwargs["debounce_key"] = run_key 
      task.apply_async(args=args, kwargs=kwargs, countdown=countdown) 

     return (first, run_key) 

    def init(self): 
     self.initial = self.key and self.cxn.get(self.key) 

    def keepalive(self, expire=None): 
     if self.key is None: 
      return 
     expire = expire if expire is not None else self._keepalive 
     self.cxn.expire(self.key, expire) 

    def is_out_of_date(self): 
     if self.key is None: 
      return False 
     return self.cxn.get(self.key) != self.initial 

    def finalize(self): 
     if self.key is None: 
      return False 
     with self.cxn.pipeline() as pipe: 
      while True: 
       try: 
        pipe.watch(self.key) 
        if pipe.get(self.key) != self.initial: 
         return True 
        pipe.multi() 
        pipe.delete(self.key) 
        pipe.execute() 
        break 
       except redis.WatchError: 
        continue 
     return False 
0

这里有一个更填写的解决方案基于关闭https://stackoverflow.com/a/28157498/4391298而是变成了装饰和把手伸进海带连接池以重用您的Redis计数器。

import logging 
from functools import wraps 

# Not strictly required 
from django.core.exceptions import ImproperlyConfigured 
from django.core.cache.utils import make_template_fragment_key 

from celery.utils import gen_task_name 


LOGGER = logging.getLogger(__name__) 


def debounced_task(**options): 
    """Debounced task decorator.""" 

    try: 
     countdown = options.pop('countdown') 
    except KeyError: 
     raise ImproperlyConfigured("Debounced tasks require a countdown") 

    def factory(func): 
     """Decorator factory.""" 
     try: 
      name = options.pop('name') 
     except KeyError: 
      name = gen_task_name(app, func.__name__, func.__module__) 

     @wraps(func) 
     def inner(*args, **kwargs): 
      """Decorated function.""" 

      key = make_template_fragment_key(name, [args, kwargs]) 
      with app.pool.acquire_channel(block=True) as (_, channel): 
       depth = channel.client.decr(key) 

       if depth <= 0: 
        try: 
         func(*args, **kwargs) 
        except: 
         # The task failed (or is going to retry), set the 
         # count back to where it was 
         channel.client.set(key, depth) 
         raise 
       else: 
        LOGGER.debug("%s calls pending to %s", 
           depth, name) 

     task = app._task_from_fun(inner, **options, name=name + '__debounced') 

     @wraps(func) 
     def debouncer(*args, **kwargs): 
      """ 
      Debouncer that calls the real task. 
      This is the task we are scheduling.""" 

      key = make_template_fragment_key(name, [args, kwargs]) 
      with app.pool.acquire_channel(block=True) as (_, channel): 
       # Mark this key to expire after the countdown, in case our 
       # task never runs or runs too many times, we want to clean 
       # up our Redis to eventually resolve the issue. 
       channel.client.expire(key, countdown + 10) 
       depth = channel.client.incr(key) 

      LOGGER.debug("Requesting %s in %i seconds (depth=%s)", 
         name, countdown, depth) 
      task.si(*args, **kwargs).apply_async(countdown=countdown) 

     return app._task_from_fun(debouncer, **options, name=name) 

    return factory