mirror of
https://github.com/bytedance/deer-flow.git
synced 2026-06-10 09:25:57 +00:00
feat(subagents): extend deferred MCP tool loading to subagents (#3432)
* feat(subagents): extend deferred MCP tool loading to subagents (#3341) Subagents now reuse the lead agent's deferred-tool path: when tool_search.enabled, MCP tool schemas are withheld from the model and surfaced by name in <available-deferred-tools>, fetched on demand via the generated tool_search helper. DeferredToolFilterMiddleware deterministically rewrites request.tools to hide the deferred schemas (the prompt section is discovery only, not enforcement). Consolidates the assembly into deerflow.tools.builtins.tool_search, now the single home for both assemble_deferred_tools (centralized fail-closed guard, replacing the lead-only private _assemble_deferred) and the relocated get_deferred_tools_prompt_section. Shared by every build path: lead agent, embedded client, and subagent executor. tool_search is appended after the subagent's name-level tool policy and is treated as infrastructure: its catalog is built from the already policy-filtered list, so it can never surface a tool the policy denied. Follow-up to #3370. Fixes #3341. * test(subagents): assert the real middleware builder emits a working deferred filter (#3341) The existing recipe test hand-constructs DeferredToolFilterMiddleware, so it cannot catch a regression in how build_subagent_runtime_middlewares (the call executor._create_agent actually makes) wires the deferred setup into the filter. Add a test that sources the filter from the real builder given a real setup and runs it through a graph: a wrong catalog hash would silently stop promotion, a dropped filter would stop hiding — both now caught. Running the full real middleware stack is intentionally avoided (the other runtime middlewares need sandbox/thread infra to execute, which would make the test flaky); their attachment + ordering before Safety stays locked in test_tool_error_handling_middleware.py. * test(subagents): keep executor tests config-free in CI * chore: trigger ci * Potential fix for pull request finding Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --------- Co-authored-by: Willem Jiang <willem.jiang@gmail.com> Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
@@ -14,6 +14,7 @@ the real implementation in isolation.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import importlib
|
||||
import sys
|
||||
import threading
|
||||
from datetime import datetime
|
||||
@@ -39,6 +40,21 @@ _MOCKED_MODULE_NAMES = [
|
||||
]
|
||||
|
||||
|
||||
def _default_app_config():
|
||||
return SimpleNamespace(tool_search=SimpleNamespace(enabled=False))
|
||||
|
||||
|
||||
def _patch_default_get_app_config(executor_module):
|
||||
executor_module.get_app_config = _default_app_config
|
||||
return executor_module
|
||||
|
||||
|
||||
def _clear_stale_executor_package_attr() -> None:
|
||||
subagents_pkg = sys.modules.get("deerflow.subagents")
|
||||
if subagents_pkg is not None and hasattr(subagents_pkg, "executor"):
|
||||
delattr(subagents_pkg, "executor")
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _setup_executor_classes():
|
||||
"""Set up mocked modules and import real executor classes.
|
||||
@@ -53,6 +69,7 @@ def _setup_executor_classes():
|
||||
# Remove mocked executor if exists (from conftest.py)
|
||||
if "deerflow.subagents.executor" in sys.modules:
|
||||
del sys.modules["deerflow.subagents.executor"]
|
||||
_clear_stale_executor_package_attr()
|
||||
|
||||
# Set up mocks
|
||||
for name in _MOCKED_MODULE_NAMES:
|
||||
@@ -71,6 +88,14 @@ def _setup_executor_classes():
|
||||
SubagentStatus,
|
||||
)
|
||||
|
||||
executor_module = sys.modules["deerflow.subagents.executor"]
|
||||
|
||||
# Most tests in this module patch _create_agent and exercise executor
|
||||
# control flow only. Keep those tests hermetic: CI checkouts do not include
|
||||
# the gitignored config.yaml, and deferral-specific tests override this
|
||||
# default explicitly.
|
||||
_patch_default_get_app_config(executor_module)
|
||||
|
||||
# Store classes in a dict to yield
|
||||
classes = {
|
||||
"AIMessage": AIMessage,
|
||||
@@ -287,6 +312,7 @@ class TestAgentConstruction:
|
||||
"app_config": app_config,
|
||||
"model_name": "parent-model",
|
||||
"lazy_init": True,
|
||||
"deferred_setup": None,
|
||||
}
|
||||
assert captured["agent"]["model"] is model
|
||||
assert captured["agent"]["middleware"] is middlewares
|
||||
@@ -359,7 +385,7 @@ class TestAgentConstruction:
|
||||
thread_id="test-thread",
|
||||
)
|
||||
|
||||
state, _filtered_tools = await executor._build_initial_state("Do the task")
|
||||
state, _final_tools, _deferred_setup = await executor._build_initial_state("Do the task")
|
||||
|
||||
messages = state["messages"]
|
||||
# Should have exactly 2 messages: one combined SystemMessage + one HumanMessage
|
||||
@@ -397,7 +423,7 @@ class TestAgentConstruction:
|
||||
thread_id="test-thread",
|
||||
)
|
||||
|
||||
state, _filtered_tools = await executor._build_initial_state("Do the task")
|
||||
state, _final_tools, _deferred_setup = await executor._build_initial_state("Do the task")
|
||||
|
||||
messages = state["messages"]
|
||||
from langchain_core.messages import HumanMessage, SystemMessage
|
||||
@@ -439,7 +465,7 @@ class TestAgentConstruction:
|
||||
SubagentExecutor = classes["SubagentExecutor"]
|
||||
executor = SubagentExecutor(config=config, tools=[], thread_id="test-thread")
|
||||
|
||||
state, _filtered_tools = await executor._build_initial_state("Do the task")
|
||||
state, _final_tools, _deferred_setup = await executor._build_initial_state("Do the task")
|
||||
|
||||
messages = state["messages"]
|
||||
from langchain_core.messages import HumanMessage, SystemMessage
|
||||
@@ -449,6 +475,192 @@ class TestAgentConstruction:
|
||||
assert "Skill content" in messages[0].content
|
||||
assert isinstance(messages[1], HumanMessage)
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_build_initial_state_defers_mcp_tools_when_tool_search_enabled(
|
||||
self,
|
||||
classes,
|
||||
base_config,
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
):
|
||||
"""tool_search enabled + a surviving MCP tool: _build_initial_state appends
|
||||
the tool_search tool, withholds the MCP schema, and injects the
|
||||
<available-deferred-tools> section into the SystemMessage."""
|
||||
from langchain_core.tools import tool as as_tool
|
||||
|
||||
from deerflow.subagents import executor as executor_module
|
||||
from deerflow.tools.mcp_metadata import tag_mcp_tool
|
||||
|
||||
SubagentExecutor = classes["SubagentExecutor"]
|
||||
|
||||
monkeypatch.setattr(
|
||||
sys.modules["deerflow.skills.storage"],
|
||||
"get_or_new_skill_storage",
|
||||
lambda *, app_config=None: SimpleNamespace(load_skills=lambda *, enabled_only: []),
|
||||
)
|
||||
monkeypatch.setattr(executor_module, "get_app_config", lambda: SimpleNamespace(tool_search=SimpleNamespace(enabled=True)))
|
||||
|
||||
@as_tool
|
||||
def mcp_calc(expression: str) -> str:
|
||||
"Evaluate arithmetic."
|
||||
return expression
|
||||
|
||||
executor = SubagentExecutor(config=base_config, tools=[tag_mcp_tool(mcp_calc)], thread_id="test-thread")
|
||||
|
||||
state, final_tools, deferred_setup = await executor._build_initial_state("Do the task")
|
||||
|
||||
assert "tool_search" in [t.name for t in final_tools]
|
||||
assert deferred_setup.deferred_names == frozenset({"mcp_calc"})
|
||||
|
||||
system_message = state["messages"][0]
|
||||
assert "<available-deferred-tools>" in system_message.content
|
||||
assert "mcp_calc" in system_message.content
|
||||
# The base system_prompt is still present alongside the injected section.
|
||||
assert base_config.system_prompt in system_message.content
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_build_initial_state_no_deferral_when_tool_search_disabled(
|
||||
self,
|
||||
classes,
|
||||
base_config,
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
):
|
||||
"""tool_search disabled: no tool_search tool, no section - pure no-op even
|
||||
with an MCP-tagged tool present."""
|
||||
from langchain_core.tools import tool as as_tool
|
||||
|
||||
from deerflow.subagents import executor as executor_module
|
||||
from deerflow.tools.mcp_metadata import tag_mcp_tool
|
||||
|
||||
SubagentExecutor = classes["SubagentExecutor"]
|
||||
|
||||
monkeypatch.setattr(
|
||||
sys.modules["deerflow.skills.storage"],
|
||||
"get_or_new_skill_storage",
|
||||
lambda *, app_config=None: SimpleNamespace(load_skills=lambda *, enabled_only: []),
|
||||
)
|
||||
monkeypatch.setattr(executor_module, "get_app_config", lambda: SimpleNamespace(tool_search=SimpleNamespace(enabled=False)))
|
||||
|
||||
@as_tool
|
||||
def mcp_calc(expression: str) -> str:
|
||||
"Evaluate arithmetic."
|
||||
return expression
|
||||
|
||||
executor = SubagentExecutor(config=base_config, tools=[tag_mcp_tool(mcp_calc)], thread_id="test-thread")
|
||||
|
||||
state, final_tools, deferred_setup = await executor._build_initial_state("Do the task")
|
||||
|
||||
assert "tool_search" not in [t.name for t in final_tools]
|
||||
assert deferred_setup.deferred_names == frozenset()
|
||||
assert "<available-deferred-tools>" not in state["messages"][0].content
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_build_initial_state_deferral_respects_tool_policy_and_tool_search_is_infra(
|
||||
self,
|
||||
classes,
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
):
|
||||
"""Adversarial-review follow-up (#3341): tool_search is appended AFTER the
|
||||
subagent tool-policy filter, mirroring the lead's intentional decision
|
||||
(test_tool_search_appended_after_policy_but_never_exposes_denied_tool).
|
||||
Lock the safe-by-construction property:
|
||||
|
||||
- an MCP tool denied by ``disallowed_tools`` never enters the deferred
|
||||
catalog, so tool_search can never promote/expose it;
|
||||
- tool_search itself is infrastructure: naming it in ``disallowed_tools``
|
||||
does not remove it, because its catalog derives from the already-
|
||||
filtered list and carries no access the policy didn't already grant.
|
||||
"""
|
||||
from langchain_core.tools import tool as as_tool
|
||||
|
||||
from deerflow.subagents import executor as executor_module
|
||||
from deerflow.tools.mcp_metadata import tag_mcp_tool
|
||||
|
||||
SubagentConfig = classes["SubagentConfig"]
|
||||
SubagentExecutor = classes["SubagentExecutor"]
|
||||
|
||||
monkeypatch.setattr(
|
||||
sys.modules["deerflow.skills.storage"],
|
||||
"get_or_new_skill_storage",
|
||||
lambda *, app_config=None: SimpleNamespace(load_skills=lambda *, enabled_only: []),
|
||||
)
|
||||
monkeypatch.setattr(executor_module, "get_app_config", lambda: SimpleNamespace(tool_search=SimpleNamespace(enabled=True)))
|
||||
|
||||
@as_tool
|
||||
def active_tool(x: str) -> str:
|
||||
"active"
|
||||
return x
|
||||
|
||||
@as_tool
|
||||
def mcp_allowed(x: str) -> str:
|
||||
"allowed mcp tool"
|
||||
return x
|
||||
|
||||
@as_tool
|
||||
def mcp_denied(x: str) -> str:
|
||||
"denied mcp tool"
|
||||
return x
|
||||
|
||||
config = SubagentConfig(
|
||||
name="test-agent",
|
||||
description="Test agent",
|
||||
system_prompt="You are a test agent.",
|
||||
max_turns=10,
|
||||
timeout_seconds=60,
|
||||
disallowed_tools=["mcp_denied", "tool_search"],
|
||||
)
|
||||
executor = SubagentExecutor(
|
||||
config=config,
|
||||
tools=[active_tool, tag_mcp_tool(mcp_allowed), tag_mcp_tool(mcp_denied)],
|
||||
thread_id="test-thread",
|
||||
)
|
||||
|
||||
_state, final_tools, deferred_setup = await executor._build_initial_state("Do the task")
|
||||
|
||||
names = {t.name for t in final_tools}
|
||||
# The policy-denied MCP tool is gone and never reaches the catalog.
|
||||
assert "mcp_denied" not in names
|
||||
assert "mcp_denied" not in deferred_setup.deferred_names
|
||||
assert deferred_setup.deferred_names == frozenset({"mcp_allowed"})
|
||||
# tool_search is infra: present despite being named in disallowed_tools.
|
||||
assert "tool_search" in names
|
||||
|
||||
def test_create_agent_threads_deferred_setup_to_middlewares(
|
||||
self,
|
||||
classes,
|
||||
base_config,
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
):
|
||||
"""A deferred setup passed to _create_agent flows into the subagent
|
||||
middleware factory (so DeferredToolFilterMiddleware can attach)."""
|
||||
from deerflow.subagents import executor as executor_module
|
||||
from deerflow.tools.builtins.tool_search import DeferredToolSetup
|
||||
|
||||
SubagentExecutor = classes["SubagentExecutor"]
|
||||
app_config = SimpleNamespace(models=[SimpleNamespace(name="default-model")])
|
||||
captured: dict[str, object] = {}
|
||||
|
||||
def fake_build_subagent_runtime_middlewares(**kwargs):
|
||||
captured["middlewares"] = kwargs
|
||||
return [object()]
|
||||
|
||||
monkeypatch.setattr(executor_module, "create_chat_model", lambda **kwargs: object())
|
||||
monkeypatch.setattr(executor_module, "create_agent", lambda **kwargs: object())
|
||||
monkeypatch.setitem(
|
||||
sys.modules,
|
||||
"deerflow.agents.middlewares.tool_error_handling_middleware",
|
||||
_module(
|
||||
"deerflow.agents.middlewares.tool_error_handling_middleware",
|
||||
build_subagent_runtime_middlewares=fake_build_subagent_runtime_middlewares,
|
||||
),
|
||||
)
|
||||
|
||||
deferred_setup = DeferredToolSetup(object(), frozenset({"mcp_calc"}), "hash123")
|
||||
executor = SubagentExecutor(config=base_config, tools=[], app_config=app_config, parent_model="parent-model")
|
||||
|
||||
executor._create_agent(tools=[], deferred_setup=deferred_setup)
|
||||
|
||||
assert captured["middlewares"]["deferred_setup"] is deferred_setup
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# Async Execution Path Tests
|
||||
@@ -692,7 +904,7 @@ class TestAsyncExecutionPath:
|
||||
if system_messages:
|
||||
assert initial_messages[0] is system_messages[0], "SystemMessage must be the first message in the conversation"
|
||||
# The consolidated SystemMessage must carry both the system_prompt
|
||||
# and all skill content — nothing should be split across two messages.
|
||||
# and all skill content; nothing should be split across two messages.
|
||||
assert base_config.system_prompt in system_messages[0].content
|
||||
assert "Skill instruction text" in system_messages[0].content
|
||||
|
||||
@@ -1128,11 +1340,9 @@ class TestThreadSafety:
|
||||
@pytest.fixture
|
||||
def executor_module(self, _setup_executor_classes):
|
||||
"""Import the executor module with real classes."""
|
||||
import importlib
|
||||
executor = importlib.import_module("deerflow.subagents.executor")
|
||||
|
||||
from deerflow.subagents import executor
|
||||
|
||||
return importlib.reload(executor)
|
||||
return _patch_default_get_app_config(importlib.reload(executor))
|
||||
|
||||
def test_multiple_executors_in_parallel(self, classes, base_config, msg):
|
||||
"""Test multiple executors running in parallel via thread pool."""
|
||||
@@ -1254,11 +1464,9 @@ class TestCleanupBackgroundTask:
|
||||
def executor_module(self, _setup_executor_classes):
|
||||
"""Import the executor module with real classes."""
|
||||
# Re-import to get the real module with cleanup_background_task
|
||||
import importlib
|
||||
executor = importlib.import_module("deerflow.subagents.executor")
|
||||
|
||||
from deerflow.subagents import executor
|
||||
|
||||
return importlib.reload(executor)
|
||||
return _patch_default_get_app_config(importlib.reload(executor))
|
||||
|
||||
def test_cleanup_removes_terminal_completed_task(self, executor_module, classes):
|
||||
"""Test that cleanup removes a COMPLETED task."""
|
||||
@@ -1399,11 +1607,9 @@ class TestCooperativeCancellation:
|
||||
@pytest.fixture
|
||||
def executor_module(self, _setup_executor_classes):
|
||||
"""Import the executor module with real classes."""
|
||||
import importlib
|
||||
executor = importlib.import_module("deerflow.subagents.executor")
|
||||
|
||||
from deerflow.subagents import executor
|
||||
|
||||
return importlib.reload(executor)
|
||||
return _patch_default_get_app_config(importlib.reload(executor))
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_aexecute_cancelled_before_streaming(self, classes, base_config, mock_agent, msg):
|
||||
|
||||
Reference in New Issue
Block a user