Observability Module

Package: structum-observability
Status: Core Module (v0.1.0+)
Dependencies: structlog, prometheus-client, opentelemetry-api


Overview

[!NOTE] Architectural Role: This plugin provides the Implementation for the Foundation Layer capabilities.
It implements protocols defined in Core Logging and Core Monitoring.
See Enterprise Architecture Guide.

The Observability module provides comprehensive monitoring and debugging capabilities for Structum applications:

  • Structured Logging: JSON logs with context propagation

  • Metrics: Prometheus-compatible metrics collection

  • Tracing: OpenTelemetry distributed tracing

  • Health Checks: Built-in health monitoring


Installation

# Included in Meta package
pip install structum

# Standalone
pip install structum-observability

Quick Start

Basic Logging

from structum.plugins.observability import get_logger

logger = get_logger(__name__)

# Structured logging with context
logger.info("user_login", user_id=123, ip="192.168.1.1")
logger.error("payment_failed", amount=99.99, reason="insufficient_funds")

Output (JSON):

{
  "event": "user_login",
  "user_id": 123,
  "ip": "192.168.1.1",
  "timestamp": "2026-01-14T16:50:00Z",
  "level": "info",
  "logger": "myapp.auth"
}

Metrics Collection

from structum.plugins.observability import metrics

# Counter
metrics.increment("api_requests_total", labels={"endpoint": "/users"})

# Gauge
metrics.set_gauge("active_connections", 42)

# Histogram
with metrics.timer("request_duration_seconds"):
    # Your code here
    process_request()

Distributed Tracing

from structum.plugins.observability import tracer

with tracer.span("database_query") as span:
    span.set_attribute("table", "users")
    span.set_attribute("operation", "SELECT")
    
    result = db.query("SELECT * FROM users")
    span.set_attribute("rows_returned", len(result))

Configuration

Dynaconf Integration

settings.toml:

[observability]
log_level = "INFO"
log_format = "json"  # or "console" for development
enable_metrics = true
enable_tracing = true

[observability.prometheus]
port = 9090
path = "/metrics"

[observability.opentelemetry]
endpoint = "http://jaeger:4318"
service_name = "my-service"

Environment Variables

export OBSERVABILITY__LOG_LEVEL=DEBUG
export OBSERVABILITY__ENABLE_METRICS=true
export OBSERVABILITY__PROMETHEUS__PORT=9090

Logging

Log Levels

logger.debug("verbose_details", step=1)
logger.info("normal_operation", status="success")
logger.warning("degraded_performance", latency_ms=500)
logger.error("operation_failed", error="timeout")
logger.critical("system_failure", service="database")

Contextual Logging

from structum.plugins.observability import log_context

# Add context for entire block
with log_context(request_id="abc-123", user_id=456):
    logger.info("processing_request")  # Includes request_id, user_id
    do_work()
    logger.info("request_complete")    # Still includes context

Structured Fields

# Good: Structured
logger.info("order_created", 
    order_id=12345,
    total_amount=99.99,
    items_count=3,
    customer_id=789
)

# Bad: Unstructured string
logger.info("Order 12345 created with total $99.99")  # Don't do this!

Metrics

Counter

Monotonically increasing value:

metrics.increment("http_requests_total", 
    labels={"method": "GET", "status": "200"})

metrics.increment("errors_total", 
    labels={"type": "validation_error"})

Gauge

Current value that can go up/down:

metrics.set_gauge("queue_depth", 42)
metrics.set_gauge("memory_usage_bytes", 1024 * 1024 * 100)

Histogram

Distribution of values (e.g., latencies):

# Manual recording
metrics.record_histogram("request_duration_seconds", 0.25)

# Timer context manager
with metrics.timer("db_query_duration_seconds"):
    result = db.query("SELECT * FROM users")

Prometheus Exposition

Metrics automatically exposed at http://localhost:9090/metrics:

# TYPE http_requests_total counter
http_requests_total{method="GET",status="200"} 1523

# TYPE queue_depth gauge  
queue_depth 42

# TYPE request_duration_seconds histogram
request_duration_seconds_bucket{le="0.1"} 95
request_duration_seconds_bucket{le="0.5"} 124
request_duration_seconds_sum 31.4
request_duration_seconds_count 150

Tracing

Spans

from structum.plugins.observability import tracer

@tracer.trace("process_order")
def process_order(order_id):
    # Automatic span creation
    with tracer.span("validate_order") as validation_span:
        validation_span.set_attribute("order_id", order_id)
        validate(order_id)
    
    with tracer.span("charge_payment"):
        charge_customer(order_id)
    
    return "success"

Span Attributes

span.set_attribute("http.method", "POST")
span.set_attribute("http.url", "/api/orders")
span.set_attribute("http.status_code", 201)
span.set_attribute("db.statement", "INSERT INTO orders...")

Trace Context Propagation

Automatically propagates across services via HTTP headers:

import requests
from structum.plugins.observability import tracer

with tracer.span("call_external_api") as span:
    # Trace context automatically injected in headers
    response = requests.get("https://api.example.com/data")
    span.set_attribute("response.status", response.status_code)

Health Checks

Built-in Health Endpoint

from structum.plugins.observability import HealthCheck

health = HealthCheck()

# Add custom checks
@health.check("database")
def check_database():
    try:
        db.ping()
        return {"status": "healthy", "latency_ms": 5}
    except Exception as e:
        return {"status": "unhealthy", "error": str(e)}

# FastAPI integration
from fastapi import FastAPI
app = FastAPI()

@app.get("/health")
def health_endpoint():
    return health.check_all()

Response:

{
  "status": "healthy",
  "timestamp": "2026-01-14T16:50:00Z",
  "checks": {
    "database": {
      "status": "healthy",
      "latency_ms": 5
    },
    "cache": {
      "status": "healthy"
    }
  }
}

Best Practices

Logging Best Practices

DO:

  • Use structured logging (key-value pairs)

  • Log at appropriate levels

  • Include context (request_id, user_id)

  • Log actionable information

DON’T:

  • Log sensitive data (passwords, tokens)

  • Log excessively in hot paths

  • Use string formatting in logs

  • Log stack traces at INFO level

Metrics Best Practices

DO:

  • Use counters for totals

  • Use gauges for current state

  • Use histograms for distributions

  • Keep cardinality low (avoid unique IDs in labels)

DON’T:

  • Create metrics in hot loops

  • Use high-cardinality labels (user_id, etc.)

  • Mix metric types

  • Forget to add units in names

Tracing Best Practices

DO:

  • Trace at service boundaries

  • Include meaningful attributes

  • Sample in production (e.g., 1%)

  • Propagate context

DON’T:

  • Trace every function

  • Include PII in spans

  • Create excessive spans

  • Forget to end spans


Integration Examples

FastAPI

from fastapi import FastAPI, Request
from structum.plugins.observability import get_logger, metrics, tracer

app = FastAPI()
logger = get_logger(__name__)

@app.middleware("http")
async def observability_middleware(request: Request, call_next):
    # Start trace
    with tracer.span(f"{request.method} {request.url.path}") as span:
        # Log request
        logger.info("request_started",
            method=request.method,
            path=request.url.path
        )
        
        # Increment counter
        metrics.increment("requests_total",
            labels={"method": request.method, "path": request.url.path}
        )
        
        # Process request
        with metrics.timer("request_duration_seconds"):
            response = await call_next(request)
        
        # Log response
        logger.info("request_completed", status_code=response.status_code)
        span.set_attribute("http.status_code", response.status_code)
        
        return response

Celery

from celery import Celery
from structum.plugins.observability import get_logger, metrics

app = Celery('tasks')
logger = get_logger(__name__)

@app.task
def process_task(task_id):
    logger.info("task_started", task_id=task_id)
    
    try:
        result = do_work(task_id)
        metrics.increment("tasks_completed", labels={"status": "success"})
        logger.info("task_completed", task_id=task_id, result=result)
        return result
    except Exception as e:
        metrics.increment("tasks_completed", labels={"status": "failure"})
        logger.error("task_failed", task_id=task_id, error=str(e))
        raise

Troubleshooting

Logs Not Appearing

Check log level:

import logging
logging.getLogger().setLevel(logging.DEBUG)

Metrics Not Exported

Verify Prometheus is running and accessible:

curl http://localhost:9090/metrics

Traces Not Showing

Check OpenTelemetry configuration:

from structum.plugins.observability import tracer
print(tracer.is_enabled())

See Also