2015-04-23 23 views
1

我目前正在使用Flask和芹菜生物信息网络服务的模板。该模板显示用户如何创建任务并在芹菜工作人员上运行这些任务。可靠地发送邮件在芹菜任务成功/失败

这种服务的一个常见要求是在任务成功或失败时通知用户。我正在使用Flask-Mail发送邮件。我的第一次尝试是这样的:

@celery.task(name='app.expensive_greet', bind=True) 
def expensive_greet(self, person, total, email): 
    this_task_id = expensive_greet.request.id 

    try: 
     for i in range(total): 
      time.sleep(0.1) 
      self.update_state(state='PROGRESS', 
           meta={'current': i, 'total': total}) 

     if email: 
      send_success_mail(email, this_task_id) 
     return 'Greetings, {}!'.format(person) 
    except SoftTimeLimitExceeded: 
     if email: 
      send_failure_mail(email, this_task_id) 
     return 'Greetings, fallback!' 
    except Exception: 
     if email: 
      send_failure_mail(email, this_task_id) 

正如你看到的有很多重复的代码。我想知道是否有可能在邮件处理在自定义芹菜Task隔离并结束了与此:

class MailBase(celery.Task): 
    abstract = True 
    def on_success(self, res, task_id, args, kwargs): 
     _, _, email = args 
     if email: 
      send_success_mail(email, task_id) 

    def on_failure(self, exc, task_id, args, kwargs, einf): 
     _, _, email = args 
     if email: 
      send_failure_mail(email, task_id) 

和设置base=MailBase的任务。我不喜欢这个解决方案,因为:

  1. 它仍然有重复的if,但我可以忍受。
  2. 如果用户更改任务函数的参数,则还必须在两个不同的位置修改MailBase类。
  3. 解开函数参数是很难看的。
  4. 由于这是大多数非技术用户的模板,因此简单性很重要。

有没有更好的方法来做到这一点?理想情况下,我希望将邮件作为元数据与任务一起发送,而不是任务函数的参数,并且能够在不触及任务本身的情况下插入邮件功能。

在此先感谢!

回答

1

如果您只想在任务失败时得到通知,则可以使用芹菜built-in email mechanism

另外,如果你仍然想坚持自己的方式,你可以尝试使用装饰器来包装电子邮件相关的操作。

import functools 


def send_emails(func): 
    @functools.wraps(func) 
    def wrapper(self, *args, **kwargs): 
     this_task_id = self.request.id 
     email = kwargs.pop('email', False) # get email and remove it from kwargs 
     try: 
      ret = func(self, *args, **kwargs) 
     except NotifyException as ex: 
      if email: 
       send_failure_mail(email, this_task_id) 
      return ex.value 
     except Exception: 
      if email: 
       send_failure_mail(email, this_task_id) 
      # It would be better to raise again to allow celery knows the task has failed 
      raise 
     else: 
      if email: 
       send_success_mail(mail, this_task_id) 
      return ret 
    return wrapper 

NotifyException介绍了一种情况下,当出现错误,但用户并没有把它当作一个失败,只是想发送一封电子邮件来代替。

class NotifyException(Exception): 
    ''' 
    This exception would be handled by send_emails decorator, the decorator will 
    catch it and return its value to outer. 
    ''' 
    def __init__(self, value): 
     self.value = value 
     super(NotifyException, self).__init__(value) 

请注意,最好在配置文件中保存email参数。

而现在的任务方法是这样

@celery.task(name='app.expensive_greet', bind=True) 
@send_emails 
def expensive_greet(self, person, total): 
    try: 
     for i in range(total): 
      time.sleep(0.1) 
      self.update_state(state='PROGRESS', 
           meta={'current': i, 'total': total}) 
     return 'Greetings, {}!'.format(person) 
    except SoftTimeLimitExceeded: 
     # Note that raise this exception to allow the decorator catch it 
     # and return the value of the exception 
     raise NotifyException('Greetings, fallback!') 

改变希望它能帮助!

+0

谢谢您的建议。必须将电子邮件发送给任务(用户在服务器上启动作业时输入他的电子邮件,并在完成/失败时预期发送电子邮件),因此无法将其保存在配置中。因此,问题1-3仍然存在:) – Dan

+0

我不清楚明白**必须将电子邮件发送给任务,因为它会更改**。这是否意味着某些任务可能需要电子邮件,但其他任务不需要。如果是这样,我们可以从send_emails的wrapper()中的** kwargs **中得到它。以上的一切依然存在 – Jacky

+0

你是对的!我早些时候尝试过,由于某种原因它没有工作,所以我确信自己Celery不支持关键字参数。 你可以编辑你的答案,包括这个?然后我会将它标记为已接受:) – Dan