Core Monitoring Protocols¶
Monitoring subsystem for Structum Framework.
Architectural Role: This module acts as the Observability Facade (DP-6). It provides a unified, safe API for emitting metrics (get_metrics()) that: 1. Never Fails: Uses Null Objects (NoOpMetrics) if no backend is configured. 2. Zero Coupling: Application code doesn’t import Prometheus or Datadog libraries. 3. Hot-Patchable: Plugins inject the real backend at runtime.
- class structum.monitoring.MetricsInterface(*args, **kwargs)[source]¶
Bases:
ProtocolProtocol for metrics emission in Structum.
This interface enables plugins to emit metrics in a backend-agnostic manner.
Architectural Role: - Portability: Code instrumented with this interface runs on any cloud/infrastructure. - Stability: Methods are contractually guaranteed to be non-blocking and exception-safe.
- Implementations:
PrometheusMetricsStatsDMetricsNoOpMetrics: Fallback Operational Continuity
Example
Basic metrics usage:
from structum.monitoring import get_metrics metrics = get_metrics() # Counter: track events metrics.increment("api.requests", tags={"endpoint": "/users", "method": "GET"}) metrics.increment("api.errors", tags={"endpoint": "/users", "status": "500"}) # Gauge: current values metrics.gauge("cache.size", len(cache), tags={"cache": "redis"}) metrics.gauge("db.connections.active", pool.active_count()) # Timing: operation duration import time start = time.time() process_request() duration = time.time() - start metrics.timing("api.duration", duration, tags={"endpoint": "/users"}) # Histogram: value distributions metrics.histogram("response.size", len(response_body), tags={"endpoint": "/users"})Note
All metric methods are non-blocking and should not raise exceptions. Failed metric emissions should be logged but not disrupt application flow.
See also
LoggerInterface: Logging interfaceget_metrics(): Retrieve metrics instance-
gauge(name: str, value: float, tags: dict[str, str] | None =
None) None[source]¶ Set a gauge metric to a specific value.
Gauges represent current values that can increase or decrease (e.g., memory usage, active connections, queue size). Each call overwrites the previous value.
- Parameters:¶
Example
System metrics:
import psutil # Memory usage mem = psutil.virtual_memory() metrics.gauge("system.memory.used", mem.used, tags={"host": hostname}) metrics.gauge("system.memory.percent", mem.percent) # CPU usage cpu_percent = psutil.cpu_percent(interval=1) metrics.gauge("system.cpu.percent", cpu_percent)Application metrics:
# Database connection pool metrics.gauge("db.pool.active", db.pool.active_connections()) metrics.gauge("db.pool.idle", db.pool.idle_connections()) # Queue size metrics.gauge("queue.length", len(task_queue), tags={"queue": "background"}) # Cache size metrics.gauge("cache.entries", cache.size(), tags={"cache": "redis"})Warning
Gauges represent point-in-time values. For cumulative values, use
increment().Note
Emit gauges periodically (e.g., every minute) for accurate monitoring
Consider using background jobs for system metric collection
See also
increment(): For cumulative counters
-
histogram(name: str, value: float, tags: dict[str, str] | None =
None) None[source]¶ Record a value in a histogram.
Histograms track value distributions (e.g., request sizes, payload sizes). Unlike gauges, all values are recorded and aggregated into buckets.
- Parameters:¶
Example
Request/response sizes:
# Request payload size request_size = len(request.body) metrics.histogram( "http.request.size", request_size, tags={"endpoint": request.path, "content_type": request.content_type} ) # Response size response_size = len(response.body) metrics.histogram( "http.response.size", response_size, tags={"endpoint": request.path} )Batch sizes:
# Processing batch sizes batch = fetch_batch_from_queue() metrics.histogram( "processing.batch.size", len(batch), tags={"queue": "tasks"} )Query result counts:
rows = db.execute("SELECT * FROM users").fetchall() metrics.histogram( "db.query.rows", len(rows), tags={"table": "users"} )Note
Histograms are ideal for analyzing value distributions (percentiles, averages)
Use
timing()specifically for duration measurementsAvoid extremely high-cardinality values (>1000 unique values per second)
-
increment(name: str, value: float =
1.0, tags: dict[str, str] | None =None) None[source]¶ Increment a counter metric.
Counters track cumulative values that only increase (e.g., request count, error count). Use for counting events over time.
- Parameters:¶
Example
Tracking API requests:
# Request counter metrics.increment("http.requests", tags={ "method": request.method, "endpoint": request.path, "status": str(response.status_code) }) # Error counter try: risky_operation() except Exception: metrics.increment("operations.errors", tags={"operation": "risky"}) raiseCache statistics:
# Cache hits/misses if key in cache: metrics.increment("cache.hits", tags={"cache": "redis"}) return cache[key] else: metrics.increment("cache.misses", tags={"cache": "redis"}) return fetch_from_db(key)Note
Counter values should never decrease
Use consistent tag keys across increments for proper aggregation
Avoid high-cardinality tags (e.g., user IDs) that create too many series
See also
gauge(): For values that can increase/decrease
-
timing(name: str, value: float, tags: dict[str, str] | None =
None) None[source]¶ Record a timing/duration metric.
- Used for tracking operation latency and performance.
Typically implemented as a histogram with predefined buckets.
- Args:
name (str): Metric name (e.g.,
api.request.duration). value (float): Duration in seconds (use fractional seconds for sub-second precision). tags (Optional[Dict[str, str]]): Labels for the metric. Defaults to None.- Example:
API endpoint timing:
import time start = time.time() try: result = handle_request(request) return result finally: duration = time.time() - start metrics.timing( "api.request.duration", duration, tags={"endpoint": request.path, "method": request.method} )Database query timing:
start = time.perf_counter() rows = db.execute(query) duration = time.perf_counter() - start metrics.timing( "db.query.duration", duration, tags={"table": "users", "operation": "select"} )Context manager for timing:
from contextlib import contextmanager @contextmanager def track_time(operation: str): start = time.time() try: yield finally: metrics.timing(f"{operation}.duration", time.time() - start) with track_time("data_processing"): process_large_dataset()- Note:
Use seconds as the unit for consistency
time.perf_counter()is more accurate thantime.time()for durationsMost backends convert to milliseconds for display
- See Also:
histogram(): For general value distributions
- class structum.monitoring.NoOpMetrics[source]¶
Bases:
objectFallback no-op implementation when no monitoring plugin is installed.
Architectural Role: Operational Continuity (DP-2). Swallows all metric events to prevent runtime errors in environments without a monitoring stack (e.g., local dev, CI unit tests).
-
histogram(name: str, value: float, tags: dict[str, str] | None =
None) None[source]¶ No-op histogram.
-
histogram(name: str, value: float, tags: dict[str, str] | None =
-
structum.monitoring.get_metrics(namespace: str =
'structum') MetricsInterface[source]¶ Get (or create) a metrics emitter for a specific namespace.
Architectural Role: Factory Accessor.
This function isolates the application from the instantiation complexity of metric backends.
- structum.monitoring.set_metrics_backend(backend: MetricsInterface) None[source]¶
Set the global metrics backend.
This is called by monitoring plugins (e.g., structum_observability) to inject their implementation.
- Parameters:¶
- backend: MetricsInterface¶
MetricsInterface implementation