Core Auth Protocols

Authentication interfaces for Structum.

Architectural Role: This module defines the Authentication Boundaries (DP-4). It ensures that: 1. Storage Agnosticism: Auth logic does not know about SQL/Mongo/LDAP (UserRepositoryInterface). 2. Mechanism Agnosticism: Applications verify identity (AuthInterface.authenticate) regardless of the method (JWT, OAuth, SAML).

This separation allows swapping auth providers (e.g., local to OAuth) without changing application logic.

class structum.auth.AuthInterface(*args, **kwargs)[source]

Bases: Protocol

Protocol for authentication providers in Structum.

This is the main entry point for all authentication operations. Implementations provide JWT-based authentication, password hashing, and token management.

The auth system is storage-agnostic - it doesn’t manage users directly. Instead, it uses UserRepositoryInterface to fetch user data, keeping authentication logic decoupled from storage.

Implementations:
  • JWTAuthProvider (recommended)

  • OAuthProvider

Example

See usage in specific implementations like JWTAuthProvider.

__init__(*args, **kwargs)
authenticate(username: str, password: str, user_repo: UserRepositoryInterface) TokenPair | None[source]

Authenticate a user credentials and issue tokens.

Architectural Flow: 1. Find user via user_repo.find_by_username(). 2. Verify password via internal verify_password(). 3. Issue TokenPair if successful.

Parameters:
username: str

The principal identifier (email, username).

password: str

The secret credential.

user_repo: UserRepositoryInterface

Abstract access to user storage.

Returns:

Access/Refresh tokens if successful, None otherwise.

Return type:

TokenPair

hash_password(password: str) str[source]

Hash a password.

refresh(refresh_token: str, user_repo: UserRepositoryInterface) TokenPair | None[source]

Exchange a valid refresh token for a fresh access token.

Ensures session continuity without re-entering credentials. Implementations should validate token signature and expiration.

verify_access_token(token: str) dict[str, Any] | None[source]

Verify the validity of an access token.

Security Critical: This method is the gatekeeper for all protected resources. It must verify: - Digital Signature (Integrity) - Expiration Time (Validity) - Tenant/Audience claims (Scope)

Returns:

Decoded claims payload (e.g., {“user_id”: “…”, “role”: “admin”}) if valid. None: If invalid, expired, or tampered.

Return type:

dict

verify_password(password: str, hashed: str) bool[source]

Verify a password.

class structum.auth.UserInterface(*args, **kwargs)[source]

Bases: Protocol

Protocol for authenticated user entities in Structum.

Applications must implement this protocol for their User model to integrate with the authentication system. The auth system never creates or modifies users - it only queries them via UserRepositoryInterface.

Example

Implementing UserInterface:

from dataclasses import dataclass

@dataclass
class User:
    id: str
    username: str
    hashed_password: str
    roles: list[str]
    permissions: set[str]

    def has_permission(self, permission: str) -> bool:
        # Check role-based permissions
        for role in self.roles:
            if permission in ROLE_PERMISSIONS.get(role, set[Any]()):
                return True
        # Check user-specific permissions
        return permission in self.permissions

Using with auth:

user = user_repo.find_by_username("john")
if user and user.has_permission("users:write"):
    # Allow operation
    pass

Note

This is a Protocol, not a base class. Your User model doesn’t need to inherit from this - just implement the required properties and methods.

See also

UserRepositoryInterface: Repository for user data access AuthInterface: Authentication provider using users

__init__(*args, **kwargs)
has_permission(permission: str) bool[source]

Check if user has a specific permission.

Parameters:
permission : str

Permission string, typically in format resource:action (e.g., “users:write”, “posts:delete”).

Returns:

True if user has the permission, False otherwise.

Return type:

bool

Example

Permission-based authorization:

@app.delete("/users/{user_id}")
async def delete_user(user_id: str, current_user: User = Depends(get_current_user)):
    if not current_user.has_permission("users:delete"):
        raise HTTPException(403, "Permission denied")
    # Delete user

Role-based permission mapping:

ROLE_PERMISSIONS = {
    "admin": {"users:read", "users:write", "users:delete"},
    "user": {"users:read"},
}

def has_permission(self, permission: str) -> bool:
    return any(
        permission in ROLE_PERMISSIONS.get(role, set[Any]())
        for role in self.roles
    )

Note

Permission format is application-defined. Use a consistent naming scheme (e.g., resource:action).

property hashed_password : str

User’s hashed password.

Returns:

Securely hashed password (e.g., Argon2, bcrypt).

Return type:

str

Warning

Never store or transmit plain-text passwords. This property should only return hashed values.

Example

Password verification:

if auth.verify_password(input_password, user.hashed_password):
    # Password matches
    pass

See also

PasswordHasherInterface.hash(): Hash passwords PasswordHasherInterface.verify(): Verify passwords

property id : str

Unique identifier for the user.

Returns:

User ID, typically a UUID or database primary key.

Return type:

str

Example

User ID in token payload:

token_payload = {"user_id": user.id, "exp": ...}
property roles : list[str]

List of roles assigned to the user.

Returns:

Role names (e.g., [“admin”, “user”, “moderator”]).

Return type:

list[str]

Example

Role-based access control:

if "admin" in user.roles:
    # Allow admin operation
    pass

Note

Roles should be lowercase strings. Use has_permission() for fine-grained permission checks.

property username : str

User’s username or email address.

Returns:

Username, typically used for login and display.

Return type:

str

Example

username in login:

user = user_repo.find_by_username(username)
if user and auth.verify_password(password, user.hashed_password):
    return auth.authenticate(...)
class structum.auth.UserRepositoryInterface(*args, **kwargs)[source]

Bases: Protocol

Protocol for user storage and retrieval in Structum.

Applications implement this to connect the auth system with their database or user storage backend. The auth plugin does NOT manage user storage directly - it delegates all user operations to this repository.

This separation ensures the auth system remains storage-agnostic and can work with any database (PostgreSQL, MongoDB, etc.) or user service (LDAP, OAuth providers).

Implementations:
  • Database-backed repository (SQLAlchemy, etc.)

  • External user service adapter (LDAP, Active Directory)

  • In-memory repository (testing only)

Example

SQLAlchemy repository implementation:

class SQLAlchemyUserRepository:
    def __init__(self, db: DatabaseInterface) -> None:
        self.db = db

    def find_by_username(self, username: str) -> User | None:
        with self.db.transaction() as conn:
            conn.execute(
                "SELECT * FROM users WHERE username = :username",
                {"username": username}
            )
            row = conn.fetchone()
            if not row:
                return None

            return User(
                id=row["id"],
                username=row["username"],
                hashed_password=row["password_hash"],
                roles=row.get("roles", []),
            )

    def find_by_id(self, user_id: str) -> User | None:
        with self.db.transaction() as conn:
            conn.execute(
                "SELECT * FROM users WHERE id = :id",
                {"id": user_id}
            )
            row = conn.fetchone()
            return User(**row) if row else None

Using with authentication:

user_repo = SQLAlchemyUserRepository(db)
auth = JWTAuthProvider.from_config()

tokens = auth.authenticate("john", "password123", user_repo)
if tokens:
    print(f"Access token: {tokens.access_token}")

Note

Repository is responsible for mapping storage format to UserInterface. It should handle serialization/deserialization of user data.

See also

UserInterface: User entity protocol AuthInterface: Authentication provider using repositories

__init__(*args, **kwargs)
find_by_id(user_id: str) UserInterface | None[source]

Find a user by their unique identifier.

Parameters:
user_id : str

User’s unique identifier (typically UUID or database ID).

Returns:

User if found, None otherwise.

Return type:

UserInterface | None

Example

Loading user from token:

# After verifying access token
payload = auth.verify_access_token(token)
if payload:
    user = user_repo.find_by_id(payload["user_id"])
    if user:
        # User authenticated
        return user

Note

This method is called frequently (on every authenticated request). Consider caching user data for performance.

find_by_username(username: str) UserInterface | None[source]

Find a user by username or email.

Parameters:
username : str

Username or email to search for. Should be case-insensitive in most implementations.

Returns:

User if found, None otherwise.

Return type:

UserInterface | None

Example

Looking up user for authentication:

user = user_repo.find_by_username("john@example.com")
if user:
    # Verify password
    if auth.verify_password(password, user.hashed_password):
        return auth.create_tokens(user)
else:
    log.warning("Login attempt for unknown user", username=username)

Note

Implementation should normalize username (e.g., lowercase) before lookup. Consider using database indexes on username column for performance.

class structum.auth.PasswordHasherInterface(*args, **kwargs)[source]

Bases: Protocol

Protocol for secure password hashing in Structum.

Implementations must use cryptographically secure hashing algorithms (e.g., Argon2, b

crypt, scrypt). Never use fast hashes like MD5 or SHA-1

for passwords.

Implementations:
  • Argon2Hasher (recommended)

  • BcryptHasher

Example:

Using password hasher:

from structum.plugins.auth.password import Argon2Hasher

hasher = Argon2Hasher()

# Hash password during registration
hashed = hasher.hash("user_password_123")
# Store hashed in database: user.hashed_password = hashed

# Verify during login
if hasher.verify("user_password_123", hashed):
    # Password matches
    return create_token(user)
else:
    # Invalid password
    raise AuthenticationError("Invalid credentials")
Warning:

Never log, display, or store plain-text passwords. Always hash passwords immediately upon receipt.

See Also:

AuthInterface: Auth provider using password hasher UserInterface: User entity with hashed_password property

__init__(*args, **kwargs)
hash(password: str) str[source]

Hash a plain-text password securely.

Parameters:
password : str

Plain-text password to hash. No length restrictions, but implementations may truncate very long passwords.

Returns:

Hashed password string including algorithm identifier and salt.

Format is implementation-specific (e.g., Argon2: $argon2id$v=19$...).

Return type:

str

Example

Creating user with hashed password:

# During user registration
plain_password = request.form["password"]
hashed = auth.hash_password(plain_password)

user = User(
    id=generate_id(),
    username=request.form["username"],
    hashed_password=hashed,  # Store this
    roles=["user"]
)
user_repo.save(user)

Warning

Hashing is intentionally slow (100-500ms) to resist brute-force attacks. Do not hash passwords in tight loops or performance-critical paths.

Note

Each call generates a unique hash (due to random salt) even for the same password. This is expected and secure behavior.

verify(password: str, hashed: str) bool[source]

Verify a plain-text password against a hash.

Parameters:
password : str

Plain-text password to verify.

hashed : str

Previously hashed password (from database).

Returns:

True if password matches hash, False otherwise.

Return type:

bool

Example

Password verification during login:

# Get user from database
user = user_repo.find_by_username(username)
if not user:
    return None  # User not found

# Verify password
if auth.verify_password(password, user.hashed_password):
    # Authentication successful
    return auth.create_tokens(user)
else:
    # Invalid password
    log.warning("Failed login attempt", username=username)
    return None

Warning

Always use constant-time comparison internally to prevent timing attacks. Most modern hashing libraries handle this automatically.

Note

Returns False for invalid/malformed hashes rather than raising exceptions. This prevents information leakage about hash format.

class structum.auth.TokenPair(access_token: str, refresh_token: str, token_type: str = 'bearer', expires_at: datetime | None = None)[source]

Bases: object

Data class containing JWT access and refresh token pair.

access_token

Short-lived JWT access token for API requests.

Type:

str

refresh_token

Long-lived token for obtaining new access tokens.

Type:

str

token_type

Token type, typically “bearer” for JWT. Defaults to “bearer”.

Type:

str

expires_at

Expiration timestamp for access token, if available.

Type:

datetime | None

Example

Creating and using token pair:

tokens = TokenPair(
    access_token="eyJ0eXAiOiJKV1QiLCJhbGc...",
    refresh_token="eyJ0eXAiOiJKV1QiLCJhbGc...",
    token_type="bearer",
    expires_at=datetime.now() + timedelta(hours=1)
)

# Use in HTTP Authorization header
headers = {"Authorization": f"{tokens.token_type} {tokens.access_token}"}

Note

This class is frozen (immutable) to prevent accidental token modification. Tokens should be treated as opaque strings and never parsed by clients.

See also

AuthInterface.authenticate(): Method that returns token pairs AuthInterface.refresh(): Refresh access tokens

__init__(access_token: str, refresh_token: str, token_type: str = 'bearer', expires_at: datetime | None = None)
expires_at : datetime | None = None
token_type : str = 'bearer'
access_token : str
refresh_token : str
structum.auth.Auth

alias of AuthInterface

structum.auth.User

alias of UserInterface

structum.auth.UserRepository

alias of UserRepositoryInterface

structum.auth.PasswordHasher

alias of PasswordHasherInterface

Interfaces