2014-01-11 40 views
1

我有以下代码通过访问URL来启动芹菜链。链参数通过查询参数传递,如:/process_pipeline/?pipeline=task_a|task_c|task_b。为了避免启动几个类似的链接任务(如有人刷新页面),我使用一个简单的缓存锁定系统。芹菜链完成时执行一些代码

我在缓存上有一个超时,但是我在这里丢失的是一个在链完成时释放缓存的方法。

有什么想法?

tasks.py

from __future__ import absolute_import 
from celery import shared_task 


registry = {} 

def register(fn): 
    registry[fn.__name__] = fn 

@shared_task 
def task_a(*args, **kwargs): 
    print('task a') 

@shared_task 
def task_b(*args, **kwargs): 
    print('task b') 

@shared_task 
def task_c(*args, **kwargs): 
    print('task c') 

register(task_a) 
register(task_b) 
register(task_c) 

views.py

from __future__ import absolute_import 
from django.core.cache import cache as memcache 
from django.shortcuts import redirect 
from django.utils.hashcompat import md5_constructor as md5 
from celery import chain 
from .tasks import registry 


LOCK_EXPIRE = 60 * 5 # Lock expires in 5 minutes 


def process_pipeline(request): 
    pipeline = request.GET.get('pipeline') 
    hexdigest = md5(pipeline).hexdigest() 
    lock_id = 'lock-{0}'.format(hexdigest) 

    # cache.add fails if if the key already exists 
    acquire_lock = lambda: memcache.add(lock_id, None, LOCK_EXPIRE) 
    # memcache delete is very slow, but we have to use it to take 
    # advantage of using add() for atomic locking 
    release_lock = lambda: memcache.delete(lock_id) 

    if acquire_lock(): 
     args = [registry[p].s() for p in pipeline.split('|')] 
     task = chain(*args).apply_async() 
     memcache.set(lock_id, task.id) 

     return redirect('celery-task_status', task_id=task.id) 
    else: 
     task_id = memcache.get(lock_id) 
     return redirect('celery-task_status', task_id=task_id) 



from django.conf.urls import patterns, url 

urls.py

urlpatterns = patterns('aafilters.views', 
    url(r'^process_pipeline/$', 'process_pipeline', name="process_pipeline"), 
) 
+0

在任务上,有一种叫做'after_return'的方法,但是在链上看起来没有类似的东西。参见 Alex

回答