Compare commits
12 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 4383d96583 | |||
| 3e461d9d08 | |||
| cf43584d24 | |||
| 6ff60f2af1 | |||
| a3bfea631c | |||
| aae59a8ba8 | |||
| 3ff15423d6 | |||
| c2f7be37b3 | |||
| 09a9209724 | |||
| b356a13da5 | |||
| ac9a6ee6a2 | |||
| 64e0f5329a |
@@ -38,6 +38,7 @@ class RunCreateRequest(BaseModel):
|
||||
command: dict[str, Any] | None = Field(default=None, description="LangGraph Command")
|
||||
metadata: dict[str, Any] | None = Field(default=None, description="Run metadata")
|
||||
config: dict[str, Any] | None = Field(default=None, description="RunnableConfig overrides")
|
||||
context: dict[str, Any] | None = Field(default=None, description="DeerFlow context overrides (model_name, thinking_enabled, etc.)")
|
||||
webhook: str | None = Field(default=None, description="Completion callback URL")
|
||||
checkpoint_id: str | None = Field(default=None, description="Resume from checkpoint")
|
||||
checkpoint: dict[str, Any] | None = Field(default=None, description="Full checkpoint object")
|
||||
|
||||
@@ -10,6 +10,7 @@ from __future__ import annotations
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
import time
|
||||
from typing import Any
|
||||
|
||||
@@ -93,20 +94,56 @@ def normalize_input(raw_input: dict[str, Any] | None) -> dict[str, Any]:
|
||||
return raw_input
|
||||
|
||||
|
||||
_DEFAULT_ASSISTANT_ID = "lead_agent"
|
||||
|
||||
|
||||
def resolve_agent_factory(assistant_id: str | None):
|
||||
"""Resolve the agent factory callable from config."""
|
||||
"""Resolve the agent factory callable from config.
|
||||
|
||||
Custom agents are implemented as ``lead_agent`` + an ``agent_name``
|
||||
injected into ``configurable`` — see :func:`build_run_config`. All
|
||||
``assistant_id`` values therefore map to the same factory; the routing
|
||||
happens inside ``make_lead_agent`` when it reads ``cfg["agent_name"]``.
|
||||
"""
|
||||
from deerflow.agents.lead_agent.agent import make_lead_agent
|
||||
|
||||
if assistant_id and assistant_id != "lead_agent":
|
||||
logger.info("assistant_id=%s requested; falling back to lead_agent", assistant_id)
|
||||
return make_lead_agent
|
||||
|
||||
|
||||
def build_run_config(thread_id: str, request_config: dict[str, Any] | None, metadata: dict[str, Any] | None) -> dict[str, Any]:
|
||||
"""Build a RunnableConfig dict for the agent."""
|
||||
configurable = {"thread_id": thread_id}
|
||||
def build_run_config(
|
||||
thread_id: str,
|
||||
request_config: dict[str, Any] | None,
|
||||
metadata: dict[str, Any] | None,
|
||||
*,
|
||||
assistant_id: str | None = None,
|
||||
) -> dict[str, Any]:
|
||||
"""Build a RunnableConfig dict for the agent.
|
||||
|
||||
When *assistant_id* refers to a custom agent (anything other than
|
||||
``"lead_agent"`` / ``None``), the name is forwarded as
|
||||
``configurable["agent_name"]``. ``make_lead_agent`` reads this key to
|
||||
load the matching ``agents/<name>/SOUL.md`` and per-agent config —
|
||||
without it the agent silently runs as the default lead agent.
|
||||
|
||||
This mirrors the channel manager's ``_resolve_run_params`` logic so that
|
||||
the LangGraph Platform-compatible HTTP API and the IM channel path behave
|
||||
identically.
|
||||
"""
|
||||
configurable: dict[str, Any] = {"thread_id": thread_id}
|
||||
if request_config:
|
||||
configurable.update(request_config.get("configurable", {}))
|
||||
|
||||
# Inject custom agent name when the caller specified a non-default assistant.
|
||||
# Honour an explicit configurable["agent_name"] in the request if already set.
|
||||
if assistant_id and assistant_id != _DEFAULT_ASSISTANT_ID and "agent_name" not in configurable:
|
||||
# Normalize the same way ChannelManager does: strip, lowercase,
|
||||
# replace underscores with hyphens, then validate to prevent path
|
||||
# traversal and invalid agent directory lookups.
|
||||
normalized = assistant_id.strip().lower().replace("_", "-")
|
||||
if not normalized or not re.fullmatch(r"[a-z0-9-]+", normalized):
|
||||
raise ValueError(f"Invalid assistant_id {assistant_id!r}: must contain only letters, digits, and hyphens after normalization.")
|
||||
configurable["agent_name"] = normalized
|
||||
|
||||
config: dict[str, Any] = {"configurable": configurable, "recursion_limit": 100}
|
||||
if request_config:
|
||||
for k, v in request_config.items():
|
||||
@@ -233,7 +270,28 @@ async def start_run(
|
||||
|
||||
agent_factory = resolve_agent_factory(body.assistant_id)
|
||||
graph_input = normalize_input(body.input)
|
||||
config = build_run_config(thread_id, body.config, body.metadata)
|
||||
config = build_run_config(thread_id, body.config, body.metadata, assistant_id=body.assistant_id)
|
||||
|
||||
# Merge DeerFlow-specific context overrides into configurable.
|
||||
# The ``context`` field is a custom extension for the langgraph-compat layer
|
||||
# that carries agent configuration (model_name, thinking_enabled, etc.).
|
||||
# Only agent-relevant keys are forwarded; unknown keys (e.g. thread_id) are ignored.
|
||||
context = getattr(body, "context", None)
|
||||
if context:
|
||||
_CONTEXT_CONFIGURABLE_KEYS = {
|
||||
"model_name",
|
||||
"mode",
|
||||
"thinking_enabled",
|
||||
"reasoning_effort",
|
||||
"is_plan_mode",
|
||||
"subagent_enabled",
|
||||
"max_concurrent_subagents",
|
||||
}
|
||||
configurable = config.setdefault("configurable", {})
|
||||
for key in _CONTEXT_CONFIGURABLE_KEYS:
|
||||
if key in context:
|
||||
configurable.setdefault(key, context[key])
|
||||
|
||||
stream_modes = normalize_stream_modes(body.stream_mode)
|
||||
|
||||
task = asyncio.create_task(
|
||||
|
||||
@@ -257,6 +257,8 @@ sandbox:
|
||||
read_only: false
|
||||
```
|
||||
|
||||
When you configure `sandbox.mounts`, DeerFlow exposes those `container_path` values in the agent prompt so the agent can discover and operate on mounted directories directly instead of assuming everything must live under `/mnt/user-data`.
|
||||
|
||||
### Skills
|
||||
|
||||
Configure the skills directory for specialized workflows:
|
||||
|
||||
@@ -477,6 +477,28 @@ def _build_acp_section() -> str:
|
||||
)
|
||||
|
||||
|
||||
def _build_custom_mounts_section() -> str:
|
||||
"""Build a prompt section for explicitly configured sandbox mounts."""
|
||||
try:
|
||||
from deerflow.config import get_app_config
|
||||
|
||||
mounts = get_app_config().sandbox.mounts or []
|
||||
except Exception:
|
||||
logger.exception("Failed to load configured sandbox mounts for the lead-agent prompt")
|
||||
return ""
|
||||
|
||||
if not mounts:
|
||||
return ""
|
||||
|
||||
lines = []
|
||||
for mount in mounts:
|
||||
access = "read-only" if mount.read_only else "read-write"
|
||||
lines.append(f"- Custom mount: `{mount.container_path}` - Host directory mapped into the sandbox ({access})")
|
||||
|
||||
mounts_list = "\n".join(lines)
|
||||
return f"\n**Custom Mounted Directories:**\n{mounts_list}\n- If the user needs files outside `/mnt/user-data`, use these absolute container paths directly when they match the requested directory"
|
||||
|
||||
|
||||
def apply_prompt_template(subagent_enabled: bool = False, max_concurrent_subagents: int = 3, *, agent_name: str | None = None, available_skills: set[str] | None = None) -> str:
|
||||
# Get memory context
|
||||
memory_context = _get_memory_context(agent_name)
|
||||
@@ -511,6 +533,8 @@ def apply_prompt_template(subagent_enabled: bool = False, max_concurrent_subagen
|
||||
|
||||
# Build ACP agent section only if ACP agents are configured
|
||||
acp_section = _build_acp_section()
|
||||
custom_mounts_section = _build_custom_mounts_section()
|
||||
acp_and_mounts_section = "\n".join(section for section in (acp_section, custom_mounts_section) if section)
|
||||
|
||||
# Format the prompt with dynamic skills and memory
|
||||
prompt = SYSTEM_PROMPT_TEMPLATE.format(
|
||||
@@ -522,7 +546,7 @@ def apply_prompt_template(subagent_enabled: bool = False, max_concurrent_subagen
|
||||
subagent_section=subagent_section,
|
||||
subagent_reminder=subagent_reminder,
|
||||
subagent_thinking=subagent_thinking,
|
||||
acp_section=acp_section,
|
||||
acp_section=acp_and_mounts_section,
|
||||
)
|
||||
|
||||
return prompt + f"\n<current_date>{datetime.now().strftime('%Y-%m-%d, %A')}</current_date>"
|
||||
|
||||
@@ -1,5 +1,8 @@
|
||||
import base64
|
||||
import logging
|
||||
import shlex
|
||||
import threading
|
||||
import uuid
|
||||
|
||||
from agent_sandbox import Sandbox as AioSandboxClient
|
||||
|
||||
@@ -7,11 +10,15 @@ from deerflow.sandbox.sandbox import Sandbox
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_ERROR_OBSERVATION_SIGNATURE = "'ErrorObservation' object has no attribute 'exit_code'"
|
||||
|
||||
|
||||
class AioSandbox(Sandbox):
|
||||
"""Sandbox implementation using the agent-infra/sandbox Docker container.
|
||||
|
||||
This sandbox connects to a running AIO sandbox container via HTTP API.
|
||||
A threading lock serializes shell commands to prevent concurrent requests
|
||||
from corrupting the container's single persistent session (see #1433).
|
||||
"""
|
||||
|
||||
def __init__(self, id: str, base_url: str, home_dir: str | None = None):
|
||||
@@ -26,6 +33,7 @@ class AioSandbox(Sandbox):
|
||||
self._base_url = base_url
|
||||
self._client = AioSandboxClient(base_url=base_url, timeout=600)
|
||||
self._home_dir = home_dir
|
||||
self._lock = threading.Lock()
|
||||
|
||||
@property
|
||||
def base_url(self) -> str:
|
||||
@@ -42,19 +50,34 @@ class AioSandbox(Sandbox):
|
||||
def execute_command(self, command: str) -> str:
|
||||
"""Execute a shell command in the sandbox.
|
||||
|
||||
Uses a lock to serialize concurrent requests. The AIO sandbox
|
||||
container maintains a single persistent shell session that
|
||||
corrupts when hit with concurrent exec_command calls (returns
|
||||
``ErrorObservation`` instead of real output). If corruption is
|
||||
detected despite the lock (e.g. multiple processes sharing a
|
||||
sandbox), the command is retried on a fresh session.
|
||||
|
||||
Args:
|
||||
command: The command to execute.
|
||||
|
||||
Returns:
|
||||
The output of the command.
|
||||
"""
|
||||
try:
|
||||
result = self._client.shell.exec_command(command=command)
|
||||
output = result.data.output if result.data else ""
|
||||
return output if output else "(no output)"
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to execute command in sandbox: {e}")
|
||||
return f"Error: {e}"
|
||||
with self._lock:
|
||||
try:
|
||||
result = self._client.shell.exec_command(command=command)
|
||||
output = result.data.output if result.data else ""
|
||||
|
||||
if output and _ERROR_OBSERVATION_SIGNATURE in output:
|
||||
logger.warning("ErrorObservation detected in sandbox output, retrying with a fresh session")
|
||||
fresh_id = str(uuid.uuid4())
|
||||
result = self._client.shell.exec_command(command=command, id=fresh_id)
|
||||
output = result.data.output if result.data else ""
|
||||
|
||||
return output if output else "(no output)"
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to execute command in sandbox: {e}")
|
||||
return f"Error: {e}"
|
||||
|
||||
def read_file(self, path: str) -> str:
|
||||
"""Read the content of a file in the sandbox.
|
||||
@@ -82,17 +105,16 @@ class AioSandbox(Sandbox):
|
||||
Returns:
|
||||
The contents of the directory.
|
||||
"""
|
||||
try:
|
||||
# Use shell command to list directory with depth limit
|
||||
# The -L flag limits the depth for the tree command
|
||||
result = self._client.shell.exec_command(command=f"find {path} -maxdepth {max_depth} -type f -o -type d 2>/dev/null | head -500")
|
||||
output = result.data.output if result.data else ""
|
||||
if output:
|
||||
return [line.strip() for line in output.strip().split("\n") if line.strip()]
|
||||
return []
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to list directory in sandbox: {e}")
|
||||
return []
|
||||
with self._lock:
|
||||
try:
|
||||
result = self._client.shell.exec_command(command=f"find {shlex.quote(path)} -maxdepth {max_depth} -type f -o -type d 2>/dev/null | head -500")
|
||||
output = result.data.output if result.data else ""
|
||||
if output:
|
||||
return [line.strip() for line in output.strip().split("\n") if line.strip()]
|
||||
return []
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to list directory in sandbox: {e}")
|
||||
return []
|
||||
|
||||
def write_file(self, path: str, content: str, append: bool = False) -> None:
|
||||
"""Write content to a file in the sandbox.
|
||||
|
||||
@@ -26,7 +26,7 @@ except ImportError: # pragma: no cover - Windows fallback
|
||||
import msvcrt
|
||||
|
||||
from deerflow.config import get_app_config
|
||||
from deerflow.config.paths import VIRTUAL_PATH_PREFIX, Paths, get_paths
|
||||
from deerflow.config.paths import VIRTUAL_PATH_PREFIX, get_paths
|
||||
from deerflow.sandbox.sandbox import Sandbox
|
||||
from deerflow.sandbox.sandbox_provider import SandboxProvider
|
||||
|
||||
@@ -214,17 +214,13 @@ class AioSandboxProvider(SandboxProvider):
|
||||
paths = get_paths()
|
||||
paths.ensure_thread_dirs(thread_id)
|
||||
|
||||
# host_paths resolves to the host-side base dir when DEER_FLOW_HOST_BASE_DIR
|
||||
# is set, otherwise falls back to the container's own base dir (native mode).
|
||||
host_paths = Paths(base_dir=paths.host_base_dir)
|
||||
|
||||
return [
|
||||
(str(host_paths.sandbox_work_dir(thread_id)), f"{VIRTUAL_PATH_PREFIX}/workspace", False),
|
||||
(str(host_paths.sandbox_uploads_dir(thread_id)), f"{VIRTUAL_PATH_PREFIX}/uploads", False),
|
||||
(str(host_paths.sandbox_outputs_dir(thread_id)), f"{VIRTUAL_PATH_PREFIX}/outputs", False),
|
||||
(paths.host_sandbox_work_dir(thread_id), f"{VIRTUAL_PATH_PREFIX}/workspace", False),
|
||||
(paths.host_sandbox_uploads_dir(thread_id), f"{VIRTUAL_PATH_PREFIX}/uploads", False),
|
||||
(paths.host_sandbox_outputs_dir(thread_id), f"{VIRTUAL_PATH_PREFIX}/outputs", False),
|
||||
# ACP workspace: read-only inside the sandbox (lead agent reads results;
|
||||
# the ACP subprocess writes from the host side, not from within the container).
|
||||
(str(host_paths.acp_workspace_dir(thread_id)), "/mnt/acp-workspace", True),
|
||||
(paths.host_acp_workspace_dir(thread_id), "/mnt/acp-workspace", True),
|
||||
]
|
||||
|
||||
@staticmethod
|
||||
|
||||
@@ -18,6 +18,26 @@ from .sandbox_info import SandboxInfo
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _format_container_mount(runtime: str, host_path: str, container_path: str, read_only: bool) -> list[str]:
|
||||
"""Format a bind-mount argument for the selected runtime.
|
||||
|
||||
Docker's ``-v host:container`` syntax is ambiguous for Windows drive-letter
|
||||
paths like ``D:/...`` because ``:`` is both the drive separator and the
|
||||
volume separator. Use ``--mount type=bind,...`` for Docker to avoid that
|
||||
parsing ambiguity. Apple Container keeps using ``-v``.
|
||||
"""
|
||||
if runtime == "docker":
|
||||
mount_spec = f"type=bind,src={host_path},dst={container_path}"
|
||||
if read_only:
|
||||
mount_spec += ",readonly"
|
||||
return ["--mount", mount_spec]
|
||||
|
||||
mount_spec = f"{host_path}:{container_path}"
|
||||
if read_only:
|
||||
mount_spec += ":ro"
|
||||
return ["-v", mount_spec]
|
||||
|
||||
|
||||
class LocalContainerBackend(SandboxBackend):
|
||||
"""Backend that manages sandbox containers locally using Docker or Apple Container.
|
||||
|
||||
@@ -246,18 +266,26 @@ class LocalContainerBackend(SandboxBackend):
|
||||
|
||||
# Config-level volume mounts
|
||||
for mount in self._config_mounts:
|
||||
mount_spec = f"{mount.host_path}:{mount.container_path}"
|
||||
if mount.read_only:
|
||||
mount_spec += ":ro"
|
||||
cmd.extend(["-v", mount_spec])
|
||||
cmd.extend(
|
||||
_format_container_mount(
|
||||
self._runtime,
|
||||
mount.host_path,
|
||||
mount.container_path,
|
||||
mount.read_only,
|
||||
)
|
||||
)
|
||||
|
||||
# Extra mounts (thread-specific, skills, etc.)
|
||||
if extra_mounts:
|
||||
for host_path, container_path, read_only in extra_mounts:
|
||||
mount_spec = f"{host_path}:{container_path}"
|
||||
if read_only:
|
||||
mount_spec += ":ro"
|
||||
cmd.extend(["-v", mount_spec])
|
||||
cmd.extend(
|
||||
_format_container_mount(
|
||||
self._runtime,
|
||||
host_path,
|
||||
container_path,
|
||||
read_only,
|
||||
)
|
||||
)
|
||||
|
||||
cmd.append(self._image)
|
||||
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import os
|
||||
import re
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
from pathlib import Path, PureWindowsPath
|
||||
|
||||
# Virtual path prefix seen by agents inside the sandbox
|
||||
VIRTUAL_PATH_PREFIX = "/mnt/user-data"
|
||||
@@ -9,6 +9,41 @@ VIRTUAL_PATH_PREFIX = "/mnt/user-data"
|
||||
_SAFE_THREAD_ID_RE = re.compile(r"^[A-Za-z0-9_\-]+$")
|
||||
|
||||
|
||||
def _validate_thread_id(thread_id: str) -> str:
|
||||
"""Validate a thread ID before using it in filesystem paths."""
|
||||
if not _SAFE_THREAD_ID_RE.match(thread_id):
|
||||
raise ValueError(f"Invalid thread_id {thread_id!r}: only alphanumeric characters, hyphens, and underscores are allowed.")
|
||||
return thread_id
|
||||
|
||||
|
||||
def _join_host_path(base: str, *parts: str) -> str:
|
||||
"""Join host filesystem path segments while preserving native style.
|
||||
|
||||
Docker Desktop on Windows expects bind mount sources to stay in Windows
|
||||
path form (for example ``C:\\repo\\backend\\.deer-flow``). Using
|
||||
``Path(base) / ...`` on a POSIX host can accidentally rewrite those paths
|
||||
with mixed separators, so this helper preserves the original style.
|
||||
"""
|
||||
if not parts:
|
||||
return base
|
||||
|
||||
if re.match(r"^[A-Za-z]:[\\/]", base) or base.startswith("\\\\") or "\\" in base:
|
||||
result = PureWindowsPath(base)
|
||||
for part in parts:
|
||||
result /= part
|
||||
return str(result)
|
||||
|
||||
result = Path(base)
|
||||
for part in parts:
|
||||
result /= part
|
||||
return str(result)
|
||||
|
||||
|
||||
def join_host_path(base: str, *parts: str) -> str:
|
||||
"""Join host filesystem path segments while preserving native style."""
|
||||
return _join_host_path(base, *parts)
|
||||
|
||||
|
||||
class Paths:
|
||||
"""
|
||||
Centralized path configuration for DeerFlow application data.
|
||||
@@ -54,6 +89,12 @@ class Paths:
|
||||
return Path(env)
|
||||
return self.base_dir
|
||||
|
||||
def _host_base_dir_str(self) -> str:
|
||||
"""Return the host base dir as a raw string for bind mounts."""
|
||||
if env := os.getenv("DEER_FLOW_HOST_BASE_DIR"):
|
||||
return env
|
||||
return str(self.base_dir)
|
||||
|
||||
@property
|
||||
def base_dir(self) -> Path:
|
||||
"""Root directory for all application data."""
|
||||
@@ -103,9 +144,7 @@ class Paths:
|
||||
ValueError: If `thread_id` contains unsafe characters (path separators
|
||||
or `..`) that could cause directory traversal.
|
||||
"""
|
||||
if not _SAFE_THREAD_ID_RE.match(thread_id):
|
||||
raise ValueError(f"Invalid thread_id {thread_id!r}: only alphanumeric characters, hyphens, and underscores are allowed.")
|
||||
return self.base_dir / "threads" / thread_id
|
||||
return self.base_dir / "threads" / _validate_thread_id(thread_id)
|
||||
|
||||
def sandbox_work_dir(self, thread_id: str) -> Path:
|
||||
"""
|
||||
@@ -150,6 +189,30 @@ class Paths:
|
||||
"""
|
||||
return self.thread_dir(thread_id) / "user-data"
|
||||
|
||||
def host_thread_dir(self, thread_id: str) -> str:
|
||||
"""Host path for a thread directory, preserving Windows path syntax."""
|
||||
return _join_host_path(self._host_base_dir_str(), "threads", _validate_thread_id(thread_id))
|
||||
|
||||
def host_sandbox_user_data_dir(self, thread_id: str) -> str:
|
||||
"""Host path for a thread's user-data root."""
|
||||
return _join_host_path(self.host_thread_dir(thread_id), "user-data")
|
||||
|
||||
def host_sandbox_work_dir(self, thread_id: str) -> str:
|
||||
"""Host path for the workspace mount source."""
|
||||
return _join_host_path(self.host_sandbox_user_data_dir(thread_id), "workspace")
|
||||
|
||||
def host_sandbox_uploads_dir(self, thread_id: str) -> str:
|
||||
"""Host path for the uploads mount source."""
|
||||
return _join_host_path(self.host_sandbox_user_data_dir(thread_id), "uploads")
|
||||
|
||||
def host_sandbox_outputs_dir(self, thread_id: str) -> str:
|
||||
"""Host path for the outputs mount source."""
|
||||
return _join_host_path(self.host_sandbox_user_data_dir(thread_id), "outputs")
|
||||
|
||||
def host_acp_workspace_dir(self, thread_id: str) -> str:
|
||||
"""Host path for the ACP workspace mount source."""
|
||||
return _join_host_path(self.host_thread_dir(thread_id), "acp-workspace")
|
||||
|
||||
def ensure_thread_dirs(self, thread_id: str) -> None:
|
||||
"""Create all standard sandbox directories for a thread.
|
||||
|
||||
|
||||
@@ -81,11 +81,9 @@ class RunManager:
|
||||
async def list_by_thread(self, thread_id: str) -> list[RunRecord]:
|
||||
"""Return all runs for a given thread, newest first."""
|
||||
async with self._lock:
|
||||
return sorted(
|
||||
(r for r in self._runs.values() if r.thread_id == thread_id),
|
||||
key=lambda r: r.created_at,
|
||||
reverse=True,
|
||||
)
|
||||
# Dict insertion order matches creation order, so reversing it gives
|
||||
# us deterministic newest-first results even when timestamps tie.
|
||||
return [r for r in reversed(self._runs.values()) if r.thread_id == thread_id]
|
||||
|
||||
async def set_status(self, run_id: str, status: RunStatus, *, error: str | None = None) -> None:
|
||||
"""Transition a run to a new status."""
|
||||
|
||||
@@ -37,6 +37,7 @@ You have access to the sandbox environment:
|
||||
- User uploads: `/mnt/user-data/uploads`
|
||||
- User workspace: `/mnt/user-data/workspace`
|
||||
- Output files: `/mnt/user-data/outputs`
|
||||
- Deployment-configured custom mounts may also be available at other absolute container paths; use them directly when the task references those mounted directories
|
||||
</working_directory>
|
||||
""",
|
||||
tools=["bash", "ls", "read_file", "write_file", "str_replace"], # Sandbox tools only
|
||||
|
||||
@@ -38,6 +38,7 @@ You have access to the same sandbox environment as the parent agent:
|
||||
- User uploads: `/mnt/user-data/uploads`
|
||||
- User workspace: `/mnt/user-data/workspace`
|
||||
- Output files: `/mnt/user-data/outputs`
|
||||
- Deployment-configured custom mounts may also be available at other absolute container paths; use them directly when the task references those mounted directories
|
||||
</working_directory>
|
||||
""",
|
||||
tools=None, # Inherit all tools from parent
|
||||
|
||||
@@ -9,7 +9,6 @@ from langgraph.types import Command
|
||||
from langgraph.typing import ContextT
|
||||
|
||||
from deerflow.agents.thread_state import ThreadState
|
||||
from deerflow.sandbox.tools import get_thread_data, replace_virtual_path
|
||||
|
||||
|
||||
@tool("view_image", parse_docstring=True)
|
||||
@@ -32,6 +31,8 @@ def view_image_tool(
|
||||
Args:
|
||||
image_path: Absolute path to the image file. Common formats supported: jpg, jpeg, png, webp.
|
||||
"""
|
||||
from deerflow.sandbox.tools import get_thread_data, replace_virtual_path
|
||||
|
||||
# Replace virtual path with actual path
|
||||
# /mnt/user-data/* paths are mapped to thread-specific directories
|
||||
thread_data = get_thread_data(runtime)
|
||||
|
||||
@@ -0,0 +1,133 @@
|
||||
"""Tests for AioSandbox concurrent command serialization (#1433)."""
|
||||
|
||||
import threading
|
||||
from types import SimpleNamespace
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def sandbox():
|
||||
"""Create an AioSandbox with a mocked client."""
|
||||
with patch("deerflow.community.aio_sandbox.aio_sandbox.AioSandboxClient"):
|
||||
from deerflow.community.aio_sandbox.aio_sandbox import AioSandbox
|
||||
|
||||
sb = AioSandbox(id="test-sandbox", base_url="http://localhost:8080")
|
||||
return sb
|
||||
|
||||
|
||||
class TestExecuteCommandSerialization:
|
||||
"""Verify that concurrent exec_command calls are serialized."""
|
||||
|
||||
def test_lock_prevents_concurrent_execution(self, sandbox):
|
||||
"""Concurrent threads should not overlap inside execute_command."""
|
||||
call_log = []
|
||||
barrier = threading.Barrier(3)
|
||||
|
||||
def slow_exec(command, **kwargs):
|
||||
call_log.append(("enter", command))
|
||||
import time
|
||||
|
||||
time.sleep(0.05)
|
||||
call_log.append(("exit", command))
|
||||
return SimpleNamespace(data=SimpleNamespace(output=f"ok: {command}"))
|
||||
|
||||
sandbox._client.shell.exec_command = slow_exec
|
||||
|
||||
def worker(cmd):
|
||||
barrier.wait() # ensure all threads contend for the lock simultaneously
|
||||
sandbox.execute_command(cmd)
|
||||
|
||||
threads = []
|
||||
for i in range(3):
|
||||
t = threading.Thread(target=worker, args=(f"cmd-{i}",))
|
||||
threads.append(t)
|
||||
|
||||
for t in threads:
|
||||
t.start()
|
||||
for t in threads:
|
||||
t.join()
|
||||
|
||||
# Verify serialization: each "enter" should be followed by its own
|
||||
# "exit" before the next "enter" (no interleaving).
|
||||
enters = [i for i, (action, _) in enumerate(call_log) if action == "enter"]
|
||||
exits = [i for i, (action, _) in enumerate(call_log) if action == "exit"]
|
||||
assert len(enters) == 3
|
||||
assert len(exits) == 3
|
||||
for e_idx, x_idx in zip(enters, exits):
|
||||
assert x_idx == e_idx + 1, f"Interleaved execution detected: {call_log}"
|
||||
|
||||
|
||||
class TestErrorObservationRetry:
|
||||
"""Verify ErrorObservation detection and fresh-session retry."""
|
||||
|
||||
def test_retry_on_error_observation(self, sandbox):
|
||||
"""When output contains ErrorObservation, retry with a fresh session."""
|
||||
call_count = 0
|
||||
|
||||
def mock_exec(command, **kwargs):
|
||||
nonlocal call_count
|
||||
call_count += 1
|
||||
if call_count == 1:
|
||||
return SimpleNamespace(data=SimpleNamespace(output="'ErrorObservation' object has no attribute 'exit_code'"))
|
||||
return SimpleNamespace(data=SimpleNamespace(output="success"))
|
||||
|
||||
sandbox._client.shell.exec_command = mock_exec
|
||||
|
||||
result = sandbox.execute_command("echo hello")
|
||||
assert result == "success"
|
||||
assert call_count == 2
|
||||
|
||||
def test_retry_passes_fresh_session_id(self, sandbox):
|
||||
"""The retry call should include a new session id kwarg."""
|
||||
calls = []
|
||||
|
||||
def mock_exec(command, **kwargs):
|
||||
calls.append(kwargs)
|
||||
if len(calls) == 1:
|
||||
return SimpleNamespace(data=SimpleNamespace(output="'ErrorObservation' object has no attribute 'exit_code'"))
|
||||
return SimpleNamespace(data=SimpleNamespace(output="ok"))
|
||||
|
||||
sandbox._client.shell.exec_command = mock_exec
|
||||
|
||||
sandbox.execute_command("test")
|
||||
assert len(calls) == 2
|
||||
assert "id" not in calls[0]
|
||||
assert "id" in calls[1]
|
||||
assert len(calls[1]["id"]) == 36 # UUID format
|
||||
|
||||
def test_no_retry_on_clean_output(self, sandbox):
|
||||
"""Normal output should not trigger a retry."""
|
||||
call_count = 0
|
||||
|
||||
def mock_exec(command, **kwargs):
|
||||
nonlocal call_count
|
||||
call_count += 1
|
||||
return SimpleNamespace(data=SimpleNamespace(output="all good"))
|
||||
|
||||
sandbox._client.shell.exec_command = mock_exec
|
||||
|
||||
result = sandbox.execute_command("echo hello")
|
||||
assert result == "all good"
|
||||
assert call_count == 1
|
||||
|
||||
|
||||
class TestListDirSerialization:
|
||||
"""Verify that list_dir also acquires the lock."""
|
||||
|
||||
def test_list_dir_uses_lock(self, sandbox):
|
||||
"""list_dir should hold the lock during execution."""
|
||||
lock_was_held = []
|
||||
|
||||
original_exec = MagicMock(return_value=SimpleNamespace(data=SimpleNamespace(output="/a\n/b")))
|
||||
|
||||
def tracking_exec(command, **kwargs):
|
||||
lock_was_held.append(sandbox._lock.locked())
|
||||
return original_exec(command, **kwargs)
|
||||
|
||||
sandbox._client.shell.exec_command = tracking_exec
|
||||
|
||||
result = sandbox.list_dir("/test")
|
||||
assert result == ["/a", "/b"]
|
||||
assert lock_was_held == [True], "list_dir must hold the lock during exec_command"
|
||||
@@ -0,0 +1,28 @@
|
||||
from deerflow.community.aio_sandbox.local_backend import _format_container_mount
|
||||
|
||||
|
||||
def test_format_container_mount_uses_mount_syntax_for_docker_windows_paths():
|
||||
args = _format_container_mount("docker", "D:/deer-flow/backend/.deer-flow/threads", "/mnt/threads", False)
|
||||
|
||||
assert args == [
|
||||
"--mount",
|
||||
"type=bind,src=D:/deer-flow/backend/.deer-flow/threads,dst=/mnt/threads",
|
||||
]
|
||||
|
||||
|
||||
def test_format_container_mount_marks_docker_readonly_mounts():
|
||||
args = _format_container_mount("docker", "/host/path", "/mnt/path", True)
|
||||
|
||||
assert args == [
|
||||
"--mount",
|
||||
"type=bind,src=/host/path,dst=/mnt/path,readonly",
|
||||
]
|
||||
|
||||
|
||||
def test_format_container_mount_keeps_volume_syntax_for_apple_container():
|
||||
args = _format_container_mount("container", "/host/path", "/mnt/path", True)
|
||||
|
||||
assert args == [
|
||||
"-v",
|
||||
"/host/path:/mnt/path:ro",
|
||||
]
|
||||
@@ -5,7 +5,7 @@ from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from deerflow.config.paths import Paths
|
||||
from deerflow.config.paths import Paths, join_host_path
|
||||
|
||||
# ── ensure_thread_dirs ───────────────────────────────────────────────────────
|
||||
|
||||
@@ -31,6 +31,13 @@ def test_ensure_thread_dirs_acp_workspace_is_world_writable(tmp_path):
|
||||
assert mode == oct(0o777)
|
||||
|
||||
|
||||
def test_host_thread_dir_rejects_invalid_thread_id(tmp_path):
|
||||
paths = Paths(base_dir=tmp_path)
|
||||
|
||||
with pytest.raises(ValueError, match="Invalid thread_id"):
|
||||
paths.host_thread_dir("../escape")
|
||||
|
||||
|
||||
# ── _get_thread_mounts ───────────────────────────────────────────────────────
|
||||
|
||||
|
||||
@@ -75,6 +82,30 @@ def test_get_thread_mounts_includes_user_data_dirs(tmp_path, monkeypatch):
|
||||
assert "/mnt/user-data/outputs" in container_paths
|
||||
|
||||
|
||||
def test_join_host_path_preserves_windows_drive_letter_style():
|
||||
base = r"C:\Users\demo\deer-flow\backend\.deer-flow"
|
||||
|
||||
joined = join_host_path(base, "threads", "thread-9", "user-data", "outputs")
|
||||
|
||||
assert joined == r"C:\Users\demo\deer-flow\backend\.deer-flow\threads\thread-9\user-data\outputs"
|
||||
|
||||
|
||||
def test_get_thread_mounts_preserves_windows_host_path_style(tmp_path, monkeypatch):
|
||||
"""Docker bind mount sources must keep Windows-style paths intact."""
|
||||
aio_mod = importlib.import_module("deerflow.community.aio_sandbox.aio_sandbox_provider")
|
||||
monkeypatch.setenv("DEER_FLOW_HOST_BASE_DIR", r"C:\Users\demo\deer-flow\backend\.deer-flow")
|
||||
monkeypatch.setattr(aio_mod, "get_paths", lambda: Paths(base_dir=tmp_path))
|
||||
|
||||
mounts = aio_mod.AioSandboxProvider._get_thread_mounts("thread-10")
|
||||
|
||||
container_paths = {container_path: host_path for host_path, container_path, _ in mounts}
|
||||
|
||||
assert container_paths["/mnt/user-data/workspace"] == r"C:\Users\demo\deer-flow\backend\.deer-flow\threads\thread-10\user-data\workspace"
|
||||
assert container_paths["/mnt/user-data/uploads"] == r"C:\Users\demo\deer-flow\backend\.deer-flow\threads\thread-10\user-data\uploads"
|
||||
assert container_paths["/mnt/user-data/outputs"] == r"C:\Users\demo\deer-flow\backend\.deer-flow\threads\thread-10\user-data\outputs"
|
||||
assert container_paths["/mnt/acp-workspace"] == r"C:\Users\demo\deer-flow\backend\.deer-flow\threads\thread-10\acp-workspace"
|
||||
|
||||
|
||||
def test_discover_or_create_only_unlocks_when_lock_succeeds(tmp_path, monkeypatch):
|
||||
"""Unlock should not run if exclusive locking itself fails."""
|
||||
aio_mod = importlib.import_module("deerflow.community.aio_sandbox.aio_sandbox_provider")
|
||||
|
||||
@@ -11,9 +11,16 @@ import pytest
|
||||
|
||||
REPO_ROOT = Path(__file__).resolve().parents[2]
|
||||
SCRIPT_PATH = REPO_ROOT / "scripts" / "docker.sh"
|
||||
BASH_EXECUTABLE = which("bash") or r"C:\Program Files\Git\bin\bash.exe"
|
||||
BASH_CANDIDATES = [
|
||||
Path(r"C:\Program Files\Git\bin\bash.exe"),
|
||||
Path(which("bash")) if which("bash") else None,
|
||||
]
|
||||
BASH_EXECUTABLE = next(
|
||||
(str(path) for path in BASH_CANDIDATES if path is not None and path.exists() and "WindowsApps" not in str(path)),
|
||||
None,
|
||||
)
|
||||
|
||||
if not Path(BASH_EXECUTABLE).exists():
|
||||
if BASH_EXECUTABLE is None:
|
||||
pytestmark = pytest.mark.skip(reason="bash is required for docker.sh detection tests")
|
||||
|
||||
|
||||
@@ -21,13 +28,14 @@ def _detect_mode_with_config(config_content: str) -> str:
|
||||
"""Write config content into a temp project root and execute detect_sandbox_mode."""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
tmp_root = Path(tmpdir)
|
||||
(tmp_root / "config.yaml").write_text(config_content)
|
||||
(tmp_root / "config.yaml").write_text(config_content, encoding="utf-8")
|
||||
|
||||
command = f"source '{SCRIPT_PATH}' && PROJECT_ROOT='{tmp_root}' && detect_sandbox_mode"
|
||||
|
||||
output = subprocess.check_output(
|
||||
[BASH_EXECUTABLE, "-lc", command],
|
||||
text=True,
|
||||
encoding="utf-8",
|
||||
).strip()
|
||||
|
||||
return output
|
||||
@@ -37,7 +45,11 @@ def test_detect_mode_defaults_to_local_when_config_missing():
|
||||
"""No config file should default to local mode."""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
command = f"source '{SCRIPT_PATH}' && PROJECT_ROOT='{tmpdir}' && detect_sandbox_mode"
|
||||
output = subprocess.check_output([BASH_EXECUTABLE, "-lc", command], text=True).strip()
|
||||
output = subprocess.check_output(
|
||||
[BASH_EXECUTABLE, "-lc", command],
|
||||
text=True,
|
||||
encoding="utf-8",
|
||||
).strip()
|
||||
|
||||
assert output == "local"
|
||||
|
||||
|
||||
@@ -100,3 +100,187 @@ def test_build_run_config_with_overrides():
|
||||
assert config["configurable"]["model_name"] == "gpt-4"
|
||||
assert config["tags"] == ["test"]
|
||||
assert config["metadata"]["user"] == "alice"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Regression tests for issue #1644:
|
||||
# assistant_id not mapped to agent_name → custom agent SOUL.md never loaded
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_build_run_config_custom_agent_injects_agent_name():
|
||||
"""Custom assistant_id must be forwarded as configurable['agent_name'].
|
||||
|
||||
Regression test for #1644: when the LangGraph Platform-compatible
|
||||
/runs endpoint receives a custom assistant_id (e.g. 'finalis'), the
|
||||
Gateway must inject configurable['agent_name'] so that make_lead_agent
|
||||
loads the correct agents/finalis/SOUL.md.
|
||||
"""
|
||||
from app.gateway.services import build_run_config
|
||||
|
||||
config = build_run_config("thread-1", None, None, assistant_id="finalis")
|
||||
assert config["configurable"]["agent_name"] == "finalis", "Custom assistant_id must be forwarded as configurable['agent_name'] so that make_lead_agent loads the correct SOUL.md"
|
||||
|
||||
|
||||
def test_build_run_config_lead_agent_no_agent_name():
|
||||
"""'lead_agent' assistant_id must NOT inject configurable['agent_name']."""
|
||||
from app.gateway.services import build_run_config
|
||||
|
||||
config = build_run_config("thread-1", None, None, assistant_id="lead_agent")
|
||||
assert "agent_name" not in config["configurable"]
|
||||
|
||||
|
||||
def test_build_run_config_none_assistant_id_no_agent_name():
|
||||
"""None assistant_id must NOT inject configurable['agent_name']."""
|
||||
from app.gateway.services import build_run_config
|
||||
|
||||
config = build_run_config("thread-1", None, None, assistant_id=None)
|
||||
assert "agent_name" not in config["configurable"]
|
||||
|
||||
|
||||
def test_build_run_config_explicit_agent_name_not_overwritten():
|
||||
"""An explicit configurable['agent_name'] in the request must take precedence."""
|
||||
from app.gateway.services import build_run_config
|
||||
|
||||
config = build_run_config(
|
||||
"thread-1",
|
||||
{"configurable": {"agent_name": "explicit-agent"}},
|
||||
None,
|
||||
assistant_id="other-agent",
|
||||
)
|
||||
assert config["configurable"]["agent_name"] == "explicit-agent", "An explicit configurable['agent_name'] in the request body must not be overwritten by the assistant_id mapping"
|
||||
|
||||
|
||||
def test_resolve_agent_factory_returns_make_lead_agent():
|
||||
"""resolve_agent_factory always returns make_lead_agent regardless of assistant_id."""
|
||||
from app.gateway.services import resolve_agent_factory
|
||||
from deerflow.agents.lead_agent.agent import make_lead_agent
|
||||
|
||||
assert resolve_agent_factory(None) is make_lead_agent
|
||||
assert resolve_agent_factory("lead_agent") is make_lead_agent
|
||||
assert resolve_agent_factory("finalis") is make_lead_agent
|
||||
assert resolve_agent_factory("custom-agent-123") is make_lead_agent
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Regression tests for issue #1699:
|
||||
# context field in langgraph-compat requests not merged into configurable
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_run_create_request_accepts_context():
|
||||
"""RunCreateRequest must accept the ``context`` field without dropping it."""
|
||||
from app.gateway.routers.thread_runs import RunCreateRequest
|
||||
|
||||
body = RunCreateRequest(
|
||||
input={"messages": [{"role": "user", "content": "hi"}]},
|
||||
context={
|
||||
"model_name": "deepseek-v3",
|
||||
"thinking_enabled": True,
|
||||
"is_plan_mode": True,
|
||||
"subagent_enabled": True,
|
||||
"thread_id": "some-thread-id",
|
||||
},
|
||||
)
|
||||
assert body.context is not None
|
||||
assert body.context["model_name"] == "deepseek-v3"
|
||||
assert body.context["is_plan_mode"] is True
|
||||
assert body.context["subagent_enabled"] is True
|
||||
|
||||
|
||||
def test_run_create_request_context_defaults_to_none():
|
||||
"""RunCreateRequest without context should default to None (backward compat)."""
|
||||
from app.gateway.routers.thread_runs import RunCreateRequest
|
||||
|
||||
body = RunCreateRequest(input=None)
|
||||
assert body.context is None
|
||||
|
||||
|
||||
def test_context_merges_into_configurable():
|
||||
"""Context values must be merged into config['configurable'] by start_run.
|
||||
|
||||
Since start_run is async and requires many dependencies, we test the
|
||||
merging logic directly by simulating what start_run does.
|
||||
"""
|
||||
from app.gateway.services import build_run_config
|
||||
|
||||
# Simulate the context merging logic from start_run
|
||||
config = build_run_config("thread-1", None, None)
|
||||
|
||||
context = {
|
||||
"model_name": "deepseek-v3",
|
||||
"mode": "ultra",
|
||||
"reasoning_effort": "high",
|
||||
"thinking_enabled": True,
|
||||
"is_plan_mode": True,
|
||||
"subagent_enabled": True,
|
||||
"max_concurrent_subagents": 5,
|
||||
"thread_id": "should-be-ignored",
|
||||
}
|
||||
|
||||
_CONTEXT_CONFIGURABLE_KEYS = {
|
||||
"model_name",
|
||||
"mode",
|
||||
"thinking_enabled",
|
||||
"reasoning_effort",
|
||||
"is_plan_mode",
|
||||
"subagent_enabled",
|
||||
"max_concurrent_subagents",
|
||||
}
|
||||
configurable = config.setdefault("configurable", {})
|
||||
for key in _CONTEXT_CONFIGURABLE_KEYS:
|
||||
if key in context:
|
||||
configurable.setdefault(key, context[key])
|
||||
|
||||
assert config["configurable"]["model_name"] == "deepseek-v3"
|
||||
assert config["configurable"]["thinking_enabled"] is True
|
||||
assert config["configurable"]["is_plan_mode"] is True
|
||||
assert config["configurable"]["subagent_enabled"] is True
|
||||
assert config["configurable"]["max_concurrent_subagents"] == 5
|
||||
assert config["configurable"]["reasoning_effort"] == "high"
|
||||
assert config["configurable"]["mode"] == "ultra"
|
||||
# thread_id from context should NOT override the one from build_run_config
|
||||
assert config["configurable"]["thread_id"] == "thread-1"
|
||||
# Non-allowlisted keys should not appear
|
||||
assert "thread_id" not in {k for k in context if k in _CONTEXT_CONFIGURABLE_KEYS}
|
||||
|
||||
|
||||
def test_context_does_not_override_existing_configurable():
|
||||
"""Values already in config.configurable must NOT be overridden by context.
|
||||
|
||||
This ensures that explicit configurable values from the ``config`` field
|
||||
take precedence over the ``context`` field.
|
||||
"""
|
||||
from app.gateway.services import build_run_config
|
||||
|
||||
config = build_run_config(
|
||||
"thread-1",
|
||||
{"configurable": {"model_name": "gpt-4", "is_plan_mode": False}},
|
||||
None,
|
||||
)
|
||||
|
||||
context = {
|
||||
"model_name": "deepseek-v3",
|
||||
"is_plan_mode": True,
|
||||
"subagent_enabled": True,
|
||||
}
|
||||
|
||||
_CONTEXT_CONFIGURABLE_KEYS = {
|
||||
"model_name",
|
||||
"mode",
|
||||
"thinking_enabled",
|
||||
"reasoning_effort",
|
||||
"is_plan_mode",
|
||||
"subagent_enabled",
|
||||
"max_concurrent_subagents",
|
||||
}
|
||||
configurable = config.setdefault("configurable", {})
|
||||
for key in _CONTEXT_CONFIGURABLE_KEYS:
|
||||
if key in context:
|
||||
configurable.setdefault(key, context[key])
|
||||
|
||||
# Existing values must NOT be overridden
|
||||
assert config["configurable"]["model_name"] == "gpt-4"
|
||||
assert config["configurable"]["is_plan_mode"] is False
|
||||
# New values should be added
|
||||
assert config["configurable"]["subagent_enabled"] is True
|
||||
|
||||
@@ -0,0 +1,46 @@
|
||||
from types import SimpleNamespace
|
||||
|
||||
from deerflow.agents.lead_agent import prompt as prompt_module
|
||||
|
||||
|
||||
def test_build_custom_mounts_section_returns_empty_when_no_mounts(monkeypatch):
|
||||
config = SimpleNamespace(sandbox=SimpleNamespace(mounts=[]))
|
||||
monkeypatch.setattr("deerflow.config.get_app_config", lambda: config)
|
||||
|
||||
assert prompt_module._build_custom_mounts_section() == ""
|
||||
|
||||
|
||||
def test_build_custom_mounts_section_lists_configured_mounts(monkeypatch):
|
||||
mounts = [
|
||||
SimpleNamespace(container_path="/home/user/shared", read_only=False),
|
||||
SimpleNamespace(container_path="/mnt/reference", read_only=True),
|
||||
]
|
||||
config = SimpleNamespace(sandbox=SimpleNamespace(mounts=mounts))
|
||||
monkeypatch.setattr("deerflow.config.get_app_config", lambda: config)
|
||||
|
||||
section = prompt_module._build_custom_mounts_section()
|
||||
|
||||
assert "**Custom Mounted Directories:**" in section
|
||||
assert "`/home/user/shared`" in section
|
||||
assert "read-write" in section
|
||||
assert "`/mnt/reference`" in section
|
||||
assert "read-only" in section
|
||||
|
||||
|
||||
def test_apply_prompt_template_includes_custom_mounts(monkeypatch):
|
||||
mounts = [SimpleNamespace(container_path="/home/user/shared", read_only=False)]
|
||||
config = SimpleNamespace(
|
||||
sandbox=SimpleNamespace(mounts=mounts),
|
||||
skills=SimpleNamespace(container_path="/mnt/skills"),
|
||||
)
|
||||
monkeypatch.setattr("deerflow.config.get_app_config", lambda: config)
|
||||
monkeypatch.setattr(prompt_module, "load_skills", lambda enabled_only=True: [])
|
||||
monkeypatch.setattr(prompt_module, "get_deferred_tools_prompt_section", lambda: "")
|
||||
monkeypatch.setattr(prompt_module, "_build_acp_section", lambda: "")
|
||||
monkeypatch.setattr(prompt_module, "_get_memory_context", lambda agent_name=None: "")
|
||||
monkeypatch.setattr(prompt_module, "get_agent_soul", lambda agent_name=None: "")
|
||||
|
||||
prompt = prompt_module.apply_prompt_template()
|
||||
|
||||
assert "`/home/user/shared`" in prompt
|
||||
assert "Custom Mounted Directories" in prompt
|
||||
@@ -86,6 +86,18 @@ async def test_list_by_thread(manager: RunManager):
|
||||
assert runs[1].run_id == r1.run_id
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_list_by_thread_is_stable_when_timestamps_tie(manager: RunManager, monkeypatch: pytest.MonkeyPatch):
|
||||
"""Newest-first ordering should not depend on timestamp precision."""
|
||||
monkeypatch.setattr("deerflow.runtime.runs.manager._now_iso", lambda: "2026-01-01T00:00:00+00:00")
|
||||
|
||||
r1 = await manager.create("thread-1")
|
||||
r2 = await manager.create("thread-1")
|
||||
|
||||
runs = await manager.list_by_thread("thread-1")
|
||||
assert [run.run_id for run in runs] == [r2.run_id, r1.run_id]
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_has_inflight(manager: RunManager):
|
||||
"""has_inflight should be True when a run is pending or running."""
|
||||
|
||||
@@ -397,6 +397,9 @@ sandbox:
|
||||
# # - host_path: /path/on/host
|
||||
# # container_path: /home/user/shared
|
||||
# # read_only: false
|
||||
# #
|
||||
# # # DeerFlow will surface configured container_path values to the agent,
|
||||
# # # so it can directly read/write mounted directories such as /home/user/shared
|
||||
#
|
||||
# # Optional: Environment variables to inject into the sandbox container
|
||||
# # Values starting with $ will be resolved from host environment variables
|
||||
|
||||
@@ -121,8 +121,8 @@ services:
|
||||
container_name: deer-flow-langgraph
|
||||
command: sh -c "cd /app/backend && uv run langgraph dev --no-browser --allow-blocking --no-reload --host 0.0.0.0 --port 2024 --n-jobs-per-worker 10"
|
||||
volumes:
|
||||
- ${DEER_FLOW_CONFIG_PATH}:/app/config.yaml:ro
|
||||
- ${DEER_FLOW_EXTENSIONS_CONFIG_PATH}:/app/extensions_config.json:ro
|
||||
- ${DEER_FLOW_CONFIG_PATH}:/app/backend/config.yaml:ro
|
||||
- ${DEER_FLOW_EXTENSIONS_CONFIG_PATH}:/app/backend/extensions_config.json:ro
|
||||
- ${DEER_FLOW_HOME}:/app/backend/.deer-flow
|
||||
- ../skills:/app/skills:ro
|
||||
- ../backend/.langgraph_api:/app/backend/.langgraph_api
|
||||
@@ -144,14 +144,12 @@ services:
|
||||
environment:
|
||||
- CI=true
|
||||
- DEER_FLOW_HOME=/app/backend/.deer-flow
|
||||
- DEER_FLOW_CONFIG_PATH=/app/config.yaml
|
||||
- DEER_FLOW_EXTENSIONS_CONFIG_PATH=/app/extensions_config.json
|
||||
- DEER_FLOW_CONFIG_PATH=/app/backend/config.yaml
|
||||
- DEER_FLOW_EXTENSIONS_CONFIG_PATH=/app/backend/extensions_config.json
|
||||
- DEER_FLOW_HOST_BASE_DIR=${DEER_FLOW_HOME}
|
||||
- DEER_FLOW_HOST_SKILLS_PATH=${DEER_FLOW_REPO_ROOT}/skills
|
||||
- DEER_FLOW_SANDBOX_HOST=host.docker.internal
|
||||
# Disable LangSmith tracing — LANGSMITH_API_KEY is not required.
|
||||
# Set LANGSMITH_TRACING=true and LANGSMITH_API_KEY in .env to enable.
|
||||
- LANGSMITH_TRACING=${LANGSMITH_TRACING:-false}
|
||||
# LangSmith tracing: set LANGSMITH_TRACING=true and LANGSMITH_API_KEY in .env to enable.
|
||||
env_file:
|
||||
- ../.env
|
||||
extra_hosts:
|
||||
|
||||
@@ -31,6 +31,7 @@ from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import time
|
||||
from contextlib import asynccontextmanager
|
||||
|
||||
@@ -39,7 +40,7 @@ from fastapi import FastAPI, HTTPException
|
||||
from kubernetes import client as k8s_client
|
||||
from kubernetes import config as k8s_config
|
||||
from kubernetes.client.rest import ApiException
|
||||
from pydantic import BaseModel
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
# Suppress only the InsecureRequestWarning from urllib3
|
||||
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
|
||||
@@ -59,6 +60,7 @@ SANDBOX_IMAGE = os.environ.get(
|
||||
)
|
||||
SKILLS_HOST_PATH = os.environ.get("SKILLS_HOST_PATH", "/skills")
|
||||
THREADS_HOST_PATH = os.environ.get("THREADS_HOST_PATH", "/.deer-flow/threads")
|
||||
SAFE_THREAD_ID_PATTERN = r"^[A-Za-z0-9_\-]+$"
|
||||
|
||||
# Path to the kubeconfig *inside* the provisioner container.
|
||||
# Typically the host's ~/.kube/config is mounted here.
|
||||
@@ -69,6 +71,36 @@ KUBECONFIG_PATH = os.environ.get("KUBECONFIG_PATH", "/root/.kube/config")
|
||||
# is ``host.docker.internal``; on Linux it may be the host's LAN IP.
|
||||
NODE_HOST = os.environ.get("NODE_HOST", "host.docker.internal")
|
||||
|
||||
|
||||
def join_host_path(base: str, *parts: str) -> str:
|
||||
"""Join host filesystem path segments while preserving native style."""
|
||||
if not parts:
|
||||
return base
|
||||
|
||||
if re.match(r"^[A-Za-z]:[\\/]", base) or base.startswith("\\\\") or "\\" in base:
|
||||
from pathlib import PureWindowsPath
|
||||
|
||||
result = PureWindowsPath(base)
|
||||
for part in parts:
|
||||
result /= part
|
||||
return str(result)
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
result = Path(base)
|
||||
for part in parts:
|
||||
result /= part
|
||||
return str(result)
|
||||
|
||||
|
||||
def _validate_thread_id(thread_id: str) -> str:
|
||||
if not re.match(SAFE_THREAD_ID_PATTERN, thread_id):
|
||||
raise ValueError(
|
||||
"Invalid thread_id: only alphanumeric characters, hyphens, and underscores are allowed."
|
||||
)
|
||||
return thread_id
|
||||
|
||||
|
||||
# ── K8s client setup ────────────────────────────────────────────────────
|
||||
|
||||
core_v1: k8s_client.CoreV1Api | None = None
|
||||
@@ -186,7 +218,7 @@ app = FastAPI(title="DeerFlow Sandbox Provisioner", lifespan=lifespan)
|
||||
|
||||
class CreateSandboxRequest(BaseModel):
|
||||
sandbox_id: str
|
||||
thread_id: str
|
||||
thread_id: str = Field(pattern=SAFE_THREAD_ID_PATTERN)
|
||||
|
||||
|
||||
class SandboxResponse(BaseModel):
|
||||
@@ -213,6 +245,7 @@ def _sandbox_url(node_port: int) -> str:
|
||||
|
||||
def _build_pod(sandbox_id: str, thread_id: str) -> k8s_client.V1Pod:
|
||||
"""Construct a Pod manifest for a single sandbox."""
|
||||
thread_id = _validate_thread_id(thread_id)
|
||||
return k8s_client.V1Pod(
|
||||
metadata=k8s_client.V1ObjectMeta(
|
||||
name=_pod_name(sandbox_id),
|
||||
@@ -298,7 +331,7 @@ def _build_pod(sandbox_id: str, thread_id: str) -> k8s_client.V1Pod:
|
||||
k8s_client.V1Volume(
|
||||
name="user-data",
|
||||
host_path=k8s_client.V1HostPathVolumeSource(
|
||||
path=f"{THREADS_HOST_PATH}/{thread_id}/user-data",
|
||||
path=join_host_path(THREADS_HOST_PATH, thread_id, "user-data"),
|
||||
type="DirectoryOrCreate",
|
||||
),
|
||||
),
|
||||
|
||||
@@ -80,13 +80,9 @@ export default function NewAgentPage() {
|
||||
setNameError(t.agents.nameStepAlreadyExistsError);
|
||||
return;
|
||||
}
|
||||
} catch (error) {
|
||||
if (error instanceof AgentNameCheckError) {
|
||||
setNameError(
|
||||
error.reason === "backend_unreachable"
|
||||
? t.agents.nameStepCheckError
|
||||
: error.message,
|
||||
);
|
||||
} catch (err) {
|
||||
if (err instanceof TypeError && err.message === "Failed to fetch") {
|
||||
setNameError(t.agents.nameStepNetworkError);
|
||||
} else {
|
||||
setNameError(t.agents.nameStepCheckError);
|
||||
}
|
||||
@@ -107,6 +103,7 @@ export default function NewAgentPage() {
|
||||
t.agents.nameStepBootstrapMessage,
|
||||
t.agents.nameStepInvalidError,
|
||||
t.agents.nameStepAlreadyExistsError,
|
||||
t.agents.nameStepNetworkError,
|
||||
t.agents.nameStepCheckError,
|
||||
]);
|
||||
|
||||
|
||||
@@ -83,7 +83,7 @@ export function ArtifactFileDetail({
|
||||
const isSupportPreview = useMemo(() => {
|
||||
return language === "html" || language === "markdown";
|
||||
}, [language]);
|
||||
const { content } = useArtifactContent({
|
||||
const { content, url } = useArtifactContent({
|
||||
threadId,
|
||||
filepath: filepathFromProps,
|
||||
enabled: isCodeFile && !isWriteFile,
|
||||
@@ -240,7 +240,9 @@ export function ArtifactFileDetail({
|
||||
(language === "markdown" || language === "html") && (
|
||||
<ArtifactFilePreview
|
||||
content={displayContent}
|
||||
isWriteFile={isWriteFile}
|
||||
language={language ?? "text"}
|
||||
url={url}
|
||||
/>
|
||||
)}
|
||||
{isCodeFile && viewMode === "code" && (
|
||||
@@ -263,10 +265,14 @@ export function ArtifactFileDetail({
|
||||
|
||||
export function ArtifactFilePreview({
|
||||
content,
|
||||
isWriteFile,
|
||||
language,
|
||||
url,
|
||||
}: {
|
||||
content: string;
|
||||
isWriteFile: boolean;
|
||||
language: string;
|
||||
url?: string;
|
||||
}) {
|
||||
if (language === "markdown") {
|
||||
return (
|
||||
@@ -286,8 +292,8 @@ export function ArtifactFilePreview({
|
||||
<iframe
|
||||
className="size-full"
|
||||
title="Artifact preview"
|
||||
srcDoc={content}
|
||||
sandbox="allow-scripts allow-forms"
|
||||
{...(isWriteFile ? { srcDoc: content } : url ? { src: url } : {})}
|
||||
/>
|
||||
);
|
||||
}
|
||||
|
||||
@@ -34,5 +34,10 @@ export function useArtifactContent({
|
||||
// Cache artifact content for 5 minutes to avoid repeated fetches (especially for .skill ZIP extraction)
|
||||
staleTime: 5 * 60 * 1000,
|
||||
});
|
||||
return { content: isWriteFile ? content : data, isLoading, error };
|
||||
return {
|
||||
content: isWriteFile ? content : data?.content,
|
||||
url: isWriteFile ? undefined : data?.url,
|
||||
isLoading,
|
||||
error,
|
||||
};
|
||||
}
|
||||
|
||||
@@ -20,7 +20,7 @@ export async function loadArtifactContent({
|
||||
const url = urlOfArtifact({ filepath: enhancedFilepath, threadId, isMock });
|
||||
const response = await fetch(url);
|
||||
const text = await response.text();
|
||||
return text;
|
||||
return { content: text, url };
|
||||
}
|
||||
|
||||
export function loadArtifactContentFromToolCall({
|
||||
|
||||
@@ -194,8 +194,9 @@ export const enUS: Translations = {
|
||||
nameStepInvalidError:
|
||||
"Invalid name — use only letters, digits, and hyphens",
|
||||
nameStepAlreadyExistsError: "An agent with this name already exists",
|
||||
nameStepCheckError:
|
||||
"Could not reach the DeerFlow backend to verify name availability. Start the backend or set NEXT_PUBLIC_BACKEND_BASE_URL, then try again.",
|
||||
nameStepNetworkError:
|
||||
"Network request failed — check your network or backend connection",
|
||||
nameStepCheckError: "Could not verify name availability — please try again",
|
||||
nameStepBootstrapMessage:
|
||||
"The new custom agent name is {name}. Let's bootstrap it's **SOUL**.",
|
||||
agentCreated: "Agent created!",
|
||||
|
||||
@@ -133,6 +133,7 @@ export interface Translations {
|
||||
nameStepContinue: string;
|
||||
nameStepInvalidError: string;
|
||||
nameStepAlreadyExistsError: string;
|
||||
nameStepNetworkError: string;
|
||||
nameStepCheckError: string;
|
||||
nameStepBootstrapMessage: string;
|
||||
agentCreated: string;
|
||||
|
||||
@@ -183,8 +183,8 @@ export const zhCN: Translations = {
|
||||
nameStepContinue: "继续",
|
||||
nameStepInvalidError: "名称无效,只允许字母、数字和连字符",
|
||||
nameStepAlreadyExistsError: "已存在同名智能体",
|
||||
nameStepCheckError:
|
||||
"无法连接 DeerFlow 后端来验证名称是否可用。请先启动后端,或配置 NEXT_PUBLIC_BACKEND_BASE_URL,然后再重试。",
|
||||
nameStepNetworkError: "网络请求失败,请检查网络或后端连接",
|
||||
nameStepCheckError: "无法验证名称可用性,请稍后重试",
|
||||
nameStepBootstrapMessage:
|
||||
"新智能体的名称是 {name},现在开始为它生成 **SOUL**。",
|
||||
agentCreated: "智能体已创建!",
|
||||
|
||||
+15
-2
@@ -9,6 +9,17 @@ import sys
|
||||
from typing import Optional
|
||||
|
||||
|
||||
def configure_stdio() -> None:
|
||||
"""Prefer UTF-8 output so Unicode status markers render on Windows."""
|
||||
for stream_name in ("stdout", "stderr"):
|
||||
stream = getattr(sys, stream_name, None)
|
||||
if hasattr(stream, "reconfigure"):
|
||||
try:
|
||||
stream.reconfigure(encoding="utf-8", errors="replace")
|
||||
except (OSError, ValueError):
|
||||
continue
|
||||
|
||||
|
||||
def run_command(command: list[str]) -> Optional[str]:
|
||||
"""Run a command and return trimmed stdout, or None on failure."""
|
||||
try:
|
||||
@@ -29,6 +40,7 @@ def parse_node_major(version_text: str) -> Optional[int]:
|
||||
|
||||
|
||||
def main() -> int:
|
||||
configure_stdio()
|
||||
print("==========================================")
|
||||
print(" Checking Required Dependencies")
|
||||
print("==========================================")
|
||||
@@ -61,8 +73,9 @@ def main() -> int:
|
||||
|
||||
print()
|
||||
print("Checking pnpm...")
|
||||
if shutil.which("pnpm"):
|
||||
pnpm_version = run_command(["pnpm", "-v"])
|
||||
pnpm_executable = shutil.which("pnpm.cmd") or shutil.which("pnpm")
|
||||
if pnpm_executable:
|
||||
pnpm_version = run_command([pnpm_executable, "-v"])
|
||||
if pnpm_version:
|
||||
print(f" ✓ pnpm {pnpm_version}")
|
||||
else:
|
||||
|
||||
Reference in New Issue
Block a user