mirror of
https://github.com/invoke-ai/InvokeAI.git
synced 2026-01-14 15:57:59 -05:00
94 lines
3.7 KiB
Python
94 lines
3.7 KiB
Python
import sqlite3
|
|
import threading
|
|
from collections.abc import Generator
|
|
from contextlib import contextmanager
|
|
from logging import Logger
|
|
from pathlib import Path
|
|
|
|
from invokeai.app.services.shared.sqlite.sqlite_common import sqlite_memory
|
|
|
|
|
|
class SqliteDatabase:
|
|
"""
|
|
Manages a connection to an SQLite database.
|
|
|
|
:param db_path: Path to the database file. If None, an in-memory database is used.
|
|
:param logger: Logger to use for logging.
|
|
:param verbose: Whether to log SQL statements. Provides `logger.debug` as the SQLite trace callback.
|
|
|
|
This is a light wrapper around the `sqlite3` module, providing a few conveniences:
|
|
- The database file is written to disk if it does not exist.
|
|
- Foreign key constraints are enabled by default.
|
|
- The connection is configured to use the `sqlite3.Row` row factory.
|
|
|
|
In addition to the constructor args, the instance provides the following attributes and methods:
|
|
- `conn`: A `sqlite3.Connection` object. Note that the connection must never be closed if the database is in-memory.
|
|
- `lock`: A shared re-entrant lock, used to approximate thread safety.
|
|
- `clean()`: Runs the SQL `VACUUM;` command and reports on the freed space.
|
|
"""
|
|
|
|
def __init__(self, db_path: Path | None, logger: Logger, verbose: bool = False) -> None:
|
|
"""Initializes the database. This is used internally by the class constructor."""
|
|
self._logger = logger
|
|
self._db_path = db_path
|
|
self._verbose = verbose
|
|
self._lock = threading.RLock()
|
|
|
|
if not self._db_path:
|
|
logger.info("Initializing in-memory database")
|
|
else:
|
|
self._db_path.parent.mkdir(parents=True, exist_ok=True)
|
|
self._logger.info(f"Initializing database at {self._db_path}")
|
|
|
|
self._conn = sqlite3.connect(database=self._db_path or sqlite_memory, check_same_thread=False)
|
|
self._conn.row_factory = sqlite3.Row
|
|
|
|
if self._verbose:
|
|
self._conn.set_trace_callback(self._logger.debug)
|
|
|
|
# Enable foreign key constraints
|
|
self._conn.execute("PRAGMA foreign_keys = ON;")
|
|
|
|
# Enable Write-Ahead Logging (WAL) mode for better concurrency
|
|
self._conn.execute("PRAGMA journal_mode = WAL;")
|
|
|
|
# Set a busy timeout to prevent database lockups during writes
|
|
self._conn.execute("PRAGMA busy_timeout = 5000;") # 5 seconds
|
|
|
|
def clean(self) -> None:
|
|
"""
|
|
Cleans the database by running the VACUUM command, reporting on the freed space.
|
|
"""
|
|
# No need to clean in-memory database
|
|
if not self._db_path:
|
|
return
|
|
try:
|
|
with self._conn as conn:
|
|
initial_db_size = Path(self._db_path).stat().st_size
|
|
conn.execute("VACUUM;")
|
|
conn.commit()
|
|
final_db_size = Path(self._db_path).stat().st_size
|
|
freed_space_in_mb = round((initial_db_size - final_db_size) / 1024 / 1024, 2)
|
|
if freed_space_in_mb > 0:
|
|
self._logger.info(f"Cleaned database (freed {freed_space_in_mb}MB)")
|
|
except Exception as e:
|
|
self._logger.error(f"Error cleaning database: {e}")
|
|
raise
|
|
|
|
@contextmanager
|
|
def transaction(self) -> Generator[sqlite3.Cursor, None, None]:
|
|
"""
|
|
Thread-safe context manager for DB work.
|
|
Acquires the RLock, yields a Cursor, then commits or rolls back.
|
|
"""
|
|
with self._lock:
|
|
cursor = self._conn.cursor()
|
|
try:
|
|
yield cursor
|
|
self._conn.commit()
|
|
except:
|
|
self._conn.rollback()
|
|
raise
|
|
finally:
|
|
cursor.close()
|