fix(config): reset config-backed singletons on hot reload (#2588)

* Fix stale config singletons on reload

* fix(config): update checkpointer imports after runtime move

* Fix config reload singleton mutation on validation failure

---------

Co-authored-by: Willem Jiang <willem.jiang@gmail.com>
This commit is contained in:
KiteEater
2026-05-06 10:17:55 +08:00
committed by GitHub
parent 59c4a3f0a4
commit 4ead2c6b19
5 changed files with 259 additions and 48 deletions
@@ -1,5 +1,6 @@
import logging import logging
import os import os
from collections.abc import Mapping
from contextvars import ContextVar from contextvars import ContextVar
from pathlib import Path from pathlib import Path
from typing import Any, Self from typing import Any, Self
@@ -157,56 +158,54 @@ class AppConfig(BaseModel):
config_data = cls.resolve_env_variables(config_data) config_data = cls.resolve_env_variables(config_data)
cls._apply_database_defaults(config_data) cls._apply_database_defaults(config_data)
# Load title config if present
if "title" in config_data:
load_title_config_from_dict(config_data["title"])
# Load summarization config if present
if "summarization" in config_data:
load_summarization_config_from_dict(config_data["summarization"])
# Load memory config if present
if "memory" in config_data:
load_memory_config_from_dict(config_data["memory"])
# Always refresh agents API config so removed config sections reset
# singleton-backed state to its default/disabled values on reload.
load_agents_api_config_from_dict(config_data.get("agents_api") or {})
# Load subagents config if present
if "subagents" in config_data:
load_subagents_config_from_dict(config_data["subagents"])
# Load tool_search config if present
if "tool_search" in config_data:
load_tool_search_config_from_dict(config_data["tool_search"])
# Load guardrails config if present
if "guardrails" in config_data:
load_guardrails_config_from_dict(config_data["guardrails"])
# Load circuit_breaker config if present # Load circuit_breaker config if present
if "circuit_breaker" in config_data: if "circuit_breaker" in config_data:
config_data["circuit_breaker"] = config_data["circuit_breaker"] config_data["circuit_breaker"] = config_data["circuit_breaker"]
# Load checkpointer config if present
if "checkpointer" in config_data:
load_checkpointer_config_from_dict(config_data["checkpointer"])
# Load stream bridge config if present
if "stream_bridge" in config_data:
load_stream_bridge_config_from_dict(config_data["stream_bridge"])
# Always refresh ACP agent config so removed entries do not linger across reloads.
load_acp_config_from_dict(config_data.get("acp_agents", {}))
# Load extensions config separately (it's in a different file) # Load extensions config separately (it's in a different file)
extensions_config = ExtensionsConfig.from_file() extensions_config = ExtensionsConfig.from_file()
config_data["extensions"] = extensions_config.model_dump() config_data["extensions"] = extensions_config.model_dump()
result = cls.model_validate(config_data) result = cls.model_validate(config_data)
acp_agents = cls._validate_acp_agents(config_data.get("acp_agents", {}))
cls._apply_singleton_configs(result, acp_agents)
return result return result
@classmethod
def _validate_acp_agents(
cls,
config_data: Mapping[str, Mapping[str, object]] | None,
) -> dict[str, ACPAgentConfig]:
if config_data is None:
config_data = {}
return {name: ACPAgentConfig(**cfg) for name, cfg in config_data.items()}
@classmethod
def _apply_singleton_configs(cls, config: Self, acp_agents: dict[str, ACPAgentConfig]) -> None:
from deerflow.config.checkpointer_config import get_checkpointer_config
previous_checkpointer_config = get_checkpointer_config()
load_title_config_from_dict(config.title.model_dump())
load_summarization_config_from_dict(config.summarization.model_dump())
load_memory_config_from_dict(config.memory.model_dump())
load_agents_api_config_from_dict(config.agents_api.model_dump())
load_subagents_config_from_dict(config.subagents.model_dump())
load_tool_search_config_from_dict(config.tool_search.model_dump())
load_guardrails_config_from_dict(config.guardrails.model_dump())
load_checkpointer_config_from_dict(config.checkpointer.model_dump() if config.checkpointer is not None else None)
load_stream_bridge_config_from_dict(config.stream_bridge.model_dump() if config.stream_bridge is not None else None)
load_acp_config_from_dict({name: agent.model_dump() for name, agent in acp_agents.items()})
if previous_checkpointer_config != config.checkpointer:
# These runtime singletons derive their backend from checkpointer config.
# Keep imports local to avoid cycles: both providers import get_app_config.
from deerflow.runtime.checkpointer import reset_checkpointer
from deerflow.runtime.store import reset_store
reset_checkpointer()
reset_store()
@classmethod @classmethod
def _apply_database_defaults(cls, config_data: dict[str, Any]) -> None: def _apply_database_defaults(cls, config_data: dict[str, Any]) -> None:
"""Apply config.yaml defaults for persistence when the section is absent.""" """Apply config.yaml defaults for persistence when the section is absent."""
@@ -40,7 +40,10 @@ def set_checkpointer_config(config: CheckpointerConfig | None) -> None:
_checkpointer_config = config _checkpointer_config = config
def load_checkpointer_config_from_dict(config_dict: dict) -> None: def load_checkpointer_config_from_dict(config_dict: dict | None) -> None:
"""Load checkpointer configuration from a dictionary.""" """Load checkpointer configuration from a dictionary."""
global _checkpointer_config global _checkpointer_config
if config_dict is None:
_checkpointer_config = None
return
_checkpointer_config = CheckpointerConfig(**config_dict) _checkpointer_config = CheckpointerConfig(**config_dict)
@@ -40,7 +40,10 @@ def set_stream_bridge_config(config: StreamBridgeConfig | None) -> None:
_stream_bridge_config = config _stream_bridge_config = config
def load_stream_bridge_config_from_dict(config_dict: dict) -> None: def load_stream_bridge_config_from_dict(config_dict: dict | None) -> None:
"""Load stream bridge configuration from a dictionary.""" """Load stream bridge configuration from a dictionary."""
global _stream_bridge_config global _stream_bridge_config
if config_dict is None:
_stream_bridge_config = None
return
_stream_bridge_config = StreamBridgeConfig(**config_dict) _stream_bridge_config = StreamBridgeConfig(**config_dict)
@@ -179,9 +179,3 @@ def load_subagents_config_from_dict(config_dict: dict) -> None:
overrides_summary or "none", overrides_summary or "none",
custom_agents_names or "none", custom_agents_names or "none",
) )
else:
logger.info(
"Subagents config loaded: default timeout=%ss, default max_turns=%s, no per-agent overrides",
_subagents_config.timeout_seconds,
_subagents_config.max_turns,
)
+213 -1
View File
@@ -4,10 +4,40 @@ import json
import os import os
from pathlib import Path from pathlib import Path
import pytest
import yaml import yaml
from pydantic import ValidationError
from deerflow.config.agents_api_config import get_agents_api_config import deerflow.config.app_config as app_config_module
from deerflow.config.acp_config import load_acp_config_from_dict
from deerflow.config.agents_api_config import get_agents_api_config, load_agents_api_config_from_dict
from deerflow.config.app_config import AppConfig, get_app_config, reset_app_config from deerflow.config.app_config import AppConfig, get_app_config, reset_app_config
from deerflow.config.checkpointer_config import get_checkpointer_config, load_checkpointer_config_from_dict
from deerflow.config.guardrails_config import get_guardrails_config, load_guardrails_config_from_dict
from deerflow.config.memory_config import get_memory_config, load_memory_config_from_dict
from deerflow.config.stream_bridge_config import get_stream_bridge_config, load_stream_bridge_config_from_dict
from deerflow.config.subagents_config import get_subagents_app_config, load_subagents_config_from_dict
from deerflow.config.summarization_config import get_summarization_config, load_summarization_config_from_dict
from deerflow.config.title_config import get_title_config, load_title_config_from_dict
from deerflow.config.tool_search_config import get_tool_search_config, load_tool_search_config_from_dict
from deerflow.runtime.checkpointer import get_checkpointer, reset_checkpointer
from deerflow.runtime.store import get_store, reset_store
def _reset_config_singletons() -> None:
load_title_config_from_dict({})
load_summarization_config_from_dict({})
load_memory_config_from_dict({})
load_agents_api_config_from_dict({})
load_subagents_config_from_dict({})
load_tool_search_config_from_dict({})
load_guardrails_config_from_dict({})
load_checkpointer_config_from_dict(None)
load_stream_bridge_config_from_dict(None)
load_acp_config_from_dict({})
reset_checkpointer()
reset_store()
reset_app_config()
def _write_config(path: Path, *, model_name: str, supports_thinking: bool) -> None: def _write_config(path: Path, *, model_name: str, supports_thinking: bool) -> None:
@@ -53,6 +83,23 @@ def _write_config_with_agents_api(
path.write_text(yaml.safe_dump(config), encoding="utf-8") path.write_text(yaml.safe_dump(config), encoding="utf-8")
def _write_config_with_sections(path: Path, sections: dict | None = None) -> None:
config = {
"sandbox": {"use": "deerflow.sandbox.local:LocalSandboxProvider"},
"models": [
{
"name": "first-model",
"use": "langchain_openai:ChatOpenAI",
"model": "gpt-test",
}
],
}
if sections:
config.update(sections)
path.write_text(yaml.safe_dump(config), encoding="utf-8")
def _write_extensions_config(path: Path) -> None: def _write_extensions_config(path: Path) -> None:
path.write_text(json.dumps({"mcpServers": {}, "skills": {}}), encoding="utf-8") path.write_text(json.dumps({"mcpServers": {}, "skills": {}}), encoding="utf-8")
@@ -175,3 +222,168 @@ def test_get_app_config_resets_agents_api_config_when_section_removed(tmp_path,
assert get_agents_api_config().enabled is False assert get_agents_api_config().enabled is False
finally: finally:
reset_app_config() reset_app_config()
def test_get_app_config_resets_singleton_configs_when_sections_removed(tmp_path, monkeypatch):
config_path = tmp_path / "config.yaml"
extensions_path = tmp_path / "extensions_config.json"
_write_extensions_config(extensions_path)
_write_config_with_sections(
config_path,
{
"title": {"enabled": False, "max_words": 3},
"summarization": {"enabled": True},
"memory": {"enabled": False, "max_facts": 50},
"subagents": {"timeout_seconds": 42, "agents": {"reviewer": {"max_turns": 2}}},
"tool_search": {"enabled": True},
"guardrails": {"enabled": True, "fail_closed": False},
"checkpointer": {"type": "memory"},
"stream_bridge": {"type": "memory", "queue_maxsize": 12},
},
)
monkeypatch.setenv("DEER_FLOW_CONFIG_PATH", str(config_path))
monkeypatch.setenv("DEER_FLOW_EXTENSIONS_CONFIG_PATH", str(extensions_path))
reset_app_config()
try:
get_app_config()
assert get_title_config().enabled is False
assert get_summarization_config().enabled is True
assert get_memory_config().enabled is False
assert get_subagents_app_config().timeout_seconds == 42
assert get_tool_search_config().enabled is True
assert get_guardrails_config().enabled is True
assert get_checkpointer_config() is not None
assert get_stream_bridge_config() is not None
_write_config_with_sections(config_path)
next_mtime = config_path.stat().st_mtime + 5
os.utime(config_path, (next_mtime, next_mtime))
get_app_config()
assert get_title_config().enabled is True
assert get_summarization_config().enabled is False
assert get_memory_config().enabled is True
assert get_subagents_app_config().timeout_seconds == 900
assert get_tool_search_config().enabled is False
assert get_guardrails_config().enabled is False
assert get_checkpointer_config() is None
assert get_stream_bridge_config() is None
finally:
_reset_config_singletons()
def test_get_app_config_resets_persistence_runtime_singletons_when_checkpointer_removed(tmp_path, monkeypatch):
config_path = tmp_path / "config.yaml"
extensions_path = tmp_path / "extensions_config.json"
_write_extensions_config(extensions_path)
_write_config_with_sections(config_path, {"checkpointer": {"type": "memory"}})
monkeypatch.setenv("DEER_FLOW_CONFIG_PATH", str(config_path))
monkeypatch.setenv("DEER_FLOW_EXTENSIONS_CONFIG_PATH", str(extensions_path))
reset_checkpointer()
reset_store()
reset_app_config()
try:
get_app_config()
initial_checkpointer = get_checkpointer()
initial_store = get_store()
_write_config_with_sections(config_path)
next_mtime = config_path.stat().st_mtime + 5
os.utime(config_path, (next_mtime, next_mtime))
get_app_config()
assert get_checkpointer_config() is None
assert get_checkpointer() is not initial_checkpointer
assert get_store() is not initial_store
finally:
_reset_config_singletons()
def test_get_app_config_keeps_persistence_runtime_singletons_when_checkpointer_unchanged(tmp_path, monkeypatch):
config_path = tmp_path / "config.yaml"
extensions_path = tmp_path / "extensions_config.json"
_write_extensions_config(extensions_path)
_write_config_with_sections(
config_path,
{
"title": {"enabled": False},
"checkpointer": {"type": "memory"},
},
)
monkeypatch.setenv("DEER_FLOW_CONFIG_PATH", str(config_path))
monkeypatch.setenv("DEER_FLOW_EXTENSIONS_CONFIG_PATH", str(extensions_path))
_reset_config_singletons()
try:
get_app_config()
initial_checkpointer = get_checkpointer()
initial_store = get_store()
_write_config_with_sections(
config_path,
{
"title": {"enabled": True},
"checkpointer": {"type": "memory"},
},
)
next_mtime = config_path.stat().st_mtime + 5
os.utime(config_path, (next_mtime, next_mtime))
get_app_config()
assert get_checkpointer() is initial_checkpointer
assert get_store() is initial_store
finally:
_reset_config_singletons()
def test_get_app_config_does_not_mutate_singletons_when_reload_validation_fails(tmp_path, monkeypatch):
config_path = tmp_path / "config.yaml"
extensions_path = tmp_path / "extensions_config.json"
_write_extensions_config(extensions_path)
_write_config_with_sections(
config_path,
{
"title": {"enabled": False},
"tool_search": {"enabled": True},
"checkpointer": {"type": "memory"},
},
)
monkeypatch.setenv("DEER_FLOW_CONFIG_PATH", str(config_path))
monkeypatch.setenv("DEER_FLOW_EXTENSIONS_CONFIG_PATH", str(extensions_path))
_reset_config_singletons()
try:
previous_app_config = get_app_config()
initial_checkpointer = get_checkpointer()
initial_store = get_store()
_write_config_with_sections(
config_path,
{
"title": False,
"tool_search": False,
"checkpointer": {"type": "memory"},
},
)
next_mtime = config_path.stat().st_mtime + 5
os.utime(config_path, (next_mtime, next_mtime))
with pytest.raises(ValidationError):
get_app_config()
assert app_config_module._app_config is previous_app_config
assert get_title_config().enabled is False
assert get_tool_search_config().enabled is True
assert get_checkpointer_config() is not None
assert get_checkpointer() is initial_checkpointer
assert get_store() is initial_store
finally:
_reset_config_singletons()