Source code for structum_lab.plugins.database.base

# Database Plugin - Base Class with Factory Pattern
# SPDX-License-Identifier: Apache-2.0

"""
Base class for database implementations with Factory Pattern support.

The factory pattern allows database backends to be configured via:
1. Direct instantiation: `SQLAlchemyDatabase(url="...")`
2. Config provider: `SQLAlchemyDatabase.from_config()`

The `.from_config()` method uses the active config provider (e.g., dynaconf)
to read database settings, enabling decoupled configuration.
"""

from __future__ import annotations

from abc import ABC, abstractmethod
from collections.abc import Iterator
from contextlib import contextmanager
from typing import TYPE_CHECKING, Any, ClassVar

from structum_lab.database.interfaces import (
    ConnectionInterface,
    HealthCheckResult,
)

if TYPE_CHECKING:
    from structum_lab.config import ConfigInterface


[docs] class BaseDatabase(ABC): """Abstract base class for database implementations. Provides the Factory Pattern via `.from_config()` class method. Subclasses must implement the abstract methods for their specific backend. Example: >>> # Direct instantiation >>> db = SQLAlchemyDatabase(url="postgresql://...") >>> >>> # Factory pattern with config >>> db = SQLAlchemyDatabase.from_config() >>> db = SQLAlchemyDatabase.from_config(config_key="secondary_db") """ #: Default config key prefix for database settings DEFAULT_CONFIG_KEY: ClassVar[str] = "database"
[docs] def __init__( self, url: str, *, pool_size: int = 5, pool_timeout: int = 30, echo: bool = False, **kwargs: Any, ) -> None: """Initialize database with connection parameters. Args: url: Database connection URL (e.g., postgresql://...) pool_size: Connection pool size pool_timeout: Pool timeout in seconds echo: Log SQL queries (debug mode) **kwargs: Additional backend-specific options """ self._url = url self._pool_size = pool_size self._pool_timeout = pool_timeout self._echo = echo self._options = kwargs self._connected = False
[docs] @classmethod def from_config( cls, config_key: str | None = None, *, config: ConfigInterface | None = None, ) -> BaseDatabase: """Factory method to create database from config provider. This method reads database configuration from the active config provider (e.g., dynaconf), allowing decoupled configuration. Args: config_key: Config key prefix (default: "database") config: Explicit config provider (optional, uses global if None) Returns: Configured database instance Raises: RuntimeError: If no config provider is available KeyError: If required config keys are missing Example: >>> # Uses default config key "database" >>> db = SQLAlchemyDatabase.from_config() >>> >>> # Uses custom config key for secondary database >>> db = SQLAlchemyDatabase.from_config("replica_db") Config Structure: ```toml [database] url = "postgresql://user:pass@localhost/mydb" pool_size = 10 pool_timeout = 30 echo = false ``` """ key = config_key or cls.DEFAULT_CONFIG_KEY if config is None: from structum_lab.config import get_config config = get_config() # Required url = config.get(f"{key}.url") if not url: raise KeyError(f"Missing required config: {key}.url") # Optional with defaults pool_size = config.get(f"{key}.pool_size", 5) pool_timeout = config.get(f"{key}.pool_timeout", 30) echo = config.get(f"{key}.echo", False) return cls( url=url, pool_size=pool_size, pool_timeout=pool_timeout, echo=echo, )
@property def url(self) -> str: """Database URL (sanitized, password hidden).""" # Hide password in URL for logging import re return re.sub(r"://[^:]+:[^@]+@", "://***:***@", self._url) @property def is_connected(self) -> bool: """Check if database is connected.""" return self._connected
[docs] @abstractmethod def connect(self) -> None: """Establish connection to database.""" ...
[docs] @abstractmethod def close(self) -> None: """Close all connections.""" ...
[docs] @abstractmethod def get_connection(self) -> ConnectionInterface: """Get a connection from the pool.""" ...
[docs] @abstractmethod @contextmanager def transaction(self) -> Iterator[ConnectionInterface]: """Context manager for transactions.""" ...
[docs] @abstractmethod def health_check(self) -> HealthCheckResult: """Check database health.""" ...
[docs] def __enter__(self) -> BaseDatabase: """Context manager entry.""" self.connect() return self
[docs] def __exit__(self, *args: Any) -> None: """Context manager exit.""" self.close()
[docs] def __repr__(self) -> str: """Return a string representation of the database instance.""" return f"{self.__class__.__name__}(url={self.url!r}, pool_size={self._pool_size})"