# Database Interfaces - Core Protocols
# SPDX-License-Identifier: Apache-2.0
"""
Core database interfaces for Structum.
This module defines the Protocols that any database implementation must follow.
Implementations are provided by the `structum-database` plugin.
Example:
>>> from structum.plugins.database import SQLAlchemyDatabase
>>> db = SQLAlchemyDatabase.from_config()
>>> with db.transaction() as conn:
... conn.execute("SELECT 1")
"""
from __future__ import annotations
from collections.abc import Iterator
from contextlib import contextmanager
from dataclasses import dataclass
from enum import Enum
from typing import Any, Protocol, runtime_checkable
[docs]
class HealthStatus(Enum):
"""
Enumeration of possible database health states.
Attributes:
HEALTHY: Database is fully operational with normal latency.
DEGRADED: Database is responsive but experiencing issues
(e.g., high latency, connection warnings).
UNHEALTHY: Database is unreachable or critically impaired.
Example:
Checking health status::
result = db.health_check()
if result.status == HealthStatus.HEALTHY:
log.info("Database OK", latency_ms=result.latency_ms)
elif result.status == HealthStatus.DEGRADED:
log.warning("Database degraded", message=result.message)
else:
log.error("Database unhealthy", message=result.message)
"""
HEALTHY = "healthy"
DEGRADED = "degraded"
UNHEALTHY = "unhealthy"
[docs]
@dataclass(frozen=True)
class HealthCheckResult:
"""
Data class representing database health check results.
Attributes:
status (HealthStatus): Overall health status (HEALTHY, DEGRADED, UNHEALTHY).
message (str): Human-readable status description.
latency_ms (float | None): Query latency in milliseconds, if available.
details (dict[str, Any] | None): Additional diagnostic information
(e.g., active connections, pool statistics).
Example:
Creating and using a health check result::
result = HealthCheckResult(
status=HealthStatus.HEALTHY,
message="Database connection OK",
latency_ms=2.5,
details={"active_connections": 5, "pool_size": 10}
)
# Use in monitoring/alerts
if result.latency_ms and result.latency_ms > 100:
alert("High database latency", result.latency_ms)
Note:
This class is frozen (immutable) to ensure health check results
cannot be modified after creation.
"""
status: HealthStatus
message: str
latency_ms: float | None = None
details: dict[str, Any] | None = None
[docs]
@runtime_checkable
class ConnectionInterface(Protocol):
"""
Protocol for database connections in Structum.
A connection represents an active link to the database that can
execute queries and retrieve results. Connections are typically
obtained from a connection pool and should be returned after use.
Implementations:
- :class:`~structum.plugins.database.sqlalchemy.SQLAlchemyConnection`
- :class:`~structum.plugins.database.postgres.PostgresConnection`
Example:
Using a connection within a transaction::
with db.transaction() as conn:
# Execute query with named parameters
conn.execute(
"INSERT INTO users (name, email) VALUES (:name, :email)",
{"name": "John", "email": "john@example.com"}
)
# Fetch results
conn.execute("SELECT * FROM users WHERE id > :id", {"id": 100})
users = conn.fetchall()
for user in users:
print(f"{user['name']} - {user['email']}")
Note:
Connections are not thread-safe. Use one connection per thread
or protect access with locks.
See Also:
:class:`DatabaseInterface`: Database manager providing connections
:meth:`DatabaseInterface.transaction`: Recommended way to get connections
"""
[docs]
def execute(
self,
query: str,
params: dict[str, Any] | tuple[Any, ...] | None = None,
) -> Any:
"""
Execute a SQL query with optional parameters.
Supports both named parameters (dict[str, Any]) and positional parameters (tuple[Any, ...]).
Use ``:name`` syntax for named parameters, ``?`` for positional.
Args:
query (str): SQL query string. Use ``:param_name`` for named
parameters or ``?`` for positional parameters.
params (dict[str, Any] | tuple[Any, ...] | None): Query parameters.
Use dict[str, Any] for named parameters, tuple[Any, ...] for positional.
Defaults to None.
Returns:
Any: Result object (implementation-specific). Use fetch methods
to retrieve rows.
Raises:
DatabaseError: If query execution fails.
ParameterError: If parameters don't match query placeholders.
Example:
Different parameter styles::
# Named parameters (recommended)
conn.execute(
"SELECT * FROM users WHERE age > :min_age AND city = :city",
{"min_age": 18, "city": "NYC"}
)
# Positional parameters
conn.execute(
"SELECT * FROM users WHERE age > ? AND city = ?",
(18, "NYC")
)
# No parameters
conn.execute("SELECT COUNT(*) FROM users")
Warning:
Always use parameterized queries. Never use string interpolation
for user input (SQL injection risk).
See Also:
:meth:`fetchone`: Retrieve single row
:meth:`fetchall`: Retrieve all rows
"""
...
[docs]
def fetchone(self) -> dict[str, Any] | None:
"""
Fetch the next row from the last executed query.
Returns:
dict[str, Any] | None: Row as dictionary with column names as keys,
or None if no more rows available.
Example:
Fetching single row::
conn.execute("SELECT * FROM users WHERE id = :id", {"id": 1})
user = conn.fetchone()
if user:
print(f"Found user: {user['name']}")
else:
print("User not found")
Note:
Call after :meth:`execute`. Returns None when cursor exhausted.
See Also:
:meth:`fetchall`: Fetch all remaining rows
:meth:`fetchmany`: Fetch specific number of rows
"""
...
[docs]
def fetchall(self) -> list[dict[str, Any]]:
"""
Fetch all remaining rows from the last executed query.
Returns:
list[dict[str, Any]]: List of rows as dictionaries. Empty list[Any]
if no rows or cursor exhausted.
Example:
Fetching multiple rows::
conn.execute("SELECT * FROM users WHERE active = :active", {"active": True})
users = conn.fetchall()
for user in users:
print(f"{user['id']}: {user['name']}")
print(f"Total active users: {len(users)}")
Warning:
For large result sets, consider using :meth:`fetchmany` to avoid
loading all rows into memory at once.
See Also:
:meth:`fetchone`: Fetch single row
:meth:`fetchmany`: Fetch rows in batches
"""
...
[docs]
def fetchmany(self, size: int) -> list[dict[str, Any]]:
"""
Fetch up to ``size`` rows from the last executed query.
Useful for processing large result sets in batches to manage memory usage.
Args:
size (int): Maximum number of rows to fetch. Must be > 0.
Returns:
list[dict[str, Any]]: List of rows (up to size). May return fewer
rows if result set[Any] is exhausted. Empty list[Any] if no rows remain.
Example:
Batch processing large result set[Any]::
conn.execute("SELECT * FROM large_table")
batch_size = 100
while True:
batch = conn.fetchmany(batch_size)
if not batch:
break
process_batch(batch)
print(f"Processed {len(batch)} rows")
Note:
Efficient for iterating large datasets without loading
everything into memory.
See Also:
:meth:`fetchone`: Fetch single row
:meth:`fetchall`: Fetch all rows
"""
...
[docs]
@runtime_checkable
class TransactionInterface(Protocol):
"""
Protocol for database transactions providing ACID guarantees.
Transactions group multiple database operations into a single atomic unit.
Either all operations succeed (commit) or all fail (rollback).
Example:
Manual transaction management::
tx = db.begin_transaction()
try:
conn.execute("UPDATE accounts SET balance = balance - 100 WHERE id = 1")
conn.execute("UPDATE accounts SET balance = balance + 100 WHERE id = 2")
tx.commit()
except Exception:
tx.rollback()
raise
Preferred context manager approach::
# Automatic commit/rollback
with db.transaction() as conn:
conn.execute("UPDATE accounts SET balance = balance - 100 WHERE id = 1")
conn.execute("UPDATE accounts SET balance = balance + 100 WHERE id = 2")
# Commits automatically if no exception
Note:
Most applications should use :meth:`DatabaseInterface.transaction`
context manager instead of managing transactions manually.
See Also:
:meth:`DatabaseInterface.transaction`: Recommended transaction API
"""
[docs]
def commit(self) -> None:
"""
Commit the current transaction.
Makes all changes since transaction start permanent in the database.
After commit, the transaction is no longer active.
Raises:
DatabaseError: If commit fails (e.g., constraint violation).
TransactionError: If transaction is not active.
Example:
Manual commit::
tx = db.begin_transaction()
try:
conn.execute("INSERT INTO logs VALUES (:msg)", {"msg": "test"})
tx.commit()
log.info("Transaction committed successfully")
except DatabaseError as e:
tx.rollback()
log.error("Commit failed", exc_info=True)
Warning:
After commit, the transaction cannot be reused. Start a new
transaction for additional operations.
See Also:
:meth:`rollback`: Abort transaction
"""
...
[docs]
def rollback(self) -> None:
"""
Rollback the current transaction.
Discards all changes made since transaction start. The database
state returns to what it was before the transaction began.
Raises:
TransactionError: If transaction is not active.
Example:
Manual rollback on error::
tx = db.begin_transaction()
try:
conn.execute("DELETE FROM important_data")
# Validation fails
if not validate_deletion():
tx.rollback()
log.warning("Deletion rolled back - validation failed")
return
tx.commit()
except Exception:
tx.rollback()
raise
Note:
Rollback is safe to call multiple times. Subsequent calls are no-ops.
See Also:
:meth:`commit`: Persist transaction changes
"""
...
@property
def is_active(self) -> bool:
"""
Check if the transaction is still active.
Returns:
bool: True if transaction can accept operations, False if
already committed or rolled back.
Example:
Checking transaction state::
tx = db.begin_transaction()
assert tx.is_active # True
tx.commit()
assert not tx.is_active # False
# This would raise TransactionError
# tx.execute("SELECT 1")
Note:
Useful for conditional logic and error handling.
"""
...
[docs]
@runtime_checkable
class DatabaseInterface(Protocol):
"""
Protocol for database managers in Structum.
This is the main entry point for all database operations. Implementations
must provide connection pooling, transaction management, and health monitoring.
**Architectural Role**:
- **Abstraction Root**: Defines the "language" of persistence (Transactions, Connections).
- **Stability Anchor**: Application code depends on this Protocol, never on `sqlalchemy.*`.
The protocol abstracts away database-specific details, allowing applications
to work with different backends (PostgreSQL, MySQL, SQLite) through a
unified interface.
Implementations:
- :class:`~structum.plugins.database.sqlalchemy.SQLAlchemyDatabase`
- :class:`~structum.plugins.database.postgres.PostgresDatabase`
Example:
Basic usage with configuration::
from structum.plugins.database import SQLAlchemyDatabase
# Initialize from config
db = SQLAlchemyDatabase.from_config()
# Or with explicit URL
db = SQLAlchemyDatabase(url="postgresql://user:pass@localhost/mydb")
# Use transaction context manager (recommended)
with db.transaction() as conn:
conn.execute(
"INSERT INTO users (name, email) VALUES (:name, :email)",
{"name": "John", "email": "john@example.com"}
)
user_id = conn.execute("SELECT last_insert_id()").fetchone()["id"]
conn.execute(
"INSERT INTO profiles (user_id, bio) VALUES (:uid, :bio)",
{"uid": user_id, "bio": "Software engineer"}
)
# Commits automatically on success
# Check health
health = db.health_check()
if health.status != HealthStatus.HEALTHY:
log.warning("Database issues detected", result=health)
Note:
Always use the :meth:`transaction` context manager for database operations.
It ensures proper connection pooling, automatic commit/rollback, and
resource cleanup.
See Also:
:class:`ConnectionInterface`: Connection protocol
:class:`TransactionInterface`: Transaction protocol
:class:`HealthCheckResult`: Health check result data class
"""
@property
def url(self) -> str:
"""
Get the database connection URL (sanitized).
Returns:
str: Database URL with password redacted/masked for security.
Example: ``"postgresql://user:***@localhost:5432/mydb"``
Example:
Logging database configuration::
log.info("Connected to database", url=db.url)
# Logs: postgresql://user:***@localhost/mydb
Note:
Passwords are automatically redacted to prevent accidental logging
of credentials.
"""
...
@property
def is_connected(self) -> bool:
"""
Check if the database connection pool is active.
Returns:
bool: True if connection pool is established, False otherwise.
Example:
Conditional connection::
if not db.is_connected:
db.connect()
log.info("Database connection established")
Note:
Most operations call :meth:`connect` automatically if not connected.
Explicit checking is mainly useful for health checks and diagnostics.
"""
...
[docs]
def connect(self) -> None:
"""
Establish the database connection pool.
Creates and initializes the connection pool. Called automatically
on first database operation if not already connected.
Raises:
ConnectionError: If unable to connect to database.
ConfigurationError: If database URL or settings are invalid.
Example:
Explicit connection during startup::
db = SQLAlchemyDatabase.from_config()
try:
db.connect()
log.info("Database pool created", url=db.url)
except ConnectionError as e:
log.critical("Cannot connect to database", exc_info=True)
sys.exit(1)
Note:
Usually not needed - the database connects automatically on first use.
Explicit connection is useful for fail-fast behavior during startup.
See Also:
:meth:`close`: Shutdown connection pool
"""
...
[docs]
def close(self) -> None:
"""
Close all connections in the pool and release resources.
Should be called during application shutdown to ensure clean termination.
Any ongoing transactions should be completed before closing.
Raises:
RuntimeError: If called while transactions are active (implementation-specific).
Example:
Application shutdown::
import atexit
db = SQLAlchemyDatabase.from_config ()
atexit.register(db.close) # Ensure cleanup on exit
# Or in FastAPI lifespan
@asynccontextmanager
async def lifespan(app: FastAPI):
db.connect()
yield
db.close() # Clean shutdown
Warning:
After calling close(), the database instance should not be reused.
Create a new instance for additional operations.
See Also:
:meth:`connect`: Initialize connection pool
"""
...
[docs]
def get_connection(self) -> ConnectionInterface:
"""
Acquire a connection from the pool.
Returns a connection that must be explicitly returned to the pool
after use. **Prefer using** :meth:`transaction` **instead.**
Returns:
ConnectionInterface: A database connection from the pool.
Raises:
PoolExhaustedError: If no connections are available and pool is at maximum.
ConnectionError: If pool is not initialized.
Example:
Manual connection management (not recommended)::
conn = db.get_connection()
try:
result = conn.execute("SELECT * FROM users")
users = result.fetchall()
finally:
# Must return connection to pool manually
conn.close()
Warning:
Manual connection management is error-prone. Always prefer
:meth:`transaction` context manager which handles connections
automatically.
See Also:
:meth:`transaction`: Recommended connection API
"""
...
[docs]
@contextmanager
def transaction(self) -> Iterator[ConnectionInterface]:
"""
Context manager for database transactions.
Provides automatic transaction management: commits on success,
rolls back on exception. This is the recommended way to perform
database operations.
Yields:
ConnectionInterface: Database connection with active transaction.
Raises:
DatabaseError: If transaction fails to start.
ConnectionError: If pool is exhausted or not connected.
Example:
Recommended usage pattern::
# Single transaction
with db.transaction() as conn:
conn.execute(
"UPDATE accounts SET balance = balance - :amount WHERE id = :id",
{"amount": 100, "id": 1}
)
conn.execute(
"UPDATE accounts SET balance = balance + :amount WHERE id = :id",
{"amount": 100, "id": 2}
)
# Commits automatically if no exception
# Exception triggers rollback
try:
with db.transaction() as conn:
conn.execute("DELETE FROM important_data")
raise ValueError("Validation failed")
except ValueError:
pass # Transaction automatically rolled back
Note:
Transactions are isolated - changes are not visible to other
connections until commit. Isolation level depends on database
implementation (usually READ COMMITTED).
See Also:
:class:`TransactionInterface`: Transaction protocol
:class:`ConnectionInterface`: Connection protocol
"""
...
[docs]
def health_check(self) -> HealthCheckResult:
"""
Check database connectivity and health status.
Performs a simple query to verify the database is responsive and
measures latency. Useful for readiness probes and monitoring.
Returns:
HealthCheckResult: Health status with latency and diagnostic details.
Example:
Health check endpoint::
@app.get("/health/database")
def database_health():
result = db.health_check()
if result.status == HealthStatus.UNHEALTHY:
raise HTTPException(503, detail=result.message)
return {
"status": result.status.value,
"latency_ms": result.latency_ms,
"message": result.message
}
Prometheus metrics::
result = db.health_check()
# Record health status
db_health_gauge.labels(database="main").set[Any](
1 if result.status == HealthStatus.HEALTHY else 0
)
# Record latency
if result.latency_ms:
db_latency_histogram.observe(result.latency_ms)
Note:
Health checks execute a lightweight query (usually `SELECT 1`).
They should complete quickly (<100ms typically).
See Also:
:class:`HealthCheckResult`: Result data class
:class:`HealthStatus`: Health status enumeration
"""
...
# Type alias for clarity in type hints
Database = DatabaseInterface
Connection = ConnectionInterface
Transaction = TransactionInterface