mirror of
https://github.com/bytedance/deer-flow.git
synced 2026-05-23 00:16:48 +00:00
1a28334b19
`get_config(request)` returned the `app.state.config` snapshot captured at startup. The worker / lead-agent path then threaded that frozen `AppConfig` through `RunContext` and `agent_factory`, so per-run fields edited in `config.yaml` (notably `max_tokens`) were ignored until the gateway process was restarted — even though `get_app_config()` already does mtime-based reload at the bottom layer. Route the request dependency through `get_app_config()` directly. Runtime `ContextVar` overrides (`push_current_app_config`) and test-injected singletons (`set_app_config`) keep working; `app.state.config` is now only read at startup for one-shot bootstrap (logging level, IM channels, `langgraph_runtime` engines). `tests/test_gateway_deps_config.py` encoded the old snapshot contract and is removed; `tests/test_gateway_config_freshness.py` replaces it with mtime, ContextVar, and `set_app_config` coverage. `test_skills_custom_router.py` and `test_uploads_router.py` now inject test configs via FastAPI `dependency_overrides[get_config]` instead of mutating `app.state.config`. Document the hot-reload boundary in `backend/CLAUDE.md` so reviewers know which fields are picked up on the next request vs. which still require a restart (`database`, `checkpointer`, `run_events`, `stream_bridge`, `sandbox.use`, `log_level`, `channels.*`). Refs: bytedance/deer-flow#3107 (BUG-001)
256 lines
10 KiB
Python
256 lines
10 KiB
Python
"""Centralized accessors for singleton objects stored on ``app.state``.
|
||
|
||
**Getters** (used by routers): raise 503 when a required dependency is
|
||
missing, except ``get_store`` which returns ``None``.
|
||
|
||
Initialization is handled directly in ``app.py`` via :class:`AsyncExitStack`.
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
from collections.abc import AsyncGenerator, Callable
|
||
from contextlib import AsyncExitStack, asynccontextmanager
|
||
from typing import TYPE_CHECKING, TypeVar, cast
|
||
|
||
from fastapi import FastAPI, HTTPException, Request
|
||
from langgraph.types import Checkpointer
|
||
|
||
from deerflow.config.app_config import AppConfig, get_app_config
|
||
from deerflow.persistence.feedback import FeedbackRepository
|
||
from deerflow.runtime import RunContext, RunManager, StreamBridge
|
||
from deerflow.runtime.events.store.base import RunEventStore
|
||
from deerflow.runtime.runs.store.base import RunStore
|
||
|
||
if TYPE_CHECKING:
|
||
from app.gateway.auth.local_provider import LocalAuthProvider
|
||
from app.gateway.auth.repositories.sqlite import SQLiteUserRepository
|
||
from deerflow.persistence.thread_meta.base import ThreadMetaStore
|
||
|
||
|
||
T = TypeVar("T")
|
||
|
||
|
||
def get_config(request: Request) -> AppConfig: # noqa: ARG001 - kept for FastAPI Depends signature
|
||
"""Return the freshest ``AppConfig`` for the current request.
|
||
|
||
Routes through :func:`deerflow.config.app_config.get_app_config`, which
|
||
honours runtime ``ContextVar`` overrides and reloads ``config.yaml`` from
|
||
disk when its mtime changes. ``app.state.config`` is no longer consulted
|
||
on the request hot path — it is set at startup only for one-shot infra
|
||
bootstrap (logging level, IM channels, ``langgraph_runtime`` engines).
|
||
Reading from ``get_app_config`` here closes the bytedance/deer-flow
|
||
issue #3107 BUG-001 split-brain where the worker / lead-agent thread saw
|
||
a stale startup snapshot.
|
||
"""
|
||
try:
|
||
return get_app_config()
|
||
except FileNotFoundError as exc:
|
||
raise HTTPException(status_code=503, detail="Configuration not available") from exc
|
||
|
||
|
||
@asynccontextmanager
|
||
async def langgraph_runtime(app: FastAPI) -> AsyncGenerator[None, None]:
|
||
"""Bootstrap and tear down all LangGraph runtime singletons.
|
||
|
||
Usage in ``app.py``::
|
||
|
||
async with langgraph_runtime(app):
|
||
yield
|
||
"""
|
||
from deerflow.persistence.engine import close_engine, get_session_factory, init_engine_from_config
|
||
from deerflow.runtime import make_store, make_stream_bridge
|
||
from deerflow.runtime.checkpointer.async_provider import make_checkpointer
|
||
from deerflow.runtime.events.store import make_run_event_store
|
||
|
||
async with AsyncExitStack() as stack:
|
||
config = getattr(app.state, "config", None)
|
||
if config is None:
|
||
raise RuntimeError("langgraph_runtime() requires app.state.config to be initialized")
|
||
|
||
app.state.stream_bridge = await stack.enter_async_context(make_stream_bridge(config))
|
||
|
||
# Initialize persistence engine BEFORE checkpointer so that
|
||
# auto-create-database logic runs first (postgres backend).
|
||
await init_engine_from_config(config.database)
|
||
|
||
app.state.checkpointer = await stack.enter_async_context(make_checkpointer(config))
|
||
app.state.store = await stack.enter_async_context(make_store(config))
|
||
|
||
# Initialize repositories — one get_session_factory() call for all.
|
||
sf = get_session_factory()
|
||
if sf is not None:
|
||
from deerflow.persistence.feedback import FeedbackRepository
|
||
from deerflow.persistence.run import RunRepository
|
||
|
||
app.state.run_store = RunRepository(sf)
|
||
app.state.feedback_repo = FeedbackRepository(sf)
|
||
else:
|
||
from deerflow.runtime.runs.store.memory import MemoryRunStore
|
||
|
||
app.state.run_store = MemoryRunStore()
|
||
app.state.feedback_repo = None
|
||
|
||
from deerflow.persistence.thread_meta import make_thread_store
|
||
|
||
app.state.thread_store = make_thread_store(sf, app.state.store)
|
||
|
||
# Run event store (has its own factory with config-driven backend selection)
|
||
run_events_config = getattr(config, "run_events", None)
|
||
app.state.run_event_store = make_run_event_store(run_events_config)
|
||
|
||
# RunManager with store backing for persistence
|
||
app.state.run_manager = RunManager(store=app.state.run_store)
|
||
|
||
try:
|
||
yield
|
||
finally:
|
||
await close_engine()
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Getters – called by routers per-request
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
def _require(attr: str, label: str) -> Callable[[Request], T]:
|
||
"""Create a FastAPI dependency that returns ``app.state.<attr>`` or 503."""
|
||
|
||
def dep(request: Request) -> T:
|
||
val = getattr(request.app.state, attr, None)
|
||
if val is None:
|
||
raise HTTPException(status_code=503, detail=f"{label} not available")
|
||
return cast(T, val)
|
||
|
||
dep.__name__ = dep.__qualname__ = f"get_{attr}"
|
||
return dep
|
||
|
||
|
||
get_stream_bridge: Callable[[Request], StreamBridge] = _require("stream_bridge", "Stream bridge")
|
||
get_run_manager: Callable[[Request], RunManager] = _require("run_manager", "Run manager")
|
||
get_checkpointer: Callable[[Request], Checkpointer] = _require("checkpointer", "Checkpointer")
|
||
get_run_event_store: Callable[[Request], RunEventStore] = _require("run_event_store", "Run event store")
|
||
get_feedback_repo: Callable[[Request], FeedbackRepository] = _require("feedback_repo", "Feedback")
|
||
get_run_store: Callable[[Request], RunStore] = _require("run_store", "Run store")
|
||
|
||
|
||
def get_store(request: Request):
|
||
"""Return the global store (may be ``None`` if not configured)."""
|
||
return getattr(request.app.state, "store", None)
|
||
|
||
|
||
def get_thread_store(request: Request) -> ThreadMetaStore:
|
||
"""Return the thread metadata store (SQL or memory-backed)."""
|
||
val = getattr(request.app.state, "thread_store", None)
|
||
if val is None:
|
||
raise HTTPException(status_code=503, detail="Thread metadata store not available")
|
||
return val
|
||
|
||
|
||
def get_run_context(request: Request) -> RunContext:
|
||
"""Build a :class:`RunContext` from ``app.state`` singletons.
|
||
|
||
Returns a *base* context with infrastructure dependencies.
|
||
"""
|
||
config = get_config(request)
|
||
return RunContext(
|
||
checkpointer=get_checkpointer(request),
|
||
store=get_store(request),
|
||
event_store=get_run_event_store(request),
|
||
run_events_config=getattr(config, "run_events", None),
|
||
thread_store=get_thread_store(request),
|
||
app_config=config,
|
||
)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Auth helpers (used by authz.py and auth middleware)
|
||
# ---------------------------------------------------------------------------
|
||
|
||
# Cached singletons to avoid repeated instantiation per request
|
||
_cached_local_provider: LocalAuthProvider | None = None
|
||
_cached_repo: SQLiteUserRepository | None = None
|
||
|
||
|
||
def get_local_provider() -> LocalAuthProvider:
|
||
"""Get or create the cached LocalAuthProvider singleton.
|
||
|
||
Must be called after ``init_engine_from_config()`` — the shared
|
||
session factory is required to construct the user repository.
|
||
"""
|
||
global _cached_local_provider, _cached_repo
|
||
if _cached_repo is None:
|
||
from app.gateway.auth.repositories.sqlite import SQLiteUserRepository
|
||
from deerflow.persistence.engine import get_session_factory
|
||
|
||
sf = get_session_factory()
|
||
if sf is None:
|
||
raise RuntimeError("get_local_provider() called before init_engine_from_config(); cannot access users table")
|
||
_cached_repo = SQLiteUserRepository(sf)
|
||
if _cached_local_provider is None:
|
||
from app.gateway.auth.local_provider import LocalAuthProvider
|
||
|
||
_cached_local_provider = LocalAuthProvider(repository=_cached_repo)
|
||
return _cached_local_provider
|
||
|
||
|
||
async def get_current_user_from_request(request: Request):
|
||
"""Get the current authenticated user from the request cookie.
|
||
|
||
Raises HTTPException 401 if not authenticated.
|
||
"""
|
||
from app.gateway.auth import decode_token
|
||
from app.gateway.auth.errors import AuthErrorCode, AuthErrorResponse, TokenError, token_error_to_code
|
||
|
||
access_token = request.cookies.get("access_token")
|
||
if not access_token:
|
||
raise HTTPException(
|
||
status_code=401,
|
||
detail=AuthErrorResponse(code=AuthErrorCode.NOT_AUTHENTICATED, message="Not authenticated").model_dump(),
|
||
)
|
||
|
||
payload = decode_token(access_token)
|
||
if isinstance(payload, TokenError):
|
||
raise HTTPException(
|
||
status_code=401,
|
||
detail=AuthErrorResponse(code=token_error_to_code(payload), message=f"Token error: {payload.value}").model_dump(),
|
||
)
|
||
|
||
provider = get_local_provider()
|
||
user = await provider.get_user(payload.sub)
|
||
if user is None:
|
||
raise HTTPException(
|
||
status_code=401,
|
||
detail=AuthErrorResponse(code=AuthErrorCode.USER_NOT_FOUND, message="User not found").model_dump(),
|
||
)
|
||
|
||
# Token version mismatch → password was changed, token is stale
|
||
if user.token_version != payload.ver:
|
||
raise HTTPException(
|
||
status_code=401,
|
||
detail=AuthErrorResponse(code=AuthErrorCode.TOKEN_INVALID, message="Token revoked (password changed)").model_dump(),
|
||
)
|
||
|
||
return user
|
||
|
||
|
||
async def get_optional_user_from_request(request: Request):
|
||
"""Get optional authenticated user from request.
|
||
|
||
Returns None if not authenticated.
|
||
"""
|
||
try:
|
||
return await get_current_user_from_request(request)
|
||
except HTTPException:
|
||
return None
|
||
|
||
|
||
async def get_current_user(request: Request) -> str | None:
|
||
"""Extract user_id from request cookie, or None if not authenticated.
|
||
|
||
Thin adapter that returns the string id for callers that only need
|
||
identification (e.g., ``feedback.py``). Full-user callers should use
|
||
``get_current_user_from_request`` or ``get_optional_user_from_request``.
|
||
"""
|
||
user = await get_optional_user_from_request(request)
|
||
return str(user.id) if user else None
|