"""Centralized accessors for singleton objects stored on ``app.state``. **Getters** (used by routers): raise 503 when a required dependency is missing, except ``get_store`` which returns ``None``. ``AppConfig`` is intentionally *not* cached on ``app.state``. Routers and the run path resolve it through :func:`deerflow.config.app_config.get_app_config`, which performs mtime-based hot reload, so edits to ``config.yaml`` take effect on the next request without a process restart. The engines created in :func:`langgraph_runtime` (stream bridge, persistence, checkpointer, store, run-event store) accept a ``startup_config`` snapshot — they are restart-required by design and stay bound to that snapshot to keep the live process consistent with itself. Initialization is handled directly in ``app.py`` via :class:`AsyncExitStack`. """ from __future__ import annotations import asyncio import logging from collections.abc import AsyncGenerator, Callable from contextlib import AsyncExitStack, asynccontextmanager from typing import TYPE_CHECKING, TypeVar, cast from fastapi import FastAPI, HTTPException, Request from langgraph.types import Checkpointer from deerflow.config.app_config import AppConfig, get_app_config from deerflow.persistence.feedback import FeedbackRepository from deerflow.runtime import RunContext, RunManager, StreamBridge from deerflow.runtime.events.store.base import RunEventStore from deerflow.runtime.runs.store.base import RunStore logger = logging.getLogger(__name__) # Upper bound (seconds) for draining in-flight runs during shutdown, before the # AsyncExitStack tears down the checkpointer (and its connection pool). Kept # local to avoid an app -> deps -> app import cycle. This is a *separate* budget # from ``app.gateway.app._SHUTDOWN_HOOK_TIMEOUT_SECONDS`` (currently also 5.0s, # which bounds channel-service stop): the two govern independent teardown steps # and may diverge, but both count toward the lifespan shutdown window — revisit # them together if their sum must stay within the server's graceful-shutdown # timeout. _RUN_DRAIN_TIMEOUT_SECONDS = 5.0 async def _drain_inflight_runs(run_manager: RunManager) -> None: """Drain in-flight runs before the checkpointer is torn down (issue #3373). Shields the (internally-bounded) drain so that even if the lifespan coroutine is itself cancelled mid-shutdown — a second SIGINT or the server's graceful-shutdown timeout, i.e. the same signal storm behind #3373 — the checkpointer pool is not closed while run tasks are still writing checkpoints. On such a cancellation we let the already-running drain finish (it is bounded by ``RunManager.shutdown``'s own timeout) and then propagate the cancellation. """ drain = asyncio.create_task(run_manager.shutdown(timeout=_RUN_DRAIN_TIMEOUT_SECONDS)) try: await asyncio.shield(drain) except asyncio.CancelledError: # Re-shield so this second wait does not abandon the in-flight drain; # it is bounded, so this cannot hang. Then re-raise to honour shutdown. try: await asyncio.shield(drain) except Exception: logger.exception("In-flight run drain failed after shutdown cancellation") raise except Exception: logger.exception("Failed to drain in-flight runs during shutdown") if TYPE_CHECKING: from app.gateway.auth.local_provider import LocalAuthProvider from app.gateway.auth.repositories.sqlite import SQLiteUserRepository from deerflow.persistence.thread_meta.base import ThreadMetaStore from deerflow.runtime import RunRecord T = TypeVar("T") async def _mark_latest_recovered_threads_error( run_manager: RunManager, thread_store: ThreadMetaStore, recovered_runs: list[RunRecord], ) -> None: """Mark thread status as error only when its newest run was recovered.""" recovered_by_thread: dict[str, set[str]] = {} for record in recovered_runs: recovered_by_thread.setdefault(record.thread_id, set()).add(record.run_id) for thread_id, recovered_run_ids in recovered_by_thread.items(): try: latest_runs = await run_manager.list_by_thread(thread_id, user_id=None, limit=1) except Exception: logger.warning("Failed to find latest run for thread %s during run reconciliation", thread_id, exc_info=True) continue if not latest_runs or latest_runs[0].run_id not in recovered_run_ids: continue try: await thread_store.update_status(thread_id, "error", user_id=None) except Exception: logger.warning("Failed to mark thread %s as error during run reconciliation", thread_id, exc_info=True) def get_config() -> AppConfig: """Return the freshest ``AppConfig`` for the current request. Routes through :func:`deerflow.config.app_config.get_app_config`, which honours runtime ``ContextVar`` overrides and reloads ``config.yaml`` from disk when its mtime changes. ``AppConfig`` is not cached on ``app.state`` at all — the only startup-time snapshot lives as a local ``startup_config`` variable inside ``lifespan()`` and is passed explicitly into :func:`langgraph_runtime` for the engines that are restart-required by design. Routing every request through :func:`get_app_config` closes the bytedance/deer-flow issue #3107 BUG-001 split-brain where the worker / lead-agent thread saw a stale startup snapshot. Hot-reload boundary: fields backed by startup-time singletons (engines, sandbox provider, IM channels, logging handler) require a process restart to change at runtime. The authoritative list lives in :mod:`deerflow.config.reload_boundary` and is mirrored by the standardised ``"startup-only:"`` prefix on the matching ``Field(description=...)`` in :class:`AppConfig` — IDE hover on those fields will surface the boundary inline. See ``backend/CLAUDE.md`` "Config Hot-Reload Boundary" for the operator summary. Any failure to materialise the config (missing file, permission denied, YAML parse error, validation error) is reported as 503 — semantically "the gateway cannot serve requests without a usable configuration" — and logged with the original exception so operators have something to debug. """ try: return get_app_config() except Exception as exc: # noqa: BLE001 - request boundary: log and degrade gracefully logger.exception("Failed to load AppConfig at request time") raise HTTPException(status_code=503, detail="Configuration not available") from exc @asynccontextmanager async def langgraph_runtime(app: FastAPI, startup_config: AppConfig) -> AsyncGenerator[None, None]: """Bootstrap and tear down all LangGraph runtime singletons. ``startup_config`` is the ``AppConfig`` snapshot taken once during ``lifespan()`` for one-shot infrastructure bootstrap. The engines and stores constructed here (stream bridge, persistence engine, checkpointer, store, run-event store) are restart-required by design — they hold live connections, file handles, or singleton providers — so they bind to this snapshot and survive across `config.yaml` edits. Request-time consumers must still go through :func:`get_config` for any field that should be hot-reloadable. See ``backend/CLAUDE.md`` "Config Hot-Reload Boundary". The matching ``run_events_config`` is frozen onto ``app.state`` so :func:`get_run_context` pairs a freshly-loaded ``AppConfig`` with the *startup-time* run-events configuration the underlying ``event_store`` was built from — otherwise the runtime could end up combining a live new ``run_events_config`` with an event store still bound to the previous backend. Usage in ``app.py``:: async with langgraph_runtime(app, startup_config): yield """ from deerflow.persistence.engine import close_engine, get_session_factory, init_engine_from_config from deerflow.runtime import make_store, make_stream_bridge from deerflow.runtime.checkpointer.async_provider import make_checkpointer from deerflow.runtime.events.store import make_run_event_store async with AsyncExitStack() as stack: config = startup_config app.state.stream_bridge = await stack.enter_async_context(make_stream_bridge(config)) # Initialize persistence engine BEFORE checkpointer so that # auto-create-database logic runs first (postgres backend). await init_engine_from_config(config.database) app.state.checkpointer = await stack.enter_async_context(make_checkpointer(config)) app.state.store = await stack.enter_async_context(make_store(config)) # Initialize repositories — one get_session_factory() call for all. sf = get_session_factory() if sf is not None: from deerflow.persistence.feedback import FeedbackRepository from deerflow.persistence.run import RunRepository app.state.run_store = RunRepository(sf) app.state.feedback_repo = FeedbackRepository(sf) else: from deerflow.runtime.runs.store.memory import MemoryRunStore app.state.run_store = MemoryRunStore() app.state.feedback_repo = None from deerflow.persistence.thread_meta import make_thread_store app.state.thread_store = make_thread_store(sf, app.state.store) # Run event store. The store and the matching ``run_events_config`` are # both frozen at startup so ``get_run_context`` does not combine a # freshly-reloaded ``AppConfig.run_events`` with a store still bound to # the previous backend. run_events_config = getattr(config, "run_events", None) app.state.run_events_config = run_events_config app.state.run_event_store = make_run_event_store(run_events_config) # RunManager with store backing for persistence app.state.run_manager = RunManager(store=app.state.run_store) if getattr(config.database, "backend", None) == "sqlite": from deerflow.utils.time import now_iso # Startup-only recovery: clean shutdowns return no active rows and # the thread-status update below becomes a no-op. recovered_runs = await app.state.run_manager.reconcile_orphaned_inflight_runs( error="Gateway restarted before this run reached a durable final state.", before=now_iso(), ) await _mark_latest_recovered_threads_error(app.state.run_manager, app.state.thread_store, recovered_runs) try: yield finally: # Drain in-flight run tasks BEFORE the AsyncExitStack tears down the # checkpointer (and its connection pool). A run still mid-graph would # otherwise leak into asyncio.run() shutdown, where langgraph's # _checkpointer_put_after_previous aput races the closed pool and # raises PoolClosed (issue #3373). run_manager = getattr(app.state, "run_manager", None) if run_manager is not None: await _drain_inflight_runs(run_manager) await close_engine() # --------------------------------------------------------------------------- # Getters – called by routers per-request # --------------------------------------------------------------------------- def _require(attr: str, label: str) -> Callable[[Request], T]: """Create a FastAPI dependency that returns ``app.state.`` or 503.""" def dep(request: Request) -> T: val = getattr(request.app.state, attr, None) if val is None: raise HTTPException(status_code=503, detail=f"{label} not available") return cast(T, val) dep.__name__ = dep.__qualname__ = f"get_{attr}" return dep get_stream_bridge: Callable[[Request], StreamBridge] = _require("stream_bridge", "Stream bridge") get_run_manager: Callable[[Request], RunManager] = _require("run_manager", "Run manager") get_checkpointer: Callable[[Request], Checkpointer] = _require("checkpointer", "Checkpointer") get_run_event_store: Callable[[Request], RunEventStore] = _require("run_event_store", "Run event store") get_feedback_repo: Callable[[Request], FeedbackRepository] = _require("feedback_repo", "Feedback") get_run_store: Callable[[Request], RunStore] = _require("run_store", "Run store") def get_store(request: Request): """Return the global store (may be ``None`` if not configured).""" return getattr(request.app.state, "store", None) def get_thread_store(request: Request) -> ThreadMetaStore: """Return the thread metadata store (SQL or memory-backed).""" val = getattr(request.app.state, "thread_store", None) if val is None: raise HTTPException(status_code=503, detail="Thread metadata store not available") return val def get_run_context(request: Request) -> RunContext: """Build a :class:`RunContext` from ``app.state`` singletons. Returns a *base* context with infrastructure dependencies. The ``app_config`` field is resolved live so per-run fields (e.g. ``models[*].max_tokens``) follow ``config.yaml`` edits; the ``event_store`` / ``run_events_config`` pair stays frozen to the snapshot captured in :func:`langgraph_runtime` so callers never see a store bound to one backend paired with a config pointing at another. """ return RunContext( checkpointer=get_checkpointer(request), store=get_store(request), event_store=get_run_event_store(request), run_events_config=getattr(request.app.state, "run_events_config", None), thread_store=get_thread_store(request), app_config=get_config(), ) # --------------------------------------------------------------------------- # Auth helpers (used by authz.py and auth middleware) # --------------------------------------------------------------------------- # Cached singletons to avoid repeated instantiation per request _cached_local_provider: LocalAuthProvider | None = None _cached_repo: SQLiteUserRepository | None = None def get_local_provider() -> LocalAuthProvider: """Get or create the cached LocalAuthProvider singleton. Must be called after ``init_engine_from_config()`` — the shared session factory is required to construct the user repository. """ global _cached_local_provider, _cached_repo if _cached_repo is None: from app.gateway.auth.repositories.sqlite import SQLiteUserRepository from deerflow.persistence.engine import get_session_factory sf = get_session_factory() if sf is None: raise RuntimeError("get_local_provider() called before init_engine_from_config(); cannot access users table") _cached_repo = SQLiteUserRepository(sf) if _cached_local_provider is None: from app.gateway.auth.local_provider import LocalAuthProvider _cached_local_provider = LocalAuthProvider(repository=_cached_repo) return _cached_local_provider async def get_current_user_from_request(request: Request): """Get the current authenticated user from the request cookie. Raises HTTPException 401 if not authenticated. """ state = getattr(request, "state", None) state_user = getattr(state, "user", None) from app.gateway.auth_disabled import AUTH_SOURCE_AUTH_DISABLED, AUTH_SOURCE_INTERNAL, AUTH_SOURCE_SESSION if state_user is not None and getattr(state, "auth_source", None) in { AUTH_SOURCE_SESSION, AUTH_SOURCE_AUTH_DISABLED, AUTH_SOURCE_INTERNAL, }: return state_user from app.gateway.auth import decode_token from app.gateway.auth.errors import AuthErrorCode, AuthErrorResponse, TokenError, token_error_to_code access_token = request.cookies.get("access_token") if not access_token: raise HTTPException( status_code=401, detail=AuthErrorResponse(code=AuthErrorCode.NOT_AUTHENTICATED, message="Not authenticated").model_dump(), ) payload = decode_token(access_token) if isinstance(payload, TokenError): raise HTTPException( status_code=401, detail=AuthErrorResponse(code=token_error_to_code(payload), message=f"Token error: {payload.value}").model_dump(), ) provider = get_local_provider() user = await provider.get_user(payload.sub) if user is None: raise HTTPException( status_code=401, detail=AuthErrorResponse(code=AuthErrorCode.USER_NOT_FOUND, message="User not found").model_dump(), ) # Token version mismatch → password was changed, token is stale if user.token_version != payload.ver: raise HTTPException( status_code=401, detail=AuthErrorResponse(code=AuthErrorCode.TOKEN_INVALID, message="Token revoked (password changed)").model_dump(), ) return user async def get_optional_user_from_request(request: Request): """Get optional authenticated user from request. Returns None if not authenticated. """ try: return await get_current_user_from_request(request) except HTTPException: return None async def get_current_user(request: Request) -> str | None: """Extract user_id from request cookie, or None if not authenticated. Thin adapter that returns the string id for callers that only need identification (e.g., ``feedback.py``). Full-user callers should use ``get_current_user_from_request`` or ``get_optional_user_from_request``. """ user = await get_optional_user_from_request(request) return str(user.id) if user else None