Structum Auth (structum-auth)¶
Structum Auth manages JWT authentication, secure password hashing (Argon2), and Role-Based Access Control (RBAC).
Feature |
Status |
Version |
|---|---|---|
Status |
Alpha |
0.1.0 |
Namespace |
|
|
Dependencies |
|
Index¶
1. What is Auth Plugin¶
structum-auth is a production-ready authentication system that brings enterprise-grade security to Structum:
1.1 The Problem¶
Before (Without Plugin):
# ❌ Password in plaintext
users_db = {"john": "password123"}
# ❌ No token expiration
# ❌ No secure hashing
# ❌ No RBAC
After (With Plugin):
# ✅ Argon2 password hashing
# ✅ JWT with access + refresh tokens
# ✅ Integrated RBAC
# ✅ Type-safe with Pydantic
auth = JWTAuthProvider.from_config()
tokens = auth.authenticate("john", "password123", user_repo)
1.2 Key Features¶
Feature |
Description |
|---|---|
JWT Tokens |
Short-lived access token + long-lived refresh token |
Argon2 Hashing |
State-of-the-art password hashing (better than bcrypt) |
RBAC |
Role-Based Access Control with permission checking |
Type Safety |
Pydantic validation on all models |
Config Integration |
Setup via Dynaconf with secrets isolation |
Extensible |
Protocol-based for custom implementations |
2. Core Concepts¶
2.1 JWT Token System¶
The plugin uses a dual-token system for security and usability:
graph TD
subgraph "Token System"
LOGIN[Login Request] --> |Success| TOKENS
TOKENS --> ACCESS[Access Token]
TOKENS --> REFRESH[Refresh Token]
ACCESS --> |TTL: 15 min| API[API Requests]
API --> |Expired| RENEW[Renew Flow]
REFRESH --> |TTL: 7 days| RENEW
RENEW --> |Valid| NEW_ACCESS[New Access Token]
style ACCESS fill:#d4f1f4,stroke:#333
style REFRESH fill:#fad7ac,stroke:#333
end
Workflow:
Login → Returns access + refresh token
Client uses access token for API calls
Access token expires → Client uses refresh to get new access
Refresh token expires → Requires new login
2.2 Argon2 Password Hashing¶
Why Argon2 instead of bcrypt?
Feature |
Argon2 |
bcrypt |
|---|---|---|
Memory-hard |
✅ Yes (GPU resistant) |
❌ No |
Configurability |
Time + Memory + Parallelism |
Iterations only |
Winner PHC |
✅ Password Hashing Competition 2015 |
Older (1999) |
Performance |
Parameterizable |
Fixed |
2.3 RBAC Model¶
classDiagram
class User {
+list[str] roles
+has_permission(perm: str)
}
class Role {
+string name
+list[str] permissions
}
class Permission {
+string code
}
User --> Role : has multiple
Role --> Permission : has multiple
# Example permission check
if user.has_permission("delete_posts"):
delete_post()
3. Quick Start (5 Minutes)¶
Step 1: Installation¶
pip install -e packages/auth
Step 2: Configuration¶
File: config/app/auth.toml
[default]
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 15
REFRESH_TOKEN_EXPIRE_DAYS = 7
File: config/.secrets.toml (⚠️ add to .gitignore)
[auth]
SECRET_KEY = "your-256-bit-secret-key-change-this"
REFRESH_SECRET_KEY = "different-256-bit-refresh-key"
Step 3: Implement User Model¶
from dataclasses import dataclass
from structum_lab.auth import UserInterface
@dataclass
class User(UserInterface):
"""User model for your application."""
id: str
username: str
email: str
hashed_password: str
roles: list[str]
is_active: bool = True
def has_permission(self, permission: str) -> bool:
"""
Custom logic for permission checking.
Example: admin has all permissions.
"""
if "admin" in self.roles:
return True
# Custom permission mapping
role_permissions = {
"editor": ["edit_posts", "view_posts"],
"viewer": ["view_posts"]
}
for role in self.roles:
if permission in role_permissions.get(role, []):
return True
return False
Step 4: Implement Repository¶
from structum_lab.auth import UserRepositoryInterface
from typing import Optional
class UserRepository(UserRepositoryInterface):
"""Repository for user database access."""
def __init__(self, db_connection):
self.db = db_connection
def find_by_username(self, username: str) -> Optional[User]:
"""Find user by username."""
# Example with SQL
result = self.db.execute(
"SELECT * FROM users WHERE username = :username",
{"username": username}
)
row = result.fetchone()
if not row:
return None
return User(
id=row["id"],
username=row["username"],
email=row["email"],
hashed_password=row["hashed_password"],
roles=row["roles"].split(","), # Assume comma-separated
is_active=row["is_active"]
)
Step 5: Authenticate¶
from structum_lab.plugins.auth import JWTAuthProvider
from structum_lab.config import set_config_provider
from structum_lab.plugins.dynaconf import DynaconfConfigProvider
# Setup config
provider = DynaconfConfigProvider(root_path=".")
provider.auto_discover()
set_config_provider(provider)
# Create auth provider
auth = JWTAuthProvider.from_config()
# Create repository
user_repo = UserRepository(db_connection)
# Login
tokens = auth.authenticate("john_doe", "password123", user_repo)
if tokens:
print(f"✅ Login successful")
print(f"Access Token: {tokens.access_token}")
print(f"Refresh Token: {tokens.refresh_token}")
print(f"Expires in: {tokens.expires_in} seconds")
else:
print("❌ Invalid credentials")
4. User Model Implementation¶
4.1 UserInterface Protocol¶
The plugin requires your User model to implement UserInterface:
from typing import Protocol
class UserInterface(Protocol):
"""Protocol that every User model must respect."""
id: str
username: str
hashed_password: str
roles: list[str]
def has_permission(self, permission: str) -> bool:
"""Check if user has a specific permission."""
...
4.2 Example with Dataclass¶
from dataclasses import dataclass, field
from datetime import datetime
@dataclass
class User(UserInterface):
id: str
username: str
email: str
hashed_password: str
roles: list[str] = field(default_factory=list)
is_active: bool = True
created_at: datetime = field(default_factory=datetime.utcnow)
last_login: datetime | None = None
def has_permission(self, permission: str) -> bool:
# Custom implementation
return "admin" in self.roles
4.3 Example with Pydantic¶
from pydantic import BaseModel, Field
from datetime import datetime
class User(BaseModel, UserInterface):
"""User model with Pydantic validation."""
id: str = Field(..., min_length=1)
username: str = Field(..., min_length=3, max_length=50)
email: str = Field(..., pattern=r'^[\w\.-]+@[\w\.-]+\.\w+$')
hashed_password: str
roles: list[str] = Field(default_factory=list)
is_active: bool = True
created_at: datetime = Field(default_factory=datetime.utcnow)
def has_permission(self, permission: str) -> bool:
return "admin" in self.roles
4.4 Example with SQLAlchemy ORM¶
from sqlalchemy import Column, String, Boolean, DateTime, JSON
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class User(Base, UserInterface):
__tablename__ = "users"
id = Column(String, primary_key=True)
username = Column(String(50), unique=True, nullable=False)
email = Column(String(255), unique=True, nullable=False)
hashed_password = Column(String(255), nullable=False)
roles = Column(JSON, default=list)
is_active = Column(Boolean, default=True)
created_at = Column(DateTime, default=datetime.utcnow)
def has_permission(self, permission: str) -> bool:
return "admin" in (self.roles or [])
5. Repository Pattern¶
5.1 UserRepositoryInterface¶
from typing import Protocol, Optional
class UserRepositoryInterface(Protocol):
"""Protocol for repository implementability."""
def find_by_username(self, username: str) -> Optional[UserInterface]:
"""Find user by username. Returns None if not found."""
...
5.2 Implementation with Database Plugin¶
from structum_lab.plugins.database import get_database
class DatabaseUserRepository(UserRepositoryInterface):
"""Repository using Structum Database plugin."""
def find_by_username(self, username: str) -> Optional[User]:
db = get_database()
with db.transaction() as conn:
result = conn.execute(
"SELECT * FROM users WHERE username = :u AND is_active = true",
{"u": username}
)
row = result.fetchone()
if not row:
return None
return User(
id=row["id"],
username=row["username"],
email=row["email"],
hashed_password=row["hashed_password"],
roles=row["roles"] # Assume already a list
)
5.3 In-Memory Implementation (Testing)¶
class InMemoryUserRepository(UserRepositoryInterface):
"""In-memory repository for testing."""
def __init__(self):
# Pre-populate with test users
self.users = {
"admin": User(
id="1",
username="admin",
email="admin@example.com",
hashed_password="$argon2id$...", # Pre-calculated hash
roles=["admin"]
),
"john": User(
id="2",
username="john",
email="john@example.com",
hashed_password="$argon2id$...",
roles=["editor"]
)
}
def find_by_username(self, username: str) -> Optional[User]:
return self.users.get(username)
6. JWT Token System¶
6.1 Token Structure¶
Access Token Payload:
{
"sub": "user_id_123",
"username": "john_doe",
"roles": ["editor", "viewer"],
"type": "access",
"exp": 1705234567,
"iat": 1705233667
}
Refresh Token Payload:
{
"sub": "user_id_123",
"type": "refresh",
"exp": 1705838467,
"iat": 1705233667
}
6.2 Token Generation¶
auth = JWTAuthProvider.from_config()
# Authenticate and get tokens
tokens = auth.authenticate("john", "password", user_repo)
# Tokens object
tokens.access_token # str: JWT access token
tokens.refresh_token # str: JWT refresh token
tokens.token_type # str: "Bearer"
tokens.expires_in # int: seconds until access expires
6.3 Token Verification¶
# Verify access token
try:
payload = auth.verify_access_token(access_token)
user_id = payload["sub"]
roles = payload["roles"]
except Exception as e:
# Token invalid/expired
print(f"Invalid token: {e}")
6.4 Token Refresh¶
# Use refresh token to get new access token
try:
new_tokens = auth.refresh_access_token(refresh_token, user_repo)
new_access = new_tokens.access_token
except Exception as e:
# Refresh token invalid/expired
print(f"Refresh failed: {e}")
7. Password Hashing¶
7.1 Hash Password¶
from structum_lab.plugins.auth import hash_password, verify_password
# During registration
plain_password = "SuperSecret123!"
hashed = hash_password(plain_password)
# Store in database
user.hashed_password = hashed
Output:
$argon2id$v=19$m=65536,t=3,p=4$randomsalt$hashedvalue
7.2 Verify Password¶
# During login
stored_hash = user.hashed_password
input_password = "SuperSecret123!"
if verify_password(input_password, stored_hash):
print("✅ Password correct")
else:
print("❌ Password incorrect")
7.3 Custom Argon2 Parameters¶
# Advanced: Custom hashing parameters
from argon2 import PasswordHasher
ph = PasswordHasher(
time_cost=3, # Iterations
memory_cost=65536, # 64 MB
parallelism=4, # Threads
hash_len=32, # Output length
salt_len=16 # Salt length
)
hashed = ph.hash("password")
verified = ph.verify(hashed, "password")
8. RBAC (Role-Based Access Control)¶
8.1 Role Checker Utility¶
from structum_lab.plugins.auth import RoleChecker
# Check single role
if RoleChecker.has_role(user, "admin"):
print("User is admin")
# Check multiple roles (OR logic)
if RoleChecker.has_any_role(user, ["admin", "moderator"]):
print("User is admin OR moderator")
# Check all roles (AND logic)
if RoleChecker.has_all_roles(user, ["editor", "reviewer"]):
print("User is editor AND reviewer")
8.2 Permission Decorator¶
from structum_lab.plugins.auth import require_permission
@require_permission("delete_posts")
def delete_post(post_id: str, current_user: User):
"""Only admin can call this function."""
# Delete logic
pass
# Usage
try:
delete_post("post_123", current_user)
except PermissionError:
print("User doesn't have delete_posts permission")
8.3 Advanced Permission System¶
class User(UserInterface):
# ... other fields...
ROLE_PERMISSIONS = {
"admin": ["*"], # Wildcard: all permissions
"editor": [
"edit_posts",
"create_posts",
"delete_own_posts",
"view_posts"
],
"viewer": ["view_posts"]
}
def has_permission(self, permission: str) -> bool:
"""Sophisticated permission checking."""
for role in self.roles:
perms = self.ROLE_PERMISSIONS.get(role, [])
# Wildcard check
if "*" in perms:
return True
# Exact match
if permission in perms:
return True
# Glob pattern match (e.g., "delete_*")
if any(fnmatch(permission, p) for p in perms):
return True
return False
9. Configuration¶
9.1 Complete Configuration File¶
File: config/app/auth.toml
[default]
# JWT Algorithm
ALGORITHM = "HS256" # HS256, HS384, HS512, RS256, RS384, RS512
# Token Expiration
ACCESS_TOKEN_EXPIRE_MINUTES = 15
REFRESH_TOKEN_EXPIRE_DAYS = 7
# Argon2 Parameters (optional, uses defaults if omitted)
[default.argon2]
time_cost = 3
memory_cost = 65536 # 64 MB
parallelism = 4
hash_len = 32
salt_len = 16
File: config/.secrets.toml
[auth]
# ⚠️ CRITICAL: Keep these secret!
SECRET_KEY = "your-256-bit-secret-key-here"
REFRESH_SECRET_KEY = "different-256-bit-secret-key-for-refresh"
9.2 Generate Secret Keys¶
# Generate secure random keys
python -c "import secrets; print(secrets.token_urlsafe(32))"
# Output: kR7vN2wP9sT4bL8xC1mQ6fH3jD5gK0aZ
# Generate another for refresh
python -c "import secrets; print(secrets.token_urlsafe(32))"
# Output: xY9zW2vU4tS7qP1oN3mL6kJ8hG5fD0cB
9.3 Environment Variable Override¶
# Override in production
export STRUCTUM_AUTH__SECRET_KEY="production-secret-key"
export STRUCTUM_AUTH__REFRESH_SECRET_KEY="production-refresh-key"
export STRUCTUM_AUTH__ACCESS_TOKEN_EXPIRE_MINUTES=30
10. FastAPI Integration¶
10.1 Middleware Setup¶
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from structum_lab.plugins.auth import JWTAuthProvider
app = FastAPI()
security = HTTPBearer()
auth = JWTAuthProvider.from_config()
async def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security)
) -> User:
"""
Dependency to extract current user from JWT.
Use in routes like: current_user: User = Depends(get_current_user)
"""
token = credentials.credentials
try:
payload = auth.verify_access_token(token)
user_id = payload["sub"]
# Load user from database
user_repo = UserRepository(get_database())
user = user_repo.find_by_id(user_id)
if not user or not user.is_active:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials"
)
return user
except Exception as e:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"Could not validate credentials: {e}"
)
@app.get("/protected")
async def protected_route(current_user: User = Depends(get_current_user)):
"""Protected route - requires authentication."""
return {"message": f"Hello {current_user.username}!"}
10.2 Login Endpoint¶
from pydantic import BaseModel
class LoginRequest(BaseModel):
username: str
password: str
class TokenResponse(BaseModel):
access_token: str
refresh_token: str
token_type: str
expires_in: int
@app.post("/auth/login", response_model=TokenResponse)
async def login(credentials: LoginRequest):
"""Login endpoint."""
user_repo = UserRepository(get_database())
tokens = auth.authenticate(
credentials.username,
credentials.password,
user_repo
)
if not tokens:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password"
)
return TokenResponse(
access_token=tokens.access_token,
refresh_token=tokens.refresh_token,
token_type=tokens.token_type,
expires_in=tokens.expires_in
)
10.3 Refresh Endpoint¶
class RefreshRequest(BaseModel):
refresh_token: str
@app.post("/auth/refresh", response_model=TokenResponse)
async def refresh(request: RefreshRequest):
"""Refresh access token using refresh token."""
user_repo = UserRepository(get_database())
try:
new_tokens = auth.refresh_access_token(
request.refresh_token,
user_repo
)
return TokenResponse(
access_token=new_tokens.access_token,
refresh_token=new_tokens.refresh_token,
token_type=new_tokens.token_type,
expires_in=new_tokens.expires_in
)
except Exception as e:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"Refresh failed: {e}"
)
11. Testing¶
11.1 Mocking Auth in Tests¶
from unittest.mock import MagicMock
from structum_lab.auth import TokenPair
def test_login_success():
# Mock repositories
auth_mock = MagicMock()
repo_mock = MagicMock()
# Setup return values
auth_mock.authenticate.return_value = TokenPair(
access_token="fake_access",
refresh_token="fake_refresh",
token_type="Bearer",
expires_in=900
)
# Call function under test
tokens = auth_mock.authenticate("user", "pass", repo_mock)
assert tokens.access_token == "fake_access"
12. Security Best Practices¶
12.1 Checklist¶
[ ] HTTPS Only: JWTs can be intercepted over HTTP.
[ ] Secure Cookies: Store refresh tokens in
HttpOnly; Secure; SameSite=Strictcookies (not localStorage).[ ] Short Expiration: Access tokens should expire in 5-15 minutes.
[ ] Key Rotation: Rotate
SECRET_KEYperiodically.[ ] Scopes: Use RBAC to grant minimum privileges.
13. API Reference¶
See the Core Auth Module for full protocol definitions and type hints.