Core Database Protocols¶
Database interfaces for Structum.
Architectural Role: This module acts as the Persistence Facade (DP-5). It allows the application to interact with data storages through a high-level API (DatabaseInterface) without coupling to specific drivers (SQLAlchemy, Psycopg, etc.).
Key Principles: 1. Driver Agnosticism: Switch from SQLite to Postgres by changing config. 2. Connection Management: Pooling and lifecycle are handled by the plugin. 3. Transaction Boundaries: Explicit with db.transaction(): syntax.
- class structum.database.DatabaseInterface(*args, **kwargs)[source]¶
Bases:
ProtocolProtocol for database managers in Structum.
This is the main entry point for all database operations. Implementations must provide connection pooling, transaction management, and health monitoring.
Architectural Role: - Abstraction Root: Defines the “language” of persistence (Transactions, Connections). - Stability Anchor: Application code depends on this Protocol, never on sqlalchemy.*.
The protocol abstracts away database-specific details, allowing applications to work with different backends (PostgreSQL, MySQL, SQLite) through a unified interface.
- Implementations:
SQLAlchemyDatabasePostgresDatabase
Example
Basic usage with configuration:
from structum.plugins.database import SQLAlchemyDatabase # Initialize from config db = SQLAlchemyDatabase.from_config() # Or with explicit URL db = SQLAlchemyDatabase(url="postgresql://user:pass@localhost/mydb") # Use transaction context manager (recommended) with db.transaction() as conn: conn.execute( "INSERT INTO users (name, email) VALUES (:name, :email)", {"name": "John", "email": "john@example.com"} ) user_id = conn.execute("SELECT last_insert_id()").fetchone()["id"] conn.execute( "INSERT INTO profiles (user_id, bio) VALUES (:uid, :bio)", {"uid": user_id, "bio": "Software engineer"} ) # Commits automatically on success # Check health health = db.health_check() if health.status != HealthStatus.HEALTHY: log.warning("Database issues detected", result=health)Note
Always use the
transaction()context manager for database operations. It ensures proper connection pooling, automatic commit/rollback, and resource cleanup.See also
ConnectionInterface: Connection protocolTransactionInterface: Transaction protocolHealthCheckResult: Health check result data class- close() None[source]¶
Close all connections in the pool and release resources.
Should be called during application shutdown to ensure clean termination. Any ongoing transactions should be completed before closing.
- Raises:¶
RuntimeError – If called while transactions are active (implementation-specific).
Example
Application shutdown:
import atexit db = SQLAlchemyDatabase.from_config () atexit.register(db.close) # Ensure cleanup on exit # Or in FastAPI lifespan @asynccontextmanager async def lifespan(app: FastAPI): db.connect() yield db.close() # Clean shutdownWarning
After calling close(), the database instance should not be reused. Create a new instance for additional operations.
See also
connect(): Initialize connection pool
- connect() None[source]¶
Establish the database connection pool.
Creates and initializes the connection pool. Called automatically on first database operation if not already connected.
- Raises:¶
ConnectionError – If unable to connect to database.
ConfigurationError – If database URL or settings are invalid.
Example
Explicit connection during startup:
db = SQLAlchemyDatabase.from_config() try: db.connect() log.info("Database pool created", url=db.url) except ConnectionError as e: log.critical("Cannot connect to database", exc_info=True) sys.exit(1)Note
Usually not needed - the database connects automatically on first use. Explicit connection is useful for fail-fast behavior during startup.
See also
close(): Shutdown connection pool
- get_connection() ConnectionInterface[source]¶
Acquire a connection from the pool.
Returns a connection that must be explicitly returned to the pool after use. Prefer using
transaction()instead.- Returns:¶
A database connection from the pool.
- Return type:¶
- Raises:¶
PoolExhaustedError – If no connections are available and pool is at maximum.
ConnectionError – If pool is not initialized.
Example
Manual connection management (not recommended):
conn = db.get_connection() try: result = conn.execute("SELECT * FROM users") users = result.fetchall() finally: # Must return connection to pool manually conn.close()Warning
Manual connection management is error-prone. Always prefer
transaction()context manager which handles connections automatically.See also
transaction(): Recommended connection API
- health_check() HealthCheckResult[source]¶
Check database connectivity and health status.
Performs a simple query to verify the database is responsive and measures latency. Useful for readiness probes and monitoring.
Example
Health check endpoint:
@app.get("/health/database") def database_health(): result = db.health_check() if result.status == HealthStatus.UNHEALTHY: raise HTTPException(503, detail=result.message) return { "status": result.status.value, "latency_ms": result.latency_ms, "message": result.message }Prometheus metrics:
result = db.health_check() # Record health status db_health_gauge.labels(database="main").set[Any]( 1 if result.status == HealthStatus.HEALTHY else 0 ) # Record latency if result.latency_ms: db_latency_histogram.observe(result.latency_ms)Note
Health checks execute a lightweight query (usually SELECT 1). They should complete quickly (<100ms typically).
See also
HealthCheckResult: Result data classHealthStatus: Health status enumeration
- property is_connected : bool¶
Check if the database connection pool is active.
Example
Conditional connection:
if not db.is_connected: db.connect() log.info("Database connection established")Note
Most operations call
connect()automatically if not connected. Explicit checking is mainly useful for health checks and diagnostics.
- transaction() Iterator[ConnectionInterface][source]¶
Context manager for database transactions.
Provides automatic transaction management: commits on success, rolls back on exception. This is the recommended way to perform database operations.
- Yields:¶
ConnectionInterface – Database connection with active transaction.
- Raises:¶
DatabaseError – If transaction fails to start.
ConnectionError – If pool is exhausted or not connected.
Example
Recommended usage pattern:
# Single transaction with db.transaction() as conn: conn.execute( "UPDATE accounts SET balance = balance - :amount WHERE id = :id", {"amount": 100, "id": 1} ) conn.execute( "UPDATE accounts SET balance = balance + :amount WHERE id = :id", {"amount": 100, "id": 2} ) # Commits automatically if no exception # Exception triggers rollback try: with db.transaction() as conn: conn.execute("DELETE FROM important_data") raise ValueError("Validation failed") except ValueError: pass # Transaction automatically rolled backNote
Transactions are isolated - changes are not visible to other connections until commit. Isolation level depends on database implementation (usually READ COMMITTED).
See also
TransactionInterface: Transaction protocolConnectionInterface: Connection protocol
- property url : str¶
Get the database connection URL (sanitized).
- Returns:¶
- Database URL with password redacted/masked for security.
Example:
"postgresql://user:***@localhost:5432/mydb"
- Return type:¶
Example
Logging database configuration:
log.info("Connected to database", url=db.url) # Logs: postgresql://user:***@localhost/mydbNote
Passwords are automatically redacted to prevent accidental logging of credentials.
- class structum.database.ConnectionInterface(*args, **kwargs)[source]¶
Bases:
ProtocolProtocol for database connections in Structum.
A connection represents an active link to the database that can execute queries and retrieve results. Connections are typically obtained from a connection pool and should be returned after use.
- Implementations:
SQLAlchemyConnectionPostgresConnection
Example
Using a connection within a transaction:
with db.transaction() as conn: # Execute query with named parameters conn.execute( "INSERT INTO users (name, email) VALUES (:name, :email)", {"name": "John", "email": "john@example.com"} ) # Fetch results conn.execute("SELECT * FROM users WHERE id > :id", {"id": 100}) users = conn.fetchall() for user in users: print(f"{user['name']} - {user['email']}")Note
Connections are not thread-safe. Use one connection per thread or protect access with locks.
See also
DatabaseInterface: Database manager providing connectionsDatabaseInterface.transaction(): Recommended way to get connections-
execute(query: str, params: dict[str, Any] | tuple[Any, ...] | None =
None) Any[source]¶ Execute a SQL query with optional parameters.
Supports both named parameters (dict[str, Any]) and positional parameters (tuple[Any, …]). Use
:namesyntax for named parameters,?for positional.- Parameters:¶
- Returns:¶
- Result object (implementation-specific). Use fetch methods
to retrieve rows.
- Return type:¶
Any
- Raises:¶
DatabaseError – If query execution fails.
ParameterError – If parameters don’t match query placeholders.
Example
Different parameter styles:
# Named parameters (recommended) conn.execute( "SELECT * FROM users WHERE age > :min_age AND city = :city", {"min_age": 18, "city": "NYC"} ) # Positional parameters conn.execute( "SELECT * FROM users WHERE age > ? AND city = ?", (18, "NYC") ) # No parameters conn.execute("SELECT COUNT(*) FROM users")Warning
Always use parameterized queries. Never use string interpolation for user input (SQL injection risk).
See also
fetchone(): Retrieve single rowfetchall(): Retrieve all rows
- fetchall() list[dict[str, Any]][source]¶
Fetch all remaining rows from the last executed query.
- Returns:¶
- List of rows as dictionaries. Empty list[Any]
if no rows or cursor exhausted.
- Return type:¶
Example
Fetching multiple rows:
conn.execute("SELECT * FROM users WHERE active = :active", {"active": True}) users = conn.fetchall() for user in users: print(f"{user['id']}: {user['name']}") print(f"Total active users: {len(users)}")Warning
For large result sets, consider using
fetchmany()to avoid loading all rows into memory at once.See also
fetchone(): Fetch single rowfetchmany(): Fetch rows in batches
- fetchmany(size: int) list[dict[str, Any]][source]¶
Fetch up to
sizerows from the last executed query.Useful for processing large result sets in batches to manage memory usage.
- Parameters:¶
- Returns:¶
- List of rows (up to size). May return fewer
rows if result set[Any] is exhausted. Empty list[Any] if no rows remain.
- Return type:¶
Example
Batch processing large result set[Any]:
conn.execute("SELECT * FROM large_table") batch_size = 100 while True: batch = conn.fetchmany(batch_size) if not batch: break process_batch(batch) print(f"Processed {len(batch)} rows")Note
Efficient for iterating large datasets without loading everything into memory.
See also
fetchone(): Fetch single rowfetchall(): Fetch all rows
- fetchone() dict[str, Any] | None[source]¶
Fetch the next row from the last executed query.
- Returns:¶
- Row as dictionary with column names as keys,
or None if no more rows available.
- Return type:¶
Example
Fetching single row:
conn.execute("SELECT * FROM users WHERE id = :id", {"id": 1}) user = conn.fetchone() if user: print(f"Found user: {user['name']}") else: print("User not found")Note
Call after
execute(). Returns None when cursor exhausted.See also
fetchall(): Fetch all remaining rowsfetchmany(): Fetch specific number of rows
- class structum.database.TransactionInterface(*args, **kwargs)[source]¶
Bases:
ProtocolProtocol for database transactions providing ACID guarantees.
Transactions group multiple database operations into a single atomic unit. Either all operations succeed (commit) or all fail (rollback).
Example
Manual transaction management:
tx = db.begin_transaction() try: conn.execute("UPDATE accounts SET balance = balance - 100 WHERE id = 1") conn.execute("UPDATE accounts SET balance = balance + 100 WHERE id = 2") tx.commit() except Exception: tx.rollback() raisePreferred context manager approach:
# Automatic commit/rollback with db.transaction() as conn: conn.execute("UPDATE accounts SET balance = balance - 100 WHERE id = 1") conn.execute("UPDATE accounts SET balance = balance + 100 WHERE id = 2") # Commits automatically if no exceptionNote
Most applications should use
DatabaseInterface.transaction()context manager instead of managing transactions manually.See also
DatabaseInterface.transaction(): Recommended transaction API- commit() None[source]¶
Commit the current transaction.
Makes all changes since transaction start permanent in the database. After commit, the transaction is no longer active.
- Raises:¶
DatabaseError – If commit fails (e.g., constraint violation).
TransactionError – If transaction is not active.
Example
Manual commit:
tx = db.begin_transaction() try: conn.execute("INSERT INTO logs VALUES (:msg)", {"msg": "test"}) tx.commit() log.info("Transaction committed successfully") except DatabaseError as e: tx.rollback() log.error("Commit failed", exc_info=True)Warning
After commit, the transaction cannot be reused. Start a new transaction for additional operations.
See also
rollback(): Abort transaction
- property is_active : bool¶
Check if the transaction is still active.
- Returns:¶
- True if transaction can accept operations, False if
already committed or rolled back.
- Return type:¶
Example
Checking transaction state:
tx = db.begin_transaction() assert tx.is_active # True tx.commit() assert not tx.is_active # False # This would raise TransactionError # tx.execute("SELECT 1")Note
Useful for conditional logic and error handling.
- rollback() None[source]¶
Rollback the current transaction.
Discards all changes made since transaction start. The database state returns to what it was before the transaction began.
- Raises:¶
TransactionError – If transaction is not active.
Example
Manual rollback on error:
tx = db.begin_transaction() try: conn.execute("DELETE FROM important_data") # Validation fails if not validate_deletion(): tx.rollback() log.warning("Deletion rolled back - validation failed") return tx.commit() except Exception: tx.rollback() raiseNote
Rollback is safe to call multiple times. Subsequent calls are no-ops.
See also
commit(): Persist transaction changes
-
class structum.database.HealthCheckResult(status: HealthStatus, message: str, latency_ms: float | None =
None, details: dict[str, Any] | None =None)[source]¶ Bases:
objectData class representing database health check results.
- details¶
Additional diagnostic information (e.g., active connections, pool statistics).
Example
Creating and using a health check result:
result = HealthCheckResult( status=HealthStatus.HEALTHY, message="Database connection OK", latency_ms=2.5, details={"active_connections": 5, "pool_size": 10} ) # Use in monitoring/alerts if result.latency_ms and result.latency_ms > 100: alert("High database latency", result.latency_ms)Note
This class is frozen (immutable) to ensure health check results cannot be modified after creation.
-
__init__(status: HealthStatus, message: str, latency_ms: float | None =
None, details: dict[str, Any] | None =None)¶
- status : HealthStatus¶
- class structum.database.HealthStatus(*values)[source]¶
Bases:
EnumEnumeration of possible database health states.
- HEALTHY¶
Database is fully operational with normal latency.
- DEGRADED¶
Database is responsive but experiencing issues (e.g., high latency, connection warnings).
- UNHEALTHY¶
Database is unreachable or critically impaired.
Example
Checking health status:
result = db.health_check() if result.status == HealthStatus.HEALTHY: log.info("Database OK", latency_ms=result.latency_ms) elif result.status == HealthStatus.DEGRADED: log.warning("Database degraded", message=result.message) else: log.error("Database unhealthy", message=result.message)-
HEALTHY =
'healthy'¶
-
DEGRADED =
'degraded'¶
-
UNHEALTHY =
'unhealthy'¶
- structum.database.Database¶
alias of
DatabaseInterface
- structum.database.Connection¶
alias of
ConnectionInterface
- structum.database.Transaction¶
alias of
TransactionInterface