diff --git a/backend/CLAUDE.md b/backend/CLAUDE.md index 4732a57ee..d038ae390 100644 --- a/backend/CLAUDE.md +++ b/backend/CLAUDE.md @@ -184,7 +184,7 @@ Setup: Copy `config.example.yaml` to `config.yaml` in the **project root** direc **Config Caching**: `get_app_config()` caches the parsed config, but automatically reloads it when the resolved config path changes or the file's mtime increases. This keeps Gateway and LangGraph reads aligned with `config.yaml` edits without requiring a manual process restart. -**Config Hot-Reload Boundary**: Gateway dependencies route through `get_app_config()` on every request, so per-run fields like `models[*].max_tokens`, `summarization.*`, `title.*`, `memory.*`, `subagents.*`, `tools[*]`, and the agent system prompt pick up `config.yaml` edits on the next message. Infrastructure fields that the gateway captures once at startup are **restart-required**: +**Config Hot-Reload Boundary**: Gateway dependencies route through `get_app_config()` on every request, so per-run fields like `models[*].max_tokens`, `summarization.*`, `title.*`, `memory.*`, `subagents.*`, `tools[*]`, and the agent system prompt pick up `config.yaml` edits on the next message. `AppConfig` is intentionally **not** cached on `app.state` — `lifespan()` keeps a local `startup_config` variable for one-shot bootstrap work (logging level, channels, `langgraph_runtime` engines) and passes it explicitly to `langgraph_runtime(app, startup_config)`. Infrastructure fields are **restart-required**: | Field | Why a restart is required | |---|---| diff --git a/backend/app/gateway/app.py b/backend/app/gateway/app.py index 2c13f571c..8baecb363 100644 --- a/backend/app/gateway/app.py +++ b/backend/app/gateway/app.py @@ -161,10 +161,16 @@ async def _migrate_orphaned_threads(store, admin_user_id: str) -> int: async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: """Application lifespan handler.""" - # Load config and check necessary environment variables at startup + # Load config and check necessary environment variables at startup. + # `startup_config` is a local snapshot used only for one-shot bootstrap + # work (logging level, langgraph_runtime engines, channels). Request-time + # config resolution always routes through `get_app_config()` in + # `app/gateway/deps.py::get_config()` so `config.yaml` edits become + # visible without a process restart. We deliberately do NOT cache this + # snapshot on `app.state` to keep that contract enforceable. try: - app.state.config = get_app_config() - apply_logging_level(app.state.config.log_level) + startup_config = get_app_config() + apply_logging_level(startup_config.log_level) logger.info("Configuration loaded successfully") except Exception as e: error_msg = f"Failed to load configuration during gateway startup: {e}" @@ -174,7 +180,7 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: logger.info(f"Starting API Gateway on {config.host}:{config.port}") # Initialize LangGraph runtime components (StreamBridge, RunManager, checkpointer, store) - async with langgraph_runtime(app): + async with langgraph_runtime(app, startup_config): logger.info("LangGraph runtime initialised") # Check admin bootstrap state and migrate orphan threads after admin exists. @@ -185,7 +191,7 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: try: from app.channels.service import start_channel_service - channel_service = await start_channel_service(app.state.config) + channel_service = await start_channel_service(startup_config) logger.info("Channel service started: %s", channel_service.get_status()) except Exception: logger.exception("No IM channels configured or channel service failed to start") diff --git a/backend/app/gateway/deps.py b/backend/app/gateway/deps.py index d81bdb2c9..f045a2ee3 100644 --- a/backend/app/gateway/deps.py +++ b/backend/app/gateway/deps.py @@ -3,6 +3,15 @@ **Getters** (used by routers): raise 503 when a required dependency is missing, except ``get_store`` which returns ``None``. +``AppConfig`` is intentionally *not* cached on ``app.state``. Routers and the +run path resolve it through :func:`deerflow.config.app_config.get_app_config`, +which performs mtime-based hot reload, so edits to ``config.yaml`` take +effect on the next request without a process restart. The engines created in +:func:`langgraph_runtime` (stream bridge, persistence, checkpointer, store, +run-event store) accept a ``startup_config`` snapshot — they are +restart-required by design and stay bound to that snapshot to keep the live +process consistent with itself. + Initialization is handled directly in ``app.py`` via :class:`AsyncExitStack`. """ @@ -38,12 +47,14 @@ def get_config() -> AppConfig: Routes through :func:`deerflow.config.app_config.get_app_config`, which honours runtime ``ContextVar`` overrides and reloads ``config.yaml`` from - disk when its mtime changes. ``app.state.config`` is no longer consulted - on the request hot path — it is set at startup only for one-shot infra - bootstrap (logging level, IM channels, ``langgraph_runtime`` engines). - Reading from ``get_app_config`` here closes the bytedance/deer-flow - issue #3107 BUG-001 split-brain where the worker / lead-agent thread saw - a stale startup snapshot. + disk when its mtime changes. ``AppConfig`` is not cached on ``app.state`` + at all — the only startup-time snapshot lives as a local + ``startup_config`` variable inside ``lifespan()`` and is passed + explicitly into :func:`langgraph_runtime` for the engines that are + restart-required by design. Routing every request through + :func:`get_app_config` closes the bytedance/deer-flow issue #3107 BUG-001 + split-brain where the worker / lead-agent thread saw a stale startup + snapshot. Any failure to materialise the config (missing file, permission denied, YAML parse error, validation error) is reported as 503 — semantically @@ -58,12 +69,28 @@ def get_config() -> AppConfig: @asynccontextmanager -async def langgraph_runtime(app: FastAPI) -> AsyncGenerator[None, None]: +async def langgraph_runtime(app: FastAPI, startup_config: AppConfig) -> AsyncGenerator[None, None]: """Bootstrap and tear down all LangGraph runtime singletons. + ``startup_config`` is the ``AppConfig`` snapshot taken once during + ``lifespan()`` for one-shot infrastructure bootstrap. The engines and + stores constructed here (stream bridge, persistence engine, checkpointer, + store, run-event store) are restart-required by design — they hold live + connections, file handles, or singleton providers — so they bind to this + snapshot and survive across `config.yaml` edits. Request-time consumers + must still go through :func:`get_config` for any field that should be + hot-reloadable. See ``backend/CLAUDE.md`` "Config Hot-Reload Boundary". + + The matching ``run_events_config`` is frozen onto ``app.state`` so + :func:`get_run_context` pairs a freshly-loaded ``AppConfig`` with the + *startup-time* run-events configuration the underlying ``event_store`` + was built from — otherwise the runtime could end up combining a live + new ``run_events_config`` with an event store still bound to the + previous backend. + Usage in ``app.py``:: - async with langgraph_runtime(app): + async with langgraph_runtime(app, startup_config): yield """ from deerflow.persistence.engine import close_engine, get_session_factory, init_engine_from_config @@ -72,9 +99,7 @@ async def langgraph_runtime(app: FastAPI) -> AsyncGenerator[None, None]: from deerflow.runtime.events.store import make_run_event_store async with AsyncExitStack() as stack: - config = getattr(app.state, "config", None) - if config is None: - raise RuntimeError("langgraph_runtime() requires app.state.config to be initialized") + config = startup_config app.state.stream_bridge = await stack.enter_async_context(make_stream_bridge(config)) @@ -103,8 +128,12 @@ async def langgraph_runtime(app: FastAPI) -> AsyncGenerator[None, None]: app.state.thread_store = make_thread_store(sf, app.state.store) - # Run event store (has its own factory with config-driven backend selection) + # Run event store. The store and the matching ``run_events_config`` are + # both frozen at startup so ``get_run_context`` does not combine a + # freshly-reloaded ``AppConfig.run_events`` with a store still bound to + # the previous backend. run_events_config = getattr(config, "run_events", None) + app.state.run_events_config = run_events_config app.state.run_event_store = make_run_event_store(run_events_config) # RunManager with store backing for persistence @@ -158,16 +187,20 @@ def get_thread_store(request: Request) -> ThreadMetaStore: def get_run_context(request: Request) -> RunContext: """Build a :class:`RunContext` from ``app.state`` singletons. - Returns a *base* context with infrastructure dependencies. + Returns a *base* context with infrastructure dependencies. The + ``app_config`` field is resolved live so per-run fields (e.g. + ``models[*].max_tokens``) follow ``config.yaml`` edits; the + ``event_store`` / ``run_events_config`` pair stays frozen to the snapshot + captured in :func:`langgraph_runtime` so callers never see a store bound + to one backend paired with a config pointing at another. """ - config = get_config() return RunContext( checkpointer=get_checkpointer(request), store=get_store(request), event_store=get_run_event_store(request), - run_events_config=getattr(config, "run_events", None), + run_events_config=getattr(request.app.state, "run_events_config", None), thread_store=get_thread_store(request), - app_config=config, + app_config=get_config(), ) diff --git a/backend/tests/test_gateway_config_freshness.py b/backend/tests/test_gateway_config_freshness.py index b0d4268af..8f38ab6cc 100644 --- a/backend/tests/test_gateway_config_freshness.py +++ b/backend/tests/test_gateway_config_freshness.py @@ -111,6 +111,52 @@ def test_get_config_respects_test_set_app_config(): assert client.get("/probe").json() == {"log_level": "warning"} +def test_run_context_app_config_reflects_yaml_edit(tmp_path, monkeypatch): + """``RunContext.app_config`` must follow live `config.yaml` edits. + + BUG-001 review feedback: the run-context that feeds worker / lead-agent + factories must observe the same mtime reload that `get_config()` does; + otherwise stale config slips back in through the run path even after the + request dependency is fixed. + """ + from unittest.mock import MagicMock + + from app.gateway.deps import get_run_context + + config_file = tmp_path / "config.yaml" + _write_config_yaml(config_file, log_level="info") + monkeypatch.setenv("DEER_FLOW_CONFIG_PATH", str(config_file)) + + app = FastAPI() + # Sentinel values for the rest of the RunContext wiring — we only care + # about ``ctx.app_config`` for this assertion. + app.state.checkpointer = MagicMock() + app.state.store = MagicMock() + app.state.run_event_store = MagicMock() + app.state.run_events_config = {"frozen": "startup"} + app.state.thread_store = MagicMock() + + @app.get("/run-ctx-log-level") + def probe(ctx=Depends(get_run_context)): + return { + "log_level": ctx.app_config.log_level, + "run_events_config": ctx.run_events_config, + } + + client = TestClient(app) + first = client.get("/run-ctx-log-level").json() + assert first == {"log_level": "info", "run_events_config": {"frozen": "startup"}} + + _write_config_yaml(config_file, log_level="debug") + future_mtime = config_file.stat().st_mtime + 5 + os.utime(config_file, (future_mtime, future_mtime)) + + second = client.get("/run-ctx-log-level").json() + # app_config follows the edit; run_events_config stays frozen to the + # startup snapshot we wrote onto app.state above. + assert second == {"log_level": "debug", "run_events_config": {"frozen": "startup"}} + + @pytest.mark.parametrize( "exception", [