2013-10-29 46 views
2

我正在尝试设置Celery任务。我们的主要应用程序是SQLAlchemy的金字塔。Celery任务中的金字塔/ SQLAlchemy模型绑定

所以我有一个任务定义为:

from celery.contrib.methods import task 
from apipython.celerytasks import celery 

class Email(): 
    def __init__(self, from_name, from_email, to_name, to_email, subject, html_body, 
       sendgrid_category=None): 
     self.from_name = from_name 
     self.from_email = from_email 
     self.to_name = to_name 
     self.to_email = to_email 
     self.subject = subject 
     self.body = None 
     self.html_body = html_body 
     self.sendgrid_category = sendgrid_category 

class EmailService(): 
    @task() 
    def task__send_smtp(self, email, from_user_id=None, to_user_id=None): 
     # send the email, not shown here 

     # EmailLog is a SQLAlchemy model 
     email_log = EmailLog(
         email.subject, 
         email.html_body, 
         from_user_id=from_user_id, 
         to_user_id=to_user_id, 
         action_type=email.sendgrid_category) 
     DBSession.add(email_log) 

     transaction.commit() 

而且celerytasks.py我:

from celery import Celery 

celery = Celery('apipython.celery', 
       broker='sqla+mysql+mysqldb://root:[email protected]/gs?charset=utf8', 
       backend=None, 
       include=['apipython.services.NotificationService']) 

if __name__ == '__main__': 
    celery.start() 

它的工作原理 - 任务被序列化和回升。

然而,当我尝试使用SQLAlchemy/DBSession里面的任务,我得到一个错误:

UnboundExecutionError: Could not locate a bind configured on mapper Mapper|EmailLog|emaillogs or this Session 

我了解工人的任务是在一个单独的进程中运行,需要有其设置,会话引擎等设置。所以我有这个:

@worker_init.connect 
def bootstrap_pyramid(signal, sender): 
    import os 
    from pyramid.paster import bootstrap 
    sender.app.settings = bootstrap('development.ini')['registry'].settings 

    customize_settings(sender.app.settings) 

    engine = sqlalchemy.create_engine('mysql+mysqldb://root:[email protected]/gs?charset=utf8') 
    DBSession.configure(bind=engine) 
    Base.metadata.bind = engine 

但是我仍然得到相同的错误。

DBSession和基地在models.py定义为

DBSession = scoped_session(sessionmaker(extension=ZopeTransactionExtension())) 
Base = declarative_base() 

我缺少什么步骤使模型结合工作?

第二个问题,这个代码可以在芹菜的init,vs worker init中创建会话/绑定吗?

(顺便说一句我也尝试pyramid_celery但宁愿让普通芹菜的工作)

感谢,

+0

你不会在这里显示所有的代码,但是,你传递给你的任务的“email”是什么?那是一个SQLAlchemy对象吗?您不能在会话之间共享SQLAlchemy对象。 –

+0

@AntoineLeclair电子邮件是一个普通的Python对象,而不是SQLAlchemy对象。在这个任务中,我创建了一个新的EmailLog实例,它是一个SQLAlchemy模型,并尝试使用DBSession.add和transaction.commit。 – Jonathan

+0

重要的部分是你如何定义你的会话?你使用了'scoped_session'吗? – javex

回答

1

我的同事试图完全相同的代码和它的工作。奇怪的