mirror of
https://github.com/bytedance/deer-flow.git
synced 2026-05-21 15:36:48 +00:00
7de9b5828b
* fix(tools): introduce Runtime type alias to eliminate Pydantic serialization warning
Add deerflow/tools/types.py with:
Runtime = ToolRuntime[dict[str, Any], ThreadState]
Replace every runtime: ToolRuntime[ContextT, ThreadState] and
runtime: ToolRuntime[dict[str, Any], ThreadState] annotation in
sandbox/tools.py, present_file_tool.py, task_tool.py, view_image_tool.py,
and skill_manage_tool.py with the new Runtime alias.
The unbound ContextT TypeVar (default None) caused
PydanticSerializationUnexpectedValue warnings on every tool call because
LangChain's BaseTool._parse_input calls model_dump() on the auto-generated
args_schema while DeerFlow passes a dict as runtime context.
Binding the context to dict[str, Any] aligns Pydantic's serialization
expectations with reality and removes the noise from all run modes.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-authored-by: Cursor <cursoragent@cursor.com>
* fix(tools): extend Runtime alias to setup_agent and update_agent tools
Replace bare ToolRuntime annotations in setup_agent_tool.py and
update_agent_tool.py with the shared Runtime alias introduced in the
previous commit, and add both tools to the Pydantic serialization
warning regression test (13 cases total).
Co-authored-by: Cursor <cursoragent@cursor.com>
* test(tools): loosen Pydantic warning filter to avoid version-specific format
Replace the brittle "field_name='context'" substring check with a looser
"context" match so the assertion stays valid if Pydantic changes its
internal warning format across versions.
Co-authored-by: Cursor <cursoragent@cursor.com>
* test(tools): simplify warning filter and clean up docstring
Remove the "context" substring condition from the Pydantic warning
filter — asserting that no PydanticSerializationUnexpectedValue fires
at all is both simpler and more comprehensive, since the test payload
contains only the tool's own args plus runtime.
Also update the module docstring to remove the version-specific warning
format example that was inconsistent with the looser filter.
Co-authored-by: Cursor <cursoragent@cursor.com>
---------
Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-authored-by: Cursor <cursoragent@cursor.com>
308 lines
14 KiB
Python
308 lines
14 KiB
Python
"""Task tool for delegating work to subagents."""
|
|
|
|
import asyncio
|
|
import logging
|
|
import uuid
|
|
from dataclasses import replace
|
|
from typing import TYPE_CHECKING, Annotated, Any, cast
|
|
|
|
from langchain.tools import InjectedToolCallId, tool
|
|
from langgraph.config import get_stream_writer
|
|
|
|
from deerflow.config import get_app_config
|
|
from deerflow.sandbox.security import LOCAL_BASH_SUBAGENT_DISABLED_MESSAGE, is_host_bash_allowed
|
|
from deerflow.subagents import SubagentExecutor, get_available_subagent_names, get_subagent_config
|
|
from deerflow.subagents.config import resolve_subagent_model_name
|
|
from deerflow.subagents.executor import (
|
|
SubagentStatus,
|
|
cleanup_background_task,
|
|
get_background_task_result,
|
|
request_cancel_background_task,
|
|
)
|
|
from deerflow.tools.types import Runtime
|
|
|
|
if TYPE_CHECKING:
|
|
from deerflow.config.app_config import AppConfig
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def _get_runtime_app_config(runtime: Any) -> "AppConfig | None":
|
|
context = getattr(runtime, "context", None)
|
|
if isinstance(context, dict):
|
|
app_config = context.get("app_config")
|
|
if app_config is not None:
|
|
return cast("AppConfig", app_config)
|
|
return None
|
|
|
|
|
|
def _merge_skill_allowlists(parent: list[str] | None, child: list[str] | None) -> list[str] | None:
|
|
"""Return the effective subagent skill allowlist under the parent policy."""
|
|
if parent is None:
|
|
return child
|
|
if child is None:
|
|
return list(parent)
|
|
|
|
parent_set = set(parent)
|
|
return [skill for skill in child if skill in parent_set]
|
|
|
|
|
|
@tool("task", parse_docstring=True)
|
|
async def task_tool(
|
|
runtime: Runtime,
|
|
description: str,
|
|
prompt: str,
|
|
subagent_type: str,
|
|
tool_call_id: Annotated[str, InjectedToolCallId],
|
|
max_turns: int | None = None,
|
|
) -> str:
|
|
"""Delegate a task to a specialized subagent that runs in its own context.
|
|
|
|
Subagents help you:
|
|
- Preserve context by keeping exploration and implementation separate
|
|
- Handle complex multi-step tasks autonomously
|
|
- Execute commands or operations in isolated contexts
|
|
|
|
Built-in subagent types:
|
|
- **general-purpose**: A capable agent for complex, multi-step tasks that require
|
|
both exploration and action. Use when the task requires complex reasoning,
|
|
multiple dependent steps, or would benefit from isolated context.
|
|
- **bash**: Command execution specialist for running bash commands. This is only
|
|
available when host bash is explicitly allowed or when using an isolated shell
|
|
sandbox such as `AioSandboxProvider`.
|
|
|
|
Additional custom subagent types may be defined in config.yaml under
|
|
`subagents.custom_agents`. Each custom type can have its own system prompt,
|
|
tools, skills, model, and timeout configuration. If an unknown subagent_type
|
|
is provided, the error message will list all available types.
|
|
|
|
When to use this tool:
|
|
- Complex tasks requiring multiple steps or tools
|
|
- Tasks that produce verbose output
|
|
- When you want to isolate context from the main conversation
|
|
- Parallel research or exploration tasks
|
|
|
|
When NOT to use this tool:
|
|
- Simple, single-step operations (use tools directly)
|
|
- Tasks requiring user interaction or clarification
|
|
|
|
Args:
|
|
description: A short (3-5 word) description of the task for logging/display. ALWAYS PROVIDE THIS PARAMETER FIRST.
|
|
prompt: The task description for the subagent. Be specific and clear about what needs to be done. ALWAYS PROVIDE THIS PARAMETER SECOND.
|
|
subagent_type: The type of subagent to use. ALWAYS PROVIDE THIS PARAMETER THIRD.
|
|
max_turns: Optional maximum number of agent turns. Defaults to subagent's configured max.
|
|
"""
|
|
runtime_app_config = _get_runtime_app_config(runtime)
|
|
available_subagent_names = get_available_subagent_names(app_config=runtime_app_config) if runtime_app_config is not None else get_available_subagent_names()
|
|
|
|
# Get subagent configuration
|
|
config = get_subagent_config(subagent_type, app_config=runtime_app_config) if runtime_app_config is not None else get_subagent_config(subagent_type)
|
|
if config is None:
|
|
available = ", ".join(available_subagent_names)
|
|
return f"Error: Unknown subagent type '{subagent_type}'. Available: {available}"
|
|
if subagent_type == "bash":
|
|
host_bash_allowed = is_host_bash_allowed(runtime_app_config) if runtime_app_config is not None else is_host_bash_allowed()
|
|
if not host_bash_allowed:
|
|
return f"Error: {LOCAL_BASH_SUBAGENT_DISABLED_MESSAGE}"
|
|
|
|
# Build config overrides
|
|
overrides: dict = {}
|
|
|
|
# Skills are loaded by SubagentExecutor per-session (aligned with Codex's pattern:
|
|
# each subagent loads its own skills based on config, injected as conversation items).
|
|
# No longer appended to system_prompt here.
|
|
|
|
if max_turns is not None:
|
|
overrides["max_turns"] = max_turns
|
|
|
|
# Extract parent context from runtime
|
|
sandbox_state = None
|
|
thread_data = None
|
|
thread_id = None
|
|
parent_model = None
|
|
trace_id = None
|
|
metadata: dict = {}
|
|
|
|
if runtime is not None:
|
|
sandbox_state = runtime.state.get("sandbox")
|
|
thread_data = runtime.state.get("thread_data")
|
|
thread_id = runtime.context.get("thread_id") if runtime.context else None
|
|
if thread_id is None:
|
|
thread_id = runtime.config.get("configurable", {}).get("thread_id")
|
|
|
|
# Try to get parent model from configurable
|
|
metadata = runtime.config.get("metadata", {})
|
|
parent_model = metadata.get("model_name")
|
|
|
|
# Get or generate trace_id for distributed tracing
|
|
trace_id = metadata.get("trace_id") or str(uuid.uuid4())[:8]
|
|
|
|
parent_available_skills = metadata.get("available_skills")
|
|
if parent_available_skills is not None:
|
|
overrides["skills"] = _merge_skill_allowlists(list(parent_available_skills), config.skills)
|
|
|
|
if overrides:
|
|
config = replace(config, **overrides)
|
|
|
|
# Get available tools (excluding task tool to prevent nesting)
|
|
# Lazy import to avoid circular dependency
|
|
from deerflow.tools import get_available_tools
|
|
|
|
# Inherit parent agent's tool_groups so subagents respect the same restrictions
|
|
parent_tool_groups = metadata.get("tool_groups")
|
|
resolved_app_config = runtime_app_config
|
|
if config.model == "inherit" and parent_model is None and resolved_app_config is None:
|
|
resolved_app_config = get_app_config()
|
|
effective_model = resolve_subagent_model_name(config, parent_model, app_config=resolved_app_config)
|
|
|
|
# Subagents should not have subagent tools enabled (prevent recursive nesting)
|
|
available_tools_kwargs = {
|
|
"model_name": effective_model,
|
|
"groups": parent_tool_groups,
|
|
"subagent_enabled": False,
|
|
}
|
|
if resolved_app_config is not None:
|
|
available_tools_kwargs["app_config"] = resolved_app_config
|
|
tools = get_available_tools(**available_tools_kwargs)
|
|
|
|
# Create executor
|
|
executor_kwargs = {
|
|
"config": config,
|
|
"tools": tools,
|
|
"parent_model": parent_model,
|
|
"sandbox_state": sandbox_state,
|
|
"thread_data": thread_data,
|
|
"thread_id": thread_id,
|
|
"trace_id": trace_id,
|
|
}
|
|
if resolved_app_config is not None:
|
|
executor_kwargs["app_config"] = resolved_app_config
|
|
executor = SubagentExecutor(**executor_kwargs)
|
|
|
|
# Start background execution (always async to prevent blocking)
|
|
# Use tool_call_id as task_id for better traceability
|
|
task_id = executor.execute_async(prompt, task_id=tool_call_id)
|
|
|
|
# Poll for task completion in backend (removes need for LLM to poll)
|
|
poll_count = 0
|
|
last_status = None
|
|
last_message_count = 0 # Track how many AI messages we've already sent
|
|
# Polling timeout: execution timeout + 60s buffer, checked every 5s
|
|
max_poll_count = (config.timeout_seconds + 60) // 5
|
|
|
|
logger.info(f"[trace={trace_id}] Started background task {task_id} (subagent={subagent_type}, timeout={config.timeout_seconds}s, polling_limit={max_poll_count} polls)")
|
|
|
|
writer = get_stream_writer()
|
|
# Send Task Started message'
|
|
writer({"type": "task_started", "task_id": task_id, "description": description})
|
|
|
|
try:
|
|
while True:
|
|
result = get_background_task_result(task_id)
|
|
|
|
if result is None:
|
|
logger.error(f"[trace={trace_id}] Task {task_id} not found in background tasks")
|
|
writer({"type": "task_failed", "task_id": task_id, "error": "Task disappeared from background tasks"})
|
|
cleanup_background_task(task_id)
|
|
return f"Error: Task {task_id} disappeared from background tasks"
|
|
|
|
# Log status changes for debugging
|
|
if result.status != last_status:
|
|
logger.info(f"[trace={trace_id}] Task {task_id} status: {result.status.value}")
|
|
last_status = result.status
|
|
|
|
# Check for new AI messages and send task_running events
|
|
ai_messages = result.ai_messages or []
|
|
current_message_count = len(ai_messages)
|
|
if current_message_count > last_message_count:
|
|
# Send task_running event for each new message
|
|
for i in range(last_message_count, current_message_count):
|
|
message = ai_messages[i]
|
|
writer(
|
|
{
|
|
"type": "task_running",
|
|
"task_id": task_id,
|
|
"message": message,
|
|
"message_index": i + 1, # 1-based index for display
|
|
"total_messages": current_message_count,
|
|
}
|
|
)
|
|
logger.info(f"[trace={trace_id}] Task {task_id} sent message #{i + 1}/{current_message_count}")
|
|
last_message_count = current_message_count
|
|
|
|
# Check if task completed, failed, or timed out
|
|
if result.status == SubagentStatus.COMPLETED:
|
|
writer({"type": "task_completed", "task_id": task_id, "result": result.result})
|
|
logger.info(f"[trace={trace_id}] Task {task_id} completed after {poll_count} polls")
|
|
cleanup_background_task(task_id)
|
|
return f"Task Succeeded. Result: {result.result}"
|
|
elif result.status == SubagentStatus.FAILED:
|
|
writer({"type": "task_failed", "task_id": task_id, "error": result.error})
|
|
logger.error(f"[trace={trace_id}] Task {task_id} failed: {result.error}")
|
|
cleanup_background_task(task_id)
|
|
return f"Task failed. Error: {result.error}"
|
|
elif result.status == SubagentStatus.CANCELLED:
|
|
writer({"type": "task_cancelled", "task_id": task_id, "error": result.error})
|
|
logger.info(f"[trace={trace_id}] Task {task_id} cancelled: {result.error}")
|
|
cleanup_background_task(task_id)
|
|
return "Task cancelled by user."
|
|
elif result.status == SubagentStatus.TIMED_OUT:
|
|
writer({"type": "task_timed_out", "task_id": task_id, "error": result.error})
|
|
logger.warning(f"[trace={trace_id}] Task {task_id} timed out: {result.error}")
|
|
cleanup_background_task(task_id)
|
|
return f"Task timed out. Error: {result.error}"
|
|
|
|
# Still running, wait before next poll
|
|
await asyncio.sleep(5)
|
|
poll_count += 1
|
|
|
|
# Polling timeout as a safety net (in case thread pool timeout doesn't work)
|
|
# Set to execution timeout + 60s buffer, in 5s poll intervals
|
|
# This catches edge cases where the background task gets stuck
|
|
# Note: We don't call cleanup_background_task here because the task may
|
|
# still be running in the background. The cleanup will happen when the
|
|
# executor completes and sets a terminal status.
|
|
if poll_count > max_poll_count:
|
|
timeout_minutes = config.timeout_seconds // 60
|
|
logger.error(f"[trace={trace_id}] Task {task_id} polling timed out after {poll_count} polls (should have been caught by thread pool timeout)")
|
|
writer({"type": "task_timed_out", "task_id": task_id})
|
|
return f"Task polling timed out after {timeout_minutes} minutes. This may indicate the background task is stuck. Status: {result.status.value}"
|
|
except asyncio.CancelledError:
|
|
# Signal the background subagent thread to stop cooperatively.
|
|
# Without this, the thread (running in ThreadPoolExecutor with its
|
|
# own event loop via asyncio.run) would continue executing even
|
|
# after the parent task is cancelled.
|
|
request_cancel_background_task(task_id)
|
|
|
|
async def cleanup_when_done() -> None:
|
|
max_cleanup_polls = max_poll_count
|
|
cleanup_poll_count = 0
|
|
|
|
while True:
|
|
result = get_background_task_result(task_id)
|
|
if result is None:
|
|
return
|
|
|
|
if result.status in {SubagentStatus.COMPLETED, SubagentStatus.FAILED, SubagentStatus.CANCELLED, SubagentStatus.TIMED_OUT} or getattr(result, "completed_at", None) is not None:
|
|
cleanup_background_task(task_id)
|
|
return
|
|
|
|
if cleanup_poll_count > max_cleanup_polls:
|
|
logger.warning(f"[trace={trace_id}] Deferred cleanup for task {task_id} timed out after {cleanup_poll_count} polls")
|
|
return
|
|
|
|
await asyncio.sleep(5)
|
|
cleanup_poll_count += 1
|
|
|
|
def log_cleanup_failure(cleanup_task: asyncio.Task[None]) -> None:
|
|
if cleanup_task.cancelled():
|
|
return
|
|
|
|
exc = cleanup_task.exception()
|
|
if exc is not None:
|
|
logger.error(f"[trace={trace_id}] Deferred cleanup failed for task {task_id}: {exc}")
|
|
|
|
logger.debug(f"[trace={trace_id}] Scheduling deferred cleanup for cancelled task {task_id}")
|
|
asyncio.create_task(cleanup_when_done()).add_done_callback(log_cleanup_failure)
|
|
raise
|