Structum Database (structum-database)¶
Structum Database offers a robust abstraction layer for SQLAlchemy (Sync/Async) and PostgreSQL (psycopg3).
Feature |
Status |
Version |
|---|---|---|
Status |
Alpha |
0.1.0 |
Namespace |
|
|
Backend |
SQLAlchemy, PostgreSQL |
Index¶
1. What is Database Plugin¶
structum-database provides production-ready database connectivity with built-in connection pooling, health monitoring, and transaction management.
1.1 The Problem¶
Before (Without Plugin):
# ❌ Manual connection management
import psycopg2
conn = psycopg2.connect("postgresql://...")
cursor = conn.cursor()
cursor.execute("SELECT * FROM users")
# No pooling
# No health check
# Manual transaction management
After (With Plugin):
# ✅ Automatic connection pooling
# ✅ Integrated Health checks
# ✅ Transaction context manager
# ✅ Type-safe queries
db = SQLAlchemyDatabase.from_config()
with db.transaction() as conn:
conn.execute("SELECT * FROM users WHERE id = :id", {"id": 1})
user = conn.fetchone()
1.2 Key Features¶
Feature |
Description |
|---|---|
Multiple Backends |
SQLAlchemy (universal) or psycopg3 (PostgreSQL-only) |
Connection Pooling |
Automatic pool management with pre-ping |
Transaction Management |
Context managers for ACID guarantees |
Health Checks |
Built-in health monitoring with latency metrics |
Type Safety |
Full type hints for mypy strict |
Config Integration |
Setup via Dynaconf with secrets isolation |
2. Core Concepts¶
2.1 Backend Comparison¶
Feature |
SQLAlchemyDatabase |
PostgresDatabase |
|---|---|---|
Databases Supported |
PostgreSQL, MySQL, SQLite, Oracle, MSSQL |
PostgreSQL only |
Performance |
Good |
Excellent (native driver) |
ORM Support |
✅ Full SQLAlchemy ORM |
❌ Raw SQL only |
Async Support |
via asyncio extension |
✅ Native |
Pool Management |
SQLAlchemy Engine |
psycopg3 Pool |
Dependencies |
sqlalchemy>=2.0 |
psycopg[binary]>=3.1 |
Recommendation:
Use
SQLAlchemyDatabasefor multi-database support or ORM needs.Use
PostgresDatabasefor maximum performance with PostgreSQL only.
2.2 Connection Lifecycle¶
graph TD
subgraph "Application Scope"
INIT["Startup: db.init"] --> POOL["Create Pool min=2 max=10"]
POOL --> |"Wait for changes"| SHUT["Shutdown: db.shutdown"]
SHUT --> CLOSE["Close All Connections"]
end
subgraph "Request Scope"
REQ["Request Start"] --> TX["Begin Transaction"]
TX --> ACQ["Acquire from Pool"]
ACQ --> EXEC["Execute Queries"]
EXEC --> CHECK{"Success?"}
CHECK -->|Yes| COMMIT[Commit]
CHECK -->|No| ROLL[Rollback]
COMMIT & ROLL --> REL["Release to Pool"]
end
POOL -.-> |Checkout| ACQ
REL -.-> |Return| POOL
3. Quick Start (5 Minutes)¶
Step 1: Installation¶
# SQLAlchemy backend
pip install -e packages/database[sqlalchemy]
# PostgreSQL backend
pip install -e packages/database[postgres]
# Both
pip install -e packages/database[all]
Step 2: Configuration¶
File: config/app/database.toml
[default]
url = "postgresql://user:password@localhost:5432/mydb"
pool_size = 10
pool_timeout = 30
echo = false # Set true to log SQL queries
File: config/.secrets.toml
[database]
url = "postgresql://user:YOUR_PASSWORD@prod.db.example.com:5432/production_db"
Step 3: Basic Usage¶
from structum_lab.plugins.database import SQLAlchemyDatabase
from structum_lab.config import set_config_provider
from structum_lab.plugins.dynaconf import DynaconfConfigProvider
# Setup config
provider = DynaconfConfigProvider(root_path=".")
provider.auto_discover()
set_config_provider(provider)
# Create database connection
db = SQLAlchemyDatabase.from_config()
# Execute query in transaction
with db.transaction() as conn:
result = conn.execute(
"SELECT * FROM users WHERE id = :user_id",
{"user_id": 1}
)
user = result.fetchone()
if user:
print(f"Found user: {user['username']}")
# Shutdown (cleanup)
db.shutdown()
4. SQLAlchemy Backend¶
4.1 Initialization¶
from structum_lab.plugins.database import SQLAlchemyDatabase
# From config
db = SQLAlchemyDatabase.from_config()
# Manual initialization
db = SQLAlchemyDatabase(
url="postgresql://user:pass@localhost/mydb",
pool_size=10,
pool_timeout=30,
pool_recycle=3600, # Recycle connections after 1 hour
echo=False, # Log SQL queries
pool_pre_ping=True # Test connections before use
)
4.2 Query Execution¶
Select Query:
with db.transaction() as conn:
result = conn.execute(
"SELECT id, username, email FROM users WHERE active = :active",
{"active": True}
)
# Fetch one
user = result.fetchone()
if user:
print(f"{user['id']}: {user['username']}")
# Fetch all
users = result.fetchall()
for user in users:
print(f"{user['id']}: {user['username']}")
# Fetch many
batch = result.fetchmany(size=100)
Insert Query:
with db.transaction() as conn:
result = conn.execute(
"""
INSERT INTO users (username, email, created_at)
VALUES (:username, :email, :created_at)
RETURNING id
""",
{
"username": "john_doe",
"email": "john@example.com",
"created_at": datetime.utcnow()
}
)
new_id = result.fetchone()["id"]
print(f"Created user with ID: {new_id}")
Update Query:
with db.transaction() as conn:
result = conn.execute(
"""
UPDATE users
SET last_login = :now
WHERE id = :user_id
""",
{
"now": datetime.utcnow(),
"user_id": 123
}
)
rows_affected = result.rowcount
print(f"Updated {rows_affected} rows")
Delete Query:
with db.transaction() as conn:
result = conn.execute(
"DELETE FROM sessions WHERE expires_at < :now",
{"now": datetime.utcnow()}
)
deleted_count = result.rowcount
print(f"Deleted {deleted_count} expired sessions")
4.3 ORM Support¶
from sqlalchemy.orm import Session, DeclarativeBase, Mapped, mapped_column
from sqlalchemy import String, Integer
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
username: Mapped[str] = mapped_column(String(50))
email: Mapped[str] = mapped_column(String(255))
# Get ORM session
with db.get_session() as session:
# Query with ORM
users = session.query(User).filter(User.username.like("john%")).all()
# Create new record
new_user = User(username="jane_doe", email="jane@example.com")
session.add(new_user)
session.commit()
# Update
user = session.get(User, 1)
if user:
user.email = "newemail@example.com"
session.commit()
5. PostgreSQL Backend¶
5.1 Initialization¶
from structum_lab.plugins.database import PostgresDatabase
# From config
db = PostgresDatabase.from_config()
# Manual
db = PostgresDatabase(
url="postgresql://user:pass@localhost/mydb",
pool_size=10,
min_size=2, # Minimum connections to maintain
max_size=20, # Maximum connections
timeout=5.0
)
5.2 Query Execution¶
with db.transaction() as conn:
# Execute with parameters
result = conn.execute(
"SELECT * FROM users WHERE id = %s",
(user_id,) # Note: tuple for parameters
)
# Fetch results
row = result.fetchone()
if row:
print(f"User: {row['username']}")
5.3 Async Support¶
import asyncio
from structum_lab.plugins.database import AsyncPostgresDatabase
async def main():
db = AsyncPostgresDatabase.from_config()
async with db.transaction() as conn:
result = await conn.execute(
"SELECT * FROM users WHERE id = $1",
[user_id]
)
user = await result.fetchone()
print(f"User: {user['username']}")
await db.shutdown()
asyncio.run(main())
6. Connection Pooling¶
6.1 Pool Configuration¶
db = SQLAlchemyDatabase(
url="postgresql://...",
pool_size=10, # Normal connections
max_overflow=20, # Extra connections under load
pool_timeout=30, # Wait time for connection (seconds)
pool_recycle=3600, # Recycle after 1 hour
pool_pre_ping=True # Test before use
)
6.2 Pool Monitoring¶
# Get pool status
info = db.get_pool_info()
print(f"Checked out: {info['checked_out']}")
print(f"Available: {info['available']}")
print(f"Total: {info['total']}")
print(f"Overflow: {info['overflow']}")
# Log pool stats
from structum_lab.plugins.observability import get_logger
logger = get_logger(__name__)
logger.info("Database pool status", **info)
6.3 Pool Exhaustion Handling¶
from sqlalchemy.exc import TimeoutError
try:
with db.transaction() as conn:
# Query execution
pass
except TimeoutError:
logger.error("Database pool exhausted - increase pool_size or check for connection leaks")
# Alert monitoring
metrics.increment("database.pool_exhaustion")
7. Transactions¶
7.1 Basic Transaction¶
with db.transaction() as conn:
# All queries in this block are part of single transaction
conn.execute("INSERT INTO users (...) VALUES (...)")
conn.execute("INSERT INTO profiles (...) VALUES (...)")
# Automatic COMMIT on success
# Automatic ROLLBACK on exception
7.2 Explicit Commit/Rollback¶
conn = db.get_connection()
try:
conn.execute("UPDATE accounts SET balance = balance - 100 WHERE id = 1")
conn.execute("UPDATE accounts SET balance = balance + 100 WHERE id = 2")
conn.commit()
except Exception as e:
conn.rollback()
logger.error(f"Transaction failed: {e}")
raise
finally:
conn.close()
7.3 Savepoints¶
with db.transaction() as conn:
# Main transaction
conn.execute("INSERT INTO orders (...) VALUES (...)")
# Savepoint for partial rollback
savepoint = conn.execute("SAVEPOINT sp1")
try:
conn.execute("INSERT INTO order_items (...) VALUES (...)")
except Exception:
# Rollback only to savepoint
conn.execute("ROLLBACK TO SAVEPOINT sp1")
logger.warning("Order items insert failed, continuing with order")
# Main transaction still commits
7.4 Nested Transactions¶
def create_user_with_profile(username: str, email: str):
"""Transactional function."""
with db.transaction() as conn:
# Insert user
result = conn.execute(
"INSERT INTO users (username, email) VALUES (:u, :e) RETURNING id",
{"u": username, "e": email}
)
user_id = result.fetchone()["id"]
# Insert profile (nested transaction)
create_profile(conn, user_id)
return user_id
def create_profile(conn, user_id: int):
"""Nested transactional function."""
conn.execute(
"INSERT INTO profiles (user_id, bio) VALUES (:id, :bio)",
{"id": user_id, "bio": "New user"}
)
8. Health Checks¶
8.1 Basic Health Check¶
from structum_lab.plugins.database import HealthStatus
# Perform health check
result = db.health_check()
print(f"Status: {result.status}") # HealthStatus.HEALTHY
print(f"Message: {result.message}") # "Database connection OK"
print(f"Latency: {result.latency_ms}ms") # 2.5
8.2 Health Check Endpoint¶
from fastapi import FastAPI
app = FastAPI()
@app.get("/health")
async def health():
"""Health check endpoint."""
db_health = db.health_check()
return {
"status": "healthy" if db_health.status == HealthStatus.HEALTHY else "unhealthy",
"database": {
"status": db_health.status.value,
"latency_ms": db_health.latency_ms,
"message": db_health.message
}
}
8.3 Detailed Health Monitoring¶
def monitor_database_health():
"""Periodic health monitoring."""
result = db.health_check()
# Log metrics
metrics.gauge("database.latency_ms", result.latency_ms)
if result.status != HealthStatus.HEALTHY:
logger.error(
"Database unhealthy",
status=result.status.value,
message=result.message,
latency_ms=result.latency_ms
)
# Alert
alert_service.send_alert(
"Database Health Check Failed",
f"Status: {result.status.value}\nLatency: {result.latency_ms}ms"
)
else:
logger.debug(f"Database healthy - latency: {result.latency_ms}ms")
9. Configuration¶
9.1 Complete Configuration Example¶
File: config/app/database.toml
[default]
url = "postgresql://localhost:5432/dev_db"
pool_size = 5
pool_timeout = 30
pool_recycle = 3600
echo = true # Log SQL in development
[default.options]
pool_pre_ping = true
connect_args = { "connect_timeout": 10 }
[production]
url = "postgresql://prod.db.internal:5432/prod_db"
pool_size = 20
pool_timeout = 60
pool_recycle = 1800
echo = false
[production.options]
pool_pre_ping = true
pool_use_lifo = true # LIFO for better connection reuse
File: config/.secrets.toml
[database]
# Production credentials
url = "postgresql://app_user:STRONG_PASSWORD@prod.db.example.com:5432/production"
9.2 Environment Variable Override¶
# Override database URL
export STRUCTUM_DATABASE__URL="postgresql://localhost/test_db"
# Override pool size
export STRUCTUM_DATABASE__POOL_SIZE=20
# Multiple databases
export STRUCTUM_PRIMARY_DB__URL="postgresql://..."
export STRUCTUM_REPLICA_DB__URL="postgresql://..."
9.3 Multiple Database Connections¶
# Primary database
primary_db = SQLAlchemyDatabase.from_config(namespace="primary_db")
# Read replica
replica_db = SQLAlchemyDatabase.from_config(namespace="replica_db")
# Writes to primary
with primary_db.transaction() as conn:
conn.execute("INSERT INTO users (...) VALUES (...)")
# Reads from replica
with replica_db.transaction() as conn:
result = conn.execute("SELECT * FROM users WHERE id = :id", {"id": 1})
user = result.fetchone()
10. FastAPI Integration¶
10.1 Dependency Injection¶
from fastapi import FastAPI, Depends
from structum_lab.plugins.database import get_database, DatabaseInterface
app = FastAPI()
# Database initialization
@app.on_event("startup")
async def startup():
db = SQLAlchemyDatabase.from_config()
set_database(db)
@app.on_event("shutdown")
async def shutdown():
db = get_database()
db.shutdown()
# Dependency
def get_db() -> DatabaseInterface:
return get_database()
# Route with injection
@app.get("/users/{user_id}")
async def get_user(user_id: int, db: DatabaseInterface = Depends(get_db)):
with db.transaction() as conn:
result = conn.execute(
"SELECT * FROM users WHERE id = :id",
{"id": user_id}
)
user = result.fetchone()
if not user:
raise HTTPException(status_code=404, detail="User not found")
return {"id": user["id"], "username": user["username"]}
10.2 Transaction Middleware¶
from starlette.middleware.base import BaseHTTPMiddleware
class TransactionMiddleware(BaseHTTPMiddleware):
"""Automatic transaction per request."""
async def dispatch(self, request, call_next):
db = get_database()
with db.transaction() as conn:
# Attach connection to request state
request.state.db_conn = conn
try:
response = await call_next(request)
# Transaction auto-commits on success
return response
except Exception:
# Transaction auto-rollbacks on error
raise
app.add_middleware(TransactionMiddleware)
# Routes can access connection
@app.post("/users")
async def create_user(user: UserCreate, request: Request):
conn = request.state.db_conn
result = conn.execute(
"INSERT INTO users (username, email) VALUES (:u, :e) RETURNING id",
{"u": user.username, "e": user.email}
)
new_id = result.fetchone()["id"]
return {"id": new_id}
11. Testing¶
11.1 Test Database Setup¶
import pytest
from structum_lab.plugins.database import SQLAlchemyDatabase
@pytest.fixture(scope="session")
def test_db():
"""Create test database."""
db = SQLAlchemyDatabase(
url="postgresql://localhost/test_db",
pool_size=2
)
# Run migrations
with db.transaction() as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS users (
id SERIAL PRIMARY KEY,
username VARCHAR(50) UNIQUE NOT NULL,
email VARCHAR(255) UNIQUE NOT NULL
)
""")
yield db
# Cleanup
with db.transaction() as conn:
conn.execute("DROP TABLE IF EXISTS users")
db.shutdown()
@pytest.fixture
def clean_db(test_db):
"""Clean database before each test."""
with test_db.transaction() as conn:
conn.execute("TRUNCATE users RESTART IDENTITY CASCADE")
yield test_db
11.2 Test Queries¶
def test_create_user(clean_db):
"""Test user creation."""
with clean_db.transaction() as conn:
result = conn.execute(
"INSERT INTO users (username, email) VALUES (:u, :e) RETURNING id",
{"u": "testuser", "e": "test@example.com"}
)
user_id = result.fetchone()["id"]
assert user_id > 0
12. Production Best Practices¶
Pool Size Calculation:
pool_size = (CPUs * 2) + effective_spindle_countPre-Ping: Always enabled
pool_pre_ping=Trueto recover from dropped connections.Timeouts: Set
pool_timeoutto fail fast under load (e.g., 5s).Secrets: Never commit DB passwords. Use
.secrets.toml.
13. API Reference¶
See the Core Database Module for protocol definitions.