Core Database Protocols

Database interfaces for Structum.

Architectural Role: This module acts as the Persistence Facade (DP-5). It allows the application to interact with data storages through a high-level API (DatabaseInterface) without coupling to specific drivers (SQLAlchemy, Psycopg, etc.).

Key Principles: 1. Driver Agnosticism: Switch from SQLite to Postgres by changing config. 2. Connection Management: Pooling and lifecycle are handled by the plugin. 3. Transaction Boundaries: Explicit with db.transaction(): syntax.

class structum.database.DatabaseInterface(*args, **kwargs)[source]

Bases: Protocol

Protocol for database managers in Structum.

This is the main entry point for all database operations. Implementations must provide connection pooling, transaction management, and health monitoring.

Architectural Role: - Abstraction Root: Defines the “language” of persistence (Transactions, Connections). - Stability Anchor: Application code depends on this Protocol, never on sqlalchemy.*.

The protocol abstracts away database-specific details, allowing applications to work with different backends (PostgreSQL, MySQL, SQLite) through a unified interface.

Implementations:
  • SQLAlchemyDatabase

  • PostgresDatabase

Example

Basic usage with configuration:

from structum.plugins.database import SQLAlchemyDatabase

# Initialize from config
db = SQLAlchemyDatabase.from_config()

# Or with explicit URL
db = SQLAlchemyDatabase(url="postgresql://user:pass@localhost/mydb")

# Use transaction context manager (recommended)
with db.transaction() as conn:
    conn.execute(
        "INSERT INTO users (name, email) VALUES (:name, :email)",
        {"name": "John", "email": "john@example.com"}
    )

    user_id = conn.execute("SELECT last_insert_id()").fetchone()["id"]

    conn.execute(
        "INSERT INTO profiles (user_id, bio) VALUES (:uid, :bio)",
        {"uid": user_id, "bio": "Software engineer"}
    )
    # Commits automatically on success
# Check health
health = db.health_check()
if health.status != HealthStatus.HEALTHY:
    log.warning("Database issues detected", result=health)

Note

Always use the transaction() context manager for database operations. It ensures proper connection pooling, automatic commit/rollback, and resource cleanup.

See also

ConnectionInterface: Connection protocol TransactionInterface: Transaction protocol HealthCheckResult: Health check result data class

__init__(*args, **kwargs)
close() None[source]

Close all connections in the pool and release resources.

Should be called during application shutdown to ensure clean termination. Any ongoing transactions should be completed before closing.

Raises:

RuntimeError – If called while transactions are active (implementation-specific).

Example

Application shutdown:

import atexit

db = SQLAlchemyDatabase.from_config ()
atexit.register(db.close)  # Ensure cleanup on exit

# Or in FastAPI lifespan
@asynccontextmanager
async def lifespan(app: FastAPI):
    db.connect()
    yield
    db.close()  # Clean shutdown

Warning

After calling close(), the database instance should not be reused. Create a new instance for additional operations.

See also

connect(): Initialize connection pool

connect() None[source]

Establish the database connection pool.

Creates and initializes the connection pool. Called automatically on first database operation if not already connected.

Raises:
  • ConnectionError – If unable to connect to database.

  • ConfigurationError – If database URL or settings are invalid.

Example

Explicit connection during startup:

db = SQLAlchemyDatabase.from_config()

try:
    db.connect()
    log.info("Database pool created", url=db.url)
except ConnectionError as e:
    log.critical("Cannot connect to database", exc_info=True)
    sys.exit(1)

Note

Usually not needed - the database connects automatically on first use. Explicit connection is useful for fail-fast behavior during startup.

See also

close(): Shutdown connection pool

get_connection() ConnectionInterface[source]

Acquire a connection from the pool.

Returns a connection that must be explicitly returned to the pool after use. Prefer using transaction() instead.

Returns:

A database connection from the pool.

Return type:

ConnectionInterface

Raises:
  • PoolExhaustedError – If no connections are available and pool is at maximum.

  • ConnectionError – If pool is not initialized.

Example

Manual connection management (not recommended):

conn = db.get_connection()
try:
    result = conn.execute("SELECT * FROM users")
    users = result.fetchall()
finally:
    # Must return connection to pool manually
    conn.close()

Warning

Manual connection management is error-prone. Always prefer transaction() context manager which handles connections automatically.

See also

transaction(): Recommended connection API

health_check() HealthCheckResult[source]

Check database connectivity and health status.

Performs a simple query to verify the database is responsive and measures latency. Useful for readiness probes and monitoring.

Returns:

Health status with latency and diagnostic details.

Return type:

HealthCheckResult

Example

Health check endpoint:

@app.get("/health/database")
def database_health():
    result = db.health_check()

    if result.status == HealthStatus.UNHEALTHY:
        raise HTTPException(503, detail=result.message)

    return {
        "status": result.status.value,
        "latency_ms": result.latency_ms,
        "message": result.message
    }

Prometheus metrics:

result = db.health_check()

# Record health status
db_health_gauge.labels(database="main").set[Any](
    1 if result.status == HealthStatus.HEALTHY else 0
)

# Record latency
if result.latency_ms:
    db_latency_histogram.observe(result.latency_ms)

Note

Health checks execute a lightweight query (usually SELECT 1). They should complete quickly (<100ms typically).

See also

HealthCheckResult: Result data class HealthStatus: Health status enumeration

property is_connected : bool

Check if the database connection pool is active.

Returns:

True if connection pool is established, False otherwise.

Return type:

bool

Example

Conditional connection:

if not db.is_connected:
    db.connect()
    log.info("Database connection established")

Note

Most operations call connect() automatically if not connected. Explicit checking is mainly useful for health checks and diagnostics.

transaction() Iterator[ConnectionInterface][source]

Context manager for database transactions.

Provides automatic transaction management: commits on success, rolls back on exception. This is the recommended way to perform database operations.

Yields:

ConnectionInterface – Database connection with active transaction.

Raises:
  • DatabaseError – If transaction fails to start.

  • ConnectionError – If pool is exhausted or not connected.

Example

Recommended usage pattern:

# Single transaction
with db.transaction() as conn:
    conn.execute(
        "UPDATE accounts SET balance = balance - :amount WHERE id = :id",
        {"amount": 100, "id": 1}
    )
    conn.execute(
        "UPDATE accounts SET balance = balance + :amount WHERE id = :id",
        {"amount": 100, "id": 2}
    )
    # Commits automatically if no exception

# Exception triggers rollback
try:
    with db.transaction() as conn:
        conn.execute("DELETE FROM important_data")
        raise ValueError("Validation failed")
except ValueError:
    pass  # Transaction automatically rolled back

Note

Transactions are isolated - changes are not visible to other connections until commit. Isolation level depends on database implementation (usually READ COMMITTED).

See also

TransactionInterface: Transaction protocol ConnectionInterface: Connection protocol

property url : str

Get the database connection URL (sanitized).

Returns:

Database URL with password redacted/masked for security.

Example: "postgresql://user:***@localhost:5432/mydb"

Return type:

str

Example

Logging database configuration:

log.info("Connected to database", url=db.url)
# Logs: postgresql://user:***@localhost/mydb

Note

Passwords are automatically redacted to prevent accidental logging of credentials.

class structum.database.ConnectionInterface(*args, **kwargs)[source]

Bases: Protocol

Protocol for database connections in Structum.

A connection represents an active link to the database that can execute queries and retrieve results. Connections are typically obtained from a connection pool and should be returned after use.

Implementations:
  • SQLAlchemyConnection

  • PostgresConnection

Example

Using a connection within a transaction:

with db.transaction() as conn:
    # Execute query with named parameters
    conn.execute(
        "INSERT INTO users (name, email) VALUES (:name, :email)",
        {"name": "John", "email": "john@example.com"}
    )

    # Fetch results
    conn.execute("SELECT * FROM users WHERE id > :id", {"id": 100})
    users = conn.fetchall()
    for user in users:
        print(f"{user['name']} - {user['email']}")

Note

Connections are not thread-safe. Use one connection per thread or protect access with locks.

See also

DatabaseInterface: Database manager providing connections DatabaseInterface.transaction(): Recommended way to get connections

__init__(*args, **kwargs)
execute(query: str, params: dict[str, Any] | tuple[Any, ...] | None = None) Any[source]

Execute a SQL query with optional parameters.

Supports both named parameters (dict[str, Any]) and positional parameters (tuple[Any, …]). Use :name syntax for named parameters, ? for positional.

Parameters:
query : str

SQL query string. Use :param_name for named parameters or ? for positional parameters.

params : dict[str, Any] | tuple[Any, ...] | None

Query parameters. Use dict[str, Any] for named parameters, tuple[Any, …] for positional. Defaults to None.

Returns:

Result object (implementation-specific). Use fetch methods

to retrieve rows.

Return type:

Any

Raises:
  • DatabaseError – If query execution fails.

  • ParameterError – If parameters don’t match query placeholders.

Example

Different parameter styles:

# Named parameters (recommended)
conn.execute(
    "SELECT * FROM users WHERE age > :min_age AND city = :city",
    {"min_age": 18, "city": "NYC"}
)

# Positional parameters
conn.execute(
    "SELECT * FROM users WHERE age > ? AND city = ?",
    (18, "NYC")
)

# No parameters
conn.execute("SELECT COUNT(*) FROM users")

Warning

Always use parameterized queries. Never use string interpolation for user input (SQL injection risk).

See also

fetchone(): Retrieve single row fetchall(): Retrieve all rows

fetchall() list[dict[str, Any]][source]

Fetch all remaining rows from the last executed query.

Returns:

List of rows as dictionaries. Empty list[Any]

if no rows or cursor exhausted.

Return type:

list[dict[str, Any]]

Example

Fetching multiple rows:

conn.execute("SELECT * FROM users WHERE active = :active", {"active": True})
users = conn.fetchall()

for user in users:
    print(f"{user['id']}: {user['name']}")

print(f"Total active users: {len(users)}")

Warning

For large result sets, consider using fetchmany() to avoid loading all rows into memory at once.

See also

fetchone(): Fetch single row fetchmany(): Fetch rows in batches

fetchmany(size: int) list[dict[str, Any]][source]

Fetch up to size rows from the last executed query.

Useful for processing large result sets in batches to manage memory usage.

Parameters:
size : int

Maximum number of rows to fetch. Must be > 0.

Returns:

List of rows (up to size). May return fewer

rows if result set[Any] is exhausted. Empty list[Any] if no rows remain.

Return type:

list[dict[str, Any]]

Example

Batch processing large result set[Any]:

conn.execute("SELECT * FROM large_table")

batch_size = 100
while True:
    batch = conn.fetchmany(batch_size)
    if not batch:
        break

    process_batch(batch)
    print(f"Processed {len(batch)} rows")

Note

Efficient for iterating large datasets without loading everything into memory.

See also

fetchone(): Fetch single row fetchall(): Fetch all rows

fetchone() dict[str, Any] | None[source]

Fetch the next row from the last executed query.

Returns:

Row as dictionary with column names as keys,

or None if no more rows available.

Return type:

dict[str, Any] | None

Example

Fetching single row:

conn.execute("SELECT * FROM users WHERE id = :id", {"id": 1})
user = conn.fetchone()

if user:
    print(f"Found user: {user['name']}")
else:
    print("User not found")

Note

Call after execute(). Returns None when cursor exhausted.

See also

fetchall(): Fetch all remaining rows fetchmany(): Fetch specific number of rows

class structum.database.TransactionInterface(*args, **kwargs)[source]

Bases: Protocol

Protocol for database transactions providing ACID guarantees.

Transactions group multiple database operations into a single atomic unit. Either all operations succeed (commit) or all fail (rollback).

Example

Manual transaction management:

tx = db.begin_transaction()
try:
    conn.execute("UPDATE accounts SET balance = balance - 100 WHERE id = 1")
    conn.execute("UPDATE accounts SET balance = balance + 100 WHERE id = 2")
    tx.commit()
except Exception:
    tx.rollback()
    raise

Preferred context manager approach:

# Automatic commit/rollback
with db.transaction() as conn:
    conn.execute("UPDATE accounts SET balance = balance - 100 WHERE id = 1")
    conn.execute("UPDATE accounts SET balance = balance + 100 WHERE id = 2")
    # Commits automatically if no exception

Note

Most applications should use DatabaseInterface.transaction() context manager instead of managing transactions manually.

See also

DatabaseInterface.transaction(): Recommended transaction API

__init__(*args, **kwargs)
commit() None[source]

Commit the current transaction.

Makes all changes since transaction start permanent in the database. After commit, the transaction is no longer active.

Raises:
  • DatabaseError – If commit fails (e.g., constraint violation).

  • TransactionError – If transaction is not active.

Example

Manual commit:

tx = db.begin_transaction()
try:
    conn.execute("INSERT INTO logs VALUES (:msg)", {"msg": "test"})
    tx.commit()
    log.info("Transaction committed successfully")
except DatabaseError as e:
    tx.rollback()
    log.error("Commit failed", exc_info=True)

Warning

After commit, the transaction cannot be reused. Start a new transaction for additional operations.

See also

rollback(): Abort transaction

property is_active : bool

Check if the transaction is still active.

Returns:

True if transaction can accept operations, False if

already committed or rolled back.

Return type:

bool

Example

Checking transaction state:

tx = db.begin_transaction()
assert tx.is_active  # True

tx.commit()
assert not tx.is_active  # False

# This would raise TransactionError
# tx.execute("SELECT 1")

Note

Useful for conditional logic and error handling.

rollback() None[source]

Rollback the current transaction.

Discards all changes made since transaction start. The database state returns to what it was before the transaction began.

Raises:

TransactionError – If transaction is not active.

Example

Manual rollback on error:

tx = db.begin_transaction()
try:
    conn.execute("DELETE FROM important_data")
    # Validation fails
    if not validate_deletion():
        tx.rollback()
        log.warning("Deletion rolled back - validation failed")
        return
    tx.commit()
except Exception:
    tx.rollback()
    raise

Note

Rollback is safe to call multiple times. Subsequent calls are no-ops.

See also

commit(): Persist transaction changes

class structum.database.HealthCheckResult(status: HealthStatus, message: str, latency_ms: float | None = None, details: dict[str, Any] | None = None)[source]

Bases: object

Data class representing database health check results.

status

Overall health status (HEALTHY, DEGRADED, UNHEALTHY).

Type:

HealthStatus

message

Human-readable status description.

Type:

str

latency_ms

Query latency in milliseconds, if available.

Type:

float | None

details

Additional diagnostic information (e.g., active connections, pool statistics).

Type:

dict[str, Any] | None

Example

Creating and using a health check result:

result = HealthCheckResult(
    status=HealthStatus.HEALTHY,
    message="Database connection OK",
    latency_ms=2.5,
    details={"active_connections": 5, "pool_size": 10}
)

# Use in monitoring/alerts
if result.latency_ms and result.latency_ms > 100:
    alert("High database latency", result.latency_ms)

Note

This class is frozen (immutable) to ensure health check results cannot be modified after creation.

__init__(status: HealthStatus, message: str, latency_ms: float | None = None, details: dict[str, Any] | None = None)
details : dict[str, Any] | None = None
latency_ms : float | None = None
status : HealthStatus
message : str
class structum.database.HealthStatus(*values)[source]

Bases: Enum

Enumeration of possible database health states.

HEALTHY

Database is fully operational with normal latency.

DEGRADED

Database is responsive but experiencing issues (e.g., high latency, connection warnings).

UNHEALTHY

Database is unreachable or critically impaired.

Example

Checking health status:

result = db.health_check()
if result.status == HealthStatus.HEALTHY:
    log.info("Database OK", latency_ms=result.latency_ms)
elif result.status == HealthStatus.DEGRADED:
    log.warning("Database degraded", message=result.message)
else:
    log.error("Database unhealthy", message=result.message)
HEALTHY = 'healthy'
DEGRADED = 'degraded'
UNHEALTHY = 'unhealthy'
structum.database.Database

alias of DatabaseInterface

structum.database.Connection

alias of ConnectionInterface

structum.database.Transaction

alias of TransactionInterface

Interfaces