Source code for structum_lab.monitoring.interfaces

# src/structum_lab/monitoring/interfaces.py
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: 2025 PythonWoods

"""Monitoring Interfaces for Structum Framework.

Provides protocol definitions for metrics emission following the same
pattern as LoggerInterface.
"""

from typing import Protocol


[docs] class MetricsInterface(Protocol): """ Protocol for metrics emission in Structum Lab. This interface enables plugins to emit metrics in a backend-agnostic manner. Implementations can target Prometheus, StatsD, Datadog, CloudWatch, or other monitoring systems. Implementations: - :class:`~structum_lab.plugins.observability.metrics.PrometheusMetrics` - :class:`~structum_lab.plugins.observability.metrics.StatsDMetrics` - :class:`NoOpMetrics`: Fallback no-op implementation Example: Basic metrics usage:: from structum_lab.monitoring import get_metrics metrics = get_metrics() # Counter: track events metrics.increment("api.requests", tags={"endpoint": "/users", "method": "GET"}) metrics.increment("api.errors", tags={"endpoint": "/users", "status": "500"}) # Gauge: current values metrics.gauge("cache.size", len(cache), tags={"cache": "redis"}) metrics.gauge("db.connections.active", pool.active_count()) # Timing: operation duration import time start = time.time() process_request() duration = time.time() - start metrics.timing("api.duration", duration, tags={"endpoint": "/users"}) # Histogram: value distributions metrics.histogram("response.size", len(response_body), tags={"endpoint": "/users"}) Note: All metric methods are non-blocking and should not raise exceptions. Failed metric emissions should be logged but not disrupt application flow. See Also: :class:`LoggerInterface`: Logging interface :func:`get_metrics`: Retrieve metrics instance """
[docs] def increment(self, name: str, value: float = 1.0, tags: dict[str, str] | None = None) -> None: """ Increment a counter metric. Counters track cumulative values that only increase (e.g., request count, error count). Use for counting events over time. Args: name (str): Metric name using dot notation (e.g., ``api.requests.total``). value (float): Amount to increment by. Defaults to 1.0. tags (Optional[Dict[str, str]]): Labels/dimensions for the metric (e.g., ``{"endpoint": "/users", "status": "200"}``). Defaults to None. Example: Tracking API requests:: # Request counter metrics.increment("http.requests", tags={ "method": request.method, "endpoint": request.path, "status": str(response.status_code) }) # Error counter try: risky_operation() except Exception: metrics.increment("operations.errors", tags={"operation": "risky"}) raise Cache statistics:: # Cache hits/misses if key in cache: metrics.increment("cache.hits", tags={"cache": "redis"}) return cache[key] else: metrics.increment("cache.misses", tags={"cache": "redis"}) return fetch_from_db(key) Note: - Counter values should never decrease - Use consistent tag keys across increments for proper aggregation - Avoid high-cardinality tags (e.g., user IDs) that create too many series See Also: :meth:`gauge`: For values that can increase/decrease """ ...
[docs] def gauge(self, name: str, value: float, tags: dict[str, str] | None = None) -> None: """ Set a gauge metric to a specific value. Gauges represent current values that can increase or decrease (e.g., memory usage, active connections, queue size). Each call overwrites the previous value. Args: name (str): Metric name (e.g., ``memory.usage.bytes``). value (float): Current metric value. tags (Optional[Dict[str, str]]): Labels for the metric. Defaults to None. Example: System metrics:: import psutil # Memory usage mem = psutil.virtual_memory() metrics.gauge("system.memory.used", mem.used, tags={"host": hostname}) metrics.gauge("system.memory.percent", mem.percent) # CPU usage cpu_percent = psutil.cpu_percent(interval=1) metrics.gauge("system.cpu.percent", cpu_percent) Application metrics:: # Database connection pool metrics.gauge("db.pool.active", db.pool.active_connections()) metrics.gauge("db.pool.idle", db.pool.idle_connections()) # Queue size metrics.gauge("queue.length", len(task_queue), tags={"queue": "background"}) # Cache size metrics.gauge("cache.entries", cache.size(), tags={"cache": "redis"}) Warning: Gauges represent point-in-time values. For cumulative values, use :meth:`increment`. Note: - Emit gauges periodically (e.g., every minute) for accurate monitoring - Consider using background jobs for system metric collection See Also: :meth:`increment`: For cumulative counters """ ...
[docs] def timing(self, name: str, value: float, tags: dict[str, str] | None = None) -> None: """ Record a timing/duration metric. Used for tracking operation latency and performance. Typically implemented as a histogram with predefined buckets. Args: name (str): Metric name (e.g., ``api.request.duration``). value (float): Duration in seconds (use fractional seconds for sub-second precision). tags (Optional[Dict[str, str]]): Labels for the metric. Defaults to None. Example: API endpoint timing:: import time start = time.time() try: result = handle_request(request) return result finally: duration = time.time() - start metrics.timing( "api.request.duration", duration, tags={"endpoint": request.path, "method": request.method} ) Database query timing:: start = time.perf_counter() rows = db.execute(query) duration = time.perf_counter() - start metrics.timing( "db.query.duration", duration, tags={"table": "users", "operation": "select"} ) Context manager for timing:: from contextlib import contextmanager @contextmanager def track_time(operation: str): start = time.time() try: yield finally: metrics.timing(f"{operation}.duration", time.time() - start) with track_time("data_processing"): process_large_dataset() Note: - Use seconds as the unit for consistency - ``time.perf_counter()`` is more accurate than ``time.time()`` for durations - Most backends convert to milliseconds for display See Also: :meth:`histogram`: For general value distributions """ ...
[docs] def histogram(self, name: str, value: float, tags: dict[str, str] | None = None) -> None: """ Record a value in a histogram. Histograms track value distributions (e.g., request sizes, payload sizes). Unlike gauges, all values are recorded and aggregated into buckets. Args: name (str): Metric name (e.g., ``request.body.size``). value (float): Value to record. tags (Optional[Dict[str, str]]): Labels for the metric. Defaults to None. Example: Request/response sizes:: # Request payload size request_size = len(request.body) metrics.histogram( "http.request.size", request_size, tags={"endpoint": request.path, "content_type": request.content_type} ) # Response size response_size = len(response.body) metrics.histogram( "http.response.size", response_size, tags={"endpoint": request.path} ) Batch sizes:: # Processing batch sizes batch = fetch_batch_from_queue() metrics.histogram( "processing.batch.size", len(batch), tags={"queue": "tasks"} ) Query result counts:: rows = db.execute("SELECT * FROM users").fetchall() metrics.histogram( "db.query.rows", len(rows), tags={"table": "users"} ) Note: - Histograms are ideal for analyzing value distributions (percentiles, averages) - Use :meth:`timing` specifically for duration measurements - Avoid extremely high-cardinality values (>1000 unique values per second) See Also: :meth:`timing`: Specialized for duration measurements :meth:`gauge`: For current point-in-time values """ ...
[docs] class NoOpMetrics: """Fallback no-op implementation when no monitoring plugin is installed."""
[docs] def increment(self, name: str, value: float = 1.0, tags: dict[str, str] | None = None) -> None: """No-op increment.""" pass
[docs] def gauge(self, name: str, value: float, tags: dict[str, str] | None = None) -> None: """No-op gauge.""" pass
[docs] def timing(self, name: str, value: float, tags: dict[str, str] | None = None) -> None: """No-op timing.""" pass
[docs] def histogram(self, name: str, value: float, tags: dict[str, str] | None = None) -> None: """No-op histogram.""" pass