Structum Database (structum-database)

Documentation Source Code Python 3.11+ License: Apache-2.0

Structum Database offers a robust abstraction layer for SQLAlchemy (Sync/Async) and PostgreSQL (psycopg3).

Feature

Status

Version

Status

Alpha

0.1.0

Namespace

structum_lab.plugins.database

Backend

SQLAlchemy, PostgreSQL


Index

  1. What is Database Plugin

  2. Core Concepts

  3. Quick Start (5 Minutes)

  4. SQLAlchemy Backend

  5. PostgreSQL Backend

  6. Connection Pooling

  7. Transactions

  8. Health Checks

  9. Configuration

  10. FastAPI Integration

  11. Testing

  12. Production Best Practices

  13. API Reference


1. What is Database Plugin

structum-database provides production-ready database connectivity with built-in connection pooling, health monitoring, and transaction management.

1.1 The Problem

Before (Without Plugin):

# ❌ Manual connection management
import psycopg2

conn = psycopg2.connect("postgresql://...")
cursor = conn.cursor()
cursor.execute("SELECT * FROM users")
# No pooling
# No health check
# Manual transaction management

After (With Plugin):

# ✅ Automatic connection pooling
# ✅ Integrated Health checks
# ✅ Transaction context manager
# ✅ Type-safe queries

db = SQLAlchemyDatabase.from_config()

with db.transaction() as conn:
    conn.execute("SELECT * FROM users WHERE id = :id", {"id": 1})
    user = conn.fetchone()

1.2 Key Features

Feature

Description

Multiple Backends

SQLAlchemy (universal) or psycopg3 (PostgreSQL-only)

Connection Pooling

Automatic pool management with pre-ping

Transaction Management

Context managers for ACID guarantees

Health Checks

Built-in health monitoring with latency metrics

Type Safety

Full type hints for mypy strict

Config Integration

Setup via Dynaconf with secrets isolation


2. Core Concepts

2.1 Backend Comparison

Feature

SQLAlchemyDatabase

PostgresDatabase

Databases Supported

PostgreSQL, MySQL, SQLite, Oracle, MSSQL

PostgreSQL only

Performance

Good

Excellent (native driver)

ORM Support

✅ Full SQLAlchemy ORM

❌ Raw SQL only

Async Support

via asyncio extension

✅ Native

Pool Management

SQLAlchemy Engine

psycopg3 Pool

Dependencies

sqlalchemy>=2.0

psycopg[binary]>=3.1

Recommendation:

  • Use SQLAlchemyDatabase for multi-database support or ORM needs.

  • Use PostgresDatabase for maximum performance with PostgreSQL only.

2.2 Connection Lifecycle

        graph TD
    subgraph "Application Scope"
        INIT["Startup: db.init"] --> POOL["Create Pool min=2 max=10"]
        POOL --> |"Wait for changes"| SHUT["Shutdown: db.shutdown"]
        SHUT --> CLOSE["Close All Connections"]
    end

    subgraph "Request Scope"
        REQ["Request Start"] --> TX["Begin Transaction"]
        TX --> ACQ["Acquire from Pool"]
        ACQ --> EXEC["Execute Queries"]
        EXEC --> CHECK{"Success?"}
        CHECK -->|Yes| COMMIT[Commit]
        CHECK -->|No| ROLL[Rollback]
        COMMIT & ROLL --> REL["Release to Pool"]
    end

    POOL -.-> |Checkout| ACQ
    REL -.-> |Return| POOL
    

3. Quick Start (5 Minutes)

Step 1: Installation

# SQLAlchemy backend
pip install -e packages/database[sqlalchemy]

# PostgreSQL backend
pip install -e packages/database[postgres]

# Both
pip install -e packages/database[all]

Step 2: Configuration

File: config/app/database.toml

[default]
url = "postgresql://user:password@localhost:5432/mydb"
pool_size = 10
pool_timeout = 30
echo = false  # Set true to log SQL queries

File: config/.secrets.toml

[database]
url = "postgresql://user:YOUR_PASSWORD@prod.db.example.com:5432/production_db"

Step 3: Basic Usage

from structum_lab.plugins.database import SQLAlchemyDatabase
from structum_lab.config import set_config_provider
from structum_lab.plugins.dynaconf import DynaconfConfigProvider

# Setup config
provider = DynaconfConfigProvider(root_path=".")
provider.auto_discover()
set_config_provider(provider)

# Create database connection
db = SQLAlchemyDatabase.from_config()

# Execute query in transaction
with db.transaction() as conn:
    result = conn.execute(
        "SELECT * FROM users WHERE id = :user_id",
        {"user_id": 1}
    )
    user = result.fetchone()
    
    if user:
        print(f"Found user: {user['username']}")

# Shutdown (cleanup)
db.shutdown()

4. SQLAlchemy Backend

4.1 Initialization

from structum_lab.plugins.database import SQLAlchemyDatabase

# From config
db = SQLAlchemyDatabase.from_config()

# Manual initialization
db = SQLAlchemyDatabase(
    url="postgresql://user:pass@localhost/mydb",
    pool_size=10,
    pool_timeout=30,
    pool_recycle=3600,  # Recycle connections after 1 hour
    echo=False,  # Log SQL queries
    pool_pre_ping=True  # Test connections before use
)

4.2 Query Execution

Select Query:

with db.transaction() as conn:
    result = conn.execute(
        "SELECT id, username, email FROM users WHERE active = :active",
        {"active": True}
    )
    
    # Fetch one
    user = result.fetchone()
    if user:
        print(f"{user['id']}: {user['username']}")
    
    # Fetch all
    users = result.fetchall()
    for user in users:
        print(f"{user['id']}: {user['username']}")
    
    # Fetch many
    batch = result.fetchmany(size=100)

Insert Query:

with db.transaction() as conn:
    result = conn.execute(
        """
        INSERT INTO users (username, email, created_at)
        VALUES (:username, :email, :created_at)
        RETURNING id
        """,
        {
            "username": "john_doe",
            "email": "john@example.com",
            "created_at": datetime.utcnow()
        }
    )
    
    new_id = result.fetchone()["id"]
    print(f"Created user with ID: {new_id}")

Update Query:

with db.transaction() as conn:
    result = conn.execute(
        """
        UPDATE users 
        SET last_login = :now
        WHERE id = :user_id
        """,
        {
            "now": datetime.utcnow(),
            "user_id": 123
        }
    )
    
    rows_affected = result.rowcount
    print(f"Updated {rows_affected} rows")

Delete Query:

with db.transaction() as conn:
    result = conn.execute(
        "DELETE FROM sessions WHERE expires_at < :now",
        {"now": datetime.utcnow()}
    )
    
    deleted_count = result.rowcount
    print(f"Deleted {deleted_count} expired sessions")

4.3 ORM Support

from sqlalchemy.orm import Session, DeclarativeBase, Mapped, mapped_column
from sqlalchemy import String, Integer

class Base(DeclarativeBase):
    pass

class User(Base):
    __tablename__ = "users"
    
    id: Mapped[int] = mapped_column(primary_key=True)
    username: Mapped[str] = mapped_column(String(50))
    email: Mapped[str] = mapped_column(String(255))

# Get ORM session
with db.get_session() as session:
    # Query with ORM
    users = session.query(User).filter(User.username.like("john%")).all()
    
    # Create new record
    new_user = User(username="jane_doe", email="jane@example.com")
    session.add(new_user)
    session.commit()
    
    # Update
    user = session.get(User, 1)
    if user:
        user.email = "newemail@example.com"
        session.commit()

5. PostgreSQL Backend

5.1 Initialization

from structum_lab.plugins.database import PostgresDatabase

# From config
db = PostgresDatabase.from_config()

# Manual
db = PostgresDatabase(
    url="postgresql://user:pass@localhost/mydb",
    pool_size=10,
    min_size=2,  # Minimum connections to maintain
    max_size=20,  # Maximum connections
    timeout=5.0
)

5.2 Query Execution

with db.transaction() as conn:
    # Execute with parameters
    result = conn.execute(
        "SELECT * FROM users WHERE id = %s",
        (user_id,)  # Note: tuple for parameters
    )
    
    # Fetch results
    row = result.fetchone()
    if row:
        print(f"User: {row['username']}")

5.3 Async Support

import asyncio
from structum_lab.plugins.database import AsyncPostgresDatabase

async def main():
    db = AsyncPostgresDatabase.from_config()
    
    async with db.transaction() as conn:
        result = await conn.execute(
            "SELECT * FROM users WHERE id = $1",
            [user_id]
        )
        
        user = await result.fetchone()
        print(f"User: {user['username']}")
    
    await db.shutdown()

asyncio.run(main())

6. Connection Pooling

6.1 Pool Configuration

db = SQLAlchemyDatabase(
    url="postgresql://...",
    pool_size=10,          # Normal connections
    max_overflow=20,       # Extra connections under load
    pool_timeout=30,       # Wait time for connection (seconds)
    pool_recycle=3600,     # Recycle after 1 hour
    pool_pre_ping=True     # Test before use
)

6.2 Pool Monitoring

# Get pool status
info = db.get_pool_info()

print(f"Checked out: {info['checked_out']}")
print(f"Available: {info['available']}")
print(f"Total: {info['total']}")
print(f"Overflow: {info['overflow']}")

# Log pool stats
from structum_lab.plugins.observability import get_logger

logger = get_logger(__name__)
logger.info("Database pool status", **info)

6.3 Pool Exhaustion Handling

from sqlalchemy.exc import TimeoutError

try:
    with db.transaction() as conn:
        # Query execution
        pass
except TimeoutError:
    logger.error("Database pool exhausted - increase pool_size or check for connection leaks")
    # Alert monitoring
    metrics.increment("database.pool_exhaustion")

7. Transactions

7.1 Basic Transaction

with db.transaction() as conn:
    # All queries in this block are part of single transaction
    conn.execute("INSERT INTO users (...) VALUES (...)")
    conn.execute("INSERT INTO profiles (...) VALUES (...)")
    
    # Automatic COMMIT on success
    # Automatic ROLLBACK on exception

7.2 Explicit Commit/Rollback

conn = db.get_connection()
try:
    conn.execute("UPDATE accounts SET balance = balance - 100 WHERE id = 1")
    conn.execute("UPDATE accounts SET balance = balance + 100 WHERE id = 2")
    
    conn.commit()
except Exception as e:
    conn.rollback()
    logger.error(f"Transaction failed: {e}")
    raise
finally:
    conn.close()

7.3 Savepoints

with db.transaction() as conn:
    # Main transaction
    conn.execute("INSERT INTO orders (...) VALUES (...)")
    
    # Savepoint for partial rollback
    savepoint = conn.execute("SAVEPOINT sp1")
    
    try:
        conn.execute("INSERT INTO order_items (...) VALUES (...)")
    except Exception:
        # Rollback only to savepoint
        conn.execute("ROLLBACK TO SAVEPOINT sp1")
        logger.warning("Order items insert failed, continuing with order")
    
    # Main transaction still commits

7.4 Nested Transactions

def create_user_with_profile(username: str, email: str):
    """Transactional function."""
    with db.transaction() as conn:
        # Insert user
        result = conn.execute(
            "INSERT INTO users (username, email) VALUES (:u, :e) RETURNING id",
            {"u": username, "e": email}
        )
        user_id = result.fetchone()["id"]
        
        # Insert profile (nested transaction)
        create_profile(conn, user_id)
        
        return user_id

def create_profile(conn, user_id: int):
    """Nested transactional function."""
    conn.execute(
        "INSERT INTO profiles (user_id, bio) VALUES (:id, :bio)",
        {"id": user_id, "bio": "New user"}
    )

8. Health Checks

8.1 Basic Health Check

from structum_lab.plugins.database import HealthStatus

# Perform health check
result = db.health_check()

print(f"Status: {result.status}")           # HealthStatus.HEALTHY
print(f"Message: {result.message}")         # "Database connection OK"
print(f"Latency: {result.latency_ms}ms")   # 2.5

8.2 Health Check Endpoint

from fastapi import FastAPI

app = FastAPI()

@app.get("/health")
async def health():
    """Health check endpoint."""
    db_health = db.health_check()
    
    return {
        "status": "healthy" if db_health.status == HealthStatus.HEALTHY else "unhealthy",
        "database": {
            "status": db_health.status.value,
            "latency_ms": db_health.latency_ms,
            "message": db_health.message
        }
    }

8.3 Detailed Health Monitoring

def monitor_database_health():
    """Periodic health monitoring."""
    result = db.health_check()
    
    # Log metrics
    metrics.gauge("database.latency_ms", result.latency_ms)
    
    if result.status != HealthStatus.HEALTHY:
        logger.error(
            "Database unhealthy",
            status=result.status.value,
            message=result.message,
            latency_ms=result.latency_ms
        )
        
        # Alert
        alert_service.send_alert(
            "Database Health Check Failed",
            f"Status: {result.status.value}\nLatency: {result.latency_ms}ms"
        )
    else:
        logger.debug(f"Database healthy - latency: {result.latency_ms}ms")

9. Configuration

9.1 Complete Configuration Example

File: config/app/database.toml

[default]
url = "postgresql://localhost:5432/dev_db"
pool_size = 5
pool_timeout = 30
pool_recycle = 3600
echo = true  # Log SQL in development

[default.options]
pool_pre_ping = true
connect_args = { "connect_timeout": 10 }

[production]
url = "postgresql://prod.db.internal:5432/prod_db"
pool_size = 20
pool_timeout = 60
pool_recycle = 1800
echo = false

[production.options]
pool_pre_ping = true
pool_use_lifo = true  # LIFO for better connection reuse

File: config/.secrets.toml

[database]
# Production credentials
url = "postgresql://app_user:STRONG_PASSWORD@prod.db.example.com:5432/production"

9.2 Environment Variable Override

# Override database URL
export STRUCTUM_DATABASE__URL="postgresql://localhost/test_db"

# Override pool size
export STRUCTUM_DATABASE__POOL_SIZE=20

# Multiple databases
export STRUCTUM_PRIMARY_DB__URL="postgresql://..."
export STRUCTUM_REPLICA_DB__URL="postgresql://..."

9.3 Multiple Database Connections

# Primary database
primary_db = SQLAlchemyDatabase.from_config(namespace="primary_db")

# Read replica
replica_db = SQLAlchemyDatabase.from_config(namespace="replica_db")

# Writes to primary
with primary_db.transaction() as conn:
    conn.execute("INSERT INTO users (...) VALUES (...)")

# Reads from replica
with replica_db.transaction() as conn:
    result = conn.execute("SELECT * FROM users WHERE id = :id", {"id": 1})
    user = result.fetchone()

10. FastAPI Integration

10.1 Dependency Injection

from fastapi import FastAPI, Depends
from structum_lab.plugins.database import get_database, DatabaseInterface

app = FastAPI()

# Database initialization
@app.on_event("startup")
async def startup():
    db = SQLAlchemyDatabase.from_config()
    set_database(db)

@app.on_event("shutdown")
async def shutdown():
    db = get_database()
    db.shutdown()

# Dependency
def get_db() -> DatabaseInterface:
    return get_database()

# Route with injection
@app.get("/users/{user_id}")
async def get_user(user_id: int, db: DatabaseInterface = Depends(get_db)):
    with db.transaction() as conn:
        result = conn.execute(
            "SELECT * FROM users WHERE id = :id",
            {"id": user_id}
        )
        user = result.fetchone()
        
        if not user:
            raise HTTPException(status_code=404, detail="User not found")
        
        return {"id": user["id"], "username": user["username"]}

10.2 Transaction Middleware

from starlette.middleware.base import BaseHTTPMiddleware

class TransactionMiddleware(BaseHTTPMiddleware):
    """Automatic transaction per request."""
    
    async def dispatch(self, request, call_next):
        db = get_database()
        
        with db.transaction() as conn:
            # Attach connection to request state
            request.state.db_conn = conn
            
            try:
                response = await call_next(request)
                # Transaction auto-commits on success
                return response
            except Exception:
                # Transaction auto-rollbacks on error
                raise

app.add_middleware(TransactionMiddleware)

# Routes can access connection
@app.post("/users")
async def create_user(user: UserCreate, request: Request):
    conn = request.state.db_conn
    
    result = conn.execute(
        "INSERT INTO users (username, email) VALUES (:u, :e) RETURNING id",
        {"u": user.username, "e": user.email}
    )
    
    new_id = result.fetchone()["id"]
    return {"id": new_id}

11. Testing

11.1 Test Database Setup

import pytest
from structum_lab.plugins.database import SQLAlchemyDatabase

@pytest.fixture(scope="session")
def test_db():
    """Create test database."""
    db = SQLAlchemyDatabase(
        url="postgresql://localhost/test_db",
        pool_size=2
    )
    
    # Run migrations
    with db.transaction() as conn:
        conn.execute("""
            CREATE TABLE IF NOT EXISTS users (
                id SERIAL PRIMARY KEY,
                username VARCHAR(50) UNIQUE NOT NULL,
                email VARCHAR(255) UNIQUE NOT NULL
            )
        """)
    
    yield db
    
    # Cleanup
    with db.transaction() as conn:
        conn.execute("DROP TABLE IF EXISTS users")
    
    db.shutdown()

@pytest.fixture
def clean_db(test_db):
    """Clean database before each test."""
    with test_db.transaction() as conn:
        conn.execute("TRUNCATE users RESTART IDENTITY CASCADE")
    
    yield test_db

11.2 Test Queries

def test_create_user(clean_db):
    """Test user creation."""
    with clean_db.transaction() as conn:
        result = conn.execute(
            "INSERT INTO users (username, email) VALUES (:u, :e) RETURNING id",
            {"u": "testuser", "e": "test@example.com"}
        )
        
        user_id = result.fetchone()["id"]
        assert user_id > 0

12. Production Best Practices

  • Pool Size Calculation: pool_size = (CPUs * 2) + effective_spindle_count

  • Pre-Ping: Always enabled pool_pre_ping=True to recover from dropped connections.

  • Timeouts: Set pool_timeout to fail fast under load (e.g., 5s).

  • Secrets: Never commit DB passwords. Use .secrets.toml.


13. API Reference

See the Core Database Module for protocol definitions.