Compare commits

...

12 Commits

Author SHA1 Message Date
rayhpeng 4383d96583 fix(gateway): merge context field into configurable for langgraph-compat runs (#1699)
The langgraph-compat layer dropped the DeerFlow-specific `context` field
from run requests, causing agent config (subagent_enabled, is_plan_mode,
thinking_enabled, etc.) to fall back to defaults. Add `context` to
RunCreateRequest and merge allowlisted keys into config.configurable in
start_run, with existing configurable values taking precedence.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-01 17:01:01 +08:00
LYU Yichen 3e461d9d08 fix: use safe docker bind mount syntax for sandbox mounts (#1655)
Docker's -v host:container syntax is ambiguous for Windows drive-letter
paths (e.g. D:/...) because ':' is both the drive separator and the
volume separator, causing mount failures on Windows hosts.

Introduce _format_container_mount() which uses '--mount type=bind,...'
for Docker (unambiguous on all platforms) and keeps '-v' for Apple
Container runtime which does not support the --mount flag yet.

Adds unit tests covering Windows paths, read-only mounts, and Apple
Container pass-through.

Made-with: Cursor
2026-04-01 11:42:12 +08:00
JeffJiang cf43584d24 fix(artifact): enhance artifact content loading to include URL for non-write files (#1678) 2026-04-01 11:38:55 +08:00
d 🔹 6ff60f2af1 fix(gateway): forward assistant_id as agent_name in build_run_config (#1667)
* fix(gateway): forward assistant_id as agent_name in build_run_config

Fixes #1644

When the LangGraph Platform-compatible /runs endpoint receives a custom
assistant_id (e.g. 'finalis'), the Gateway's build_run_config() silently
ignored it — configurable['agent_name'] was never set, so make_lead_agent
fell through to the default lead agent and SOUL.md was never loaded.

Root cause (introduced in #1403):
  resolve_agent_factory() correctly falls back to make_lead_agent for all
  assistant_id values, but build_run_config() had no assistant_id parameter
  and never injected configurable['agent_name'].  The full call chain:

    POST /runs (assistant_id='finalis')
      → resolve_agent_factory('finalis')   # returns make_lead_agent ✓
      → build_run_config(thread_id, ...)   # no agent_name injected ✗
        → make_lead_agent(config)
          → cfg.get('agent_name') → None
            → load_agent_soul(None) → base SOUL.md (doesn't exist) → None

Fix:
- Add keyword-only  parameter to build_run_config().
- When assistant_id is set and differs from 'lead_agent', inject it as
  configurable['agent_name'] (matching the channel manager's existing
  _resolve_run_params() logic for IM channels).
- Honour an explicit configurable['agent_name'] in the request body;
  assistant_id mapping only fills the gap when it is absent.
- Remove stale log-only branch from resolve_agent_factory(); update
  docstring to explain the factory/configurable split.

Tests added (test_gateway_services.py):
- Custom assistant_id injects configurable['agent_name']
- 'lead_agent' assistant_id does NOT inject agent_name
- None assistant_id does NOT inject agent_name
- Explicit configurable['agent_name'] in request is not overwritten
- resolve_agent_factory returns make_lead_agent for all inputs

* style: format with ruff

* fix: validate and normalize assistant_id to prevent path traversal

Addresses Copilot review: strip/lowercase/replace underscores and
reject names that don't match [a-z0-9-]+, consistent with
ChannelManager._normalize_custom_agent_name().

---------

Co-authored-by: voidborne-d <voidborne-d@users.noreply.github.com>
2026-04-01 11:15:56 +08:00
Matt Van Horn a3bfea631c fix(sandbox): serialize concurrent exec_command calls in AioSandbox (#1435)
* fix(sandbox): serialize concurrent exec_command calls in AioSandbox

The AIO sandbox container maintains a single persistent shell session
that corrupts when multiple exec_command requests arrive concurrently
(e.g. when ToolNode issues parallel tool_calls). The corrupted session
returns 'ErrorObservation' strings as output, cascading into subsequent
commands.

Add a threading.Lock to AioSandbox to serialize shell commands. As a
secondary defense, detect ErrorObservation in output and retry with a
fresh session ID.

Fixes #1433

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix(sandbox): address Copilot review findings

- Fix shell injection in list_dir: use shlex.quote(path) to escape
  user-provided paths in the find command
- Narrow ErrorObservation retry condition from broad substring match
  to the specific corruption signature to prevent false retries
- Improve test_lock_prevents_concurrent_execution: use threading.Barrier
  to ensure all workers contend for the lock simultaneously
- Improve test_list_dir_uses_lock: assert lock.locked() is True during
  exec_command to verify lock acquisition

* style: auto-format with ruff

---------

Co-authored-by: Matt Van Horn <455140+mvanhorn@users.noreply.github.com>
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-31 22:33:35 +08:00
Admire aae59a8ba8 fix: surface configured sandbox mounts to agents (#1638)
* fix: surface configured sandbox mounts to agents

* fix: address PR review feedback

---------

Co-authored-by: Willem Jiang <willem.jiang@gmail.com>
2026-03-31 22:22:30 +08:00
Admire 3ff15423d6 fix Windows Docker sandbox path mounting (#1634)
* fix windows docker sandbox paths

* fix windows sandbox mount validation

* fix backend checks for windows sandbox path PR
2026-03-31 22:19:27 +08:00
SHIYAO ZHANG c2f7be37b3 fix(tools): move sandbox.tools import in view_image_tool to break circular import (#1674)
view_image_tool.py had a top-level import of deerflow.sandbox.tools, which
created a circular dependency chain:

  sandbox.tools
    -> deerflow.agents.thread_state (triggers agents/__init__.py)
      -> agents/factory.py
        -> tools/builtins/__init__.py
          -> view_image_tool.py
            -> deerflow.sandbox.tools  <-- circular!

This caused ImportError when any test directly imported sandbox.tools,
making test_sandbox_tools_security.py fail to collect since #1522.

Fix: move the sandbox.tools import inside the view_image_tool function body.

Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-31 22:05:23 +08:00
tryag 09a9209724 fix: improve Windows compatibility in dependency check (#1550)
* fix: improve Windows compatibility in dependency check

* fix: tolerate stdio reconfigure failures in check script

---------

Co-authored-by: Willem Jiang <willem.jiang@gmail.com>
2026-03-31 21:54:41 +08:00
Rosemary1812 b356a13da5 fix(frontend): improve network error message for agent name check (#1605)
* fix(frontend): distinguish CORS errors   from generic name check failures

* fix(frontend): improve network error message for agent name check

* Fix network error message in zh-CN locale

---------

Co-authored-by: Willem Jiang <willem.jiang@gmail.com>
2026-03-31 21:14:05 +08:00
1316151417 ac9a6ee6a2 fix(langgraph): correct config.yaml mount path in docker-compose (#1679)
Co-authored-by: zhoujie172 <zhoujie172@ke.com>
2026-03-31 19:40:49 +08:00
ZHANG Ning 64e0f5329a fix: remove LANGSMITH_TRACING override that ignores .env value (#1640)
The `environment` section in docker-compose.yaml set
`LANGSMITH_TRACING=${LANGSMITH_TRACING:-false}`, which always resolves
to `false` because Docker Compose evaluates `${}` substitutions from
the host shell environment, not from `env_file`.

Since `environment` entries take precedence over `env_file`, setting
`LANGSMITH_TRACING=true` in `.env` had no effect — tracing stayed
disabled despite following the documented instructions.

Remove the explicit `LANGSMITH_TRACING` from `environment` so the
value from `.env` (loaded via `env_file`) is used as intended.
2026-03-31 09:42:33 +08:00
30 changed files with 783 additions and 85 deletions
@@ -38,6 +38,7 @@ class RunCreateRequest(BaseModel):
command: dict[str, Any] | None = Field(default=None, description="LangGraph Command")
metadata: dict[str, Any] | None = Field(default=None, description="Run metadata")
config: dict[str, Any] | None = Field(default=None, description="RunnableConfig overrides")
context: dict[str, Any] | None = Field(default=None, description="DeerFlow context overrides (model_name, thinking_enabled, etc.)")
webhook: str | None = Field(default=None, description="Completion callback URL")
checkpoint_id: str | None = Field(default=None, description="Resume from checkpoint")
checkpoint: dict[str, Any] | None = Field(default=None, description="Full checkpoint object")
+65 -7
View File
@@ -10,6 +10,7 @@ from __future__ import annotations
import asyncio
import json
import logging
import re
import time
from typing import Any
@@ -93,20 +94,56 @@ def normalize_input(raw_input: dict[str, Any] | None) -> dict[str, Any]:
return raw_input
_DEFAULT_ASSISTANT_ID = "lead_agent"
def resolve_agent_factory(assistant_id: str | None):
"""Resolve the agent factory callable from config."""
"""Resolve the agent factory callable from config.
Custom agents are implemented as ``lead_agent`` + an ``agent_name``
injected into ``configurable`` — see :func:`build_run_config`. All
``assistant_id`` values therefore map to the same factory; the routing
happens inside ``make_lead_agent`` when it reads ``cfg["agent_name"]``.
"""
from deerflow.agents.lead_agent.agent import make_lead_agent
if assistant_id and assistant_id != "lead_agent":
logger.info("assistant_id=%s requested; falling back to lead_agent", assistant_id)
return make_lead_agent
def build_run_config(thread_id: str, request_config: dict[str, Any] | None, metadata: dict[str, Any] | None) -> dict[str, Any]:
"""Build a RunnableConfig dict for the agent."""
configurable = {"thread_id": thread_id}
def build_run_config(
thread_id: str,
request_config: dict[str, Any] | None,
metadata: dict[str, Any] | None,
*,
assistant_id: str | None = None,
) -> dict[str, Any]:
"""Build a RunnableConfig dict for the agent.
When *assistant_id* refers to a custom agent (anything other than
``"lead_agent"`` / ``None``), the name is forwarded as
``configurable["agent_name"]``. ``make_lead_agent`` reads this key to
load the matching ``agents/<name>/SOUL.md`` and per-agent config —
without it the agent silently runs as the default lead agent.
This mirrors the channel manager's ``_resolve_run_params`` logic so that
the LangGraph Platform-compatible HTTP API and the IM channel path behave
identically.
"""
configurable: dict[str, Any] = {"thread_id": thread_id}
if request_config:
configurable.update(request_config.get("configurable", {}))
# Inject custom agent name when the caller specified a non-default assistant.
# Honour an explicit configurable["agent_name"] in the request if already set.
if assistant_id and assistant_id != _DEFAULT_ASSISTANT_ID and "agent_name" not in configurable:
# Normalize the same way ChannelManager does: strip, lowercase,
# replace underscores with hyphens, then validate to prevent path
# traversal and invalid agent directory lookups.
normalized = assistant_id.strip().lower().replace("_", "-")
if not normalized or not re.fullmatch(r"[a-z0-9-]+", normalized):
raise ValueError(f"Invalid assistant_id {assistant_id!r}: must contain only letters, digits, and hyphens after normalization.")
configurable["agent_name"] = normalized
config: dict[str, Any] = {"configurable": configurable, "recursion_limit": 100}
if request_config:
for k, v in request_config.items():
@@ -233,7 +270,28 @@ async def start_run(
agent_factory = resolve_agent_factory(body.assistant_id)
graph_input = normalize_input(body.input)
config = build_run_config(thread_id, body.config, body.metadata)
config = build_run_config(thread_id, body.config, body.metadata, assistant_id=body.assistant_id)
# Merge DeerFlow-specific context overrides into configurable.
# The ``context`` field is a custom extension for the langgraph-compat layer
# that carries agent configuration (model_name, thinking_enabled, etc.).
# Only agent-relevant keys are forwarded; unknown keys (e.g. thread_id) are ignored.
context = getattr(body, "context", None)
if context:
_CONTEXT_CONFIGURABLE_KEYS = {
"model_name",
"mode",
"thinking_enabled",
"reasoning_effort",
"is_plan_mode",
"subagent_enabled",
"max_concurrent_subagents",
}
configurable = config.setdefault("configurable", {})
for key in _CONTEXT_CONFIGURABLE_KEYS:
if key in context:
configurable.setdefault(key, context[key])
stream_modes = normalize_stream_modes(body.stream_mode)
task = asyncio.create_task(
+2
View File
@@ -257,6 +257,8 @@ sandbox:
read_only: false
```
When you configure `sandbox.mounts`, DeerFlow exposes those `container_path` values in the agent prompt so the agent can discover and operate on mounted directories directly instead of assuming everything must live under `/mnt/user-data`.
### Skills
Configure the skills directory for specialized workflows:
@@ -477,6 +477,28 @@ def _build_acp_section() -> str:
)
def _build_custom_mounts_section() -> str:
"""Build a prompt section for explicitly configured sandbox mounts."""
try:
from deerflow.config import get_app_config
mounts = get_app_config().sandbox.mounts or []
except Exception:
logger.exception("Failed to load configured sandbox mounts for the lead-agent prompt")
return ""
if not mounts:
return ""
lines = []
for mount in mounts:
access = "read-only" if mount.read_only else "read-write"
lines.append(f"- Custom mount: `{mount.container_path}` - Host directory mapped into the sandbox ({access})")
mounts_list = "\n".join(lines)
return f"\n**Custom Mounted Directories:**\n{mounts_list}\n- If the user needs files outside `/mnt/user-data`, use these absolute container paths directly when they match the requested directory"
def apply_prompt_template(subagent_enabled: bool = False, max_concurrent_subagents: int = 3, *, agent_name: str | None = None, available_skills: set[str] | None = None) -> str:
# Get memory context
memory_context = _get_memory_context(agent_name)
@@ -511,6 +533,8 @@ def apply_prompt_template(subagent_enabled: bool = False, max_concurrent_subagen
# Build ACP agent section only if ACP agents are configured
acp_section = _build_acp_section()
custom_mounts_section = _build_custom_mounts_section()
acp_and_mounts_section = "\n".join(section for section in (acp_section, custom_mounts_section) if section)
# Format the prompt with dynamic skills and memory
prompt = SYSTEM_PROMPT_TEMPLATE.format(
@@ -522,7 +546,7 @@ def apply_prompt_template(subagent_enabled: bool = False, max_concurrent_subagen
subagent_section=subagent_section,
subagent_reminder=subagent_reminder,
subagent_thinking=subagent_thinking,
acp_section=acp_section,
acp_section=acp_and_mounts_section,
)
return prompt + f"\n<current_date>{datetime.now().strftime('%Y-%m-%d, %A')}</current_date>"
@@ -1,5 +1,8 @@
import base64
import logging
import shlex
import threading
import uuid
from agent_sandbox import Sandbox as AioSandboxClient
@@ -7,11 +10,15 @@ from deerflow.sandbox.sandbox import Sandbox
logger = logging.getLogger(__name__)
_ERROR_OBSERVATION_SIGNATURE = "'ErrorObservation' object has no attribute 'exit_code'"
class AioSandbox(Sandbox):
"""Sandbox implementation using the agent-infra/sandbox Docker container.
This sandbox connects to a running AIO sandbox container via HTTP API.
A threading lock serializes shell commands to prevent concurrent requests
from corrupting the container's single persistent session (see #1433).
"""
def __init__(self, id: str, base_url: str, home_dir: str | None = None):
@@ -26,6 +33,7 @@ class AioSandbox(Sandbox):
self._base_url = base_url
self._client = AioSandboxClient(base_url=base_url, timeout=600)
self._home_dir = home_dir
self._lock = threading.Lock()
@property
def base_url(self) -> str:
@@ -42,19 +50,34 @@ class AioSandbox(Sandbox):
def execute_command(self, command: str) -> str:
"""Execute a shell command in the sandbox.
Uses a lock to serialize concurrent requests. The AIO sandbox
container maintains a single persistent shell session that
corrupts when hit with concurrent exec_command calls (returns
``ErrorObservation`` instead of real output). If corruption is
detected despite the lock (e.g. multiple processes sharing a
sandbox), the command is retried on a fresh session.
Args:
command: The command to execute.
Returns:
The output of the command.
"""
try:
result = self._client.shell.exec_command(command=command)
output = result.data.output if result.data else ""
return output if output else "(no output)"
except Exception as e:
logger.error(f"Failed to execute command in sandbox: {e}")
return f"Error: {e}"
with self._lock:
try:
result = self._client.shell.exec_command(command=command)
output = result.data.output if result.data else ""
if output and _ERROR_OBSERVATION_SIGNATURE in output:
logger.warning("ErrorObservation detected in sandbox output, retrying with a fresh session")
fresh_id = str(uuid.uuid4())
result = self._client.shell.exec_command(command=command, id=fresh_id)
output = result.data.output if result.data else ""
return output if output else "(no output)"
except Exception as e:
logger.error(f"Failed to execute command in sandbox: {e}")
return f"Error: {e}"
def read_file(self, path: str) -> str:
"""Read the content of a file in the sandbox.
@@ -82,17 +105,16 @@ class AioSandbox(Sandbox):
Returns:
The contents of the directory.
"""
try:
# Use shell command to list directory with depth limit
# The -L flag limits the depth for the tree command
result = self._client.shell.exec_command(command=f"find {path} -maxdepth {max_depth} -type f -o -type d 2>/dev/null | head -500")
output = result.data.output if result.data else ""
if output:
return [line.strip() for line in output.strip().split("\n") if line.strip()]
return []
except Exception as e:
logger.error(f"Failed to list directory in sandbox: {e}")
return []
with self._lock:
try:
result = self._client.shell.exec_command(command=f"find {shlex.quote(path)} -maxdepth {max_depth} -type f -o -type d 2>/dev/null | head -500")
output = result.data.output if result.data else ""
if output:
return [line.strip() for line in output.strip().split("\n") if line.strip()]
return []
except Exception as e:
logger.error(f"Failed to list directory in sandbox: {e}")
return []
def write_file(self, path: str, content: str, append: bool = False) -> None:
"""Write content to a file in the sandbox.
@@ -26,7 +26,7 @@ except ImportError: # pragma: no cover - Windows fallback
import msvcrt
from deerflow.config import get_app_config
from deerflow.config.paths import VIRTUAL_PATH_PREFIX, Paths, get_paths
from deerflow.config.paths import VIRTUAL_PATH_PREFIX, get_paths
from deerflow.sandbox.sandbox import Sandbox
from deerflow.sandbox.sandbox_provider import SandboxProvider
@@ -214,17 +214,13 @@ class AioSandboxProvider(SandboxProvider):
paths = get_paths()
paths.ensure_thread_dirs(thread_id)
# host_paths resolves to the host-side base dir when DEER_FLOW_HOST_BASE_DIR
# is set, otherwise falls back to the container's own base dir (native mode).
host_paths = Paths(base_dir=paths.host_base_dir)
return [
(str(host_paths.sandbox_work_dir(thread_id)), f"{VIRTUAL_PATH_PREFIX}/workspace", False),
(str(host_paths.sandbox_uploads_dir(thread_id)), f"{VIRTUAL_PATH_PREFIX}/uploads", False),
(str(host_paths.sandbox_outputs_dir(thread_id)), f"{VIRTUAL_PATH_PREFIX}/outputs", False),
(paths.host_sandbox_work_dir(thread_id), f"{VIRTUAL_PATH_PREFIX}/workspace", False),
(paths.host_sandbox_uploads_dir(thread_id), f"{VIRTUAL_PATH_PREFIX}/uploads", False),
(paths.host_sandbox_outputs_dir(thread_id), f"{VIRTUAL_PATH_PREFIX}/outputs", False),
# ACP workspace: read-only inside the sandbox (lead agent reads results;
# the ACP subprocess writes from the host side, not from within the container).
(str(host_paths.acp_workspace_dir(thread_id)), "/mnt/acp-workspace", True),
(paths.host_acp_workspace_dir(thread_id), "/mnt/acp-workspace", True),
]
@staticmethod
@@ -18,6 +18,26 @@ from .sandbox_info import SandboxInfo
logger = logging.getLogger(__name__)
def _format_container_mount(runtime: str, host_path: str, container_path: str, read_only: bool) -> list[str]:
"""Format a bind-mount argument for the selected runtime.
Docker's ``-v host:container`` syntax is ambiguous for Windows drive-letter
paths like ``D:/...`` because ``:`` is both the drive separator and the
volume separator. Use ``--mount type=bind,...`` for Docker to avoid that
parsing ambiguity. Apple Container keeps using ``-v``.
"""
if runtime == "docker":
mount_spec = f"type=bind,src={host_path},dst={container_path}"
if read_only:
mount_spec += ",readonly"
return ["--mount", mount_spec]
mount_spec = f"{host_path}:{container_path}"
if read_only:
mount_spec += ":ro"
return ["-v", mount_spec]
class LocalContainerBackend(SandboxBackend):
"""Backend that manages sandbox containers locally using Docker or Apple Container.
@@ -246,18 +266,26 @@ class LocalContainerBackend(SandboxBackend):
# Config-level volume mounts
for mount in self._config_mounts:
mount_spec = f"{mount.host_path}:{mount.container_path}"
if mount.read_only:
mount_spec += ":ro"
cmd.extend(["-v", mount_spec])
cmd.extend(
_format_container_mount(
self._runtime,
mount.host_path,
mount.container_path,
mount.read_only,
)
)
# Extra mounts (thread-specific, skills, etc.)
if extra_mounts:
for host_path, container_path, read_only in extra_mounts:
mount_spec = f"{host_path}:{container_path}"
if read_only:
mount_spec += ":ro"
cmd.extend(["-v", mount_spec])
cmd.extend(
_format_container_mount(
self._runtime,
host_path,
container_path,
read_only,
)
)
cmd.append(self._image)
@@ -1,7 +1,7 @@
import os
import re
import shutil
from pathlib import Path
from pathlib import Path, PureWindowsPath
# Virtual path prefix seen by agents inside the sandbox
VIRTUAL_PATH_PREFIX = "/mnt/user-data"
@@ -9,6 +9,41 @@ VIRTUAL_PATH_PREFIX = "/mnt/user-data"
_SAFE_THREAD_ID_RE = re.compile(r"^[A-Za-z0-9_\-]+$")
def _validate_thread_id(thread_id: str) -> str:
"""Validate a thread ID before using it in filesystem paths."""
if not _SAFE_THREAD_ID_RE.match(thread_id):
raise ValueError(f"Invalid thread_id {thread_id!r}: only alphanumeric characters, hyphens, and underscores are allowed.")
return thread_id
def _join_host_path(base: str, *parts: str) -> str:
"""Join host filesystem path segments while preserving native style.
Docker Desktop on Windows expects bind mount sources to stay in Windows
path form (for example ``C:\\repo\\backend\\.deer-flow``). Using
``Path(base) / ...`` on a POSIX host can accidentally rewrite those paths
with mixed separators, so this helper preserves the original style.
"""
if not parts:
return base
if re.match(r"^[A-Za-z]:[\\/]", base) or base.startswith("\\\\") or "\\" in base:
result = PureWindowsPath(base)
for part in parts:
result /= part
return str(result)
result = Path(base)
for part in parts:
result /= part
return str(result)
def join_host_path(base: str, *parts: str) -> str:
"""Join host filesystem path segments while preserving native style."""
return _join_host_path(base, *parts)
class Paths:
"""
Centralized path configuration for DeerFlow application data.
@@ -54,6 +89,12 @@ class Paths:
return Path(env)
return self.base_dir
def _host_base_dir_str(self) -> str:
"""Return the host base dir as a raw string for bind mounts."""
if env := os.getenv("DEER_FLOW_HOST_BASE_DIR"):
return env
return str(self.base_dir)
@property
def base_dir(self) -> Path:
"""Root directory for all application data."""
@@ -103,9 +144,7 @@ class Paths:
ValueError: If `thread_id` contains unsafe characters (path separators
or `..`) that could cause directory traversal.
"""
if not _SAFE_THREAD_ID_RE.match(thread_id):
raise ValueError(f"Invalid thread_id {thread_id!r}: only alphanumeric characters, hyphens, and underscores are allowed.")
return self.base_dir / "threads" / thread_id
return self.base_dir / "threads" / _validate_thread_id(thread_id)
def sandbox_work_dir(self, thread_id: str) -> Path:
"""
@@ -150,6 +189,30 @@ class Paths:
"""
return self.thread_dir(thread_id) / "user-data"
def host_thread_dir(self, thread_id: str) -> str:
"""Host path for a thread directory, preserving Windows path syntax."""
return _join_host_path(self._host_base_dir_str(), "threads", _validate_thread_id(thread_id))
def host_sandbox_user_data_dir(self, thread_id: str) -> str:
"""Host path for a thread's user-data root."""
return _join_host_path(self.host_thread_dir(thread_id), "user-data")
def host_sandbox_work_dir(self, thread_id: str) -> str:
"""Host path for the workspace mount source."""
return _join_host_path(self.host_sandbox_user_data_dir(thread_id), "workspace")
def host_sandbox_uploads_dir(self, thread_id: str) -> str:
"""Host path for the uploads mount source."""
return _join_host_path(self.host_sandbox_user_data_dir(thread_id), "uploads")
def host_sandbox_outputs_dir(self, thread_id: str) -> str:
"""Host path for the outputs mount source."""
return _join_host_path(self.host_sandbox_user_data_dir(thread_id), "outputs")
def host_acp_workspace_dir(self, thread_id: str) -> str:
"""Host path for the ACP workspace mount source."""
return _join_host_path(self.host_thread_dir(thread_id), "acp-workspace")
def ensure_thread_dirs(self, thread_id: str) -> None:
"""Create all standard sandbox directories for a thread.
@@ -81,11 +81,9 @@ class RunManager:
async def list_by_thread(self, thread_id: str) -> list[RunRecord]:
"""Return all runs for a given thread, newest first."""
async with self._lock:
return sorted(
(r for r in self._runs.values() if r.thread_id == thread_id),
key=lambda r: r.created_at,
reverse=True,
)
# Dict insertion order matches creation order, so reversing it gives
# us deterministic newest-first results even when timestamps tie.
return [r for r in reversed(self._runs.values()) if r.thread_id == thread_id]
async def set_status(self, run_id: str, status: RunStatus, *, error: str | None = None) -> None:
"""Transition a run to a new status."""
@@ -37,6 +37,7 @@ You have access to the sandbox environment:
- User uploads: `/mnt/user-data/uploads`
- User workspace: `/mnt/user-data/workspace`
- Output files: `/mnt/user-data/outputs`
- Deployment-configured custom mounts may also be available at other absolute container paths; use them directly when the task references those mounted directories
</working_directory>
""",
tools=["bash", "ls", "read_file", "write_file", "str_replace"], # Sandbox tools only
@@ -38,6 +38,7 @@ You have access to the same sandbox environment as the parent agent:
- User uploads: `/mnt/user-data/uploads`
- User workspace: `/mnt/user-data/workspace`
- Output files: `/mnt/user-data/outputs`
- Deployment-configured custom mounts may also be available at other absolute container paths; use them directly when the task references those mounted directories
</working_directory>
""",
tools=None, # Inherit all tools from parent
@@ -9,7 +9,6 @@ from langgraph.types import Command
from langgraph.typing import ContextT
from deerflow.agents.thread_state import ThreadState
from deerflow.sandbox.tools import get_thread_data, replace_virtual_path
@tool("view_image", parse_docstring=True)
@@ -32,6 +31,8 @@ def view_image_tool(
Args:
image_path: Absolute path to the image file. Common formats supported: jpg, jpeg, png, webp.
"""
from deerflow.sandbox.tools import get_thread_data, replace_virtual_path
# Replace virtual path with actual path
# /mnt/user-data/* paths are mapped to thread-specific directories
thread_data = get_thread_data(runtime)
+133
View File
@@ -0,0 +1,133 @@
"""Tests for AioSandbox concurrent command serialization (#1433)."""
import threading
from types import SimpleNamespace
from unittest.mock import MagicMock, patch
import pytest
@pytest.fixture()
def sandbox():
"""Create an AioSandbox with a mocked client."""
with patch("deerflow.community.aio_sandbox.aio_sandbox.AioSandboxClient"):
from deerflow.community.aio_sandbox.aio_sandbox import AioSandbox
sb = AioSandbox(id="test-sandbox", base_url="http://localhost:8080")
return sb
class TestExecuteCommandSerialization:
"""Verify that concurrent exec_command calls are serialized."""
def test_lock_prevents_concurrent_execution(self, sandbox):
"""Concurrent threads should not overlap inside execute_command."""
call_log = []
barrier = threading.Barrier(3)
def slow_exec(command, **kwargs):
call_log.append(("enter", command))
import time
time.sleep(0.05)
call_log.append(("exit", command))
return SimpleNamespace(data=SimpleNamespace(output=f"ok: {command}"))
sandbox._client.shell.exec_command = slow_exec
def worker(cmd):
barrier.wait() # ensure all threads contend for the lock simultaneously
sandbox.execute_command(cmd)
threads = []
for i in range(3):
t = threading.Thread(target=worker, args=(f"cmd-{i}",))
threads.append(t)
for t in threads:
t.start()
for t in threads:
t.join()
# Verify serialization: each "enter" should be followed by its own
# "exit" before the next "enter" (no interleaving).
enters = [i for i, (action, _) in enumerate(call_log) if action == "enter"]
exits = [i for i, (action, _) in enumerate(call_log) if action == "exit"]
assert len(enters) == 3
assert len(exits) == 3
for e_idx, x_idx in zip(enters, exits):
assert x_idx == e_idx + 1, f"Interleaved execution detected: {call_log}"
class TestErrorObservationRetry:
"""Verify ErrorObservation detection and fresh-session retry."""
def test_retry_on_error_observation(self, sandbox):
"""When output contains ErrorObservation, retry with a fresh session."""
call_count = 0
def mock_exec(command, **kwargs):
nonlocal call_count
call_count += 1
if call_count == 1:
return SimpleNamespace(data=SimpleNamespace(output="'ErrorObservation' object has no attribute 'exit_code'"))
return SimpleNamespace(data=SimpleNamespace(output="success"))
sandbox._client.shell.exec_command = mock_exec
result = sandbox.execute_command("echo hello")
assert result == "success"
assert call_count == 2
def test_retry_passes_fresh_session_id(self, sandbox):
"""The retry call should include a new session id kwarg."""
calls = []
def mock_exec(command, **kwargs):
calls.append(kwargs)
if len(calls) == 1:
return SimpleNamespace(data=SimpleNamespace(output="'ErrorObservation' object has no attribute 'exit_code'"))
return SimpleNamespace(data=SimpleNamespace(output="ok"))
sandbox._client.shell.exec_command = mock_exec
sandbox.execute_command("test")
assert len(calls) == 2
assert "id" not in calls[0]
assert "id" in calls[1]
assert len(calls[1]["id"]) == 36 # UUID format
def test_no_retry_on_clean_output(self, sandbox):
"""Normal output should not trigger a retry."""
call_count = 0
def mock_exec(command, **kwargs):
nonlocal call_count
call_count += 1
return SimpleNamespace(data=SimpleNamespace(output="all good"))
sandbox._client.shell.exec_command = mock_exec
result = sandbox.execute_command("echo hello")
assert result == "all good"
assert call_count == 1
class TestListDirSerialization:
"""Verify that list_dir also acquires the lock."""
def test_list_dir_uses_lock(self, sandbox):
"""list_dir should hold the lock during execution."""
lock_was_held = []
original_exec = MagicMock(return_value=SimpleNamespace(data=SimpleNamespace(output="/a\n/b")))
def tracking_exec(command, **kwargs):
lock_was_held.append(sandbox._lock.locked())
return original_exec(command, **kwargs)
sandbox._client.shell.exec_command = tracking_exec
result = sandbox.list_dir("/test")
assert result == ["/a", "/b"]
assert lock_was_held == [True], "list_dir must hold the lock during exec_command"
@@ -0,0 +1,28 @@
from deerflow.community.aio_sandbox.local_backend import _format_container_mount
def test_format_container_mount_uses_mount_syntax_for_docker_windows_paths():
args = _format_container_mount("docker", "D:/deer-flow/backend/.deer-flow/threads", "/mnt/threads", False)
assert args == [
"--mount",
"type=bind,src=D:/deer-flow/backend/.deer-flow/threads,dst=/mnt/threads",
]
def test_format_container_mount_marks_docker_readonly_mounts():
args = _format_container_mount("docker", "/host/path", "/mnt/path", True)
assert args == [
"--mount",
"type=bind,src=/host/path,dst=/mnt/path,readonly",
]
def test_format_container_mount_keeps_volume_syntax_for_apple_container():
args = _format_container_mount("container", "/host/path", "/mnt/path", True)
assert args == [
"-v",
"/host/path:/mnt/path:ro",
]
+32 -1
View File
@@ -5,7 +5,7 @@ from unittest.mock import MagicMock, patch
import pytest
from deerflow.config.paths import Paths
from deerflow.config.paths import Paths, join_host_path
# ── ensure_thread_dirs ───────────────────────────────────────────────────────
@@ -31,6 +31,13 @@ def test_ensure_thread_dirs_acp_workspace_is_world_writable(tmp_path):
assert mode == oct(0o777)
def test_host_thread_dir_rejects_invalid_thread_id(tmp_path):
paths = Paths(base_dir=tmp_path)
with pytest.raises(ValueError, match="Invalid thread_id"):
paths.host_thread_dir("../escape")
# ── _get_thread_mounts ───────────────────────────────────────────────────────
@@ -75,6 +82,30 @@ def test_get_thread_mounts_includes_user_data_dirs(tmp_path, monkeypatch):
assert "/mnt/user-data/outputs" in container_paths
def test_join_host_path_preserves_windows_drive_letter_style():
base = r"C:\Users\demo\deer-flow\backend\.deer-flow"
joined = join_host_path(base, "threads", "thread-9", "user-data", "outputs")
assert joined == r"C:\Users\demo\deer-flow\backend\.deer-flow\threads\thread-9\user-data\outputs"
def test_get_thread_mounts_preserves_windows_host_path_style(tmp_path, monkeypatch):
"""Docker bind mount sources must keep Windows-style paths intact."""
aio_mod = importlib.import_module("deerflow.community.aio_sandbox.aio_sandbox_provider")
monkeypatch.setenv("DEER_FLOW_HOST_BASE_DIR", r"C:\Users\demo\deer-flow\backend\.deer-flow")
monkeypatch.setattr(aio_mod, "get_paths", lambda: Paths(base_dir=tmp_path))
mounts = aio_mod.AioSandboxProvider._get_thread_mounts("thread-10")
container_paths = {container_path: host_path for host_path, container_path, _ in mounts}
assert container_paths["/mnt/user-data/workspace"] == r"C:\Users\demo\deer-flow\backend\.deer-flow\threads\thread-10\user-data\workspace"
assert container_paths["/mnt/user-data/uploads"] == r"C:\Users\demo\deer-flow\backend\.deer-flow\threads\thread-10\user-data\uploads"
assert container_paths["/mnt/user-data/outputs"] == r"C:\Users\demo\deer-flow\backend\.deer-flow\threads\thread-10\user-data\outputs"
assert container_paths["/mnt/acp-workspace"] == r"C:\Users\demo\deer-flow\backend\.deer-flow\threads\thread-10\acp-workspace"
def test_discover_or_create_only_unlocks_when_lock_succeeds(tmp_path, monkeypatch):
"""Unlock should not run if exclusive locking itself fails."""
aio_mod = importlib.import_module("deerflow.community.aio_sandbox.aio_sandbox_provider")
@@ -11,9 +11,16 @@ import pytest
REPO_ROOT = Path(__file__).resolve().parents[2]
SCRIPT_PATH = REPO_ROOT / "scripts" / "docker.sh"
BASH_EXECUTABLE = which("bash") or r"C:\Program Files\Git\bin\bash.exe"
BASH_CANDIDATES = [
Path(r"C:\Program Files\Git\bin\bash.exe"),
Path(which("bash")) if which("bash") else None,
]
BASH_EXECUTABLE = next(
(str(path) for path in BASH_CANDIDATES if path is not None and path.exists() and "WindowsApps" not in str(path)),
None,
)
if not Path(BASH_EXECUTABLE).exists():
if BASH_EXECUTABLE is None:
pytestmark = pytest.mark.skip(reason="bash is required for docker.sh detection tests")
@@ -21,13 +28,14 @@ def _detect_mode_with_config(config_content: str) -> str:
"""Write config content into a temp project root and execute detect_sandbox_mode."""
with tempfile.TemporaryDirectory() as tmpdir:
tmp_root = Path(tmpdir)
(tmp_root / "config.yaml").write_text(config_content)
(tmp_root / "config.yaml").write_text(config_content, encoding="utf-8")
command = f"source '{SCRIPT_PATH}' && PROJECT_ROOT='{tmp_root}' && detect_sandbox_mode"
output = subprocess.check_output(
[BASH_EXECUTABLE, "-lc", command],
text=True,
encoding="utf-8",
).strip()
return output
@@ -37,7 +45,11 @@ def test_detect_mode_defaults_to_local_when_config_missing():
"""No config file should default to local mode."""
with tempfile.TemporaryDirectory() as tmpdir:
command = f"source '{SCRIPT_PATH}' && PROJECT_ROOT='{tmpdir}' && detect_sandbox_mode"
output = subprocess.check_output([BASH_EXECUTABLE, "-lc", command], text=True).strip()
output = subprocess.check_output(
[BASH_EXECUTABLE, "-lc", command],
text=True,
encoding="utf-8",
).strip()
assert output == "local"
+184
View File
@@ -100,3 +100,187 @@ def test_build_run_config_with_overrides():
assert config["configurable"]["model_name"] == "gpt-4"
assert config["tags"] == ["test"]
assert config["metadata"]["user"] == "alice"
# ---------------------------------------------------------------------------
# Regression tests for issue #1644:
# assistant_id not mapped to agent_name → custom agent SOUL.md never loaded
# ---------------------------------------------------------------------------
def test_build_run_config_custom_agent_injects_agent_name():
"""Custom assistant_id must be forwarded as configurable['agent_name'].
Regression test for #1644: when the LangGraph Platform-compatible
/runs endpoint receives a custom assistant_id (e.g. 'finalis'), the
Gateway must inject configurable['agent_name'] so that make_lead_agent
loads the correct agents/finalis/SOUL.md.
"""
from app.gateway.services import build_run_config
config = build_run_config("thread-1", None, None, assistant_id="finalis")
assert config["configurable"]["agent_name"] == "finalis", "Custom assistant_id must be forwarded as configurable['agent_name'] so that make_lead_agent loads the correct SOUL.md"
def test_build_run_config_lead_agent_no_agent_name():
"""'lead_agent' assistant_id must NOT inject configurable['agent_name']."""
from app.gateway.services import build_run_config
config = build_run_config("thread-1", None, None, assistant_id="lead_agent")
assert "agent_name" not in config["configurable"]
def test_build_run_config_none_assistant_id_no_agent_name():
"""None assistant_id must NOT inject configurable['agent_name']."""
from app.gateway.services import build_run_config
config = build_run_config("thread-1", None, None, assistant_id=None)
assert "agent_name" not in config["configurable"]
def test_build_run_config_explicit_agent_name_not_overwritten():
"""An explicit configurable['agent_name'] in the request must take precedence."""
from app.gateway.services import build_run_config
config = build_run_config(
"thread-1",
{"configurable": {"agent_name": "explicit-agent"}},
None,
assistant_id="other-agent",
)
assert config["configurable"]["agent_name"] == "explicit-agent", "An explicit configurable['agent_name'] in the request body must not be overwritten by the assistant_id mapping"
def test_resolve_agent_factory_returns_make_lead_agent():
"""resolve_agent_factory always returns make_lead_agent regardless of assistant_id."""
from app.gateway.services import resolve_agent_factory
from deerflow.agents.lead_agent.agent import make_lead_agent
assert resolve_agent_factory(None) is make_lead_agent
assert resolve_agent_factory("lead_agent") is make_lead_agent
assert resolve_agent_factory("finalis") is make_lead_agent
assert resolve_agent_factory("custom-agent-123") is make_lead_agent
# ---------------------------------------------------------------------------
# Regression tests for issue #1699:
# context field in langgraph-compat requests not merged into configurable
# ---------------------------------------------------------------------------
def test_run_create_request_accepts_context():
"""RunCreateRequest must accept the ``context`` field without dropping it."""
from app.gateway.routers.thread_runs import RunCreateRequest
body = RunCreateRequest(
input={"messages": [{"role": "user", "content": "hi"}]},
context={
"model_name": "deepseek-v3",
"thinking_enabled": True,
"is_plan_mode": True,
"subagent_enabled": True,
"thread_id": "some-thread-id",
},
)
assert body.context is not None
assert body.context["model_name"] == "deepseek-v3"
assert body.context["is_plan_mode"] is True
assert body.context["subagent_enabled"] is True
def test_run_create_request_context_defaults_to_none():
"""RunCreateRequest without context should default to None (backward compat)."""
from app.gateway.routers.thread_runs import RunCreateRequest
body = RunCreateRequest(input=None)
assert body.context is None
def test_context_merges_into_configurable():
"""Context values must be merged into config['configurable'] by start_run.
Since start_run is async and requires many dependencies, we test the
merging logic directly by simulating what start_run does.
"""
from app.gateway.services import build_run_config
# Simulate the context merging logic from start_run
config = build_run_config("thread-1", None, None)
context = {
"model_name": "deepseek-v3",
"mode": "ultra",
"reasoning_effort": "high",
"thinking_enabled": True,
"is_plan_mode": True,
"subagent_enabled": True,
"max_concurrent_subagents": 5,
"thread_id": "should-be-ignored",
}
_CONTEXT_CONFIGURABLE_KEYS = {
"model_name",
"mode",
"thinking_enabled",
"reasoning_effort",
"is_plan_mode",
"subagent_enabled",
"max_concurrent_subagents",
}
configurable = config.setdefault("configurable", {})
for key in _CONTEXT_CONFIGURABLE_KEYS:
if key in context:
configurable.setdefault(key, context[key])
assert config["configurable"]["model_name"] == "deepseek-v3"
assert config["configurable"]["thinking_enabled"] is True
assert config["configurable"]["is_plan_mode"] is True
assert config["configurable"]["subagent_enabled"] is True
assert config["configurable"]["max_concurrent_subagents"] == 5
assert config["configurable"]["reasoning_effort"] == "high"
assert config["configurable"]["mode"] == "ultra"
# thread_id from context should NOT override the one from build_run_config
assert config["configurable"]["thread_id"] == "thread-1"
# Non-allowlisted keys should not appear
assert "thread_id" not in {k for k in context if k in _CONTEXT_CONFIGURABLE_KEYS}
def test_context_does_not_override_existing_configurable():
"""Values already in config.configurable must NOT be overridden by context.
This ensures that explicit configurable values from the ``config`` field
take precedence over the ``context`` field.
"""
from app.gateway.services import build_run_config
config = build_run_config(
"thread-1",
{"configurable": {"model_name": "gpt-4", "is_plan_mode": False}},
None,
)
context = {
"model_name": "deepseek-v3",
"is_plan_mode": True,
"subagent_enabled": True,
}
_CONTEXT_CONFIGURABLE_KEYS = {
"model_name",
"mode",
"thinking_enabled",
"reasoning_effort",
"is_plan_mode",
"subagent_enabled",
"max_concurrent_subagents",
}
configurable = config.setdefault("configurable", {})
for key in _CONTEXT_CONFIGURABLE_KEYS:
if key in context:
configurable.setdefault(key, context[key])
# Existing values must NOT be overridden
assert config["configurable"]["model_name"] == "gpt-4"
assert config["configurable"]["is_plan_mode"] is False
# New values should be added
assert config["configurable"]["subagent_enabled"] is True
+46
View File
@@ -0,0 +1,46 @@
from types import SimpleNamespace
from deerflow.agents.lead_agent import prompt as prompt_module
def test_build_custom_mounts_section_returns_empty_when_no_mounts(monkeypatch):
config = SimpleNamespace(sandbox=SimpleNamespace(mounts=[]))
monkeypatch.setattr("deerflow.config.get_app_config", lambda: config)
assert prompt_module._build_custom_mounts_section() == ""
def test_build_custom_mounts_section_lists_configured_mounts(monkeypatch):
mounts = [
SimpleNamespace(container_path="/home/user/shared", read_only=False),
SimpleNamespace(container_path="/mnt/reference", read_only=True),
]
config = SimpleNamespace(sandbox=SimpleNamespace(mounts=mounts))
monkeypatch.setattr("deerflow.config.get_app_config", lambda: config)
section = prompt_module._build_custom_mounts_section()
assert "**Custom Mounted Directories:**" in section
assert "`/home/user/shared`" in section
assert "read-write" in section
assert "`/mnt/reference`" in section
assert "read-only" in section
def test_apply_prompt_template_includes_custom_mounts(monkeypatch):
mounts = [SimpleNamespace(container_path="/home/user/shared", read_only=False)]
config = SimpleNamespace(
sandbox=SimpleNamespace(mounts=mounts),
skills=SimpleNamespace(container_path="/mnt/skills"),
)
monkeypatch.setattr("deerflow.config.get_app_config", lambda: config)
monkeypatch.setattr(prompt_module, "load_skills", lambda enabled_only=True: [])
monkeypatch.setattr(prompt_module, "get_deferred_tools_prompt_section", lambda: "")
monkeypatch.setattr(prompt_module, "_build_acp_section", lambda: "")
monkeypatch.setattr(prompt_module, "_get_memory_context", lambda agent_name=None: "")
monkeypatch.setattr(prompt_module, "get_agent_soul", lambda agent_name=None: "")
prompt = prompt_module.apply_prompt_template()
assert "`/home/user/shared`" in prompt
assert "Custom Mounted Directories" in prompt
+12
View File
@@ -86,6 +86,18 @@ async def test_list_by_thread(manager: RunManager):
assert runs[1].run_id == r1.run_id
@pytest.mark.anyio
async def test_list_by_thread_is_stable_when_timestamps_tie(manager: RunManager, monkeypatch: pytest.MonkeyPatch):
"""Newest-first ordering should not depend on timestamp precision."""
monkeypatch.setattr("deerflow.runtime.runs.manager._now_iso", lambda: "2026-01-01T00:00:00+00:00")
r1 = await manager.create("thread-1")
r2 = await manager.create("thread-1")
runs = await manager.list_by_thread("thread-1")
assert [run.run_id for run in runs] == [r2.run_id, r1.run_id]
@pytest.mark.anyio
async def test_has_inflight(manager: RunManager):
"""has_inflight should be True when a run is pending or running."""
+3
View File
@@ -397,6 +397,9 @@ sandbox:
# # - host_path: /path/on/host
# # container_path: /home/user/shared
# # read_only: false
# #
# # # DeerFlow will surface configured container_path values to the agent,
# # # so it can directly read/write mounted directories such as /home/user/shared
#
# # Optional: Environment variables to inject into the sandbox container
# # Values starting with $ will be resolved from host environment variables
+5 -7
View File
@@ -121,8 +121,8 @@ services:
container_name: deer-flow-langgraph
command: sh -c "cd /app/backend && uv run langgraph dev --no-browser --allow-blocking --no-reload --host 0.0.0.0 --port 2024 --n-jobs-per-worker 10"
volumes:
- ${DEER_FLOW_CONFIG_PATH}:/app/config.yaml:ro
- ${DEER_FLOW_EXTENSIONS_CONFIG_PATH}:/app/extensions_config.json:ro
- ${DEER_FLOW_CONFIG_PATH}:/app/backend/config.yaml:ro
- ${DEER_FLOW_EXTENSIONS_CONFIG_PATH}:/app/backend/extensions_config.json:ro
- ${DEER_FLOW_HOME}:/app/backend/.deer-flow
- ../skills:/app/skills:ro
- ../backend/.langgraph_api:/app/backend/.langgraph_api
@@ -144,14 +144,12 @@ services:
environment:
- CI=true
- DEER_FLOW_HOME=/app/backend/.deer-flow
- DEER_FLOW_CONFIG_PATH=/app/config.yaml
- DEER_FLOW_EXTENSIONS_CONFIG_PATH=/app/extensions_config.json
- DEER_FLOW_CONFIG_PATH=/app/backend/config.yaml
- DEER_FLOW_EXTENSIONS_CONFIG_PATH=/app/backend/extensions_config.json
- DEER_FLOW_HOST_BASE_DIR=${DEER_FLOW_HOME}
- DEER_FLOW_HOST_SKILLS_PATH=${DEER_FLOW_REPO_ROOT}/skills
- DEER_FLOW_SANDBOX_HOST=host.docker.internal
# Disable LangSmith tracing LANGSMITH_API_KEY is not required.
# Set LANGSMITH_TRACING=true and LANGSMITH_API_KEY in .env to enable.
- LANGSMITH_TRACING=${LANGSMITH_TRACING:-false}
# LangSmith tracing: set LANGSMITH_TRACING=true and LANGSMITH_API_KEY in .env to enable.
env_file:
- ../.env
extra_hosts:
+36 -3
View File
@@ -31,6 +31,7 @@ from __future__ import annotations
import logging
import os
import re
import time
from contextlib import asynccontextmanager
@@ -39,7 +40,7 @@ from fastapi import FastAPI, HTTPException
from kubernetes import client as k8s_client
from kubernetes import config as k8s_config
from kubernetes.client.rest import ApiException
from pydantic import BaseModel
from pydantic import BaseModel, Field
# Suppress only the InsecureRequestWarning from urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
@@ -59,6 +60,7 @@ SANDBOX_IMAGE = os.environ.get(
)
SKILLS_HOST_PATH = os.environ.get("SKILLS_HOST_PATH", "/skills")
THREADS_HOST_PATH = os.environ.get("THREADS_HOST_PATH", "/.deer-flow/threads")
SAFE_THREAD_ID_PATTERN = r"^[A-Za-z0-9_\-]+$"
# Path to the kubeconfig *inside* the provisioner container.
# Typically the host's ~/.kube/config is mounted here.
@@ -69,6 +71,36 @@ KUBECONFIG_PATH = os.environ.get("KUBECONFIG_PATH", "/root/.kube/config")
# is ``host.docker.internal``; on Linux it may be the host's LAN IP.
NODE_HOST = os.environ.get("NODE_HOST", "host.docker.internal")
def join_host_path(base: str, *parts: str) -> str:
"""Join host filesystem path segments while preserving native style."""
if not parts:
return base
if re.match(r"^[A-Za-z]:[\\/]", base) or base.startswith("\\\\") or "\\" in base:
from pathlib import PureWindowsPath
result = PureWindowsPath(base)
for part in parts:
result /= part
return str(result)
from pathlib import Path
result = Path(base)
for part in parts:
result /= part
return str(result)
def _validate_thread_id(thread_id: str) -> str:
if not re.match(SAFE_THREAD_ID_PATTERN, thread_id):
raise ValueError(
"Invalid thread_id: only alphanumeric characters, hyphens, and underscores are allowed."
)
return thread_id
# ── K8s client setup ────────────────────────────────────────────────────
core_v1: k8s_client.CoreV1Api | None = None
@@ -186,7 +218,7 @@ app = FastAPI(title="DeerFlow Sandbox Provisioner", lifespan=lifespan)
class CreateSandboxRequest(BaseModel):
sandbox_id: str
thread_id: str
thread_id: str = Field(pattern=SAFE_THREAD_ID_PATTERN)
class SandboxResponse(BaseModel):
@@ -213,6 +245,7 @@ def _sandbox_url(node_port: int) -> str:
def _build_pod(sandbox_id: str, thread_id: str) -> k8s_client.V1Pod:
"""Construct a Pod manifest for a single sandbox."""
thread_id = _validate_thread_id(thread_id)
return k8s_client.V1Pod(
metadata=k8s_client.V1ObjectMeta(
name=_pod_name(sandbox_id),
@@ -298,7 +331,7 @@ def _build_pod(sandbox_id: str, thread_id: str) -> k8s_client.V1Pod:
k8s_client.V1Volume(
name="user-data",
host_path=k8s_client.V1HostPathVolumeSource(
path=f"{THREADS_HOST_PATH}/{thread_id}/user-data",
path=join_host_path(THREADS_HOST_PATH, thread_id, "user-data"),
type="DirectoryOrCreate",
),
),
@@ -80,13 +80,9 @@ export default function NewAgentPage() {
setNameError(t.agents.nameStepAlreadyExistsError);
return;
}
} catch (error) {
if (error instanceof AgentNameCheckError) {
setNameError(
error.reason === "backend_unreachable"
? t.agents.nameStepCheckError
: error.message,
);
} catch (err) {
if (err instanceof TypeError && err.message === "Failed to fetch") {
setNameError(t.agents.nameStepNetworkError);
} else {
setNameError(t.agents.nameStepCheckError);
}
@@ -107,6 +103,7 @@ export default function NewAgentPage() {
t.agents.nameStepBootstrapMessage,
t.agents.nameStepInvalidError,
t.agents.nameStepAlreadyExistsError,
t.agents.nameStepNetworkError,
t.agents.nameStepCheckError,
]);
@@ -83,7 +83,7 @@ export function ArtifactFileDetail({
const isSupportPreview = useMemo(() => {
return language === "html" || language === "markdown";
}, [language]);
const { content } = useArtifactContent({
const { content, url } = useArtifactContent({
threadId,
filepath: filepathFromProps,
enabled: isCodeFile && !isWriteFile,
@@ -240,7 +240,9 @@ export function ArtifactFileDetail({
(language === "markdown" || language === "html") && (
<ArtifactFilePreview
content={displayContent}
isWriteFile={isWriteFile}
language={language ?? "text"}
url={url}
/>
)}
{isCodeFile && viewMode === "code" && (
@@ -263,10 +265,14 @@ export function ArtifactFileDetail({
export function ArtifactFilePreview({
content,
isWriteFile,
language,
url,
}: {
content: string;
isWriteFile: boolean;
language: string;
url?: string;
}) {
if (language === "markdown") {
return (
@@ -286,8 +292,8 @@ export function ArtifactFilePreview({
<iframe
className="size-full"
title="Artifact preview"
srcDoc={content}
sandbox="allow-scripts allow-forms"
{...(isWriteFile ? { srcDoc: content } : url ? { src: url } : {})}
/>
);
}
+6 -1
View File
@@ -34,5 +34,10 @@ export function useArtifactContent({
// Cache artifact content for 5 minutes to avoid repeated fetches (especially for .skill ZIP extraction)
staleTime: 5 * 60 * 1000,
});
return { content: isWriteFile ? content : data, isLoading, error };
return {
content: isWriteFile ? content : data?.content,
url: isWriteFile ? undefined : data?.url,
isLoading,
error,
};
}
+1 -1
View File
@@ -20,7 +20,7 @@ export async function loadArtifactContent({
const url = urlOfArtifact({ filepath: enhancedFilepath, threadId, isMock });
const response = await fetch(url);
const text = await response.text();
return text;
return { content: text, url };
}
export function loadArtifactContentFromToolCall({
+3 -2
View File
@@ -194,8 +194,9 @@ export const enUS: Translations = {
nameStepInvalidError:
"Invalid name — use only letters, digits, and hyphens",
nameStepAlreadyExistsError: "An agent with this name already exists",
nameStepCheckError:
"Could not reach the DeerFlow backend to verify name availability. Start the backend or set NEXT_PUBLIC_BACKEND_BASE_URL, then try again.",
nameStepNetworkError:
"Network request failed — check your network or backend connection",
nameStepCheckError: "Could not verify name availability — please try again",
nameStepBootstrapMessage:
"The new custom agent name is {name}. Let's bootstrap it's **SOUL**.",
agentCreated: "Agent created!",
+1
View File
@@ -133,6 +133,7 @@ export interface Translations {
nameStepContinue: string;
nameStepInvalidError: string;
nameStepAlreadyExistsError: string;
nameStepNetworkError: string;
nameStepCheckError: string;
nameStepBootstrapMessage: string;
agentCreated: string;
+2 -2
View File
@@ -183,8 +183,8 @@ export const zhCN: Translations = {
nameStepContinue: "继续",
nameStepInvalidError: "名称无效,只允许字母、数字和连字符",
nameStepAlreadyExistsError: "已存在同名智能体",
nameStepCheckError:
"无法连接 DeerFlow 后端来验证名称是否可用。请先启动后端,或配置 NEXT_PUBLIC_BACKEND_BASE_URL,然后再重试",
nameStepNetworkError: "网络请求失败,请检查网络或后端连接",
nameStepCheckError: "无法验证名称可用性,请稍后重试",
nameStepBootstrapMessage:
"新智能体的名称是 {name},现在开始为它生成 **SOUL**。",
agentCreated: "智能体已创建!",
+15 -2
View File
@@ -9,6 +9,17 @@ import sys
from typing import Optional
def configure_stdio() -> None:
"""Prefer UTF-8 output so Unicode status markers render on Windows."""
for stream_name in ("stdout", "stderr"):
stream = getattr(sys, stream_name, None)
if hasattr(stream, "reconfigure"):
try:
stream.reconfigure(encoding="utf-8", errors="replace")
except (OSError, ValueError):
continue
def run_command(command: list[str]) -> Optional[str]:
"""Run a command and return trimmed stdout, or None on failure."""
try:
@@ -29,6 +40,7 @@ def parse_node_major(version_text: str) -> Optional[int]:
def main() -> int:
configure_stdio()
print("==========================================")
print(" Checking Required Dependencies")
print("==========================================")
@@ -61,8 +73,9 @@ def main() -> int:
print()
print("Checking pnpm...")
if shutil.which("pnpm"):
pnpm_version = run_command(["pnpm", "-v"])
pnpm_executable = shutil.which("pnpm.cmd") or shutil.which("pnpm")
if pnpm_executable:
pnpm_version = run_command([pnpm_executable, "-v"])
if pnpm_version:
print(f" ✓ pnpm {pnpm_version}")
else: