fix(gateway): drop app.state.config snapshot and freeze run_events_config

Address @ShenAC-SAC's BUG-001 review on #3131. The previous cut still
stored an ``AppConfig`` snapshot on ``app.state.config`` for startup
bootstrap. Two follow-on hazards from that:

1. Future code touching the gateway lifespan could accidentally start
   reading ``app.state.config`` again, silently regressing the request
   hot path back to a stale snapshot.
2. ``get_run_context()`` paired a freshly-reloaded ``AppConfig`` with the
   startup-bound ``event_store`` and a *live* ``run_events_config``
   field — so an operator who edited ``run_events.backend`` mid-flight
   would have produced a run context whose ``event_store`` and
   ``run_events_config`` referred to different backends.

Clean approach (aligned with the direction in PR #3128):

- ``lifespan()`` keeps a local ``startup_config`` variable and passes it
  explicitly into ``langgraph_runtime(app, startup_config)`` and into
  ``start_channel_service``. No ``app.state.config`` attribute is set at
  any point.
- ``langgraph_runtime`` now accepts ``startup_config`` as a required
  parameter, removing the ``getattr(app.state, "config", None)`` lookup
  and the "config not initialised" runtime error.
- The matching ``run_events_config`` is frozen onto ``app.state`` next
  to ``run_event_store`` so ``get_run_context`` reads the two from the
  same startup-time source. ``app_config`` continues to be resolved
  live via ``get_app_config()``.
- ``backend/CLAUDE.md`` boundary explanation updated to spell out the
  ``startup_config`` / ``get_app_config()`` split.

New regression test ``test_run_context_app_config_reflects_yaml_edit``
exercises the worker-feeding path: it asserts that ``ctx.app_config``
follows a mid-flight ``config.yaml`` edit while
``ctx.run_events_config`` stays frozen to the startup snapshot the
event store was built from.

Refs: bytedance/deer-flow#3107 (BUG-001), bytedance/deer-flow#3131 review
This commit is contained in:
fancyboi999
2026-05-21 18:51:24 +08:00
parent 02daaee1f2
commit c0bc7a0648
4 changed files with 107 additions and 22 deletions
+1 -1
View File
@@ -184,7 +184,7 @@ Setup: Copy `config.example.yaml` to `config.yaml` in the **project root** direc
**Config Caching**: `get_app_config()` caches the parsed config, but automatically reloads it when the resolved config path changes or the file's mtime increases. This keeps Gateway and LangGraph reads aligned with `config.yaml` edits without requiring a manual process restart. **Config Caching**: `get_app_config()` caches the parsed config, but automatically reloads it when the resolved config path changes or the file's mtime increases. This keeps Gateway and LangGraph reads aligned with `config.yaml` edits without requiring a manual process restart.
**Config Hot-Reload Boundary**: Gateway dependencies route through `get_app_config()` on every request, so per-run fields like `models[*].max_tokens`, `summarization.*`, `title.*`, `memory.*`, `subagents.*`, `tools[*]`, and the agent system prompt pick up `config.yaml` edits on the next message. Infrastructure fields that the gateway captures once at startup are **restart-required**: **Config Hot-Reload Boundary**: Gateway dependencies route through `get_app_config()` on every request, so per-run fields like `models[*].max_tokens`, `summarization.*`, `title.*`, `memory.*`, `subagents.*`, `tools[*]`, and the agent system prompt pick up `config.yaml` edits on the next message. `AppConfig` is intentionally **not** cached on `app.state``lifespan()` keeps a local `startup_config` variable for one-shot bootstrap work (logging level, channels, `langgraph_runtime` engines) and passes it explicitly to `langgraph_runtime(app, startup_config)`. Infrastructure fields are **restart-required**:
| Field | Why a restart is required | | Field | Why a restart is required |
|---|---| |---|---|
+11 -5
View File
@@ -161,10 +161,16 @@ async def _migrate_orphaned_threads(store, admin_user_id: str) -> int:
async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
"""Application lifespan handler.""" """Application lifespan handler."""
# Load config and check necessary environment variables at startup # Load config and check necessary environment variables at startup.
# `startup_config` is a local snapshot used only for one-shot bootstrap
# work (logging level, langgraph_runtime engines, channels). Request-time
# config resolution always routes through `get_app_config()` in
# `app/gateway/deps.py::get_config()` so `config.yaml` edits become
# visible without a process restart. We deliberately do NOT cache this
# snapshot on `app.state` to keep that contract enforceable.
try: try:
app.state.config = get_app_config() startup_config = get_app_config()
apply_logging_level(app.state.config.log_level) apply_logging_level(startup_config.log_level)
logger.info("Configuration loaded successfully") logger.info("Configuration loaded successfully")
except Exception as e: except Exception as e:
error_msg = f"Failed to load configuration during gateway startup: {e}" error_msg = f"Failed to load configuration during gateway startup: {e}"
@@ -174,7 +180,7 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
logger.info(f"Starting API Gateway on {config.host}:{config.port}") logger.info(f"Starting API Gateway on {config.host}:{config.port}")
# Initialize LangGraph runtime components (StreamBridge, RunManager, checkpointer, store) # Initialize LangGraph runtime components (StreamBridge, RunManager, checkpointer, store)
async with langgraph_runtime(app): async with langgraph_runtime(app, startup_config):
logger.info("LangGraph runtime initialised") logger.info("LangGraph runtime initialised")
# Check admin bootstrap state and migrate orphan threads after admin exists. # Check admin bootstrap state and migrate orphan threads after admin exists.
@@ -185,7 +191,7 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
try: try:
from app.channels.service import start_channel_service from app.channels.service import start_channel_service
channel_service = await start_channel_service(app.state.config) channel_service = await start_channel_service(startup_config)
logger.info("Channel service started: %s", channel_service.get_status()) logger.info("Channel service started: %s", channel_service.get_status())
except Exception: except Exception:
logger.exception("No IM channels configured or channel service failed to start") logger.exception("No IM channels configured or channel service failed to start")
+49 -16
View File
@@ -3,6 +3,15 @@
**Getters** (used by routers): raise 503 when a required dependency is **Getters** (used by routers): raise 503 when a required dependency is
missing, except ``get_store`` which returns ``None``. missing, except ``get_store`` which returns ``None``.
``AppConfig`` is intentionally *not* cached on ``app.state``. Routers and the
run path resolve it through :func:`deerflow.config.app_config.get_app_config`,
which performs mtime-based hot reload, so edits to ``config.yaml`` take
effect on the next request without a process restart. The engines created in
:func:`langgraph_runtime` (stream bridge, persistence, checkpointer, store,
run-event store) accept a ``startup_config`` snapshot — they are
restart-required by design and stay bound to that snapshot to keep the live
process consistent with itself.
Initialization is handled directly in ``app.py`` via :class:`AsyncExitStack`. Initialization is handled directly in ``app.py`` via :class:`AsyncExitStack`.
""" """
@@ -38,12 +47,14 @@ def get_config() -> AppConfig:
Routes through :func:`deerflow.config.app_config.get_app_config`, which Routes through :func:`deerflow.config.app_config.get_app_config`, which
honours runtime ``ContextVar`` overrides and reloads ``config.yaml`` from honours runtime ``ContextVar`` overrides and reloads ``config.yaml`` from
disk when its mtime changes. ``app.state.config`` is no longer consulted disk when its mtime changes. ``AppConfig`` is not cached on ``app.state``
on the request hot path — it is set at startup only for one-shot infra at all — the only startup-time snapshot lives as a local
bootstrap (logging level, IM channels, ``langgraph_runtime`` engines). ``startup_config`` variable inside ``lifespan()`` and is passed
Reading from ``get_app_config`` here closes the bytedance/deer-flow explicitly into :func:`langgraph_runtime` for the engines that are
issue #3107 BUG-001 split-brain where the worker / lead-agent thread saw restart-required by design. Routing every request through
a stale startup snapshot. :func:`get_app_config` closes the bytedance/deer-flow issue #3107 BUG-001
split-brain where the worker / lead-agent thread saw a stale startup
snapshot.
Any failure to materialise the config (missing file, permission denied, Any failure to materialise the config (missing file, permission denied,
YAML parse error, validation error) is reported as 503 — semantically YAML parse error, validation error) is reported as 503 — semantically
@@ -58,12 +69,28 @@ def get_config() -> AppConfig:
@asynccontextmanager @asynccontextmanager
async def langgraph_runtime(app: FastAPI) -> AsyncGenerator[None, None]: async def langgraph_runtime(app: FastAPI, startup_config: AppConfig) -> AsyncGenerator[None, None]:
"""Bootstrap and tear down all LangGraph runtime singletons. """Bootstrap and tear down all LangGraph runtime singletons.
``startup_config`` is the ``AppConfig`` snapshot taken once during
``lifespan()`` for one-shot infrastructure bootstrap. The engines and
stores constructed here (stream bridge, persistence engine, checkpointer,
store, run-event store) are restart-required by design — they hold live
connections, file handles, or singleton providers — so they bind to this
snapshot and survive across `config.yaml` edits. Request-time consumers
must still go through :func:`get_config` for any field that should be
hot-reloadable. See ``backend/CLAUDE.md`` "Config Hot-Reload Boundary".
The matching ``run_events_config`` is frozen onto ``app.state`` so
:func:`get_run_context` pairs a freshly-loaded ``AppConfig`` with the
*startup-time* run-events configuration the underlying ``event_store``
was built from — otherwise the runtime could end up combining a live
new ``run_events_config`` with an event store still bound to the
previous backend.
Usage in ``app.py``:: Usage in ``app.py``::
async with langgraph_runtime(app): async with langgraph_runtime(app, startup_config):
yield yield
""" """
from deerflow.persistence.engine import close_engine, get_session_factory, init_engine_from_config from deerflow.persistence.engine import close_engine, get_session_factory, init_engine_from_config
@@ -72,9 +99,7 @@ async def langgraph_runtime(app: FastAPI) -> AsyncGenerator[None, None]:
from deerflow.runtime.events.store import make_run_event_store from deerflow.runtime.events.store import make_run_event_store
async with AsyncExitStack() as stack: async with AsyncExitStack() as stack:
config = getattr(app.state, "config", None) config = startup_config
if config is None:
raise RuntimeError("langgraph_runtime() requires app.state.config to be initialized")
app.state.stream_bridge = await stack.enter_async_context(make_stream_bridge(config)) app.state.stream_bridge = await stack.enter_async_context(make_stream_bridge(config))
@@ -103,8 +128,12 @@ async def langgraph_runtime(app: FastAPI) -> AsyncGenerator[None, None]:
app.state.thread_store = make_thread_store(sf, app.state.store) app.state.thread_store = make_thread_store(sf, app.state.store)
# Run event store (has its own factory with config-driven backend selection) # Run event store. The store and the matching ``run_events_config`` are
# both frozen at startup so ``get_run_context`` does not combine a
# freshly-reloaded ``AppConfig.run_events`` with a store still bound to
# the previous backend.
run_events_config = getattr(config, "run_events", None) run_events_config = getattr(config, "run_events", None)
app.state.run_events_config = run_events_config
app.state.run_event_store = make_run_event_store(run_events_config) app.state.run_event_store = make_run_event_store(run_events_config)
# RunManager with store backing for persistence # RunManager with store backing for persistence
@@ -158,16 +187,20 @@ def get_thread_store(request: Request) -> ThreadMetaStore:
def get_run_context(request: Request) -> RunContext: def get_run_context(request: Request) -> RunContext:
"""Build a :class:`RunContext` from ``app.state`` singletons. """Build a :class:`RunContext` from ``app.state`` singletons.
Returns a *base* context with infrastructure dependencies. Returns a *base* context with infrastructure dependencies. The
``app_config`` field is resolved live so per-run fields (e.g.
``models[*].max_tokens``) follow ``config.yaml`` edits; the
``event_store`` / ``run_events_config`` pair stays frozen to the snapshot
captured in :func:`langgraph_runtime` so callers never see a store bound
to one backend paired with a config pointing at another.
""" """
config = get_config()
return RunContext( return RunContext(
checkpointer=get_checkpointer(request), checkpointer=get_checkpointer(request),
store=get_store(request), store=get_store(request),
event_store=get_run_event_store(request), event_store=get_run_event_store(request),
run_events_config=getattr(config, "run_events", None), run_events_config=getattr(request.app.state, "run_events_config", None),
thread_store=get_thread_store(request), thread_store=get_thread_store(request),
app_config=config, app_config=get_config(),
) )
@@ -111,6 +111,52 @@ def test_get_config_respects_test_set_app_config():
assert client.get("/probe").json() == {"log_level": "warning"} assert client.get("/probe").json() == {"log_level": "warning"}
def test_run_context_app_config_reflects_yaml_edit(tmp_path, monkeypatch):
"""``RunContext.app_config`` must follow live `config.yaml` edits.
BUG-001 review feedback: the run-context that feeds worker / lead-agent
factories must observe the same mtime reload that `get_config()` does;
otherwise stale config slips back in through the run path even after the
request dependency is fixed.
"""
from unittest.mock import MagicMock
from app.gateway.deps import get_run_context
config_file = tmp_path / "config.yaml"
_write_config_yaml(config_file, log_level="info")
monkeypatch.setenv("DEER_FLOW_CONFIG_PATH", str(config_file))
app = FastAPI()
# Sentinel values for the rest of the RunContext wiring — we only care
# about ``ctx.app_config`` for this assertion.
app.state.checkpointer = MagicMock()
app.state.store = MagicMock()
app.state.run_event_store = MagicMock()
app.state.run_events_config = {"frozen": "startup"}
app.state.thread_store = MagicMock()
@app.get("/run-ctx-log-level")
def probe(ctx=Depends(get_run_context)):
return {
"log_level": ctx.app_config.log_level,
"run_events_config": ctx.run_events_config,
}
client = TestClient(app)
first = client.get("/run-ctx-log-level").json()
assert first == {"log_level": "info", "run_events_config": {"frozen": "startup"}}
_write_config_yaml(config_file, log_level="debug")
future_mtime = config_file.stat().st_mtime + 5
os.utime(config_file, (future_mtime, future_mtime))
second = client.get("/run-ctx-log-level").json()
# app_config follows the edit; run_events_config stays frozen to the
# startup snapshot we wrote onto app.state above.
assert second == {"log_level": "debug", "run_events_config": {"frozen": "startup"}}
@pytest.mark.parametrize( @pytest.mark.parametrize(
"exception", "exception",
[ [