Source code for structum.database.interfaces

# Database Interfaces - Core Protocols
# SPDX-License-Identifier: Apache-2.0

"""
Core database interfaces for Structum.

This module defines the Protocols that any database implementation must follow.
Implementations are provided by the `structum-database` plugin.

Example:
    >>> from structum.plugins.database import SQLAlchemyDatabase
    >>> db = SQLAlchemyDatabase.from_config()
    >>> with db.transaction() as conn:
    ...     conn.execute("SELECT 1")
"""

from __future__ import annotations

from collections.abc import Iterator
from contextlib import contextmanager
from dataclasses import dataclass
from enum import Enum
from typing import Any, Protocol, runtime_checkable


[docs] class HealthStatus(Enum): """ Enumeration of possible database health states. Attributes: HEALTHY: Database is fully operational with normal latency. DEGRADED: Database is responsive but experiencing issues (e.g., high latency, connection warnings). UNHEALTHY: Database is unreachable or critically impaired. Example: Checking health status:: result = db.health_check() if result.status == HealthStatus.HEALTHY: log.info("Database OK", latency_ms=result.latency_ms) elif result.status == HealthStatus.DEGRADED: log.warning("Database degraded", message=result.message) else: log.error("Database unhealthy", message=result.message) """ HEALTHY = "healthy" DEGRADED = "degraded" UNHEALTHY = "unhealthy"
[docs] @dataclass(frozen=True) class HealthCheckResult: """ Data class representing database health check results. Attributes: status (HealthStatus): Overall health status (HEALTHY, DEGRADED, UNHEALTHY). message (str): Human-readable status description. latency_ms (float | None): Query latency in milliseconds, if available. details (dict[str, Any] | None): Additional diagnostic information (e.g., active connections, pool statistics). Example: Creating and using a health check result:: result = HealthCheckResult( status=HealthStatus.HEALTHY, message="Database connection OK", latency_ms=2.5, details={"active_connections": 5, "pool_size": 10} ) # Use in monitoring/alerts if result.latency_ms and result.latency_ms > 100: alert("High database latency", result.latency_ms) Note: This class is frozen (immutable) to ensure health check results cannot be modified after creation. """ status: HealthStatus message: str latency_ms: float | None = None details: dict[str, Any] | None = None
[docs] @runtime_checkable class ConnectionInterface(Protocol): """ Protocol for database connections in Structum. A connection represents an active link to the database that can execute queries and retrieve results. Connections are typically obtained from a connection pool and should be returned after use. Implementations: - :class:`~structum.plugins.database.sqlalchemy.SQLAlchemyConnection` - :class:`~structum.plugins.database.postgres.PostgresConnection` Example: Using a connection within a transaction:: with db.transaction() as conn: # Execute query with named parameters conn.execute( "INSERT INTO users (name, email) VALUES (:name, :email)", {"name": "John", "email": "john@example.com"} ) # Fetch results conn.execute("SELECT * FROM users WHERE id > :id", {"id": 100}) users = conn.fetchall() for user in users: print(f"{user['name']} - {user['email']}") Note: Connections are not thread-safe. Use one connection per thread or protect access with locks. See Also: :class:`DatabaseInterface`: Database manager providing connections :meth:`DatabaseInterface.transaction`: Recommended way to get connections """
[docs] def execute( self, query: str, params: dict[str, Any] | tuple[Any, ...] | None = None, ) -> Any: """ Execute a SQL query with optional parameters. Supports both named parameters (dict[str, Any]) and positional parameters (tuple[Any, ...]). Use ``:name`` syntax for named parameters, ``?`` for positional. Args: query (str): SQL query string. Use ``:param_name`` for named parameters or ``?`` for positional parameters. params (dict[str, Any] | tuple[Any, ...] | None): Query parameters. Use dict[str, Any] for named parameters, tuple[Any, ...] for positional. Defaults to None. Returns: Any: Result object (implementation-specific). Use fetch methods to retrieve rows. Raises: DatabaseError: If query execution fails. ParameterError: If parameters don't match query placeholders. Example: Different parameter styles:: # Named parameters (recommended) conn.execute( "SELECT * FROM users WHERE age > :min_age AND city = :city", {"min_age": 18, "city": "NYC"} ) # Positional parameters conn.execute( "SELECT * FROM users WHERE age > ? AND city = ?", (18, "NYC") ) # No parameters conn.execute("SELECT COUNT(*) FROM users") Warning: Always use parameterized queries. Never use string interpolation for user input (SQL injection risk). See Also: :meth:`fetchone`: Retrieve single row :meth:`fetchall`: Retrieve all rows """ ...
[docs] def fetchone(self) -> dict[str, Any] | None: """ Fetch the next row from the last executed query. Returns: dict[str, Any] | None: Row as dictionary with column names as keys, or None if no more rows available. Example: Fetching single row:: conn.execute("SELECT * FROM users WHERE id = :id", {"id": 1}) user = conn.fetchone() if user: print(f"Found user: {user['name']}") else: print("User not found") Note: Call after :meth:`execute`. Returns None when cursor exhausted. See Also: :meth:`fetchall`: Fetch all remaining rows :meth:`fetchmany`: Fetch specific number of rows """ ...
[docs] def fetchall(self) -> list[dict[str, Any]]: """ Fetch all remaining rows from the last executed query. Returns: list[dict[str, Any]]: List of rows as dictionaries. Empty list[Any] if no rows or cursor exhausted. Example: Fetching multiple rows:: conn.execute("SELECT * FROM users WHERE active = :active", {"active": True}) users = conn.fetchall() for user in users: print(f"{user['id']}: {user['name']}") print(f"Total active users: {len(users)}") Warning: For large result sets, consider using :meth:`fetchmany` to avoid loading all rows into memory at once. See Also: :meth:`fetchone`: Fetch single row :meth:`fetchmany`: Fetch rows in batches """ ...
[docs] def fetchmany(self, size: int) -> list[dict[str, Any]]: """ Fetch up to ``size`` rows from the last executed query. Useful for processing large result sets in batches to manage memory usage. Args: size (int): Maximum number of rows to fetch. Must be > 0. Returns: list[dict[str, Any]]: List of rows (up to size). May return fewer rows if result set[Any] is exhausted. Empty list[Any] if no rows remain. Example: Batch processing large result set[Any]:: conn.execute("SELECT * FROM large_table") batch_size = 100 while True: batch = conn.fetchmany(batch_size) if not batch: break process_batch(batch) print(f"Processed {len(batch)} rows") Note: Efficient for iterating large datasets without loading everything into memory. See Also: :meth:`fetchone`: Fetch single row :meth:`fetchall`: Fetch all rows """ ...
[docs] @runtime_checkable class TransactionInterface(Protocol): """ Protocol for database transactions providing ACID guarantees. Transactions group multiple database operations into a single atomic unit. Either all operations succeed (commit) or all fail (rollback). Example: Manual transaction management:: tx = db.begin_transaction() try: conn.execute("UPDATE accounts SET balance = balance - 100 WHERE id = 1") conn.execute("UPDATE accounts SET balance = balance + 100 WHERE id = 2") tx.commit() except Exception: tx.rollback() raise Preferred context manager approach:: # Automatic commit/rollback with db.transaction() as conn: conn.execute("UPDATE accounts SET balance = balance - 100 WHERE id = 1") conn.execute("UPDATE accounts SET balance = balance + 100 WHERE id = 2") # Commits automatically if no exception Note: Most applications should use :meth:`DatabaseInterface.transaction` context manager instead of managing transactions manually. See Also: :meth:`DatabaseInterface.transaction`: Recommended transaction API """
[docs] def commit(self) -> None: """ Commit the current transaction. Makes all changes since transaction start permanent in the database. After commit, the transaction is no longer active. Raises: DatabaseError: If commit fails (e.g., constraint violation). TransactionError: If transaction is not active. Example: Manual commit:: tx = db.begin_transaction() try: conn.execute("INSERT INTO logs VALUES (:msg)", {"msg": "test"}) tx.commit() log.info("Transaction committed successfully") except DatabaseError as e: tx.rollback() log.error("Commit failed", exc_info=True) Warning: After commit, the transaction cannot be reused. Start a new transaction for additional operations. See Also: :meth:`rollback`: Abort transaction """ ...
[docs] def rollback(self) -> None: """ Rollback the current transaction. Discards all changes made since transaction start. The database state returns to what it was before the transaction began. Raises: TransactionError: If transaction is not active. Example: Manual rollback on error:: tx = db.begin_transaction() try: conn.execute("DELETE FROM important_data") # Validation fails if not validate_deletion(): tx.rollback() log.warning("Deletion rolled back - validation failed") return tx.commit() except Exception: tx.rollback() raise Note: Rollback is safe to call multiple times. Subsequent calls are no-ops. See Also: :meth:`commit`: Persist transaction changes """ ...
@property def is_active(self) -> bool: """ Check if the transaction is still active. Returns: bool: True if transaction can accept operations, False if already committed or rolled back. Example: Checking transaction state:: tx = db.begin_transaction() assert tx.is_active # True tx.commit() assert not tx.is_active # False # This would raise TransactionError # tx.execute("SELECT 1") Note: Useful for conditional logic and error handling. """ ...
[docs] @runtime_checkable class DatabaseInterface(Protocol): """ Protocol for database managers in Structum. This is the main entry point for all database operations. Implementations must provide connection pooling, transaction management, and health monitoring. **Architectural Role**: - **Abstraction Root**: Defines the "language" of persistence (Transactions, Connections). - **Stability Anchor**: Application code depends on this Protocol, never on `sqlalchemy.*`. The protocol abstracts away database-specific details, allowing applications to work with different backends (PostgreSQL, MySQL, SQLite) through a unified interface. Implementations: - :class:`~structum.plugins.database.sqlalchemy.SQLAlchemyDatabase` - :class:`~structum.plugins.database.postgres.PostgresDatabase` Example: Basic usage with configuration:: from structum.plugins.database import SQLAlchemyDatabase # Initialize from config db = SQLAlchemyDatabase.from_config() # Or with explicit URL db = SQLAlchemyDatabase(url="postgresql://user:pass@localhost/mydb") # Use transaction context manager (recommended) with db.transaction() as conn: conn.execute( "INSERT INTO users (name, email) VALUES (:name, :email)", {"name": "John", "email": "john@example.com"} ) user_id = conn.execute("SELECT last_insert_id()").fetchone()["id"] conn.execute( "INSERT INTO profiles (user_id, bio) VALUES (:uid, :bio)", {"uid": user_id, "bio": "Software engineer"} ) # Commits automatically on success # Check health health = db.health_check() if health.status != HealthStatus.HEALTHY: log.warning("Database issues detected", result=health) Note: Always use the :meth:`transaction` context manager for database operations. It ensures proper connection pooling, automatic commit/rollback, and resource cleanup. See Also: :class:`ConnectionInterface`: Connection protocol :class:`TransactionInterface`: Transaction protocol :class:`HealthCheckResult`: Health check result data class """ @property def url(self) -> str: """ Get the database connection URL (sanitized). Returns: str: Database URL with password redacted/masked for security. Example: ``"postgresql://user:***@localhost:5432/mydb"`` Example: Logging database configuration:: log.info("Connected to database", url=db.url) # Logs: postgresql://user:***@localhost/mydb Note: Passwords are automatically redacted to prevent accidental logging of credentials. """ ... @property def is_connected(self) -> bool: """ Check if the database connection pool is active. Returns: bool: True if connection pool is established, False otherwise. Example: Conditional connection:: if not db.is_connected: db.connect() log.info("Database connection established") Note: Most operations call :meth:`connect` automatically if not connected. Explicit checking is mainly useful for health checks and diagnostics. """ ...
[docs] def connect(self) -> None: """ Establish the database connection pool. Creates and initializes the connection pool. Called automatically on first database operation if not already connected. Raises: ConnectionError: If unable to connect to database. ConfigurationError: If database URL or settings are invalid. Example: Explicit connection during startup:: db = SQLAlchemyDatabase.from_config() try: db.connect() log.info("Database pool created", url=db.url) except ConnectionError as e: log.critical("Cannot connect to database", exc_info=True) sys.exit(1) Note: Usually not needed - the database connects automatically on first use. Explicit connection is useful for fail-fast behavior during startup. See Also: :meth:`close`: Shutdown connection pool """ ...
[docs] def close(self) -> None: """ Close all connections in the pool and release resources. Should be called during application shutdown to ensure clean termination. Any ongoing transactions should be completed before closing. Raises: RuntimeError: If called while transactions are active (implementation-specific). Example: Application shutdown:: import atexit db = SQLAlchemyDatabase.from_config () atexit.register(db.close) # Ensure cleanup on exit # Or in FastAPI lifespan @asynccontextmanager async def lifespan(app: FastAPI): db.connect() yield db.close() # Clean shutdown Warning: After calling close(), the database instance should not be reused. Create a new instance for additional operations. See Also: :meth:`connect`: Initialize connection pool """ ...
[docs] def get_connection(self) -> ConnectionInterface: """ Acquire a connection from the pool. Returns a connection that must be explicitly returned to the pool after use. **Prefer using** :meth:`transaction` **instead.** Returns: ConnectionInterface: A database connection from the pool. Raises: PoolExhaustedError: If no connections are available and pool is at maximum. ConnectionError: If pool is not initialized. Example: Manual connection management (not recommended):: conn = db.get_connection() try: result = conn.execute("SELECT * FROM users") users = result.fetchall() finally: # Must return connection to pool manually conn.close() Warning: Manual connection management is error-prone. Always prefer :meth:`transaction` context manager which handles connections automatically. See Also: :meth:`transaction`: Recommended connection API """ ...
[docs] @contextmanager def transaction(self) -> Iterator[ConnectionInterface]: """ Context manager for database transactions. Provides automatic transaction management: commits on success, rolls back on exception. This is the recommended way to perform database operations. Yields: ConnectionInterface: Database connection with active transaction. Raises: DatabaseError: If transaction fails to start. ConnectionError: If pool is exhausted or not connected. Example: Recommended usage pattern:: # Single transaction with db.transaction() as conn: conn.execute( "UPDATE accounts SET balance = balance - :amount WHERE id = :id", {"amount": 100, "id": 1} ) conn.execute( "UPDATE accounts SET balance = balance + :amount WHERE id = :id", {"amount": 100, "id": 2} ) # Commits automatically if no exception # Exception triggers rollback try: with db.transaction() as conn: conn.execute("DELETE FROM important_data") raise ValueError("Validation failed") except ValueError: pass # Transaction automatically rolled back Note: Transactions are isolated - changes are not visible to other connections until commit. Isolation level depends on database implementation (usually READ COMMITTED). See Also: :class:`TransactionInterface`: Transaction protocol :class:`ConnectionInterface`: Connection protocol """ ...
[docs] def health_check(self) -> HealthCheckResult: """ Check database connectivity and health status. Performs a simple query to verify the database is responsive and measures latency. Useful for readiness probes and monitoring. Returns: HealthCheckResult: Health status with latency and diagnostic details. Example: Health check endpoint:: @app.get("/health/database") def database_health(): result = db.health_check() if result.status == HealthStatus.UNHEALTHY: raise HTTPException(503, detail=result.message) return { "status": result.status.value, "latency_ms": result.latency_ms, "message": result.message } Prometheus metrics:: result = db.health_check() # Record health status db_health_gauge.labels(database="main").set[Any]( 1 if result.status == HealthStatus.HEALTHY else 0 ) # Record latency if result.latency_ms: db_latency_histogram.observe(result.latency_ms) Note: Health checks execute a lightweight query (usually `SELECT 1`). They should complete quickly (<100ms typically). See Also: :class:`HealthCheckResult`: Result data class :class:`HealthStatus`: Health status enumeration """ ...
# Type alias for clarity in type hints Database = DatabaseInterface Connection = ConnectionInterface Transaction = TransactionInterface