Structum Auth (structum-auth)

Documentation Source Code Python 3.11+ License: Apache-2.0

Structum Auth manages JWT authentication, secure password hashing (Argon2), and Role-Based Access Control (RBAC).

Feature

Status

Version

Status

Alpha

0.1.0

Namespace

structum_lab.plugins.auth

Dependencies

PyJWT, argon2-cffi


Index

  1. What is Auth Plugin

  2. Core Concepts

  3. Quick Start (5 Minutes)

  4. User Model Implementation

  5. Repository Pattern

  6. JWT Token System

  7. Password Hashing

  8. RBAC (Role-Based Access Control)

  9. Configuration

  10. FastAPI Integration

  11. Testing

  12. Security Best Practices

  13. API Reference


1. What is Auth Plugin

structum-auth is a production-ready authentication system that brings enterprise-grade security to Structum:

1.1 The Problem

Before (Without Plugin):

# ❌ Password in plaintext
users_db = {"john": "password123"}

# ❌ No token expiration
# ❌ No secure hashing
# ❌ No RBAC

After (With Plugin):

# ✅ Argon2 password hashing
# ✅ JWT with access + refresh tokens
# ✅ Integrated RBAC
# ✅ Type-safe with Pydantic

auth = JWTAuthProvider.from_config()
tokens = auth.authenticate("john", "password123", user_repo)

1.2 Key Features

Feature

Description

JWT Tokens

Short-lived access token + long-lived refresh token

Argon2 Hashing

State-of-the-art password hashing (better than bcrypt)

RBAC

Role-Based Access Control with permission checking

Type Safety

Pydantic validation on all models

Config Integration

Setup via Dynaconf with secrets isolation

Extensible

Protocol-based for custom implementations


2. Core Concepts

2.1 JWT Token System

The plugin uses a dual-token system for security and usability:

        graph TD
    subgraph "Token System"
        LOGIN["Login Request"] --> |Success| TOKENS
        TOKENS --> ACCESS["Access Token"]
        TOKENS --> REFRESH["Refresh Token"]

        ACCESS --> |"TTL: 15 min"| API["API Requests"]
        API --> |Expired| RENEW["Renew Flow"]
        
        REFRESH --> |"TTL: 7 days"| RENEW
        RENEW --> |Valid| NEW_ACCESS["New Access Token"]
        
        style ACCESS fill:#d4f1f4,stroke:#333
        style REFRESH fill:#fad7ac,stroke:#333
    end
    

Workflow:

  1. Login → Returns access + refresh token

  2. Client uses access token for API calls

  3. Access token expires → Client uses refresh to get new access

  4. Refresh token expires → Requires new login

2.2 Argon2 Password Hashing

Why Argon2 instead of bcrypt?

Feature

Argon2

bcrypt

Memory-hard

✅ Yes (GPU resistant)

❌ No

Configurability

Time + Memory + Parallelism

Iterations only

Winner PHC

✅ Password Hashing Competition 2015

Older (1999)

Performance

Parameterizable

Fixed

2.3 RBAC Model

        classDiagram
    class User {
        +list[str] roles
        +has_permission(perm: str)
    }
    class Role {
        +string name
        +list[str] permissions
    }
    class Permission {
        +string code
    }

    User --> Role : has multiple
    Role --> Permission : has multiple
    
# Example permission check
if user.has_permission("delete_posts"):
    delete_post()

3. Quick Start (5 Minutes)

Step 1: Installation

pip install -e packages/auth

Step 2: Configuration

File: config/app/auth.toml

[default]
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 15
REFRESH_TOKEN_EXPIRE_DAYS = 7

File: config/.secrets.toml (⚠️ add to .gitignore)

[auth]
SECRET_KEY = "your-256-bit-secret-key-change-this"
REFRESH_SECRET_KEY = "different-256-bit-refresh-key"

Step 3: Implement User Model

from dataclasses import dataclass
from structum_lab.auth import UserInterface

@dataclass
class User(UserInterface):
    """User model for your application."""
    
    id: str
    username: str
    email: str
    hashed_password: str
    roles: list[str]
    is_active: bool = True
    
    def has_permission(self, permission: str) -> bool:
        """
        Custom logic for permission checking.
        Example: admin has all permissions.
        """
        if "admin" in self.roles:
            return True
        
        # Custom permission mapping
        role_permissions = {
            "editor": ["edit_posts", "view_posts"],
            "viewer": ["view_posts"]
        }
        
        for role in self.roles:
            if permission in role_permissions.get(role, []):
                return True
        
        return False

Step 4: Implement Repository

from structum_lab.auth import UserRepositoryInterface
from typing import Optional

class UserRepository(UserRepositoryInterface):
    """Repository for user database access."""
    
    def __init__(self, db_connection):
        self.db = db_connection
    
    def find_by_username(self, username: str) -> Optional[User]:
        """Find user by username."""
        # Example with SQL
        result = self.db.execute(
            "SELECT * FROM users WHERE username = :username",
            {"username": username}
        )
        row = result.fetchone()
        
        if not row:
            return None
        
        return User(
            id=row["id"],
            username=row["username"],
            email=row["email"],
            hashed_password=row["hashed_password"],
            roles=row["roles"].split(","),  # Assume comma-separated
            is_active=row["is_active"]
        )

Step 5: Authenticate

from structum_lab.plugins.auth import JWTAuthProvider
from structum_lab.config import set_config_provider
from structum_lab.plugins.dynaconf import DynaconfConfigProvider

# Setup config
provider = DynaconfConfigProvider(root_path=".")
provider.auto_discover()
set_config_provider(provider)

# Create auth provider
auth = JWTAuthProvider.from_config()

# Create repository
user_repo = UserRepository(db_connection)

# Login
tokens = auth.authenticate("john_doe", "password123", user_repo)

if tokens:
    print(f"✅ Login successful")
    print(f"Access Token: {tokens.access_token}")
    print(f"Refresh Token: {tokens.refresh_token}")
    print(f"Expires in: {tokens.expires_in} seconds")
else:
    print("❌ Invalid credentials")

4. User Model Implementation

4.1 UserInterface Protocol

The plugin requires your User model to implement UserInterface:

from typing import Protocol

class UserInterface(Protocol):
    """Protocol that every User model must respect."""
    
    id: str
    username: str
    hashed_password: str
    roles: list[str]
    
    def has_permission(self, permission: str) -> bool:
        """Check if user has a specific permission."""
        ...

4.2 Example with Dataclass

from dataclasses import dataclass, field
from datetime import datetime

@dataclass
class User(UserInterface):
    id: str
    username: str
    email: str
    hashed_password: str
    roles: list[str] = field(default_factory=list)
    is_active: bool = True
    created_at: datetime = field(default_factory=datetime.utcnow)
    last_login: datetime | None = None
    
    def has_permission(self, permission: str) -> bool:
        # Custom implementation
        return "admin" in self.roles

4.3 Example with Pydantic

from pydantic import BaseModel, Field
from datetime import datetime

class User(BaseModel, UserInterface):
    """User model with Pydantic validation."""
    
    id: str = Field(..., min_length=1)
    username: str = Field(..., min_length=3, max_length=50)
    email: str = Field(..., pattern=r'^[\w\.-]+@[\w\.-]+\.\w+$')
    hashed_password: str
    roles: list[str] = Field(default_factory=list)
    is_active: bool = True
    created_at: datetime = Field(default_factory=datetime.utcnow)
    
    def has_permission(self, permission: str) -> bool:
        return "admin" in self.roles

4.4 Example with SQLAlchemy ORM

from sqlalchemy import Column, String, Boolean, DateTime, JSON
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()

class User(Base, UserInterface):
    __tablename__ = "users"
    
    id = Column(String, primary_key=True)
    username = Column(String(50), unique=True, nullable=False)
    email = Column(String(255), unique=True, nullable=False)
    hashed_password = Column(String(255), nullable=False)
    roles = Column(JSON, default=list)
    is_active = Column(Boolean, default=True)
    created_at = Column(DateTime, default=datetime.utcnow)
    
    def has_permission(self, permission: str) -> bool:
        return "admin" in (self.roles or [])

5. Repository Pattern

5.1 UserRepositoryInterface

from typing import Protocol, Optional

class UserRepositoryInterface(Protocol):
    """Protocol for repository implementability."""
    
    def find_by_username(self, username: str) -> Optional[UserInterface]:
        """Find user by username. Returns None if not found."""
        ...

5.2 Implementation with Database Plugin

from structum_lab.plugins.database import get_database

class DatabaseUserRepository(UserRepositoryInterface):
    """Repository using Structum Database plugin."""
    
    def find_by_username(self, username: str) -> Optional[User]:
        db = get_database()
        
        with db.transaction() as conn:
            result = conn.execute(
                "SELECT * FROM users WHERE username = :u AND is_active = true",
                {"u": username}
            )
            row = result.fetchone()
        
        if not row:
            return None
        
        return User(
            id=row["id"],
            username=row["username"],
            email=row["email"],
            hashed_password=row["hashed_password"],
            roles=row["roles"]  # Assume already a list
        )

5.3 In-Memory Implementation (Testing)

class InMemoryUserRepository(UserRepositoryInterface):
    """In-memory repository for testing."""
    
    def __init__(self):
        # Pre-populate with test users
        self.users = {
            "admin": User(
                id="1",
                username="admin",
                email="admin@example.com",
                hashed_password="$argon2id$...",  # Pre-calculated hash
                roles=["admin"]
            ),
            "john": User(
                id="2",
                username="john",
                email="john@example.com",
                hashed_password="$argon2id$...",
                roles=["editor"]
            )
        }
    
    def find_by_username(self, username: str) -> Optional[User]:
        return self.users.get(username)

6. JWT Token System

6.1 Token Structure

Access Token Payload:

{
  "sub": "user_id_123",
  "username": "john_doe",
  "roles": ["editor", "viewer"],
  "type": "access",
  "exp": 1705234567,
  "iat": 1705233667
}

Refresh Token Payload:

{
  "sub": "user_id_123",
  "type": "refresh",
  "exp": 1705838467,
  "iat": 1705233667
}

6.2 Token Generation

auth = JWTAuthProvider.from_config()

# Authenticate and get tokens
tokens = auth.authenticate("john", "password", user_repo)

# Tokens object
tokens.access_token   # str: JWT access token
tokens.refresh_token  # str: JWT refresh token
tokens.token_type     # str: "Bearer"
tokens.expires_in     # int: seconds until access expires

6.3 Token Verification

# Verify access token
try:
    payload = auth.verify_access_token(access_token)
    user_id = payload["sub"]
    roles = payload["roles"]
except Exception as e:
    # Token invalid/expired
    print(f"Invalid token: {e}")

6.4 Token Refresh

# Use refresh token to get new access token
try:
    new_tokens = auth.refresh_access_token(refresh_token, user_repo)
    new_access = new_tokens.access_token
except Exception as e:
    # Refresh token invalid/expired
    print(f"Refresh failed: {e}")

7. Password Hashing

7.1 Hash Password

from structum_lab.plugins.auth import hash_password, verify_password

# During registration
plain_password = "SuperSecret123!"
hashed = hash_password(plain_password)

# Store in database
user.hashed_password = hashed

Output:

$argon2id$v=19$m=65536,t=3,p=4$randomsalt$hashedvalue

7.2 Verify Password

# During login
stored_hash = user.hashed_password
input_password = "SuperSecret123!"

if verify_password(input_password, stored_hash):
    print("✅ Password correct")
else:
    print("❌ Password incorrect")

7.3 Custom Argon2 Parameters

# Advanced: Custom hashing parameters
from argon2 import PasswordHasher

ph = PasswordHasher(
    time_cost=3,        # Iterations
    memory_cost=65536,  # 64 MB
    parallelism=4,      # Threads
    hash_len=32,        # Output length
    salt_len=16         # Salt length
)

hashed = ph.hash("password")
verified = ph.verify(hashed, "password")

8. RBAC (Role-Based Access Control)

8.1 Role Checker Utility

from structum_lab.plugins.auth import RoleChecker

# Check single role
if RoleChecker.has_role(user, "admin"):
    print("User is admin")

# Check multiple roles (OR logic)
if RoleChecker.has_any_role(user, ["admin", "moderator"]):
    print("User is admin OR moderator")

# Check all roles (AND logic)
if RoleChecker.has_all_roles(user, ["editor", "reviewer"]):
    print("User is editor AND reviewer")

8.2 Permission Decorator

from structum_lab.plugins.auth import require_permission

@require_permission("delete_posts")
def delete_post(post_id: str, current_user: User):
    """Only admin can call this function."""
    # Delete logic
    pass

# Usage
try:
    delete_post("post_123", current_user)
except PermissionError:
    print("User doesn't have delete_posts permission")

8.3 Advanced Permission System

class User(UserInterface):
    # ... other fields...
    
    ROLE_PERMISSIONS = {
        "admin": ["*"],  # Wildcard: all permissions
        "editor": [
            "edit_posts",
            "create_posts",
            "delete_own_posts",
            "view_posts"
        ],
        "viewer": ["view_posts"]
    }
    
    def has_permission(self, permission: str) -> bool:
        """Sophisticated permission checking."""
        for role in self.roles:
            perms = self.ROLE_PERMISSIONS.get(role, [])
            
            # Wildcard check
            if "*" in perms:
                return True
            
            # Exact match
            if permission in perms:
                return True
            
            # Glob pattern match (e.g., "delete_*")
            if any(fnmatch(permission, p) for p in perms):
                return True
        
        return False

9. Configuration

9.1 Complete Configuration File

File: config/app/auth.toml

[default]
# JWT Algorithm
ALGORITHM = "HS256"  # HS256, HS384, HS512, RS256, RS384, RS512

# Token Expiration
ACCESS_TOKEN_EXPIRE_MINUTES = 15
REFRESH_TOKEN_EXPIRE_DAYS = 7

# Argon2 Parameters (optional, uses defaults if omitted)
[default.argon2]
time_cost = 3
memory_cost = 65536  # 64 MB
parallelism = 4
hash_len = 32
salt_len = 16

File: config/.secrets.toml

[auth]
# ⚠️ CRITICAL: Keep these secret!
SECRET_KEY = "your-256-bit-secret-key-here"
REFRESH_SECRET_KEY = "different-256-bit-secret-key-for-refresh"

9.2 Generate Secret Keys

# Generate secure random keys
python -c "import secrets; print(secrets.token_urlsafe(32))"
# Output: kR7vN2wP9sT4bL8xC1mQ6fH3jD5gK0aZ

# Generate another for refresh
python -c "import secrets; print(secrets.token_urlsafe(32))"
# Output: xY9zW2vU4tS7qP1oN3mL6kJ8hG5fD0cB

9.3 Environment Variable Override

# Override in production
export STRUCTUM_AUTH__SECRET_KEY="production-secret-key"
export STRUCTUM_AUTH__REFRESH_SECRET_KEY="production-refresh-key"
export STRUCTUM_AUTH__ACCESS_TOKEN_EXPIRE_MINUTES=30

10. FastAPI Integration

10.1 Middleware Setup

from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from structum_lab.plugins.auth import JWTAuthProvider

app = FastAPI()
security = HTTPBearer()
auth = JWTAuthProvider.from_config()

async def get_current_user(
    credentials: HTTPAuthorizationCredentials = Depends(security)
) -> User:
    """
    Dependency to extract current user from JWT.
    Use in routes like: current_user: User = Depends(get_current_user)
    """
    token = credentials.credentials
    
    try:
        payload = auth.verify_access_token(token)
        user_id = payload["sub"]
        
        # Load user from database
        user_repo = UserRepository(get_database())
        user = user_repo.find_by_id(user_id)
        
        if not user or not user.is_active:
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="Invalid authentication credentials"
            )
        
        return user
        
    except Exception as e:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail=f"Could not validate credentials: {e}"
        )

@app.get("/protected")
async def protected_route(current_user: User = Depends(get_current_user)):
    """Protected route - requires authentication."""
    return {"message": f"Hello {current_user.username}!"}

10.2 Login Endpoint

from pydantic import BaseModel

class LoginRequest(BaseModel):
    username: str
    password: str

class TokenResponse(BaseModel):
    access_token: str
    refresh_token: str
    token_type: str
    expires_in: int

@app.post("/auth/login", response_model=TokenResponse)
async def login(credentials: LoginRequest):
    """Login endpoint."""
    user_repo = UserRepository(get_database())
    
    tokens = auth.authenticate(
        credentials.username,
        credentials.password,
        user_repo
    )
    
    if not tokens:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password"
        )
    
    return TokenResponse(
        access_token=tokens.access_token,
        refresh_token=tokens.refresh_token,
        token_type=tokens.token_type,
        expires_in=tokens.expires_in
    )

10.3 Refresh Endpoint

class RefreshRequest(BaseModel):
    refresh_token: str

@app.post("/auth/refresh", response_model=TokenResponse)
async def refresh(request: RefreshRequest):
    """Refresh access token using refresh token."""
    user_repo = UserRepository(get_database())
    
    try:
        new_tokens = auth.refresh_access_token(
            request.refresh_token,
            user_repo
        )
        
        return TokenResponse(
            access_token=new_tokens.access_token,
            refresh_token=new_tokens.refresh_token,
            token_type=new_tokens.token_type,
            expires_in=new_tokens.expires_in
        )
    except Exception as e:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail=f"Refresh failed: {e}"
        )

11. Testing

11.1 Mocking Auth in Tests

from unittest.mock import MagicMock
from structum_lab.auth import TokenPair

def test_login_success():
    # Mock repositories
    auth_mock = MagicMock()
    repo_mock = MagicMock()
    
    # Setup return values
    auth_mock.authenticate.return_value = TokenPair(
        access_token="fake_access",
        refresh_token="fake_refresh",
        token_type="Bearer",
        expires_in=900
    )
    
    # Call function under test
    tokens = auth_mock.authenticate("user", "pass", repo_mock)
    
    assert tokens.access_token == "fake_access"

12. Security Best Practices

12.1 Checklist

  • [ ] HTTPS Only: JWTs can be intercepted over HTTP.

  • [ ] Secure Cookies: Store refresh tokens in HttpOnly; Secure; SameSite=Strict cookies (not localStorage).

  • [ ] Short Expiration: Access tokens should expire in 5-15 minutes.

  • [ ] Key Rotation: Rotate SECRET_KEY periodically.

  • [ ] Scopes: Use RBAC to grant minimum privileges.


13. API Reference

See the Core Auth Module for full protocol definitions and type hints.