Merge refactor/config-deerflow-context into release/2.0-rc

Cherry-pick PR #2271's config refactor onto release/2.0-rc.
Used 'git merge -X theirs' to auto-resolve content conflicts in favor of
the PR's design (frozen AppConfig + explicit-parameter passing).

Limitations:
- Release-only changes that overlapped with PR's refactor in 119 files
  are NOT preserved — those files reflect PR's version. Follow-up commits
  on this branch will need to re-apply release-only modifications where
  meaningful.
- See PR #2271 for design rationale.
This commit is contained in:
greatmengqi
2026-04-27 18:16:42 +08:00
227 changed files with 6965 additions and 5578 deletions
@@ -3,6 +3,7 @@ import logging
from langchain.agents import create_agent
from langchain.agents.middleware import AgentMiddleware
from langchain_core.runnables import RunnableConfig
from langgraph.graph.state import CompiledStateGraph
from deerflow.agents.lead_agent.prompt import apply_prompt_template
from deerflow.agents.memory.summarization_hook import memory_flush_hook
@@ -18,9 +19,8 @@ from deerflow.agents.middlewares.tool_error_handling_middleware import build_lea
from deerflow.agents.middlewares.view_image_middleware import ViewImageMiddleware
from deerflow.agents.thread_state import ThreadState
from deerflow.config.agents_config import load_agent_config, validate_agent_name
from deerflow.config.app_config import get_app_config
from deerflow.config.memory_config import get_memory_config
from deerflow.config.summarization_config import get_summarization_config
from deerflow.config.app_config import AppConfig
from deerflow.config.deer_flow_context import DeerFlowContext
from deerflow.models import create_chat_model
logger = logging.getLogger(__name__)
@@ -35,9 +35,8 @@ def _get_runtime_config(config: RunnableConfig) -> dict:
return cfg
def _resolve_model_name(requested_model_name: str | None = None) -> str:
def _resolve_model_name(app_config: AppConfig, requested_model_name: str | None = None) -> str:
"""Resolve a runtime model name safely, falling back to default if invalid. Returns None if no models are configured."""
app_config = get_app_config()
default_model_name = app_config.models[0].name if app_config.models else None
if default_model_name is None:
raise ValueError("No chat models are configured. Please configure at least one model in config.yaml.")
@@ -50,9 +49,9 @@ def _resolve_model_name(requested_model_name: str | None = None) -> str:
return default_model_name
def _create_summarization_middleware() -> DeerFlowSummarizationMiddleware | None:
def _create_summarization_middleware(app_config: AppConfig) -> DeerFlowSummarizationMiddleware | None:
"""Create and configure the summarization middleware from config."""
config = get_summarization_config()
config = app_config.summarization
if not config.enabled:
return None
@@ -73,9 +72,9 @@ def _create_summarization_middleware() -> DeerFlowSummarizationMiddleware | None
# as middleware rather than lead_agent (SummarizationMiddleware is a
# LangChain built-in, so we tag the model at creation time).
if config.model_name:
model = create_chat_model(name=config.model_name, thinking_enabled=False)
model = create_chat_model(name=config.model_name, thinking_enabled=False, app_config=app_config)
else:
model = create_chat_model(thinking_enabled=False)
model = create_chat_model(thinking_enabled=False, app_config=app_config)
model = model.with_config(tags=["middleware:summarize"])
# Prepare kwargs
@@ -92,14 +91,14 @@ def _create_summarization_middleware() -> DeerFlowSummarizationMiddleware | None
kwargs["summary_prompt"] = config.summary_prompt
hooks: list[BeforeSummarizationHook] = []
if get_memory_config().enabled:
if app_config.memory.enabled:
hooks.append(memory_flush_hook)
# The logic below relies on two assumptions holding true: this factory is
# the sole entry point for DeerFlowSummarizationMiddleware, and the runtime
# config is not expected to change after startup.
try:
skills_container_path = get_app_config().skills.container_path or "/mnt/skills"
skills_container_path = app_config.skills.container_path or "/mnt/skills"
except Exception:
logger.exception("Failed to resolve skills container path; falling back to default")
skills_container_path = "/mnt/skills"
@@ -240,10 +239,18 @@ Being proactive with task management demonstrates thoroughness and ensures all r
# ViewImageMiddleware should be before ClarificationMiddleware to inject image details before LLM
# ToolErrorHandlingMiddleware should be before ClarificationMiddleware to convert tool exceptions to ToolMessages
# ClarificationMiddleware should be last to intercept clarification requests after model calls
def _build_middlewares(config: RunnableConfig, model_name: str | None, agent_name: str | None = None, custom_middlewares: list[AgentMiddleware] | None = None):
def _build_middlewares(
app_config: AppConfig,
config: RunnableConfig,
*,
model_name: str | None,
agent_name: str | None = None,
custom_middlewares: list[AgentMiddleware] | None = None,
):
"""Build middleware chain based on runtime configuration.
Args:
app_config: Resolved application config.
config: Runtime configuration containing configurable options like is_plan_mode.
agent_name: If provided, MemoryMiddleware will use per-agent memory storage.
custom_middlewares: Optional list of custom middlewares to inject into the chain.
@@ -251,10 +258,10 @@ def _build_middlewares(config: RunnableConfig, model_name: str | None, agent_nam
Returns:
List of middleware instances.
"""
middlewares = build_lead_runtime_middlewares(lazy_init=True)
middlewares = build_lead_runtime_middlewares(app_config=app_config, lazy_init=True)
# Add summarization middleware if enabled
summarization_middleware = _create_summarization_middleware()
summarization_middleware = _create_summarization_middleware(app_config)
if summarization_middleware is not None:
middlewares.append(summarization_middleware)
@@ -266,7 +273,7 @@ def _build_middlewares(config: RunnableConfig, model_name: str | None, agent_nam
middlewares.append(todo_list_middleware)
# Add TokenUsageMiddleware when token_usage tracking is enabled
if get_app_config().token_usage.enabled:
if app_config.token_usage.enabled:
middlewares.append(TokenUsageMiddleware())
# Add TitleMiddleware
@@ -277,7 +284,6 @@ def _build_middlewares(config: RunnableConfig, model_name: str | None, agent_nam
# Add ViewImageMiddleware only if the current model supports vision.
# Use the resolved runtime model_name from make_lead_agent to avoid stale config values.
app_config = get_app_config()
model_config = app_config.get_model_config(model_name) if model_name else None
if model_config is not None and model_config.supports_vision:
middlewares.append(ViewImageMiddleware())
@@ -306,11 +312,32 @@ def _build_middlewares(config: RunnableConfig, model_name: str | None, agent_nam
return middlewares
def make_lead_agent(config: RunnableConfig):
def make_lead_agent(
config: RunnableConfig,
app_config: AppConfig | None = None,
) -> CompiledStateGraph:
"""Build the lead agent from runtime config.
Args:
config: LangGraph ``RunnableConfig`` carrying per-invocation options
(``thinking_enabled``, ``model_name``, ``is_plan_mode``, etc.).
app_config: Resolved application config. Required for in-process
entry points (DeerFlowClient, Gateway Worker). When omitted we
are being called via ``langgraph.json`` registration and reload
from disk — the LangGraph Server bootstrap path has no other
way to thread the value.
"""
# Lazy import to avoid circular dependency
from deerflow.tools import get_available_tools
from deerflow.tools.builtins import setup_agent
if app_config is None:
# LangGraph Server registers ``make_lead_agent`` via ``langgraph.json``
# and hands us only a ``RunnableConfig``. Reload config from disk
# here — it's a pure function, equivalent to the process-global the
# old code path would have read.
app_config = AppConfig.from_file()
cfg = _get_runtime_config(config)
thinking_enabled = cfg.get("thinking_enabled", True)
@@ -327,9 +354,8 @@ def make_lead_agent(config: RunnableConfig):
agent_model_name = agent_config.model if agent_config and agent_config.model else None
# Final model name resolution: request → agent config → global default, with fallback for unknown names
model_name = _resolve_model_name(requested_model_name or agent_model_name)
model_name = _resolve_model_name(app_config, requested_model_name or agent_model_name)
app_config = get_app_config()
model_config = app_config.get_model_config(model_name)
if model_config is None:
@@ -369,20 +395,22 @@ def make_lead_agent(config: RunnableConfig):
if is_bootstrap:
# Special bootstrap agent with minimal prompt for initial custom agent creation flow
return create_agent(
model=create_chat_model(name=model_name, thinking_enabled=thinking_enabled),
tools=get_available_tools(model_name=model_name, subagent_enabled=subagent_enabled) + [setup_agent],
middleware=_build_middlewares(config, model_name=model_name),
system_prompt=apply_prompt_template(subagent_enabled=subagent_enabled, max_concurrent_subagents=max_concurrent_subagents, available_skills=set(["bootstrap"])),
model=create_chat_model(name=model_name, thinking_enabled=thinking_enabled, app_config=app_config),
tools=get_available_tools(model_name=model_name, subagent_enabled=subagent_enabled, app_config=app_config) + [setup_agent],
middleware=_build_middlewares(app_config, config, model_name=model_name),
system_prompt=apply_prompt_template(app_config, subagent_enabled=subagent_enabled, max_concurrent_subagents=max_concurrent_subagents, available_skills=set(["bootstrap"])),
state_schema=ThreadState,
context_schema=DeerFlowContext,
)
# Default lead agent (unchanged behavior)
return create_agent(
model=create_chat_model(name=model_name, thinking_enabled=thinking_enabled, reasoning_effort=reasoning_effort),
tools=get_available_tools(model_name=model_name, groups=agent_config.tool_groups if agent_config else None, subagent_enabled=subagent_enabled),
middleware=_build_middlewares(config, model_name=model_name, agent_name=agent_name),
model=create_chat_model(name=model_name, thinking_enabled=thinking_enabled, reasoning_effort=reasoning_effort, app_config=app_config),
tools=get_available_tools(model_name=model_name, groups=agent_config.tool_groups if agent_config else None, subagent_enabled=subagent_enabled, app_config=app_config),
middleware=_build_middlewares(app_config, config, model_name=model_name, agent_name=agent_name),
system_prompt=apply_prompt_template(
subagent_enabled=subagent_enabled, max_concurrent_subagents=max_concurrent_subagents, agent_name=agent_name, available_skills=set(agent_config.skills) if agent_config and agent_config.skills is not None else None
app_config, subagent_enabled=subagent_enabled, max_concurrent_subagents=max_concurrent_subagents, agent_name=agent_name, available_skills=set(agent_config.skills) if agent_config and agent_config.skills is not None else None
),
state_schema=ThreadState,
context_schema=DeerFlowContext,
)
@@ -5,6 +5,7 @@ from datetime import datetime
from functools import lru_cache
from deerflow.config.agents_config import load_agent_soul
from deerflow.config.app_config import AppConfig
from deerflow.skills import load_skills
from deerflow.skills.types import Skill
from deerflow.subagents import get_available_subagent_names
@@ -19,19 +20,20 @@ _enabled_skills_refresh_version = 0
_enabled_skills_refresh_event = threading.Event()
def _load_enabled_skills_sync() -> list[Skill]:
return list(load_skills(enabled_only=True))
def _load_enabled_skills_sync(app_config: AppConfig | None) -> list[Skill]:
return list(load_skills(app_config, enabled_only=True))
def _start_enabled_skills_refresh_thread() -> None:
def _start_enabled_skills_refresh_thread(app_config: AppConfig | None) -> None:
threading.Thread(
target=_refresh_enabled_skills_cache_worker,
args=(app_config,),
name="deerflow-enabled-skills-loader",
daemon=True,
).start()
def _refresh_enabled_skills_cache_worker() -> None:
def _refresh_enabled_skills_cache_worker(app_config: AppConfig | None) -> None:
global _enabled_skills_cache, _enabled_skills_refresh_active
while True:
@@ -39,8 +41,8 @@ def _refresh_enabled_skills_cache_worker() -> None:
target_version = _enabled_skills_refresh_version
try:
skills = _load_enabled_skills_sync()
except Exception:
skills = _load_enabled_skills_sync(app_config)
except (OSError, ImportError):
logger.exception("Failed to load enabled skills for prompt injection")
skills = []
@@ -56,7 +58,7 @@ def _refresh_enabled_skills_cache_worker() -> None:
_enabled_skills_cache = None
def _ensure_enabled_skills_cache() -> threading.Event:
def _ensure_enabled_skills_cache(app_config: AppConfig | None) -> threading.Event:
global _enabled_skills_refresh_active
with _enabled_skills_lock:
@@ -68,11 +70,11 @@ def _ensure_enabled_skills_cache() -> threading.Event:
_enabled_skills_refresh_active = True
_enabled_skills_refresh_event.clear()
_start_enabled_skills_refresh_thread()
_start_enabled_skills_refresh_thread(app_config)
return _enabled_skills_refresh_event
def _invalidate_enabled_skills_cache() -> threading.Event:
def _invalidate_enabled_skills_cache(app_config: AppConfig | None) -> threading.Event:
global _enabled_skills_cache, _enabled_skills_refresh_active, _enabled_skills_refresh_version
_get_cached_skills_prompt_section.cache_clear()
@@ -84,30 +86,30 @@ def _invalidate_enabled_skills_cache() -> threading.Event:
return _enabled_skills_refresh_event
_enabled_skills_refresh_active = True
_start_enabled_skills_refresh_thread()
_start_enabled_skills_refresh_thread(app_config)
return _enabled_skills_refresh_event
def prime_enabled_skills_cache() -> None:
_ensure_enabled_skills_cache()
def prime_enabled_skills_cache(app_config: AppConfig | None = None) -> None:
_ensure_enabled_skills_cache(app_config)
def warm_enabled_skills_cache(timeout_seconds: float = _ENABLED_SKILLS_REFRESH_WAIT_TIMEOUT_SECONDS) -> bool:
if _ensure_enabled_skills_cache().wait(timeout=timeout_seconds):
def warm_enabled_skills_cache(app_config: AppConfig | None = None, timeout_seconds: float = _ENABLED_SKILLS_REFRESH_WAIT_TIMEOUT_SECONDS) -> bool:
if _ensure_enabled_skills_cache(app_config).wait(timeout=timeout_seconds):
return True
logger.warning("Timed out waiting %.1fs for enabled skills cache warm-up", timeout_seconds)
return False
def _get_enabled_skills():
def _get_enabled_skills(app_config: AppConfig | None = None):
with _enabled_skills_lock:
cached = _enabled_skills_cache
if cached is not None:
return list(cached)
_ensure_enabled_skills_cache()
_ensure_enabled_skills_cache(app_config)
return []
@@ -115,12 +117,37 @@ def _skill_mutability_label(category: str) -> str:
return "[custom, editable]" if category == "custom" else "[built-in]"
def clear_skills_system_prompt_cache() -> None:
_invalidate_enabled_skills_cache()
def clear_skills_system_prompt_cache(app_config: AppConfig | None = None) -> None:
_invalidate_enabled_skills_cache(app_config)
async def refresh_skills_system_prompt_cache_async() -> None:
await asyncio.to_thread(_invalidate_enabled_skills_cache().wait)
async def refresh_skills_system_prompt_cache_async(app_config: AppConfig | None = None) -> None:
await asyncio.to_thread(_invalidate_enabled_skills_cache(app_config).wait)
def _reset_skills_system_prompt_cache_state() -> None:
global _enabled_skills_cache, _enabled_skills_refresh_active, _enabled_skills_refresh_version
_get_cached_skills_prompt_section.cache_clear()
with _enabled_skills_lock:
_enabled_skills_cache = None
_enabled_skills_refresh_active = False
_enabled_skills_refresh_version = 0
_enabled_skills_refresh_event.clear()
def _refresh_enabled_skills_cache(app_config: AppConfig | None = None) -> None:
"""Backward-compatible test helper for direct synchronous reload."""
try:
skills = _load_enabled_skills_sync(app_config)
except Exception:
logger.exception("Failed to load enabled skills for prompt injection")
skills = []
with _enabled_skills_lock:
_enabled_skills_cache = skills
_enabled_skills_refresh_active = False
_enabled_skills_refresh_event.set()
def _build_skill_evolution_section(skill_evolution_enabled: bool) -> str:
@@ -139,7 +166,7 @@ Skip simple one-off tasks.
"""
def _build_available_subagents_description(available_names: list[str], bash_available: bool) -> str:
def _build_available_subagents_description(available_names: list[str], bash_available: bool, app_config: AppConfig) -> str:
"""Dynamically build subagent type descriptions from registry.
Mirrors Codex's pattern where agent_type_description is dynamically generated
@@ -161,7 +188,7 @@ def _build_available_subagents_description(available_names: list[str], bash_avai
if name in builtin_descriptions:
lines.append(f"- **{name}**: {builtin_descriptions[name]}")
else:
config = get_subagent_config(name)
config = get_subagent_config(name, app_config)
if config is not None:
desc = config.description.split("\n")[0].strip() # First line only for brevity
lines.append(f"- **{name}**: {desc}")
@@ -169,22 +196,23 @@ def _build_available_subagents_description(available_names: list[str], bash_avai
return "\n".join(lines)
def _build_subagent_section(max_concurrent: int) -> str:
def _build_subagent_section(max_concurrent: int, app_config: AppConfig) -> str:
"""Build the subagent system prompt section with dynamic concurrency limit.
Args:
max_concurrent: Maximum number of concurrent subagent calls allowed per response.
app_config: Application config used to gate bash availability.
Returns:
Formatted subagent section string.
"""
n = max_concurrent
available_names = get_available_subagent_names()
available_names = get_available_subagent_names(app_config)
bash_available = "bash" in available_names
# Dynamically build subagent type descriptions from registry (aligned with Codex's
# agent_type_description pattern where all registered roles are listed in the tool spec).
available_subagents = _build_available_subagents_description(available_names, bash_available)
available_subagents = _build_available_subagents_description(available_names, bash_available, app_config)
direct_tool_examples = "bash, ls, read_file, web_search, etc." if bash_available else "ls, read_file, web_search, etc."
direct_execution_example = (
'# User asks: "Run the tests"\n# Thinking: Cannot decompose into parallel sub-tasks\n# → Execute directly\n\nbash("npm test") # Direct execution, not task()'
@@ -511,37 +539,34 @@ combined with a FastAPI gateway for REST API access [citation:FastAPI](https://f
"""
def _get_memory_context(agent_name: str | None = None) -> str:
def _get_memory_context(app_config: AppConfig, agent_name: str | None = None) -> str:
"""Get memory context for injection into system prompt.
Args:
agent_name: If provided, loads per-agent memory. If None, loads global memory.
Returns:
Formatted memory context string wrapped in XML tags, or empty string if disabled.
Returns an empty string when memory is disabled or the stored memory file
cannot be read/parsed. A corrupt memory.json degrades the prompt to
no-memory; it never kills the agent.
"""
from deerflow.agents.memory import format_memory_for_injection, get_memory_data
from deerflow.runtime.user_context import get_effective_user_id
memory_config = app_config.memory
if not memory_config.enabled or not memory_config.injection_enabled:
return ""
try:
from deerflow.agents.memory import format_memory_for_injection, get_memory_data
from deerflow.config.memory_config import get_memory_config
from deerflow.runtime.user_context import get_effective_user_id
memory_data = get_memory_data(memory_config, agent_name, user_id=get_effective_user_id())
except (OSError, ValueError, UnicodeDecodeError):
logger.exception("Failed to load memory data for prompt injection")
return ""
config = get_memory_config()
if not config.enabled or not config.injection_enabled:
return ""
memory_content = format_memory_for_injection(memory_data, max_tokens=memory_config.max_injection_tokens)
if not memory_content.strip():
return ""
memory_data = get_memory_data(agent_name, user_id=get_effective_user_id())
memory_content = format_memory_for_injection(memory_data, max_tokens=config.max_injection_tokens)
if not memory_content.strip():
return ""
return f"""<memory>
return f"""<memory>
{memory_content}
</memory>
"""
except Exception as e:
logger.error("Failed to load memory context: %s", e)
return ""
@lru_cache(maxsize=32)
@@ -576,19 +601,12 @@ You have access to skills that provide optimized workflows for specific tasks. E
</skill_system>"""
def get_skills_prompt_section(available_skills: set[str] | None = None) -> str:
def get_skills_prompt_section(app_config: AppConfig, available_skills: set[str] | None = None) -> str:
"""Generate the skills prompt section with available skills list."""
skills = _get_enabled_skills()
skills = _get_enabled_skills(app_config)
try:
from deerflow.config import get_app_config
config = get_app_config()
container_base_path = config.skills.container_path
skill_evolution_enabled = config.skill_evolution.enabled
except Exception:
container_base_path = "/mnt/skills"
skill_evolution_enabled = False
container_base_path = app_config.skills.container_path
skill_evolution_enabled = app_config.skill_evolution.enabled
if not skills and not skill_evolution_enabled:
return ""
@@ -612,7 +630,7 @@ def get_agent_soul(agent_name: str | None) -> str:
return ""
def get_deferred_tools_prompt_section() -> str:
def get_deferred_tools_prompt_section(app_config: AppConfig) -> str:
"""Generate <available-deferred-tools> block for the system prompt.
Lists only deferred tool names so the agent knows what exists
@@ -621,12 +639,7 @@ def get_deferred_tools_prompt_section() -> str:
"""
from deerflow.tools.builtins.tool_search import get_deferred_registry
try:
from deerflow.config import get_app_config
if not get_app_config().tool_search.enabled:
return ""
except Exception:
if not app_config.tool_search.enabled:
return ""
registry = get_deferred_registry()
@@ -637,15 +650,9 @@ def get_deferred_tools_prompt_section() -> str:
return f"<available-deferred-tools>\n{names}\n</available-deferred-tools>"
def _build_acp_section() -> str:
def _build_acp_section(app_config: AppConfig) -> str:
"""Build the ACP agent prompt section, only if ACP agents are configured."""
try:
from deerflow.config.acp_config import get_acp_agents
agents = get_acp_agents()
if not agents:
return ""
except Exception:
if not app_config.acp_agents:
return ""
return (
@@ -657,15 +664,9 @@ def _build_acp_section() -> str:
)
def _build_custom_mounts_section() -> str:
def _build_custom_mounts_section(app_config: AppConfig) -> str:
"""Build a prompt section for explicitly configured sandbox mounts."""
try:
from deerflow.config import get_app_config
mounts = get_app_config().sandbox.mounts or []
except Exception:
logger.exception("Failed to load configured sandbox mounts for the lead-agent prompt")
return ""
mounts = app_config.sandbox.mounts or []
if not mounts:
return ""
@@ -679,13 +680,20 @@ def _build_custom_mounts_section() -> str:
return f"\n**Custom Mounted Directories:**\n{mounts_list}\n- If the user needs files outside `/mnt/user-data`, use these absolute container paths directly when they match the requested directory"
def apply_prompt_template(subagent_enabled: bool = False, max_concurrent_subagents: int = 3, *, agent_name: str | None = None, available_skills: set[str] | None = None) -> str:
def apply_prompt_template(
app_config: AppConfig,
subagent_enabled: bool = False,
max_concurrent_subagents: int = 3,
*,
agent_name: str | None = None,
available_skills: set[str] | None = None,
) -> str:
# Get memory context
memory_context = _get_memory_context(agent_name)
memory_context = _get_memory_context(app_config, agent_name)
# Include subagent section only if enabled (from runtime parameter)
n = max_concurrent_subagents
subagent_section = _build_subagent_section(n) if subagent_enabled else ""
subagent_section = _build_subagent_section(n, app_config) if subagent_enabled else ""
# Add subagent reminder to critical_reminders if enabled
subagent_reminder = (
@@ -706,14 +714,14 @@ def apply_prompt_template(subagent_enabled: bool = False, max_concurrent_subagen
)
# Get skills section
skills_section = get_skills_prompt_section(available_skills)
skills_section = get_skills_prompt_section(app_config, available_skills)
# Get deferred tools section (tool_search)
deferred_tools_section = get_deferred_tools_prompt_section()
deferred_tools_section = get_deferred_tools_prompt_section(app_config)
# Build ACP agent section only if ACP agents are configured
acp_section = _build_acp_section()
custom_mounts_section = _build_custom_mounts_section()
acp_section = _build_acp_section(app_config)
custom_mounts_section = _build_custom_mounts_section(app_config)
acp_and_mounts_section = "\n".join(section for section in (acp_section, custom_mounts_section) if section)
# Format the prompt with dynamic skills and memory
@@ -7,11 +7,17 @@ from dataclasses import dataclass, field
from datetime import UTC, datetime
from typing import Any
from deerflow.config.memory_config import get_memory_config
from deerflow.config.app_config import AppConfig
logger = logging.getLogger(__name__)
# Module-level config pointer set by the middleware that owns the queue.
# The queue runs on a background Timer thread where ``Runtime`` and FastAPI
# request context are not accessible; the enqueuer (which does have runtime
# context) is responsible for plumbing ``AppConfig`` through ``add()``.
@dataclass
class ConversationContext:
"""Context for a conversation to be processed for memory update."""
@@ -31,10 +37,21 @@ class MemoryUpdateQueue:
This queue collects conversation contexts and processes them after
a configurable debounce period. Multiple conversations received within
the debounce window are batched together.
The queue captures an ``AppConfig`` reference at construction time and
reuses it for the MemoryUpdater it spawns. Callers must construct a
fresh queue when the config changes rather than reaching into a global.
"""
def __init__(self):
"""Initialize the memory update queue."""
def __init__(self, app_config: AppConfig):
"""Initialize the memory update queue.
Args:
app_config: Application config. The queue reads its own
``memory`` section for debounce timing and hands the full
config to :class:`MemoryUpdater`.
"""
self._app_config = app_config
self._queue: list[ConversationContext] = []
self._lock = threading.Lock()
self._timer: threading.Timer | None = None
@@ -49,19 +66,8 @@ class MemoryUpdateQueue:
correction_detected: bool = False,
reinforcement_detected: bool = False,
) -> None:
"""Add a conversation to the update queue.
Args:
thread_id: The thread ID.
messages: The conversation messages.
agent_name: If provided, memory is stored per-agent. If None, uses global memory.
user_id: The user ID captured at enqueue time. Stored in ConversationContext so it
survives the threading.Timer boundary (ContextVar does not propagate across
raw threads).
correction_detected: Whether recent turns include an explicit correction signal.
reinforcement_detected: Whether recent turns include a positive reinforcement signal.
"""
config = get_memory_config()
"""Add a conversation to the update queue."""
config = self._app_config.memory
if not config.enabled:
return
@@ -88,7 +94,7 @@ class MemoryUpdateQueue:
reinforcement_detected: bool = False,
) -> None:
"""Add a conversation and start processing immediately in the background."""
config = get_memory_config()
config = self._app_config.memory
if not config.enabled:
return
@@ -111,7 +117,7 @@ class MemoryUpdateQueue:
thread_id: str,
messages: list[Any],
agent_name: str | None,
user_id: str | None,
user_id: str | None = None,
correction_detected: bool,
reinforcement_detected: bool,
) -> None:
@@ -135,7 +141,7 @@ class MemoryUpdateQueue:
def _reset_timer(self) -> None:
"""Reset the debounce timer."""
config = get_memory_config()
config = self._app_config.memory
self._schedule_timer(config.debounce_seconds)
logger.debug("Memory update timer set for %ss", config.debounce_seconds)
@@ -175,7 +181,7 @@ class MemoryUpdateQueue:
logger.info("Processing %d queued memory updates", len(contexts_to_process))
try:
updater = MemoryUpdater()
updater = MemoryUpdater(self._app_config)
for context in contexts_to_process:
try:
@@ -247,31 +253,35 @@ class MemoryUpdateQueue:
return self._processing
# Global singleton instance
_memory_queue: MemoryUpdateQueue | None = None
# Queues keyed by ``id(AppConfig)`` so tests and multi-client setups with
# distinct configs do not share a debounce queue.
_memory_queues: dict[int, MemoryUpdateQueue] = {}
_queue_lock = threading.Lock()
def get_memory_queue() -> MemoryUpdateQueue:
"""Get the global memory update queue singleton.
Returns:
The memory update queue instance.
"""
global _memory_queue
def get_memory_queue(app_config: AppConfig) -> MemoryUpdateQueue:
"""Get or create the memory update queue for the given app config."""
key = id(app_config)
with _queue_lock:
if _memory_queue is None:
_memory_queue = MemoryUpdateQueue()
return _memory_queue
queue = _memory_queues.get(key)
if queue is None:
queue = MemoryUpdateQueue(app_config)
_memory_queues[key] = queue
return queue
def reset_memory_queue() -> None:
"""Reset the global memory queue.
def reset_memory_queue(app_config: AppConfig | None = None) -> None:
"""Reset memory queue(s).
This is useful for testing.
Pass an ``app_config`` to reset only its queue, or omit to reset all
(useful at test teardown).
"""
global _memory_queue
with _queue_lock:
if _memory_queue is not None:
_memory_queue.clear()
_memory_queue = None
if app_config is not None:
queue = _memory_queues.pop(id(app_config), None)
if queue is not None:
queue.clear()
return
for queue in _memory_queues.values():
queue.clear()
_memory_queues.clear()
@@ -10,7 +10,7 @@ from pathlib import Path
from typing import Any
from deerflow.config.agents_config import AGENT_NAME_PATTERN
from deerflow.config.memory_config import get_memory_config
from deerflow.config.memory_config import MemoryConfig
from deerflow.config.paths import get_paths
logger = logging.getLogger(__name__)
@@ -62,8 +62,15 @@ class MemoryStorage(abc.ABC):
class FileMemoryStorage(MemoryStorage):
"""File-based memory storage provider."""
def __init__(self):
"""Initialize the file memory storage."""
def __init__(self, memory_config: MemoryConfig):
"""Initialize the file memory storage.
Args:
memory_config: Memory configuration (storage_path etc.). Stored on
the instance so per-request lookups don't need to reach for
ambient state.
"""
self._memory_config = memory_config
# Per-user/agent memory cache: keyed by (user_id, agent_name) tuple (None = global)
# Value: (memory_data, file_mtime)
self._memory_cache: dict[tuple[str | None, str | None], tuple[dict[str, Any], float | None]] = {}
@@ -83,11 +90,11 @@ class FileMemoryStorage(MemoryStorage):
def _get_memory_file_path(self, agent_name: str | None = None, *, user_id: str | None = None) -> Path:
"""Get the path to the memory file."""
config = self._memory_config
if user_id is not None:
if agent_name is not None:
self._validate_agent_name(agent_name)
return get_paths().user_agent_memory_file(user_id, agent_name)
config = get_memory_config()
if config.storage_path and Path(config.storage_path).is_absolute():
return Path(config.storage_path)
return get_paths().user_memory_file(user_id)
@@ -95,7 +102,6 @@ class FileMemoryStorage(MemoryStorage):
if agent_name is not None:
self._validate_agent_name(agent_name)
return get_paths().agent_memory_file(agent_name)
config = get_memory_config()
if config.storage_path:
p = Path(config.storage_path)
return p if p.is_absolute() else get_paths().base_dir / p
@@ -116,20 +122,16 @@ class FileMemoryStorage(MemoryStorage):
logger.warning("Failed to load memory file: %s", e)
return create_empty_memory()
@staticmethod
def _cache_key(agent_name: str | None = None, *, user_id: str | None = None) -> tuple[str | None, str | None]:
return (user_id, agent_name)
def load(self, agent_name: str | None = None, *, user_id: str | None = None) -> dict[str, Any]:
"""Load memory data (cached with file modification time check)."""
file_path = self._get_memory_file_path(agent_name, user_id=user_id)
cache_key = self._cache_key(agent_name, user_id=user_id)
try:
current_mtime = file_path.stat().st_mtime if file_path.exists() else None
except OSError:
current_mtime = None
cache_key = (user_id, agent_name)
with self._cache_lock:
cached = self._memory_cache.get(cache_key)
if cached is not None and cached[1] == current_mtime:
@@ -146,13 +148,13 @@ class FileMemoryStorage(MemoryStorage):
"""Reload memory data from file, forcing cache invalidation."""
file_path = self._get_memory_file_path(agent_name, user_id=user_id)
memory_data = self._load_memory_from_file(agent_name, user_id=user_id)
cache_key = self._cache_key(agent_name, user_id=user_id)
try:
mtime = file_path.stat().st_mtime if file_path.exists() else None
except OSError:
mtime = None
cache_key = (user_id, agent_name)
with self._cache_lock:
self._memory_cache[cache_key] = (memory_data, mtime)
return memory_data
@@ -160,7 +162,6 @@ class FileMemoryStorage(MemoryStorage):
def save(self, memory_data: dict[str, Any], agent_name: str | None = None, *, user_id: str | None = None) -> bool:
"""Save memory data to file and update cache."""
file_path = self._get_memory_file_path(agent_name, user_id=user_id)
cache_key = self._cache_key(agent_name, user_id=user_id)
try:
file_path.parent.mkdir(parents=True, exist_ok=True)
@@ -180,6 +181,7 @@ class FileMemoryStorage(MemoryStorage):
except OSError:
mtime = None
cache_key = (user_id, agent_name)
with self._cache_lock:
self._memory_cache[cache_key] = (memory_data, mtime)
logger.info("Memory saved to %s", file_path)
@@ -189,23 +191,31 @@ class FileMemoryStorage(MemoryStorage):
return False
_storage_instance: MemoryStorage | None = None
# Instances keyed by (storage_class_path, id(memory_config)) so tests can
# construct isolated storages and multi-client setups with different configs
# don't collide on a single process-wide singleton.
_storage_instances: dict[tuple[str, int], MemoryStorage] = {}
_storage_lock = threading.Lock()
def get_memory_storage() -> MemoryStorage:
"""Get the configured memory storage instance."""
global _storage_instance
if _storage_instance is not None:
return _storage_instance
def get_memory_storage(memory_config: MemoryConfig) -> MemoryStorage:
"""Get the configured memory storage instance.
Caches one instance per ``(storage_class, memory_config)`` pair. In
single-config deployments this collapses to one instance; in multi-client
or test scenarios each config gets its own storage.
"""
key = (memory_config.storage_class, id(memory_config))
existing = _storage_instances.get(key)
if existing is not None:
return existing
with _storage_lock:
if _storage_instance is not None:
return _storage_instance
config = get_memory_config()
storage_class_path = config.storage_class
existing = _storage_instances.get(key)
if existing is not None:
return existing
storage_class_path = memory_config.storage_class
try:
module_path, class_name = storage_class_path.rsplit(".", 1)
import importlib
@@ -219,13 +229,14 @@ def get_memory_storage() -> MemoryStorage:
if not issubclass(storage_class, MemoryStorage):
raise TypeError(f"Configured memory storage '{storage_class_path}' is not a subclass of MemoryStorage")
_storage_instance = storage_class()
instance = storage_class(memory_config)
except Exception as e:
logger.error(
"Failed to load memory storage %s, falling back to FileMemoryStorage: %s",
storage_class_path,
e,
)
_storage_instance = FileMemoryStorage()
instance = FileMemoryStorage(memory_config)
return _storage_instance
_storage_instances[key] = instance
return instance
@@ -5,12 +5,19 @@ from __future__ import annotations
from deerflow.agents.memory.message_processing import detect_correction, detect_reinforcement, filter_messages_for_memory
from deerflow.agents.memory.queue import get_memory_queue
from deerflow.agents.middlewares.summarization_middleware import SummarizationEvent
from deerflow.config.memory_config import get_memory_config
from deerflow.config.app_config import AppConfig
def memory_flush_hook(event: SummarizationEvent) -> None:
"""Flush messages about to be summarized into the memory queue."""
if not get_memory_config().enabled or not event.thread_id:
"""Flush messages about to be summarized into the memory queue.
Reads ``AppConfig`` from disk on every invocation. This hook is fired by
``SummarizationMiddleware`` which has no ergonomic way to thread an
explicit ``app_config`` through; ``AppConfig.from_file()`` is a pure load
so the cost is acceptable for this rare pre-summarization callback.
"""
app_config = AppConfig.from_file()
if not app_config.memory.enabled or not event.thread_id:
return
filtered_messages = filter_messages_for_memory(list(event.messages_to_summarize))
@@ -21,7 +28,7 @@ def memory_flush_hook(event: SummarizationEvent) -> None:
correction_detected = detect_correction(filtered_messages)
reinforcement_detected = not correction_detected and detect_reinforcement(filtered_messages)
queue = get_memory_queue()
queue = get_memory_queue(app_config)
queue.add_nowait(
thread_id=event.thread_id,
messages=filtered_messages,
@@ -21,7 +21,8 @@ from deerflow.agents.memory.storage import (
get_memory_storage,
utc_now_iso_z,
)
from deerflow.config.memory_config import get_memory_config
from deerflow.config.app_config import AppConfig
from deerflow.config.memory_config import MemoryConfig
from deerflow.models import create_chat_model
logger = logging.getLogger(__name__)
@@ -38,45 +39,33 @@ def _create_empty_memory() -> dict[str, Any]:
return create_empty_memory()
def _save_memory_to_file(memory_data: dict[str, Any], agent_name: str | None = None, *, user_id: str | None = None) -> bool:
"""Backward-compatible wrapper around the configured memory storage save path."""
return get_memory_storage().save(memory_data, agent_name, user_id=user_id)
def _save_memory_to_file(memory_config: MemoryConfig, memory_data: dict[str, Any], agent_name: str | None = None, *, user_id: str | None = None) -> bool:
"""Save via the configured memory storage."""
return get_memory_storage(memory_config).save(memory_data, agent_name, user_id=user_id)
def get_memory_data(agent_name: str | None = None, *, user_id: str | None = None) -> dict[str, Any]:
def get_memory_data(memory_config: MemoryConfig, agent_name: str | None = None, *, user_id: str | None = None) -> dict[str, Any]:
"""Get the current memory data via storage provider."""
return get_memory_storage().load(agent_name, user_id=user_id)
return get_memory_storage(memory_config).load(agent_name, user_id=user_id)
def reload_memory_data(agent_name: str | None = None, *, user_id: str | None = None) -> dict[str, Any]:
def reload_memory_data(memory_config: MemoryConfig, agent_name: str | None = None, *, user_id: str | None = None) -> dict[str, Any]:
"""Reload memory data via storage provider."""
return get_memory_storage().reload(agent_name, user_id=user_id)
return get_memory_storage(memory_config).reload(agent_name, user_id=user_id)
def import_memory_data(memory_data: dict[str, Any], agent_name: str | None = None, *, user_id: str | None = None) -> dict[str, Any]:
"""Persist imported memory data via storage provider.
Args:
memory_data: Full memory payload to persist.
agent_name: If provided, imports into per-agent memory.
user_id: If provided, scopes memory to a specific user.
Returns:
The saved memory data after storage normalization.
Raises:
OSError: If persisting the imported memory fails.
"""
storage = get_memory_storage()
def import_memory_data(memory_config: MemoryConfig, memory_data: dict[str, Any], agent_name: str | None = None, *, user_id: str | None = None) -> dict[str, Any]:
"""Persist imported memory data via storage provider."""
storage = get_memory_storage(memory_config)
if not storage.save(memory_data, agent_name, user_id=user_id):
raise OSError("Failed to save imported memory data")
return storage.load(agent_name, user_id=user_id)
def clear_memory_data(agent_name: str | None = None, *, user_id: str | None = None) -> dict[str, Any]:
def clear_memory_data(memory_config: MemoryConfig, agent_name: str | None = None, *, user_id: str | None = None) -> dict[str, Any]:
"""Clear all stored memory data and persist an empty structure."""
cleared_memory = create_empty_memory()
if not _save_memory_to_file(cleared_memory, agent_name, user_id=user_id):
if not _save_memory_to_file(memory_config, cleared_memory, agent_name, user_id=user_id):
raise OSError("Failed to save cleared memory data")
return cleared_memory
@@ -89,6 +78,7 @@ def _validate_confidence(confidence: float) -> float:
def create_memory_fact(
memory_config: MemoryConfig,
content: str,
category: str = "context",
confidence: float = 0.5,
@@ -104,7 +94,7 @@ def create_memory_fact(
normalized_category = category.strip() or "context"
validated_confidence = _validate_confidence(confidence)
now = utc_now_iso_z()
memory_data = get_memory_data(agent_name, user_id=user_id)
memory_data = get_memory_data(memory_config, agent_name, user_id=user_id)
updated_memory = dict(memory_data)
facts = list(memory_data.get("facts", []))
facts.append(
@@ -119,15 +109,15 @@ def create_memory_fact(
)
updated_memory["facts"] = facts
if not _save_memory_to_file(updated_memory, agent_name, user_id=user_id):
if not _save_memory_to_file(memory_config, updated_memory, agent_name, user_id=user_id):
raise OSError("Failed to save memory data after creating fact")
return updated_memory
def delete_memory_fact(fact_id: str, agent_name: str | None = None, *, user_id: str | None = None) -> dict[str, Any]:
def delete_memory_fact(memory_config: MemoryConfig, fact_id: str, agent_name: str | None = None, *, user_id: str | None = None) -> dict[str, Any]:
"""Delete a fact by its id and persist the updated memory data."""
memory_data = get_memory_data(agent_name, user_id=user_id)
memory_data = get_memory_data(memory_config, agent_name, user_id=user_id)
facts = memory_data.get("facts", [])
updated_facts = [fact for fact in facts if fact.get("id") != fact_id]
if len(updated_facts) == len(facts):
@@ -136,13 +126,14 @@ def delete_memory_fact(fact_id: str, agent_name: str | None = None, *, user_id:
updated_memory = dict(memory_data)
updated_memory["facts"] = updated_facts
if not _save_memory_to_file(updated_memory, agent_name, user_id=user_id):
if not _save_memory_to_file(memory_config, updated_memory, agent_name, user_id=user_id):
raise OSError(f"Failed to save memory data after deleting fact '{fact_id}'")
return updated_memory
def update_memory_fact(
memory_config: MemoryConfig,
fact_id: str,
content: str | None = None,
category: str | None = None,
@@ -152,7 +143,7 @@ def update_memory_fact(
user_id: str | None = None,
) -> dict[str, Any]:
"""Update an existing fact and persist the updated memory data."""
memory_data = get_memory_data(agent_name, user_id=user_id)
memory_data = get_memory_data(memory_config, agent_name, user_id=user_id)
updated_memory = dict(memory_data)
updated_facts: list[dict[str, Any]] = []
found = False
@@ -179,7 +170,7 @@ def update_memory_fact(
updated_memory["facts"] = updated_facts
if not _save_memory_to_file(updated_memory, agent_name, user_id=user_id):
if not _save_memory_to_file(memory_config, updated_memory, agent_name, user_id=user_id):
raise OSError(f"Failed to save memory data after updating fact '{fact_id}'")
return updated_memory
@@ -304,19 +295,25 @@ def _fact_content_key(content: Any) -> str | None:
class MemoryUpdater:
"""Updates memory using LLM based on conversation context."""
def __init__(self, model_name: str | None = None):
def __init__(self, app_config: AppConfig, model_name: str | None = None):
"""Initialize the memory updater.
Args:
app_config: Application config (the updater needs both ``memory``
section for behavior and the full config for ``create_chat_model``).
model_name: Optional model name to use. If None, uses config or default.
"""
self._app_config = app_config
self._model_name = model_name
@property
def _memory_config(self) -> MemoryConfig:
return self._app_config.memory
def _get_model(self):
"""Get the model for memory updates."""
config = get_memory_config()
model_name = self._model_name or config.model_name
return create_chat_model(name=model_name, thinking_enabled=False)
model_name = self._model_name or self._memory_config.model_name
return create_chat_model(name=model_name, thinking_enabled=False, app_config=self._app_config)
def _build_correction_hint(
self,
@@ -349,13 +346,14 @@ class MemoryUpdater:
agent_name: str | None,
correction_detected: bool,
reinforcement_detected: bool,
user_id: str | None = None,
) -> tuple[dict[str, Any], str] | None:
"""Load memory and build the update prompt for a conversation."""
config = get_memory_config()
config = self._memory_config
if not config.enabled or not messages:
return None
current_memory = get_memory_data(agent_name)
current_memory = get_memory_data(config, agent_name, user_id=user_id)
conversation_text = format_conversation_for_update(messages)
if not conversation_text.strip():
return None
@@ -377,6 +375,7 @@ class MemoryUpdater:
response_content: Any,
thread_id: str | None,
agent_name: str | None,
user_id: str | None = None,
) -> bool:
"""Parse the model response, apply updates, and persist memory."""
response_text = _extract_text(response_content).strip()
@@ -390,7 +389,7 @@ class MemoryUpdater:
# cannot corrupt the still-cached original object reference.
updated_memory = self._apply_updates(copy.deepcopy(current_memory), update_data, thread_id)
updated_memory = _strip_upload_mentions_from_memory(updated_memory)
return get_memory_storage().save(updated_memory, agent_name)
return get_memory_storage(self._memory_config).save(updated_memory, agent_name, user_id=user_id)
async def aupdate_memory(
self,
@@ -399,6 +398,7 @@ class MemoryUpdater:
agent_name: str | None = None,
correction_detected: bool = False,
reinforcement_detected: bool = False,
user_id: str | None = None,
) -> bool:
"""Update memory asynchronously based on conversation messages."""
try:
@@ -408,6 +408,7 @@ class MemoryUpdater:
agent_name=agent_name,
correction_detected=correction_detected,
reinforcement_detected=reinforcement_detected,
user_id=user_id,
)
if prepared is None:
return False
@@ -421,6 +422,7 @@ class MemoryUpdater:
response_content=response.content,
thread_id=thread_id,
agent_name=agent_name,
user_id=user_id,
)
except json.JSONDecodeError as e:
logger.warning("Failed to parse LLM response for memory update: %s", e)
@@ -451,15 +453,78 @@ class MemoryUpdater:
Returns:
True if update was successful, False otherwise.
"""
return _run_async_update_sync(
self.aupdate_memory(
messages=messages,
thread_id=thread_id,
agent_name=agent_name,
correction_detected=correction_detected,
reinforcement_detected=reinforcement_detected,
config = self._memory_config
if not config.enabled:
return False
if not messages:
return False
try:
# Get current memory
current_memory = get_memory_data(config, agent_name, user_id=user_id)
# Format conversation for prompt
conversation_text = format_conversation_for_update(messages)
if not conversation_text.strip():
return False
# Build prompt
correction_hint = ""
if correction_detected:
correction_hint = (
"IMPORTANT: Explicit correction signals were detected in this conversation. "
"Pay special attention to what the agent got wrong, what the user corrected, "
"and record the correct approach as a fact with category "
'"correction" and confidence >= 0.95 when appropriate.'
)
if reinforcement_detected:
reinforcement_hint = (
"IMPORTANT: Positive reinforcement signals were detected in this conversation. "
"The user explicitly confirmed the agent's approach was correct or helpful. "
"Record the confirmed approach, style, or preference as a fact with category "
'"preference" or "behavior" and confidence >= 0.9 when appropriate.'
)
correction_hint = (correction_hint + "\n" + reinforcement_hint).strip() if correction_hint else reinforcement_hint
prompt = MEMORY_UPDATE_PROMPT.format(
current_memory=json.dumps(current_memory, indent=2),
conversation=conversation_text,
correction_hint=correction_hint,
)
)
# Call LLM
model = self._get_model()
response = model.invoke(prompt)
response_text = _extract_text(response.content).strip()
# Parse response
# Remove markdown code blocks if present
if response_text.startswith("```"):
lines = response_text.split("\n")
response_text = "\n".join(lines[1:-1] if lines[-1] == "```" else lines[1:])
update_data = json.loads(response_text)
# Apply updates
updated_memory = self._apply_updates(current_memory, update_data, thread_id)
# Strip file-upload mentions from all summaries before saving.
# Uploaded files are session-scoped and won't exist in future sessions,
# so recording upload events in long-term memory causes the agent to
# try (and fail) to locate those files in subsequent conversations.
updated_memory = _strip_upload_mentions_from_memory(updated_memory)
# Save
return get_memory_storage(config).save(updated_memory, agent_name, user_id=user_id)
except json.JSONDecodeError as e:
logger.warning("Failed to parse LLM response for memory update: %s", e)
return False
except Exception as e:
logger.exception("Memory update failed: %s", e)
return False
def _apply_updates(
self,
@@ -477,7 +542,7 @@ class MemoryUpdater:
Returns:
Updated memory data.
"""
config = get_memory_config()
config = self._memory_config
now = utc_now_iso_z()
# Update user sections
@@ -20,7 +20,7 @@ from langchain.agents.middleware.types import (
from langchain_core.messages import AIMessage
from langgraph.errors import GraphBubbleUp
from deerflow.config import get_app_config
from deerflow.config.app_config import AppConfig
logger = logging.getLogger(__name__)
@@ -78,7 +78,7 @@ class LLMErrorHandlingMiddleware(AgentMiddleware[AgentState]):
# Load Circuit Breaker configs from app config if available, fall back to defaults
try:
app_config = get_app_config()
app_config = AppConfig.from_file()
self.circuit_failure_threshold = app_config.circuit_breaker.failure_threshold
self.circuit_recovery_timeout_sec = app_config.circuit_breaker.recovery_timeout_sec
except (FileNotFoundError, RuntimeError):
@@ -25,6 +25,8 @@ from langchain.agents.middleware import AgentMiddleware
from langchain_core.messages import HumanMessage
from langgraph.runtime import Runtime
from deerflow.config.deer_flow_context import DeerFlowContext
logger = logging.getLogger(__name__)
# Defaults — can be overridden via constructor
@@ -181,12 +183,9 @@ class LoopDetectionMiddleware(AgentMiddleware[AgentState]):
self._tool_freq: dict[str, dict[str, int]] = defaultdict(lambda: defaultdict(int))
self._tool_freq_warned: dict[str, set[str]] = defaultdict(set)
def _get_thread_id(self, runtime: Runtime) -> str:
def _get_thread_id(self, runtime: Runtime[DeerFlowContext]) -> str:
"""Extract thread_id from runtime context for per-thread tracking."""
thread_id = runtime.context.get("thread_id") if runtime.context else None
if thread_id:
return thread_id
return "default"
return runtime.context.thread_id or "default"
def _evict_if_needed(self) -> None:
"""Evict least recently used threads if over the limit.
@@ -367,11 +366,11 @@ class LoopDetectionMiddleware(AgentMiddleware[AgentState]):
return None
@override
def after_model(self, state: AgentState, runtime: Runtime) -> dict | None:
def after_model(self, state: AgentState, runtime: Runtime[DeerFlowContext]) -> dict | None:
return self._apply(state, runtime)
@override
async def aafter_model(self, state: AgentState, runtime: Runtime) -> dict | None:
async def aafter_model(self, state: AgentState, runtime: Runtime[DeerFlowContext]) -> dict | None:
return self._apply(state, runtime)
def reset(self, thread_id: str | None = None) -> None:
@@ -5,12 +5,11 @@ from typing import override
from langchain.agents import AgentState
from langchain.agents.middleware import AgentMiddleware
from langgraph.config import get_config
from langgraph.runtime import Runtime
from deerflow.agents.memory.message_processing import detect_correction, detect_reinforcement, filter_messages_for_memory
from deerflow.agents.memory.queue import get_memory_queue
from deerflow.config.memory_config import get_memory_config
from deerflow.config.deer_flow_context import DeerFlowContext
from deerflow.runtime.user_context import get_effective_user_id
logger = logging.getLogger(__name__)
@@ -44,7 +43,7 @@ class MemoryMiddleware(AgentMiddleware[MemoryMiddlewareState]):
self._agent_name = agent_name
@override
def after_agent(self, state: MemoryMiddlewareState, runtime: Runtime) -> dict | None:
def after_agent(self, state: MemoryMiddlewareState, runtime: Runtime[DeerFlowContext]) -> dict | None:
"""Queue conversation for memory update after agent completes.
Args:
@@ -54,15 +53,11 @@ class MemoryMiddleware(AgentMiddleware[MemoryMiddlewareState]):
Returns:
None (no state changes needed from this middleware).
"""
config = get_memory_config()
if not config.enabled:
memory_config = runtime.context.app_config.memory
if not memory_config.enabled:
return None
# Get thread ID from runtime context first, then fall back to LangGraph's configurable metadata
thread_id = runtime.context.get("thread_id") if runtime.context else None
if thread_id is None:
config_data = get_config()
thread_id = config_data.get("configurable", {}).get("thread_id")
thread_id = runtime.context.thread_id
if not thread_id:
logger.debug("No thread_id in context, skipping memory update")
return None
@@ -91,7 +86,7 @@ class MemoryMiddleware(AgentMiddleware[MemoryMiddlewareState]):
# threading.Timer fires on a different thread where ContextVar values are not
# propagated, so we must store user_id explicitly in ConversationContext.
user_id = get_effective_user_id()
queue = get_memory_queue()
queue = get_memory_queue(runtime.context.app_config)
queue.add(
thread_id=thread_id,
messages=filtered_messages,
@@ -4,11 +4,10 @@ from typing import NotRequired, override
from langchain.agents import AgentState
from langchain.agents.middleware import AgentMiddleware
from langchain_core.messages import HumanMessage
from langgraph.config import get_config
from langgraph.runtime import Runtime
from deerflow.agents.thread_state import ThreadDataState
from deerflow.config.deer_flow_context import DeerFlowContext
from deerflow.config.paths import Paths, get_paths
from deerflow.runtime.user_context import get_effective_user_id
@@ -79,14 +78,10 @@ class ThreadDataMiddleware(AgentMiddleware[ThreadDataMiddlewareState]):
return self._get_thread_paths(thread_id, user_id=user_id)
@override
def before_agent(self, state: ThreadDataMiddlewareState, runtime: Runtime) -> dict | None:
context = runtime.context or {}
thread_id = context.get("thread_id")
if thread_id is None:
config = get_config()
thread_id = config.get("configurable", {}).get("thread_id")
def before_agent(self, state: ThreadDataMiddlewareState, runtime: Runtime[DeerFlowContext]) -> dict | None:
thread_id = runtime.context.thread_id
if thread_id is None:
if not thread_id:
raise ValueError("Thread ID is required in runtime context or config.configurable")
user_id = get_effective_user_id()
@@ -9,7 +9,9 @@ from langchain.agents.middleware import AgentMiddleware
from langgraph.config import get_config
from langgraph.runtime import Runtime
from deerflow.config.title_config import get_title_config
from deerflow.config.app_config import AppConfig
from deerflow.config.deer_flow_context import DeerFlowContext
from deerflow.config.title_config import TitleConfig
from deerflow.models import create_chat_model
logger = logging.getLogger(__name__)
@@ -45,10 +47,9 @@ class TitleMiddleware(AgentMiddleware[TitleMiddlewareState]):
return ""
def _should_generate_title(self, state: TitleMiddlewareState) -> bool:
def _should_generate_title(self, state: TitleMiddlewareState, title_config: TitleConfig) -> bool:
"""Check if we should generate a title for this thread."""
config = get_title_config()
if not config.enabled:
if not title_config.enabled:
return False
# Check if thread already has a title in state
@@ -67,12 +68,11 @@ class TitleMiddleware(AgentMiddleware[TitleMiddlewareState]):
# Generate title after first complete exchange
return len(user_messages) == 1 and len(assistant_messages) >= 1
def _build_title_prompt(self, state: TitleMiddlewareState) -> tuple[str, str]:
def _build_title_prompt(self, state: TitleMiddlewareState, title_config: TitleConfig) -> tuple[str, str]:
"""Extract user/assistant messages and build the title prompt.
Returns (prompt_string, user_msg) so callers can use user_msg as fallback.
"""
config = get_title_config()
messages = state.get("messages", [])
user_msg_content = next((m.content for m in messages if m.type == "human"), "")
@@ -81,8 +81,8 @@ class TitleMiddleware(AgentMiddleware[TitleMiddlewareState]):
user_msg = self._normalize_content(user_msg_content)
assistant_msg = self._strip_think_tags(self._normalize_content(assistant_msg_content))
prompt = config.prompt_template.format(
max_words=config.max_words,
prompt = title_config.prompt_template.format(
max_words=title_config.max_words,
user_msg=user_msg[:500],
assistant_msg=assistant_msg[:500],
)
@@ -92,17 +92,15 @@ class TitleMiddleware(AgentMiddleware[TitleMiddlewareState]):
"""Remove <think>...</think> blocks emitted by reasoning models (e.g. minimax, DeepSeek-R1)."""
return re.sub(r"<think>[\s\S]*?</think>", "", text, flags=re.IGNORECASE).strip()
def _parse_title(self, content: object) -> str:
def _parse_title(self, content: object, title_config: TitleConfig) -> str:
"""Normalize model output into a clean title string."""
config = get_title_config()
title_content = self._normalize_content(content)
title_content = self._strip_think_tags(title_content)
title = title_content.strip().strip('"').strip("'")
return title[: config.max_chars] if len(title) > config.max_chars else title
return title[: title_config.max_chars] if len(title) > title_config.max_chars else title
def _fallback_title(self, user_msg: str) -> str:
config = get_title_config()
fallback_chars = min(config.max_chars, 50)
def _fallback_title(self, user_msg: str, title_config: TitleConfig) -> str:
fallback_chars = min(title_config.max_chars, 50)
if len(user_msg) > fallback_chars:
return user_msg[:fallback_chars].rstrip() + "..."
return user_msg if user_msg else "New Conversation"
@@ -118,43 +116,42 @@ class TitleMiddleware(AgentMiddleware[TitleMiddlewareState]):
except Exception:
parent = {}
config = {**parent}
config["run_name"] = "title_agent"
config["tags"] = [*(config.get("tags") or []), "middleware:title"]
return config
def _generate_title_result(self, state: TitleMiddlewareState) -> dict | None:
def _generate_title_result(self, state: TitleMiddlewareState, title_config: TitleConfig) -> dict | None:
"""Generate a local fallback title without blocking on an LLM call."""
if not self._should_generate_title(state):
if not self._should_generate_title(state, title_config):
return None
_, user_msg = self._build_title_prompt(state)
return {"title": self._fallback_title(user_msg)}
_, user_msg = self._build_title_prompt(state, title_config)
return {"title": self._fallback_title(user_msg, title_config)}
async def _agenerate_title_result(self, state: TitleMiddlewareState) -> dict | None:
async def _agenerate_title_result(self, state: TitleMiddlewareState, app_config: AppConfig) -> dict | None:
"""Generate a title asynchronously and fall back locally on failure."""
if not self._should_generate_title(state):
title_config = app_config.title
if not self._should_generate_title(state, title_config):
return None
config = get_title_config()
prompt, user_msg = self._build_title_prompt(state)
prompt, user_msg = self._build_title_prompt(state, title_config)
try:
if config.model_name:
model = create_chat_model(name=config.model_name, thinking_enabled=False)
if title_config.model_name:
model = create_chat_model(name=title_config.model_name, thinking_enabled=False, app_config=app_config)
else:
model = create_chat_model(thinking_enabled=False)
model = create_chat_model(thinking_enabled=False, app_config=app_config)
response = await model.ainvoke(prompt, config=self._get_runnable_config())
title = self._parse_title(response.content)
title = self._parse_title(response.content, title_config)
if title:
return {"title": title}
except Exception:
logger.debug("Failed to generate async title; falling back to local title", exc_info=True)
return {"title": self._fallback_title(user_msg)}
return {"title": self._fallback_title(user_msg, title_config)}
@override
def after_model(self, state: TitleMiddlewareState, runtime: Runtime) -> dict | None:
return self._generate_title_result(state)
def after_model(self, state: TitleMiddlewareState, runtime: Runtime[DeerFlowContext]) -> dict | None:
return self._generate_title_result(state, runtime.context.app_config.title)
@override
async def aafter_model(self, state: TitleMiddlewareState, runtime: Runtime) -> dict | None:
return await self._agenerate_title_result(state)
async def aafter_model(self, state: TitleMiddlewareState, runtime: Runtime[DeerFlowContext]) -> dict | None:
return await self._agenerate_title_result(state, runtime.context.app_config)
@@ -1,8 +1,10 @@
"""Tool error handling middleware and shared runtime middleware builders."""
from __future__ import annotations
import logging
from collections.abc import Awaitable, Callable
from typing import override
from typing import TYPE_CHECKING, override
from langchain.agents import AgentState
from langchain.agents.middleware import AgentMiddleware
@@ -11,6 +13,9 @@ from langgraph.errors import GraphBubbleUp
from langgraph.prebuilt.tool_node import ToolCallRequest
from langgraph.types import Command
if TYPE_CHECKING:
from deerflow.config.app_config import AppConfig
logger = logging.getLogger(__name__)
_MISSING_TOOL_CALL_ID = "missing_tool_call_id"
@@ -67,6 +72,7 @@ class ToolErrorHandlingMiddleware(AgentMiddleware[AgentState]):
def _build_runtime_middlewares(
*,
app_config: "AppConfig",
include_uploads: bool,
include_dangling_tool_call_patch: bool,
lazy_init: bool = True,
@@ -94,9 +100,7 @@ def _build_runtime_middlewares(
middlewares.append(LLMErrorHandlingMiddleware())
# Guardrail middleware (if configured)
from deerflow.config.guardrails_config import get_guardrails_config
guardrails_config = get_guardrails_config()
guardrails_config = app_config.guardrails
if guardrails_config.enabled and guardrails_config.provider:
import inspect
@@ -125,9 +129,10 @@ def _build_runtime_middlewares(
return middlewares
def build_lead_runtime_middlewares(*, lazy_init: bool = True) -> list[AgentMiddleware]:
def build_lead_runtime_middlewares(*, app_config: "AppConfig", lazy_init: bool = True) -> list[AgentMiddleware]:
"""Middlewares shared by lead agent runtime before lead-only middlewares."""
return _build_runtime_middlewares(
app_config=app_config,
include_uploads=True,
include_dangling_tool_call_patch=True,
lazy_init=lazy_init,
@@ -9,6 +9,7 @@ from langchain.agents.middleware import AgentMiddleware
from langchain_core.messages import HumanMessage
from langgraph.runtime import Runtime
from deerflow.config.deer_flow_context import DeerFlowContext
from deerflow.config.paths import Paths, get_paths
from deerflow.runtime.user_context import get_effective_user_id
from deerflow.utils.file_conversion import extract_outline
@@ -185,7 +186,7 @@ class UploadsMiddleware(AgentMiddleware[UploadsMiddlewareState]):
return files if files else None
@override
def before_agent(self, state: UploadsMiddlewareState, runtime: Runtime) -> dict | None:
def before_agent(self, state: UploadsMiddlewareState, runtime: Runtime[DeerFlowContext]) -> dict | None:
"""Inject uploaded files information before agent execution.
New files come from the current message's additional_kwargs.files.
@@ -214,14 +215,7 @@ class UploadsMiddleware(AgentMiddleware[UploadsMiddlewareState]):
return None
# Resolve uploads directory for existence checks
thread_id = (runtime.context or {}).get("thread_id")
if thread_id is None:
try:
from langgraph.config import get_config
thread_id = get_config().get("configurable", {}).get("thread_id")
except RuntimeError:
pass # get_config() raises outside a runnable context (e.g. unit tests)
thread_id = runtime.context.thread_id
uploads_dir = self._paths.sandbox_uploads_dir(thread_id, user_id=get_effective_user_id()) if thread_id else None
# Get newly uploaded files from the current message's additional_kwargs.files