Files
deer-flow/backend/app/gateway/internal_auth.py
T

62 lines
2.1 KiB
Python

"""Authentication for trusted Gateway internal callers."""
from __future__ import annotations
import os
import secrets
from types import SimpleNamespace
from typing import Any
from deerflow.runtime.user_context import DEFAULT_USER_ID
INTERNAL_AUTH_HEADER_NAME = "X-DeerFlow-Internal-Token"
INTERNAL_OWNER_USER_ID_HEADER_NAME = "X-DeerFlow-Owner-User-Id"
INTERNAL_AUTH_ENV_VAR = "DEER_FLOW_INTERNAL_AUTH_TOKEN"
INTERNAL_SYSTEM_ROLE = "internal"
def _load_internal_auth_token() -> str:
token = os.environ.get(INTERNAL_AUTH_ENV_VAR)
if token:
return token
return secrets.token_urlsafe(32)
_INTERNAL_AUTH_TOKEN = _load_internal_auth_token()
def create_internal_auth_headers(*, owner_user_id: str | None = None) -> dict[str, str]:
"""Return headers that authenticate trusted Gateway internal calls."""
headers = {INTERNAL_AUTH_HEADER_NAME: _INTERNAL_AUTH_TOKEN}
if owner_user_id:
headers[INTERNAL_OWNER_USER_ID_HEADER_NAME] = owner_user_id
return headers
def is_valid_internal_auth_token(token: str | None) -> bool:
"""Return True when *token* matches this Gateway worker's internal token."""
return bool(token) and secrets.compare_digest(token, _INTERNAL_AUTH_TOKEN)
def get_internal_user():
"""Return the synthetic user used for trusted internal channel calls."""
return SimpleNamespace(id=DEFAULT_USER_ID, system_role=INTERNAL_SYSTEM_ROLE)
def get_trusted_internal_owner_user_id(request: Any) -> str | None:
"""Return the owner override for a trusted internal request, if present.
The header is ignored for normal browser/API callers. It is only honored
after ``AuthMiddleware`` has validated the internal auth token and stamped
the synthetic internal user onto ``request.state.user``.
"""
user = getattr(getattr(request, "state", None), "user", None)
if getattr(user, "system_role", None) != INTERNAL_SYSTEM_ROLE:
return None
owner_user_id = request.headers.get(INTERNAL_OWNER_USER_ID_HEADER_NAME)
if not owner_user_id:
return None
owner_user_id = owner_user_id.strip()
return owner_user_id or None