2012-03-08 103 views
21

我无法正确理解如何正确打开和关闭数据库会话,正如我通过sqlalchemy文档理解的那样,如果我使用scoped_session构造我的Session对象,然后使用返回的Session对象创建会话,它是线程安全的,所以基本上每个线程都会得到它自己的会话,并且不会有问题。现在下面的例子工作,我把它放在一个无限循环中,看它是否正确地关闭了会话,并且如果我正确地监视它(在mysql中执行“SHOW PROCESSLIST;”),连接只会继续增长,它不会关闭它们,尽管我使用了session.close(),甚至在每次运行结束时删除了scoped_session对象。我究竟做错了什么?我在更大的应用程序中的目标是使用所需的最少数量的数据库连接,因为我目前的工作实现在每个需要的方法中创建一个新的会话,并在返回之前关闭它,这似乎效率低下。SQLAlchemy在多线程应用程序中正确处理会话

from sqlalchemy import create_engine 
from sqlalchemy.orm import sessionmaker, scoped_session 
from threading import Thread 
from Queue import Queue, Empty as QueueEmpty 
from models import MyModel 


DATABASE_CONNECTION_INFO = 'mysql://username:[email protected]:3306/dbname' 


class MTWorker(object): 

    def __init__(self, worker_count=5): 
     self.task_queue = Queue() 
     self.worker_count = worker_count 
     self.db_engine = create_engine(DATABASE_CONNECTION_INFO, echo=False) 
     self.DBSession = scoped_session(
      sessionmaker(
       autoflush=True, 
       autocommit=False, 
       bind=self.db_engine 
      ) 
     ) 

    def _worker(self): 
     db_session = self.DBSession() 
     while True: 
      try: 
       task_id = self.task_queue.get(False) 
       try: 
        item = db_session.query(MyModel).filter(MyModel.id == task_id).one() 
        # do something with item 
       except Exception as exc: 
        # if an error occurrs we skip it 
        continue 

       finally: 
        db_session.commit() 
        self.task_queue.task_done() 
      except QueueEmpty: 
       db_session.close() 
       return 

    def start(self): 
     try: 
      db_session = self.DBSession() 
      all_items = db_session.query(MyModel).all() 
      for item in all_items: 
       self.task_queue.put(item.id) 

      for _i in range(self.worker_count): 
       t = Thread(target=self._worker) 
       t.start() 

      self.task_queue.join() 
     finally: 
      db_session.close() 
      self.DBSession.remove() 


if __name__ == '__main__': 
    while True: 
     mt_worker = MTWorker(worker_count=50) 
     mt_worker.start() 

回答

36

你应该只调用create_enginescoped_session每 进程(每个数据库)一次。每个人都将获得自己的连接池或会话 (分别),因此您要确保只创建一个池。只需将其设置为全局模块级别即可。如果你需要比这更preciesly管理您的会话,你可能不应该使用scoped_session

另一个变化要提出的是直接使用DBSession,就好像是一个 会话。调用scoped_session上的会话方法将透明地 创建线程本地会话(如果需要),并将方法调用转发到 会话。

另一件事要注意的是连接池,这是 5默认的 pool_size 。对于许多应用程序,这很好,但如果你正在创建 大量的线程,则可能需要调整这个参数

DATABASE_CONNECTION_INFO = 'mysql://username:[email protected]:3306/dbname' 
db_engine = create_engine(DATABASE_CONNECTION_INFO, echo=False) 
DBSession = scoped_session(
    sessionmaker(
     autoflush=True, 
     autocommit=False, 
     bind=db_engine 
    ) 
) 


class MTWorker(object): 

    def __init__(self, worker_count=5): 
     self.task_queue = Queue() 
     self.worker_count = worker_count 
# snip 
+1

谢谢你的信息,这是非常有益的确实。国王问候! – andrean 2012-03-09 09:01:15