# src/structum_lab/monitoring/interfaces.py
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: 2025 PythonWoods
"""Monitoring Interfaces for Structum Framework.
Provides protocol definitions for metrics emission following the same
pattern as LoggerInterface.
"""
from typing import Protocol
[docs]
class MetricsInterface(Protocol):
"""
Protocol for metrics emission in Structum Lab.
This interface enables plugins to emit metrics in a backend-agnostic manner.
Implementations can target Prometheus, StatsD, Datadog, CloudWatch, or other
monitoring systems.
Implementations:
- :class:`~structum_lab.plugins.observability.metrics.PrometheusMetrics`
- :class:`~structum_lab.plugins.observability.metrics.StatsDMetrics`
- :class:`NoOpMetrics`: Fallback no-op implementation
Example:
Basic metrics usage::
from structum_lab.monitoring import get_metrics
metrics = get_metrics()
# Counter: track events
metrics.increment("api.requests", tags={"endpoint": "/users", "method": "GET"})
metrics.increment("api.errors", tags={"endpoint": "/users", "status": "500"})
# Gauge: current values
metrics.gauge("cache.size", len(cache), tags={"cache": "redis"})
metrics.gauge("db.connections.active", pool.active_count())
# Timing: operation duration
import time
start = time.time()
process_request()
duration = time.time() - start
metrics.timing("api.duration", duration, tags={"endpoint": "/users"})
# Histogram: value distributions
metrics.histogram("response.size", len(response_body), tags={"endpoint": "/users"})
Note:
All metric methods are non-blocking and should not raise exceptions.
Failed metric emissions should be logged but not disrupt application flow.
See Also:
:class:`LoggerInterface`: Logging interface
:func:`get_metrics`: Retrieve metrics instance
"""
[docs]
def increment(self, name: str, value: float = 1.0, tags: dict[str, str] | None = None) -> None:
"""
Increment a counter metric.
Counters track cumulative values that only increase (e.g., request count,
error count). Use for counting events over time.
Args:
name (str): Metric name using dot notation (e.g., ``api.requests.total``).
value (float): Amount to increment by. Defaults to 1.0.
tags (Optional[Dict[str, str]]): Labels/dimensions for the metric
(e.g., ``{"endpoint": "/users", "status": "200"}``). Defaults to None.
Example:
Tracking API requests::
# Request counter
metrics.increment("http.requests", tags={
"method": request.method,
"endpoint": request.path,
"status": str(response.status_code)
})
# Error counter
try:
risky_operation()
except Exception:
metrics.increment("operations.errors", tags={"operation": "risky"})
raise
Cache statistics::
# Cache hits/misses
if key in cache:
metrics.increment("cache.hits", tags={"cache": "redis"})
return cache[key]
else:
metrics.increment("cache.misses", tags={"cache": "redis"})
return fetch_from_db(key)
Note:
- Counter values should never decrease
- Use consistent tag keys across increments for proper aggregation
- Avoid high-cardinality tags (e.g., user IDs) that create too many series
See Also:
:meth:`gauge`: For values that can increase/decrease
"""
...
[docs]
def gauge(self, name: str, value: float, tags: dict[str, str] | None = None) -> None:
"""
Set a gauge metric to a specific value.
Gauges represent current values that can increase or decrease (e.g., memory usage,
active connections, queue size). Each call overwrites the previous value.
Args:
name (str): Metric name (e.g., ``memory.usage.bytes``).
value (float): Current metric value.
tags (Optional[Dict[str, str]]): Labels for the metric. Defaults to None.
Example:
System metrics::
import psutil
# Memory usage
mem = psutil.virtual_memory()
metrics.gauge("system.memory.used", mem.used, tags={"host": hostname})
metrics.gauge("system.memory.percent", mem.percent)
# CPU usage
cpu_percent = psutil.cpu_percent(interval=1)
metrics.gauge("system.cpu.percent", cpu_percent)
Application metrics::
# Database connection pool
metrics.gauge("db.pool.active", db.pool.active_connections())
metrics.gauge("db.pool.idle", db.pool.idle_connections())
# Queue size
metrics.gauge("queue.length", len(task_queue), tags={"queue": "background"})
# Cache size
metrics.gauge("cache.entries", cache.size(), tags={"cache": "redis"})
Warning:
Gauges represent point-in-time values. For cumulative values, use :meth:`increment`.
Note:
- Emit gauges periodically (e.g., every minute) for accurate monitoring
- Consider using background jobs for system metric collection
See Also:
:meth:`increment`: For cumulative counters
"""
...
[docs]
def timing(self, name: str, value: float, tags: dict[str, str] | None = None) -> None:
"""
Record a timing/duration metric.
Used for tracking operation latency and performance.
Typically implemented as a histogram with predefined buckets.
Args:
name (str): Metric name (e.g., ``api.request.duration``).
value (float): Duration in seconds (use fractional seconds for sub-second precision).
tags (Optional[Dict[str, str]]): Labels for the metric. Defaults to None.
Example:
API endpoint timing::
import time
start = time.time()
try:
result = handle_request(request)
return result
finally:
duration = time.time() - start
metrics.timing(
"api.request.duration",
duration,
tags={"endpoint": request.path, "method": request.method}
)
Database query timing::
start = time.perf_counter()
rows = db.execute(query)
duration = time.perf_counter() - start
metrics.timing(
"db.query.duration",
duration,
tags={"table": "users", "operation": "select"}
)
Context manager for timing::
from contextlib import contextmanager
@contextmanager
def track_time(operation: str):
start = time.time()
try:
yield
finally:
metrics.timing(f"{operation}.duration", time.time() - start)
with track_time("data_processing"):
process_large_dataset()
Note:
- Use seconds as the unit for consistency
- ``time.perf_counter()`` is more accurate than ``time.time()`` for durations
- Most backends convert to milliseconds for display
See Also:
:meth:`histogram`: For general value distributions
"""
...
[docs]
def histogram(self, name: str, value: float, tags: dict[str, str] | None = None) -> None:
"""
Record a value in a histogram.
Histograms track value distributions (e.g., request sizes, payload sizes).
Unlike gauges, all values are recorded and aggregated into buckets.
Args:
name (str): Metric name (e.g., ``request.body.size``).
value (float): Value to record.
tags (Optional[Dict[str, str]]): Labels for the metric. Defaults to None.
Example:
Request/response sizes::
# Request payload size
request_size = len(request.body)
metrics.histogram(
"http.request.size",
request_size,
tags={"endpoint": request.path, "content_type": request.content_type}
)
# Response size
response_size = len(response.body)
metrics.histogram(
"http.response.size",
response_size,
tags={"endpoint": request.path}
)
Batch sizes::
# Processing batch sizes
batch = fetch_batch_from_queue()
metrics.histogram(
"processing.batch.size",
len(batch),
tags={"queue": "tasks"}
)
Query result counts::
rows = db.execute("SELECT * FROM users").fetchall()
metrics.histogram(
"db.query.rows",
len(rows),
tags={"table": "users"}
)
Note:
- Histograms are ideal for analyzing value distributions (percentiles, averages)
- Use :meth:`timing` specifically for duration measurements
- Avoid extremely high-cardinality values (>1000 unique values per second)
See Also:
:meth:`timing`: Specialized for duration measurements
:meth:`gauge`: For current point-in-time values
"""
...
[docs]
class NoOpMetrics:
"""Fallback no-op implementation when no monitoring plugin is installed."""
[docs]
def increment(self, name: str, value: float = 1.0, tags: dict[str, str] | None = None) -> None:
"""No-op increment."""
pass
[docs]
def gauge(self, name: str, value: float, tags: dict[str, str] | None = None) -> None:
"""No-op gauge."""
pass
[docs]
def timing(self, name: str, value: float, tags: dict[str, str] | None = None) -> None:
"""No-op timing."""
pass
[docs]
def histogram(self, name: str, value: float, tags: dict[str, str] | None = None) -> None:
"""No-op histogram."""
pass