Skip to content

Authentication

Autodoc for the two auth models. See Auth flow for the narrative.

Admin auth: JWT + Redis sessions

utils

JWT + Redis-session auth helpers, mirrored from skynet-app-api.

Access tokens are HS256 JWTs wrapping a random session token that is looked up in Redis to resolve the live :class:User. validate_access_token gates any authenticated route; validate_admin additionally requires role == "admin".

hash_pbkdf2(username, password, salt=None)

Hash username + password with pbkdf2/sha-1; generate a 32-byte salt if absent.

Parameters:

Name Type Description Default
username str

cleartext username.

required
password str

cleartext password.

required
salt Optional[bytes]

cleartext random salt (32 bytes); generated when omitted.

None

Returns:

Type Description
bytes

base64-encoded salted hash.

Source code in src/cms_api/authentication/utils.py
def hash_pbkdf2(username: str, password: str, salt: Optional[bytes] = None) -> bytes:
    """Hash username + password with pbkdf2/sha-1; generate a 32-byte salt if absent.

    Args:
        username: cleartext username.
        password: cleartext password.
        salt: cleartext random salt (32 bytes); generated when omitted.

    Returns:
        base64-encoded salted hash.
    """
    if not salt:
        salt = token_hex(16).encode("utf-8")
    assert isinstance(salt, bytes), "Password salt should be bytes"
    assert len(salt) == 32, "Incorrect salt length"

    username_bytes = username.encode("utf-8")
    password_bytes = password.encode("utf-8")
    cleartext_combination = username_bytes + b"\0" + password_bytes
    hashed_password = pbkdf2_hmac("sha1", cleartext_combination, salt, iterations=10, dklen=32)
    hashed_password = b"p" + salt + hashed_password  # p stands for PBKDF2
    return b64encode(hashed_password)

verify_pbkdf2_hash(username, password, salted_hash)

Verify username + password against a base64-encoded salted pbkdf2 hash.

Parameters:

Name Type Description Default
username str

cleartext username.

required
password str

cleartext password.

required
salted_hash str

base64-encoded stored hash.

required

Returns:

Type Description
bool

True when the credentials match the stored hash.

Source code in src/cms_api/authentication/utils.py
def verify_pbkdf2_hash(username: str, password: str, salted_hash: str) -> bool:
    """Verify username + password against a base64-encoded salted pbkdf2 hash.

    Args:
        username: cleartext username.
        password: cleartext password.
        salted_hash: base64-encoded stored hash.

    Returns:
        True when the credentials match the stored hash.
    """
    assert isinstance(salted_hash, str)
    decoded_hash = b64decode(salted_hash)
    start = 1 if chr(decoded_hash[0]) == "p" else 0
    end = start + 32
    salt = decoded_hash[start:end]
    new_hash = hash_pbkdf2(username, password, salt)
    return salted_hash == new_hash.decode("utf-8")

generate_jwt_token(data, expires_in=ACCESS_TOKEN_EXPIRE)

Create and return a signed HS256 JWT carrying data with an exp claim.

The exp claim is enforced by jwt.decode on validation, so the token is rejected once it expires independently of the Redis session TTL.

Parameters:

Name Type Description Default
data dict

payload to sign (e.g. {"token": <session token>}).

required
expires_in int

token lifetime in seconds (default ACCESS_TOKEN_EXPIRE).

ACCESS_TOKEN_EXPIRE

Returns:

Type Description
str

the signed JWT string.

Source code in src/cms_api/authentication/utils.py
def generate_jwt_token(data: dict, expires_in: int = ACCESS_TOKEN_EXPIRE) -> str:
    """Create and return a signed HS256 JWT carrying ``data`` with an ``exp`` claim.

    The ``exp`` claim is enforced by ``jwt.decode`` on validation, so the token is
    rejected once it expires independently of the Redis session TTL.

    Args:
        data: payload to sign (e.g. ``{"token": <session token>}``).
        expires_in: token lifetime in seconds (default ``ACCESS_TOKEN_EXPIRE``).

    Returns:
        the signed JWT string.
    """
    payload = {**data, "exp": datetime.now(timezone.utc) + timedelta(seconds=expires_in)}
    return jwt.encode(payload, SECRET_KEY, algorithm="HS256")

decode_jwt_session_token(token)

Decode a JWT and return the wrapped Redis session token from its payload.

Parameters:

Name Type Description Default
token str

signed JWT (access or refresh).

required

Returns:

Type Description
str

the inner session token string.

Raises:

Type Description
HTTPException

when the JWT is invalid or lacks a session token claim.

Source code in src/cms_api/authentication/utils.py
def decode_jwt_session_token(token: str) -> str:
    """Decode a JWT and return the wrapped Redis session token from its payload.

    Args:
        token: signed JWT (access or refresh).

    Returns:
        the inner session token string.

    Raises:
        HTTPException: when the JWT is invalid or lacks a session token claim.
    """
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
        session_token = payload.get("token")
        if not session_token:
            raise invalid_token_exception
        return session_token
    except JWTError:
        raise invalid_token_exception

validate_access_token(token=Depends(oauth2_scheme), request=None)

Resolve the authenticated :class:User for a valid access token.

Decodes the JWT, then looks the wrapped session token up in Redis. Raises invalid_token_exception (401) for a missing, malformed, expired, or unknown-session token.

Parameters:

Name Type Description Default
token str

bearer access token.

Depends(oauth2_scheme)
request Request

optional request, stamped with state.user_id when present.

None

Returns:

Type Description
User

the matching :class:User.

Source code in src/cms_api/authentication/utils.py
def validate_access_token(token: str = Depends(oauth2_scheme), request: Request = None) -> User:
    """Resolve the authenticated :class:`User` for a valid access token.

    Decodes the JWT, then looks the wrapped session token up in Redis. Raises
    ``invalid_token_exception`` (401) for a missing, malformed, expired, or
    unknown-session token.

    Args:
        token: bearer access token.
        request: optional request, stamped with ``state.user_id`` when present.

    Returns:
        the matching :class:`User`.
    """
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
        user_cache_token = payload.get("token")
        if not user_cache_token:
            raise invalid_token_exception
    except JWTError:
        raise invalid_token_exception

    with RedisConnector() as redis_conn:
        user = redis_conn.get_user_info(user_cache_token)
    if not user:
        raise invalid_token_exception

    if request is not None:
        try:
            request.state.user_id = user.user_id
        except AttributeError:
            pass
    return user

validate_admin(token=Depends(oauth2_scheme))

Resolve the authenticated user and require a platform admin role (else 403).

Parameters:

Name Type Description Default
token str

bearer access token.

Depends(oauth2_scheme)

Returns:

Type Description
User

the authenticated admin :class:User.

Source code in src/cms_api/authentication/utils.py
def validate_admin(token: str = Depends(oauth2_scheme)) -> User:
    """Resolve the authenticated user and require a platform admin role (else 403).

    Args:
        token: bearer access token.

    Returns:
        the authenticated admin :class:`User`.
    """
    user = validate_access_token(token=token)
    if not is_platform_admin(user.role):
        raise user_is_not_admin
    return user

Worker API key

worker

Worker API key validation for the public v2 surface.

validate_worker_api_key(authorization=Header(default=None))

Require Authorization: Bearer <worker-key> on /public/v2 routes.

Raises 503 when the key is not configured (service refuses open v2 reads).

Source code in src/cms_api/authentication/worker.py
def validate_worker_api_key(authorization: Optional[str] = Header(default=None)) -> None:
    """Require ``Authorization: Bearer <worker-key>`` on /public/v2 routes.

    Raises 503 when the key is not configured (service refuses open v2 reads).
    """
    if not WORKER_API_KEY:
        raise HTTPException(
            status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
            detail="Worker API key is not configured",
        )
    if not authorization or not authorization.startswith("Bearer "):
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Missing or invalid Authorization header",
        )
    token = authorization[7:].strip()
    if not token or not secrets.compare_digest(token, WORKER_API_KEY):
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid worker API key",
        )