2015-09-29 26 views

回答

1

您可以编写自己的任务来检查以确保在调用apply_async时传递有效的路由键。您也可以将其应用于队列。在您的配置中设置路由和队列:

import celery 
from kombu import Queue, Exchange 

app = celery.Celery('app') 
app.conf.CELERY_QUEUES = (
    Queue('add', Exchange('default'), routing_key='good'), 
) 
app.conf.CELERY_ROUTES = { 
    'app.add': { 
     'queue': 'add', 
     'routing_key': 'good' 
    } 
} 

现在,创建您自己的Task类,它将执行对路由键的检查。你需要重写apply_async:

class RouteCheckerTask(celery.Task): 
    abstract = True 

    def apply_async(self, args=None, kwargs=None, task_id=None, producer=None, 
            link=None, link_error=None, **options): 

     app = self._get_app() 
     routing_key = options.get('routing_key', None) 
     if routing_key: 
      valid_routes = [v['routing_key'] for k, v in app.conf.CELERY_ROUTES.items()] 
      is_valid = routing_key in valid_routes 
      if not is_valid: 
       raise NotImplementedError('{} is not a valid routing key. Options are: {}'.format(routing_key, valid_routes)) 
     if app.conf.CELERY_ALWAYS_EAGER: 
      return self.apply(args, kwargs, task_id=task_id or uuid(), link=link, link_error=link_error, **options) 
      # add 'self' if this is a "task_method". 
     if self.__self__ is not None: 
      args = args if isinstance(args, tuple) else tuple(args or()) 
      args = (self.__self__,) + args 
     return app.send_task(
      self.name, args, kwargs, task_id=task_id, producer=producer, 
      link=link, link_error=link_error, result_cls=self.AsyncResult, 
      **dict(self._get_exec_options(), **options) 
     ) 

基地从这个您的任务,通常称之为apply_async:

@app.task(base=RouteCheckerTask) 
def add(x, y): 
    return x + y 

# Fails 
add.apply_async([1, 2], routing_key='bad') 
# Passes 
add.apply_async([1, 2], routing_key='good') 
相关问题