mirror of
https://github.com/bytedance/deer-flow.git
synced 2026-05-22 16:06:50 +00:00
c0bc7a0648
Address @ShenAC-SAC's BUG-001 review on #3131. The previous cut still stored an ``AppConfig`` snapshot on ``app.state.config`` for startup bootstrap. Two follow-on hazards from that: 1. Future code touching the gateway lifespan could accidentally start reading ``app.state.config`` again, silently regressing the request hot path back to a stale snapshot. 2. ``get_run_context()`` paired a freshly-reloaded ``AppConfig`` with the startup-bound ``event_store`` and a *live* ``run_events_config`` field — so an operator who edited ``run_events.backend`` mid-flight would have produced a run context whose ``event_store`` and ``run_events_config`` referred to different backends. Clean approach (aligned with the direction in PR #3128): - ``lifespan()`` keeps a local ``startup_config`` variable and passes it explicitly into ``langgraph_runtime(app, startup_config)`` and into ``start_channel_service``. No ``app.state.config`` attribute is set at any point. - ``langgraph_runtime`` now accepts ``startup_config`` as a required parameter, removing the ``getattr(app.state, "config", None)`` lookup and the "config not initialised" runtime error. - The matching ``run_events_config`` is frozen onto ``app.state`` next to ``run_event_store`` so ``get_run_context`` reads the two from the same startup-time source. ``app_config`` continues to be resolved live via ``get_app_config()``. - ``backend/CLAUDE.md`` boundary explanation updated to spell out the ``startup_config`` / ``get_app_config()`` split. New regression test ``test_run_context_app_config_reflects_yaml_edit`` exercises the worker-feeding path: it asserts that ``ctx.app_config`` follows a mid-flight ``config.yaml`` edit while ``ctx.run_events_config`` stays frozen to the startup snapshot the event store was built from. Refs: bytedance/deer-flow#3107 (BUG-001), bytedance/deer-flow#3131 review
298 lines
12 KiB
Python
298 lines
12 KiB
Python
"""Centralized accessors for singleton objects stored on ``app.state``.
|
||
|
||
**Getters** (used by routers): raise 503 when a required dependency is
|
||
missing, except ``get_store`` which returns ``None``.
|
||
|
||
``AppConfig`` is intentionally *not* cached on ``app.state``. Routers and the
|
||
run path resolve it through :func:`deerflow.config.app_config.get_app_config`,
|
||
which performs mtime-based hot reload, so edits to ``config.yaml`` take
|
||
effect on the next request without a process restart. The engines created in
|
||
:func:`langgraph_runtime` (stream bridge, persistence, checkpointer, store,
|
||
run-event store) accept a ``startup_config`` snapshot — they are
|
||
restart-required by design and stay bound to that snapshot to keep the live
|
||
process consistent with itself.
|
||
|
||
Initialization is handled directly in ``app.py`` via :class:`AsyncExitStack`.
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import logging
|
||
from collections.abc import AsyncGenerator, Callable
|
||
from contextlib import AsyncExitStack, asynccontextmanager
|
||
from typing import TYPE_CHECKING, TypeVar, cast
|
||
|
||
from fastapi import FastAPI, HTTPException, Request
|
||
from langgraph.types import Checkpointer
|
||
|
||
from deerflow.config.app_config import AppConfig, get_app_config
|
||
from deerflow.persistence.feedback import FeedbackRepository
|
||
from deerflow.runtime import RunContext, RunManager, StreamBridge
|
||
from deerflow.runtime.events.store.base import RunEventStore
|
||
from deerflow.runtime.runs.store.base import RunStore
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
if TYPE_CHECKING:
|
||
from app.gateway.auth.local_provider import LocalAuthProvider
|
||
from app.gateway.auth.repositories.sqlite import SQLiteUserRepository
|
||
from deerflow.persistence.thread_meta.base import ThreadMetaStore
|
||
|
||
|
||
T = TypeVar("T")
|
||
|
||
|
||
def get_config() -> AppConfig:
|
||
"""Return the freshest ``AppConfig`` for the current request.
|
||
|
||
Routes through :func:`deerflow.config.app_config.get_app_config`, which
|
||
honours runtime ``ContextVar`` overrides and reloads ``config.yaml`` from
|
||
disk when its mtime changes. ``AppConfig`` is not cached on ``app.state``
|
||
at all — the only startup-time snapshot lives as a local
|
||
``startup_config`` variable inside ``lifespan()`` and is passed
|
||
explicitly into :func:`langgraph_runtime` for the engines that are
|
||
restart-required by design. Routing every request through
|
||
:func:`get_app_config` closes the bytedance/deer-flow issue #3107 BUG-001
|
||
split-brain where the worker / lead-agent thread saw a stale startup
|
||
snapshot.
|
||
|
||
Any failure to materialise the config (missing file, permission denied,
|
||
YAML parse error, validation error) is reported as 503 — semantically
|
||
"the gateway cannot serve requests without a usable configuration" — and
|
||
logged with the original exception so operators have something to debug.
|
||
"""
|
||
try:
|
||
return get_app_config()
|
||
except Exception as exc: # noqa: BLE001 - request boundary: log and degrade gracefully
|
||
logger.exception("Failed to load AppConfig at request time")
|
||
raise HTTPException(status_code=503, detail="Configuration not available") from exc
|
||
|
||
|
||
@asynccontextmanager
|
||
async def langgraph_runtime(app: FastAPI, startup_config: AppConfig) -> AsyncGenerator[None, None]:
|
||
"""Bootstrap and tear down all LangGraph runtime singletons.
|
||
|
||
``startup_config`` is the ``AppConfig`` snapshot taken once during
|
||
``lifespan()`` for one-shot infrastructure bootstrap. The engines and
|
||
stores constructed here (stream bridge, persistence engine, checkpointer,
|
||
store, run-event store) are restart-required by design — they hold live
|
||
connections, file handles, or singleton providers — so they bind to this
|
||
snapshot and survive across `config.yaml` edits. Request-time consumers
|
||
must still go through :func:`get_config` for any field that should be
|
||
hot-reloadable. See ``backend/CLAUDE.md`` "Config Hot-Reload Boundary".
|
||
|
||
The matching ``run_events_config`` is frozen onto ``app.state`` so
|
||
:func:`get_run_context` pairs a freshly-loaded ``AppConfig`` with the
|
||
*startup-time* run-events configuration the underlying ``event_store``
|
||
was built from — otherwise the runtime could end up combining a live
|
||
new ``run_events_config`` with an event store still bound to the
|
||
previous backend.
|
||
|
||
Usage in ``app.py``::
|
||
|
||
async with langgraph_runtime(app, startup_config):
|
||
yield
|
||
"""
|
||
from deerflow.persistence.engine import close_engine, get_session_factory, init_engine_from_config
|
||
from deerflow.runtime import make_store, make_stream_bridge
|
||
from deerflow.runtime.checkpointer.async_provider import make_checkpointer
|
||
from deerflow.runtime.events.store import make_run_event_store
|
||
|
||
async with AsyncExitStack() as stack:
|
||
config = startup_config
|
||
|
||
app.state.stream_bridge = await stack.enter_async_context(make_stream_bridge(config))
|
||
|
||
# Initialize persistence engine BEFORE checkpointer so that
|
||
# auto-create-database logic runs first (postgres backend).
|
||
await init_engine_from_config(config.database)
|
||
|
||
app.state.checkpointer = await stack.enter_async_context(make_checkpointer(config))
|
||
app.state.store = await stack.enter_async_context(make_store(config))
|
||
|
||
# Initialize repositories — one get_session_factory() call for all.
|
||
sf = get_session_factory()
|
||
if sf is not None:
|
||
from deerflow.persistence.feedback import FeedbackRepository
|
||
from deerflow.persistence.run import RunRepository
|
||
|
||
app.state.run_store = RunRepository(sf)
|
||
app.state.feedback_repo = FeedbackRepository(sf)
|
||
else:
|
||
from deerflow.runtime.runs.store.memory import MemoryRunStore
|
||
|
||
app.state.run_store = MemoryRunStore()
|
||
app.state.feedback_repo = None
|
||
|
||
from deerflow.persistence.thread_meta import make_thread_store
|
||
|
||
app.state.thread_store = make_thread_store(sf, app.state.store)
|
||
|
||
# Run event store. The store and the matching ``run_events_config`` are
|
||
# both frozen at startup so ``get_run_context`` does not combine a
|
||
# freshly-reloaded ``AppConfig.run_events`` with a store still bound to
|
||
# the previous backend.
|
||
run_events_config = getattr(config, "run_events", None)
|
||
app.state.run_events_config = run_events_config
|
||
app.state.run_event_store = make_run_event_store(run_events_config)
|
||
|
||
# RunManager with store backing for persistence
|
||
app.state.run_manager = RunManager(store=app.state.run_store)
|
||
|
||
try:
|
||
yield
|
||
finally:
|
||
await close_engine()
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Getters – called by routers per-request
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
def _require(attr: str, label: str) -> Callable[[Request], T]:
|
||
"""Create a FastAPI dependency that returns ``app.state.<attr>`` or 503."""
|
||
|
||
def dep(request: Request) -> T:
|
||
val = getattr(request.app.state, attr, None)
|
||
if val is None:
|
||
raise HTTPException(status_code=503, detail=f"{label} not available")
|
||
return cast(T, val)
|
||
|
||
dep.__name__ = dep.__qualname__ = f"get_{attr}"
|
||
return dep
|
||
|
||
|
||
get_stream_bridge: Callable[[Request], StreamBridge] = _require("stream_bridge", "Stream bridge")
|
||
get_run_manager: Callable[[Request], RunManager] = _require("run_manager", "Run manager")
|
||
get_checkpointer: Callable[[Request], Checkpointer] = _require("checkpointer", "Checkpointer")
|
||
get_run_event_store: Callable[[Request], RunEventStore] = _require("run_event_store", "Run event store")
|
||
get_feedback_repo: Callable[[Request], FeedbackRepository] = _require("feedback_repo", "Feedback")
|
||
get_run_store: Callable[[Request], RunStore] = _require("run_store", "Run store")
|
||
|
||
|
||
def get_store(request: Request):
|
||
"""Return the global store (may be ``None`` if not configured)."""
|
||
return getattr(request.app.state, "store", None)
|
||
|
||
|
||
def get_thread_store(request: Request) -> ThreadMetaStore:
|
||
"""Return the thread metadata store (SQL or memory-backed)."""
|
||
val = getattr(request.app.state, "thread_store", None)
|
||
if val is None:
|
||
raise HTTPException(status_code=503, detail="Thread metadata store not available")
|
||
return val
|
||
|
||
|
||
def get_run_context(request: Request) -> RunContext:
|
||
"""Build a :class:`RunContext` from ``app.state`` singletons.
|
||
|
||
Returns a *base* context with infrastructure dependencies. The
|
||
``app_config`` field is resolved live so per-run fields (e.g.
|
||
``models[*].max_tokens``) follow ``config.yaml`` edits; the
|
||
``event_store`` / ``run_events_config`` pair stays frozen to the snapshot
|
||
captured in :func:`langgraph_runtime` so callers never see a store bound
|
||
to one backend paired with a config pointing at another.
|
||
"""
|
||
return RunContext(
|
||
checkpointer=get_checkpointer(request),
|
||
store=get_store(request),
|
||
event_store=get_run_event_store(request),
|
||
run_events_config=getattr(request.app.state, "run_events_config", None),
|
||
thread_store=get_thread_store(request),
|
||
app_config=get_config(),
|
||
)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Auth helpers (used by authz.py and auth middleware)
|
||
# ---------------------------------------------------------------------------
|
||
|
||
# Cached singletons to avoid repeated instantiation per request
|
||
_cached_local_provider: LocalAuthProvider | None = None
|
||
_cached_repo: SQLiteUserRepository | None = None
|
||
|
||
|
||
def get_local_provider() -> LocalAuthProvider:
|
||
"""Get or create the cached LocalAuthProvider singleton.
|
||
|
||
Must be called after ``init_engine_from_config()`` — the shared
|
||
session factory is required to construct the user repository.
|
||
"""
|
||
global _cached_local_provider, _cached_repo
|
||
if _cached_repo is None:
|
||
from app.gateway.auth.repositories.sqlite import SQLiteUserRepository
|
||
from deerflow.persistence.engine import get_session_factory
|
||
|
||
sf = get_session_factory()
|
||
if sf is None:
|
||
raise RuntimeError("get_local_provider() called before init_engine_from_config(); cannot access users table")
|
||
_cached_repo = SQLiteUserRepository(sf)
|
||
if _cached_local_provider is None:
|
||
from app.gateway.auth.local_provider import LocalAuthProvider
|
||
|
||
_cached_local_provider = LocalAuthProvider(repository=_cached_repo)
|
||
return _cached_local_provider
|
||
|
||
|
||
async def get_current_user_from_request(request: Request):
|
||
"""Get the current authenticated user from the request cookie.
|
||
|
||
Raises HTTPException 401 if not authenticated.
|
||
"""
|
||
from app.gateway.auth import decode_token
|
||
from app.gateway.auth.errors import AuthErrorCode, AuthErrorResponse, TokenError, token_error_to_code
|
||
|
||
access_token = request.cookies.get("access_token")
|
||
if not access_token:
|
||
raise HTTPException(
|
||
status_code=401,
|
||
detail=AuthErrorResponse(code=AuthErrorCode.NOT_AUTHENTICATED, message="Not authenticated").model_dump(),
|
||
)
|
||
|
||
payload = decode_token(access_token)
|
||
if isinstance(payload, TokenError):
|
||
raise HTTPException(
|
||
status_code=401,
|
||
detail=AuthErrorResponse(code=token_error_to_code(payload), message=f"Token error: {payload.value}").model_dump(),
|
||
)
|
||
|
||
provider = get_local_provider()
|
||
user = await provider.get_user(payload.sub)
|
||
if user is None:
|
||
raise HTTPException(
|
||
status_code=401,
|
||
detail=AuthErrorResponse(code=AuthErrorCode.USER_NOT_FOUND, message="User not found").model_dump(),
|
||
)
|
||
|
||
# Token version mismatch → password was changed, token is stale
|
||
if user.token_version != payload.ver:
|
||
raise HTTPException(
|
||
status_code=401,
|
||
detail=AuthErrorResponse(code=AuthErrorCode.TOKEN_INVALID, message="Token revoked (password changed)").model_dump(),
|
||
)
|
||
|
||
return user
|
||
|
||
|
||
async def get_optional_user_from_request(request: Request):
|
||
"""Get optional authenticated user from request.
|
||
|
||
Returns None if not authenticated.
|
||
"""
|
||
try:
|
||
return await get_current_user_from_request(request)
|
||
except HTTPException:
|
||
return None
|
||
|
||
|
||
async def get_current_user(request: Request) -> str | None:
|
||
"""Extract user_id from request cookie, or None if not authenticated.
|
||
|
||
Thin adapter that returns the string id for callers that only need
|
||
identification (e.g., ``feedback.py``). Full-user callers should use
|
||
``get_current_user_from_request`` or ``get_optional_user_from_request``.
|
||
"""
|
||
user = await get_optional_user_from_request(request)
|
||
return str(user.id) if user else None
|