2010-05-31 22 views
0

我已经在我为客户托管的站点上设置了此任务队列实现,它有一个cron job,它每天凌晨2点运行“/admin/tasks/queue”,这将排队发送电子邮件,“/admin/tasks/email”,并使用cursors至于做小块排队。由于某种原因,昨晚/admin/tasks/queue不断运行此代码,因此发送了我的整个电子邮件:/的配额。我用这段代码做错了什么?这个任务队列设置有什么问题?

class QueueUpEmail(webapp.RequestHandler): 
    def post(self): 
     subscribers = Subscriber.all() 
     subscribers.filter("verified =", True) 

     last_cursor = memcache.get('daily_email_cursor') 
     if last_cursor: 
      subscribers.with_cursor(last_cursor) 

     subs = subscribers.fetch(10) 
     logging.debug("POST - subs count = %i" % len(subs)) 
     if len(subs) < 10: 
      logging.debug("POST - Less than 10 subscribers in subs") 
      # Subscribers left is less than 10, don't reschedule the task 
      for sub in subs: 
       task = taskqueue.Task(url='/admin/tasks/email', params={'email': sub.emailaddress, 'day': sub.day_no}) 
       task.add("email") 
      memcache.delete('daily_email_cursor') 
     else: 
      logging.debug("POST - Greater than 10 subscibers left in subs - reschedule") 
      # Subscribers is 10 or greater, reschedule 
      for sub in subs: 
       task = taskqueue.Task(url='/admin/tasks/email', params={'email': sub.emailaddress, 'day': sub.day_no}) 
       task.add("email") 
      cursor = subscribers.cursor() 
      memcache.set('daily_email_cursor', cursor) 
      task = taskqueue.Task(url="/admin/tasks/queue", params={}) 
      task.add("queueup") 

回答

2

我可以看到一些潜在的问题。首先,您将光标存储在内存缓存中,但不保证任何内容。如果您在处理中途错过了缓存,则会再次发送每条消息。其次,如果任务失败,任务将被重新尝试;他们应该被设计成为这个原因的幂等性。当然,在发送电子邮件的情况下,这几乎是不可能的,因为一旦发送了消息,如果您的任务在发送之后由于某种其他原因而死亡,则无法回滚。至少,我建议在发送消息后尝试更新每个Subscriber实体上的“最后通过电子邮件发送的日期”字段。当然,这本身并不是万无一失的,因为电子邮件发送可能会成功,并且在此之后实体的更新可能会失败。这也会增加整个过程的开销,因为你会为每个用户进行写操作。

+0

感谢您的分析,我最初的想法是memcache可能是一个问题。 – 2010-05-31 20:12:13

+0

Memcache可能是您遇到问题背后的直接原因,是的。你最好的办法是将光标作为每个任务的参数传递给下一个任务。 – 2010-06-01 09:18:28