2012-05-23 209 views
3

我在我的应用程序中将芹菜与Dropbox结合在一起,并且我可以让用户在连接了Dropbox的情况下保存自己的照片。任务中的芹菜任务

我写了一段代码,但我担心这可能会导致无限循环,从而导致系统死机。

我使用的API只允许在一次提供60张照片,然后为您提供分页。

这里是我的tasks.py文件的副本 - 这实际上工作正常,但我想检查我做的是正确的事情,而不是太多影响系统。

class DropboxUsers(PeriodicTask): 
    run_every = timedelta(hours=4) 

    def run(self, **kwargs): 
     logger = self.get_logger(**kwargs) 
     logger.info("Collecting Dropbox users") 

     dropbox_users = UserSocialAuth.objects.filter(provider='dropbox') 
     for db in dropbox_users: 
      ... 
      ... 
      ... 
      sync_images.delay(first, second, third_argument) 
     return True 


@task(ignore_result=True) 
def sync_images(token, secret, username): 
    """docstring for sync_images""" 
    logger = sync_images.get_logger() 
    logger.info("Syncing images for %s" % username) 
    ... 
    ... 
    ... 
    ... 
    feed = api.user_recent_media(user_id='self', count=60) 
    images = feed[0] 
    pagination = feed[1] 
    for obj in images: 
     ### STORE TO DROPBOX 
     ... 
     ... 
     ... 
     response = dropbox.put_file(f, my_picture, overwrite=True) 
    ### CLOSE DB SESSION 
    sess.unlink() 
    if pagination: 
     store_images.delay(first, second, third, fourth_argument) 

@task(ignore_result=True) 
def store_images(token, secret, username, max_id): 
    """docstring for sync_images""" 
    logger = store_images.get_logger() 
    logger.info("Storing images for %s" % username) 
    ... 
    ... 
    ... 
    ... 
    feed = api.user_recent_media(user_id='self', count=60, max_id=max_id) 
    images = feed[0] 
    try: 
     pagination = feed[1] 
    except: 
     pagination = None 
    for obj in images: 
     ### STORE TO DROPBOX 
     ... 
     ... 
     ... 
     response = dropbox.put_file(f, my_picture, overwrite=True) 
    ### CLOSE DB SESSION 
    sess.unlink() 
    if pagination: 
     ### BASICALLY RESTART THE TASK WITH NEW ARGS 
     store_images.delay(first, second, third, fourth_argument) 

    return True 

您的专业知识非常感谢。

回答

1

我没有看到任何重大问题。我还实施了一项任务启动另一项任务的系统。

有一段时间,我在服务器重启时遇到了芹菜重复任务的问题。我编写了一个装饰器,用于封装使用高速缓存后端的任务,以确保具有相同参数的相同任务不会经常运行。可能会有用作对冲你的无限循环。

from django.core.cache import cache as _djcache 
from django.utils.functional import wraps 

class cache_task(object): 

    """ Makes sure that a task is only run once over the course of a configurable 
    number of seconds. Useful for tasks that get queued multiple times by accident, 
    or on service restart, etc. Uses django's cache (memcache) to keep track.""" 

    def __init__(self, seconds=120, minutes=0, hours=0): 
     self.cache_timeout_seconds = seconds + 60 * minutes + 60 * 60 * hours 

    def __call__(self, task): 
     task.unsynchronized_run = task.run 
     @wraps(task.unsynchronized_run) 
     def wrapper(*args, **kwargs): 
      key = sha1(str(task.__module__) + str(task.__name__) + str(args) + str(kwargs)).hexdigest() 
      is_cached = _djcache.get(key) 
      if not is_cached: 
       # store the cache BEFORE to cut down on race conditions caused by long tasks 
       if self.cache_timeout_seconds: 
        _djcache.set(key, True, self.cache_timeout_seconds) 
       task.unsynchronized_run(*args, **kwargs) 
     task.run = wrapper 
     return task 

用法:

@cache_task(hours=2) 
@task(ignore_result=True) 
def store_images(token, secret, username, max_id): 
    ...