Core Monitoring Protocols

Monitoring subsystem for Structum Framework.

Architectural Role: This module acts as the Observability Facade (DP-6). It provides a unified, safe API for emitting metrics (get_metrics()) that: 1. Never Fails: Uses Null Objects (NoOpMetrics) if no backend is configured. 2. Zero Coupling: Application code doesn’t import Prometheus or Datadog libraries. 3. Hot-Patchable: Plugins inject the real backend at runtime.

class structum.monitoring.MetricsInterface(*args, **kwargs)[source]

Bases: Protocol

Protocol for metrics emission in Structum.

This interface enables plugins to emit metrics in a backend-agnostic manner.

Architectural Role: - Portability: Code instrumented with this interface runs on any cloud/infrastructure. - Stability: Methods are contractually guaranteed to be non-blocking and exception-safe.

Implementations:
  • PrometheusMetrics

  • StatsDMetrics

  • NoOpMetrics: Fallback Operational Continuity

Example

Basic metrics usage:

from structum.monitoring import get_metrics

metrics = get_metrics()

# Counter: track events
metrics.increment("api.requests", tags={"endpoint": "/users", "method": "GET"})
metrics.increment("api.errors", tags={"endpoint": "/users", "status": "500"})

# Gauge: current values
metrics.gauge("cache.size", len(cache), tags={"cache": "redis"})
metrics.gauge("db.connections.active", pool.active_count())

# Timing: operation duration
import time
start = time.time()
process_request()
duration = time.time() - start
metrics.timing("api.duration", duration, tags={"endpoint": "/users"})

# Histogram: value distributions
metrics.histogram("response.size", len(response_body), tags={"endpoint": "/users"})

Note

All metric methods are non-blocking and should not raise exceptions. Failed metric emissions should be logged but not disrupt application flow.

See also

LoggerInterface: Logging interface get_metrics(): Retrieve metrics instance

__init__(*args, **kwargs)
gauge(name: str, value: float, tags: dict[str, str] | None = None) None[source]

Set a gauge metric to a specific value.

Gauges represent current values that can increase or decrease (e.g., memory usage, active connections, queue size). Each call overwrites the previous value.

Parameters:
name : str

Metric name (e.g., memory.usage.bytes).

value : float

Current metric value.

tags : Optional[Dict[str, str]]

Labels for the metric. Defaults to None.

Example

System metrics:

import psutil

# Memory usage
mem = psutil.virtual_memory()
metrics.gauge("system.memory.used", mem.used, tags={"host": hostname})
metrics.gauge("system.memory.percent", mem.percent)

# CPU usage
cpu_percent = psutil.cpu_percent(interval=1)
metrics.gauge("system.cpu.percent", cpu_percent)

Application metrics:

# Database connection pool
metrics.gauge("db.pool.active", db.pool.active_connections())
metrics.gauge("db.pool.idle", db.pool.idle_connections())

# Queue size
metrics.gauge("queue.length", len(task_queue), tags={"queue": "background"})

# Cache size
metrics.gauge("cache.entries", cache.size(), tags={"cache": "redis"})

Warning

Gauges represent point-in-time values. For cumulative values, use increment().

Note

  • Emit gauges periodically (e.g., every minute) for accurate monitoring

  • Consider using background jobs for system metric collection

See also

increment(): For cumulative counters

histogram(name: str, value: float, tags: dict[str, str] | None = None) None[source]

Record a value in a histogram.

Histograms track value distributions (e.g., request sizes, payload sizes). Unlike gauges, all values are recorded and aggregated into buckets.

Parameters:
name : str

Metric name (e.g., request.body.size).

value : float

Value to record.

tags : Optional[Dict[str, str]]

Labels for the metric. Defaults to None.

Example

Request/response sizes:

# Request payload size
request_size = len(request.body)
metrics.histogram(
    "http.request.size",
    request_size,
    tags={"endpoint": request.path, "content_type": request.content_type}
)

# Response size
response_size = len(response.body)
metrics.histogram(
    "http.response.size",
    response_size,
    tags={"endpoint": request.path}
)

Batch sizes:

# Processing batch sizes
batch = fetch_batch_from_queue()
metrics.histogram(
    "processing.batch.size",
    len(batch),
    tags={"queue": "tasks"}
)

Query result counts:

rows = db.execute("SELECT * FROM users").fetchall()
metrics.histogram(
    "db.query.rows",
    len(rows),
    tags={"table": "users"}
)

Note

  • Histograms are ideal for analyzing value distributions (percentiles, averages)

  • Use timing() specifically for duration measurements

  • Avoid extremely high-cardinality values (>1000 unique values per second)

See also

timing(): Specialized for duration measurements gauge(): For current point-in-time values

increment(name: str, value: float = 1.0, tags: dict[str, str] | None = None) None[source]

Increment a counter metric.

Counters track cumulative values that only increase (e.g., request count, error count). Use for counting events over time.

Parameters:
name : str

Metric name using dot notation (e.g., api.requests.total).

value : float

Amount to increment by. Defaults to 1.0.

tags : Optional[Dict[str, str]]

Labels/dimensions for the metric (e.g., {"endpoint": "/users", "status": "200"}). Defaults to None.

Example

Tracking API requests:

# Request counter
metrics.increment("http.requests", tags={
    "method": request.method,
    "endpoint": request.path,
    "status": str(response.status_code)
})

# Error counter
try:
    risky_operation()
except Exception:
    metrics.increment("operations.errors", tags={"operation": "risky"})
    raise

Cache statistics:

# Cache hits/misses
if key in cache:
    metrics.increment("cache.hits", tags={"cache": "redis"})
    return cache[key]
else:
    metrics.increment("cache.misses", tags={"cache": "redis"})
    return fetch_from_db(key)

Note

  • Counter values should never decrease

  • Use consistent tag keys across increments for proper aggregation

  • Avoid high-cardinality tags (e.g., user IDs) that create too many series

See also

gauge(): For values that can increase/decrease

timing(name: str, value: float, tags: dict[str, str] | None = None) None[source]

Record a timing/duration metric.

Used for tracking operation latency and performance.

Typically implemented as a histogram with predefined buckets.

Args:

name (str): Metric name (e.g., api.request.duration). value (float): Duration in seconds (use fractional seconds for sub-second precision). tags (Optional[Dict[str, str]]): Labels for the metric. Defaults to None.

Example:

API endpoint timing:

import time

start = time.time()
try:
    result = handle_request(request)
    return result
finally:
    duration = time.time() - start
    metrics.timing(
        "api.request.duration",
        duration,
        tags={"endpoint": request.path, "method": request.method}
    )

Database query timing:

start = time.perf_counter()
rows = db.execute(query)
duration = time.perf_counter() - start

metrics.timing(
    "db.query.duration",
    duration,
    tags={"table": "users", "operation": "select"}
)

Context manager for timing:

from contextlib import contextmanager

@contextmanager
def track_time(operation: str):
    start = time.time()
    try:
        yield
    finally:
        metrics.timing(f"{operation}.duration", time.time() - start)

with track_time("data_processing"):
    process_large_dataset()
Note:
  • Use seconds as the unit for consistency

  • time.perf_counter() is more accurate than time.time() for durations

  • Most backends convert to milliseconds for display

See Also:

histogram(): For general value distributions

class structum.monitoring.NoOpMetrics[source]

Bases: object

Fallback no-op implementation when no monitoring plugin is installed.

Architectural Role: Operational Continuity (DP-2). Swallows all metric events to prevent runtime errors in environments without a monitoring stack (e.g., local dev, CI unit tests).

gauge(name: str, value: float, tags: dict[str, str] | None = None) None[source]

No-op gauge.

histogram(name: str, value: float, tags: dict[str, str] | None = None) None[source]

No-op histogram.

increment(name: str, value: float = 1.0, tags: dict[str, str] | None = None) None[source]

No-op increment.

timing(name: str, value: float, tags: dict[str, str] | None = None) None[source]

No-op timing.

structum.monitoring.get_metrics(namespace: str = 'structum') MetricsInterface[source]

Get (or create) a metrics emitter for a specific namespace.

Architectural Role: Factory Accessor.

This function isolates the application from the instantiation complexity of metric backends.

Parameters:
namespace: str = 'structum'

Logical grouping (e.g., ‘structum.database’, ‘myapp.orders’).

Returns:

A thread-safe metrics emitter (NoOp or Real).

Return type:

MetricsInterface

structum.monitoring.set_metrics_backend(backend: MetricsInterface) None[source]

Set the global metrics backend.

This is called by monitoring plugins (e.g., structum_observability) to inject their implementation.

Parameters:
backend: MetricsInterface

MetricsInterface implementation

Interfaces