Core Auth Protocols¶
Authentication interfaces for Structum.
Architectural Role: This module defines the Authentication Boundaries (DP-4). It ensures that: 1. Storage Agnosticism: Auth logic does not know about SQL/Mongo/LDAP (UserRepositoryInterface). 2. Mechanism Agnosticism: Applications verify identity (AuthInterface.authenticate) regardless of the method (JWT, OAuth, SAML).
This separation allows swapping auth providers (e.g., local to OAuth) without changing application logic.
- class structum.auth.AuthInterface(*args, **kwargs)[source]¶
Bases:
ProtocolProtocol for authentication providers in Structum.
This is the main entry point for all authentication operations. Implementations provide JWT-based authentication, password hashing, and token management.
The auth system is storage-agnostic - it doesn’t manage users directly. Instead, it uses
UserRepositoryInterfaceto fetch user data, keeping authentication logic decoupled from storage.- Implementations:
JWTAuthProvider(recommended)OAuthProvider
Example
See usage in specific implementations like JWTAuthProvider.
- authenticate(username: str, password: str, user_repo: UserRepositoryInterface) TokenPair | None[source]¶
Authenticate a user credentials and issue tokens.
Architectural Flow: 1. Find user via user_repo.find_by_username(). 2. Verify password via internal verify_password(). 3. Issue TokenPair if successful.
- refresh(refresh_token: str, user_repo: UserRepositoryInterface) TokenPair | None[source]¶
Exchange a valid refresh token for a fresh access token.
Ensures session continuity without re-entering credentials. Implementations should validate token signature and expiration.
- class structum.auth.UserInterface(*args, **kwargs)[source]¶
Bases:
ProtocolProtocol for authenticated user entities in Structum.
Applications must implement this protocol for their User model to integrate with the authentication system. The auth system never creates or modifies users - it only queries them via
UserRepositoryInterface.Example
Implementing UserInterface:
from dataclasses import dataclass @dataclass class User: id: str username: str hashed_password: str roles: list[str] permissions: set[str] def has_permission(self, permission: str) -> bool: # Check role-based permissions for role in self.roles: if permission in ROLE_PERMISSIONS.get(role, set[Any]()): return True # Check user-specific permissions return permission in self.permissionsUsing with auth:
user = user_repo.find_by_username("john") if user and user.has_permission("users:write"): # Allow operation passNote
This is a Protocol, not a base class. Your User model doesn’t need to inherit from this - just implement the required properties and methods.
See also
UserRepositoryInterface: Repository for user data accessAuthInterface: Authentication provider using users- has_permission(permission: str) bool[source]¶
Check if user has a specific permission.
Example
Permission-based authorization:
@app.delete("/users/{user_id}") async def delete_user(user_id: str, current_user: User = Depends(get_current_user)): if not current_user.has_permission("users:delete"): raise HTTPException(403, "Permission denied") # Delete userRole-based permission mapping:
ROLE_PERMISSIONS = { "admin": {"users:read", "users:write", "users:delete"}, "user": {"users:read"}, } def has_permission(self, permission: str) -> bool: return any( permission in ROLE_PERMISSIONS.get(role, set[Any]()) for role in self.roles )Note
Permission format is application-defined. Use a consistent naming scheme (e.g., resource:action).
- property hashed_password : str¶
User’s hashed password.
Warning
Never store or transmit plain-text passwords. This property should only return hashed values.
Example
Password verification:
if auth.verify_password(input_password, user.hashed_password): # Password matches passSee also
PasswordHasherInterface.hash(): Hash passwordsPasswordHasherInterface.verify(): Verify passwords
- property id : str¶
Unique identifier for the user.
Example
User ID in token payload:
token_payload = {"user_id": user.id, "exp": ...}
- property roles : list[str]¶
List of roles assigned to the user.
Example
Role-based access control:
if "admin" in user.roles: # Allow admin operation passNote
Roles should be lowercase strings. Use
has_permission()for fine-grained permission checks.
- class structum.auth.UserRepositoryInterface(*args, **kwargs)[source]¶
Bases:
ProtocolProtocol for user storage and retrieval in Structum.
Applications implement this to connect the auth system with their database or user storage backend. The auth plugin does NOT manage user storage directly - it delegates all user operations to this repository.
This separation ensures the auth system remains storage-agnostic and can work with any database (PostgreSQL, MongoDB, etc.) or user service (LDAP, OAuth providers).
- Implementations:
Database-backed repository (SQLAlchemy, etc.)
External user service adapter (LDAP, Active Directory)
In-memory repository (testing only)
Example
SQLAlchemy repository implementation:
class SQLAlchemyUserRepository: def __init__(self, db: DatabaseInterface) -> None: self.db = db def find_by_username(self, username: str) -> User | None: with self.db.transaction() as conn: conn.execute( "SELECT * FROM users WHERE username = :username", {"username": username} ) row = conn.fetchone() if not row: return None return User( id=row["id"], username=row["username"], hashed_password=row["password_hash"], roles=row.get("roles", []), ) def find_by_id(self, user_id: str) -> User | None: with self.db.transaction() as conn: conn.execute( "SELECT * FROM users WHERE id = :id", {"id": user_id} ) row = conn.fetchone() return User(**row) if row else NoneUsing with authentication:
user_repo = SQLAlchemyUserRepository(db) auth = JWTAuthProvider.from_config() tokens = auth.authenticate("john", "password123", user_repo) if tokens: print(f"Access token: {tokens.access_token}")Note
Repository is responsible for mapping storage format to
UserInterface. It should handle serialization/deserialization of user data.See also
UserInterface: User entity protocolAuthInterface: Authentication provider using repositories- find_by_id(user_id: str) UserInterface | None[source]¶
Find a user by their unique identifier.
- Parameters:¶
- Returns:¶
User if found, None otherwise.
- Return type:¶
UserInterface | None
Example
Loading user from token:
# After verifying access token payload = auth.verify_access_token(token) if payload: user = user_repo.find_by_id(payload["user_id"]) if user: # User authenticated return userNote
This method is called frequently (on every authenticated request). Consider caching user data for performance.
- find_by_username(username: str) UserInterface | None[source]¶
Find a user by username or email.
- Parameters:¶
- Returns:¶
User if found, None otherwise.
- Return type:¶
UserInterface | None
Example
Looking up user for authentication:
user = user_repo.find_by_username("john@example.com") if user: # Verify password if auth.verify_password(password, user.hashed_password): return auth.create_tokens(user) else: log.warning("Login attempt for unknown user", username=username)Note
Implementation should normalize username (e.g., lowercase) before lookup. Consider using database indexes on username column for performance.
- class structum.auth.PasswordHasherInterface(*args, **kwargs)[source]¶
Bases:
ProtocolProtocol for secure password hashing in Structum.
Implementations must use cryptographically secure hashing algorithms (e.g., Argon2, b
- crypt, scrypt). Never use fast hashes like MD5 or SHA-1
for passwords.
- Implementations:
Argon2Hasher(recommended)BcryptHasher
- Example:
Using password hasher:
from structum.plugins.auth.password import Argon2Hasher hasher = Argon2Hasher() # Hash password during registration hashed = hasher.hash("user_password_123") # Store hashed in database: user.hashed_password = hashed # Verify during login if hasher.verify("user_password_123", hashed): # Password matches return create_token(user) else: # Invalid password raise AuthenticationError("Invalid credentials")- Warning:
Never log, display, or store plain-text passwords. Always hash passwords immediately upon receipt.
- See Also:
AuthInterface: Auth provider using password hasherUserInterface: User entity with hashed_password property
- hash(password: str) str[source]¶
Hash a plain-text password securely.
- Parameters:¶
- Returns:¶
- Hashed password string including algorithm identifier and salt.
Format is implementation-specific (e.g., Argon2:
$argon2id$v=19$...).
- Return type:¶
Example
Creating user with hashed password:
# During user registration plain_password = request.form["password"] hashed = auth.hash_password(plain_password) user = User( id=generate_id(), username=request.form["username"], hashed_password=hashed, # Store this roles=["user"] ) user_repo.save(user)Warning
Hashing is intentionally slow (100-500ms) to resist brute-force attacks. Do not hash passwords in tight loops or performance-critical paths.
Note
Each call generates a unique hash (due to random salt) even for the same password. This is expected and secure behavior.
- verify(password: str, hashed: str) bool[source]¶
Verify a plain-text password against a hash.
Example
Password verification during login:
# Get user from database user = user_repo.find_by_username(username) if not user: return None # User not found # Verify password if auth.verify_password(password, user.hashed_password): # Authentication successful return auth.create_tokens(user) else: # Invalid password log.warning("Failed login attempt", username=username) return NoneWarning
Always use constant-time comparison internally to prevent timing attacks. Most modern hashing libraries handle this automatically.
Note
Returns False for invalid/malformed hashes rather than raising exceptions. This prevents information leakage about hash format.
-
class structum.auth.TokenPair(access_token: str, refresh_token: str, token_type: str =
'bearer', expires_at: datetime | None =None)[source]¶ Bases:
objectData class containing JWT access and refresh token pair.
Example
Creating and using token pair:
tokens = TokenPair( access_token="eyJ0eXAiOiJKV1QiLCJhbGc...", refresh_token="eyJ0eXAiOiJKV1QiLCJhbGc...", token_type="bearer", expires_at=datetime.now() + timedelta(hours=1) ) # Use in HTTP Authorization header headers = {"Authorization": f"{tokens.token_type} {tokens.access_token}"}Note
This class is frozen (immutable) to prevent accidental token modification. Tokens should be treated as opaque strings and never parsed by clients.
See also
AuthInterface.authenticate(): Method that returns token pairsAuthInterface.refresh(): Refresh access tokens-
__init__(access_token: str, refresh_token: str, token_type: str =
'bearer', expires_at: datetime | None =None)¶
-
__init__(access_token: str, refresh_token: str, token_type: str =
- structum.auth.Auth¶
alias of
AuthInterface
- structum.auth.User¶
alias of
UserInterface
- structum.auth.UserRepository¶
alias of
UserRepositoryInterface
- structum.auth.PasswordHasher¶
alias of
PasswordHasherInterface