Source code for structum_lab.plugins.dynaconf.features.cache

# src/structum_lab.plugins.dynaconf/cache.py
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: 2025 PythonWoods

"""Smart Caching System for Configuration.

Provides TTL-based caching with LRU eviction and selective invalidation.
"""

import logging
import threading
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Any

log = logging.getLogger(__name__)


[docs] @dataclass class CacheEntry: """Entry in cache with metadata.""" value: Any timestamp: datetime = field(default_factory=datetime.now) hit_count: int = 0
[docs] def is_expired(self, ttl: timedelta) -> bool: """Check if entry has exceeded TTL.""" return datetime.now() - self.timestamp > ttl
[docs] def age_seconds(self) -> float: """Return age in seconds.""" return (datetime.now() - self.timestamp).total_seconds()
[docs] class SmartCache: """Cache with TTL, LRU eviction, and selective invalidation."""
[docs] def __init__(self, max_size: int = 1000, default_ttl: timedelta = timedelta(hours=1)) -> None: """Initialize the cache. Args: max_size: Maximum number of entries before LRU eviction. default_ttl: Default time-to-live for cache entries. """ self._cache: dict[str, CacheEntry] = {} self._max_size = max_size self._default_ttl = default_ttl self._lock = threading.RLock() self._stats = {"hits": 0, "misses": 0}
[docs] def get(self, key: str) -> Any | None: """Retrieve from cache.""" with self._lock: entry = self._cache.get(key) if not entry: self._stats["misses"] += 1 return None if entry.is_expired(self._default_ttl): del self._cache[key] self._stats["misses"] += 1 return None entry.hit_count += 1 self._stats["hits"] += 1 return entry.value
[docs] def set(self, key: str, value: Any) -> None: """Insert into cache.""" with self._lock: # LRU eviction if we reach limit if len(self._cache) >= self._max_size: self._evict_lru() self._cache[key] = CacheEntry(value=value)
def _evict_lru(self) -> None: """Remove least recently used entry.""" if not self._cache: return # Find entry with lowest hit count, oldest timestamp lru_key = min( self._cache.keys(), key=lambda k: (self._cache[k].hit_count, self._cache[k].timestamp), ) del self._cache[lru_key] log.debug(f"Evicted LRU cache entry: {lru_key}")
[docs] def invalidate(self, key: str) -> None: """Invalidate a specific key.""" with self._lock: self._cache.pop(key, None)
[docs] def invalidate_prefix(self, prefix: str) -> int: """Invalidate all keys starting with prefix.""" with self._lock: keys_to_remove = [k for k in self._cache if k.startswith(prefix)] for key in keys_to_remove: del self._cache[key] if keys_to_remove: log.debug(f"Invalidated {len(keys_to_remove)} cache entries with prefix '{prefix}'") return len(keys_to_remove)
[docs] def clear(self) -> None: """Clear entire cache.""" with self._lock: self._cache.clear() self._stats = {"hits": 0, "misses": 0}
[docs] def get_stats(self) -> dict[str, Any]: """Get cache statistics for monitoring.""" with self._lock: total = self._stats["hits"] + self._stats["misses"] hit_rate = self._stats["hits"] / total if total > 0 else 0 # Top 10 most accessed entries top_entries = sorted(self._cache.items(), key=lambda x: x[1].hit_count, reverse=True)[ :10 ] return { "size": len(self._cache), "max_size": self._max_size, "hits": self._stats["hits"], "misses": self._stats["misses"], "hit_rate": round(hit_rate, 3), "top_entries": [ { "key": k, "hits": e.hit_count, "age_seconds": round(e.age_seconds(), 2), } for k, e in top_entries ], }