Правильная обработка сеансов SQLAlchemy в многопоточных приложениях

У меня возникли проблемы с пониманием того, как правильно открывать и эффективно закрывать сеансы базы данных, как я понял из документации sqlalchemy, если я использую scoped_session для создания моего объекта Session, а затем использую вернул объект Session для создания сеансов, он потокобезопасен, поэтому в основном каждый поток получит свой собственный сеанс, и с ним не будет проблем. Теперь приведенный ниже пример работает, я помещаю его в бесконечный цикл, чтобы увидеть, правильно ли он закрывает сеансы, и если я правильно его отслеживаю (в mysql, выполнив «SHOW PROCESSLIST;»), соединения просто продолжают расти, он не закрывает их , хотя я использовал session.close() и даже удалял объект scoped_session в конце каждого запуска. Что я делаю неправильно? Моя цель в более крупном приложении — использовать минимально необходимое количество подключений к базе данных, потому что моя текущая рабочая реализация создает новый сеанс в каждом методе, где это требуется, и закрывает его перед возвратом, что кажется неэффективным.

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session
from threading import Thread
from Queue import Queue, Empty as QueueEmpty
from models import MyModel


DATABASE_CONNECTION_INFO = 'mysql://username:password@localhost:3306/dbname'


class MTWorker(object):

    def __init__(self, worker_count=5):
        self.task_queue = Queue()
        self.worker_count = worker_count
        self.db_engine = create_engine(DATABASE_CONNECTION_INFO, echo=False)
        self.DBSession = scoped_session(
            sessionmaker(
                autoflush=True,
                autocommit=False,
                bind=self.db_engine
            )
        )

    def _worker(self):
        db_session = self.DBSession()
        while True:
            try:
                task_id = self.task_queue.get(False)
                try:
                    item = db_session.query(MyModel).filter(MyModel.id == task_id).one()
                    # do something with item
                except Exception as exc:
                    # if an error occurrs we skip it
                    continue

                finally:
                    db_session.commit()
                    self.task_queue.task_done()
            except QueueEmpty:
                db_session.close()
                return

    def start(self):
        try:
            db_session = self.DBSession()
            all_items = db_session.query(MyModel).all()
            for item in all_items:
                self.task_queue.put(item.id)

            for _i in range(self.worker_count):
                t = Thread(target=self._worker)
                t.start()

            self.task_queue.join()
        finally:
            db_session.close()
            self.DBSession.remove()


if __name__ == '__main__':
    while True:
        mt_worker = MTWorker(worker_count=50)
        mt_worker.start()
21
задан Jonathan Leffler 20 January 2014 в 20:33
поделиться