From 380255f722ccc568f37d4b63f3c02bdeeb399465 Mon Sep 17 00:00:00 2001 From: Xinmin Zeng <135568692+fancyboi999@users.noreply.github.com> Date: Sun, 17 May 2026 08:26:04 +0800 Subject: [PATCH] fix(sandbox): uphold /mnt/user-data contract at Sandbox API boundary (#2873) (#2881) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix(sandbox): uphold /mnt/user-data contract at Sandbox API boundary (#2873) LocalSandboxProvider used a process-wide singleton with no /mnt/user-data mapping, forcing every caller to translate virtual paths via tools.py before invoking the public Sandbox API. AIO already exposes /mnt/user-data natively (per-thread bind mounts), so the same code path behaved differently across implementations — and direct callers like uploads.py:282 / feishu.py:389 only worked thanks to the `uses_thread_data_mounts` workaround flag. Switch the provider to a dual-track cache: keep the `"local"` singleton for legacy acquire(None) callers (backward-compat for existing tests and scripts), and create a per-thread LocalSandbox with id `"local:{tid}"` for acquire(thread_id). Each per-thread instance carries PathMapping entries for /mnt/user-data, its three subdirs, and /mnt/acp-workspace, mirroring how AioSandboxProvider mounts those paths into its container. is_local_sandbox() now recognises both id formats. `_agent_written_paths` becomes per-thread (it was a process-wide set that leaked across threads — a latent isolation bug also fixed by this change). Verified via TDD: a new contract test suite hits the public Sandbox API directly (write/read/list/exec/glob/grep/update + per-thread isolation + lifecycle). 3212 backend tests still pass, ruff is clean. * fix(sandbox): address Copilot review on #2881 Three follow-ups from Copilot's review of the LocalSandboxProvider refactor: 1. Synchronisation: ``acquire`` / ``get`` / ``reset`` mutated the cache without any lock, so concurrent acquire of the same ``thread_id`` could create two ``LocalSandbox`` instances and lose one's ``_agent_written_paths`` state. Add a provider-wide ``threading.Lock`` (matching ``AioSandboxProvider``) and build per-thread mappings outside the lock to avoid holding it during the ``ensure_thread_dirs`` filesystem touch. 2. Memory bound: ``_thread_sandboxes`` grew monotonically. Replace the plain dict with an ``OrderedDict`` LRU capped at ``DEFAULT_MAX_CACHED_THREAD_SANDBOXES`` (256, configurable per provider instance). ``get`` promotes touched threads to the MRU end so an active thread isn't evicted under load. Eviction is graceful: the next ``acquire`` rebuilds a fresh sandbox; only ``_agent_written_paths`` (reverse-resolve hint) is lost. 3. Docs: update ``CLAUDE.md`` to reflect the new per-thread architecture, the LRU cap, and that ``is_local_sandbox`` recognises both id formats. New regression tests: - Concurrent ``acquire("alpha")`` from 8 threads yields a single instance (slow-init injection forces the race window wide open). - Concurrent ``acquire`` of distinct thread_ids yields distinct instances. - The cache evicts the least-recently-used thread once the cap is exceeded. - ``get`` promotes recency so a polled thread survives a later acquire-storm. --- backend/CLAUDE.md | 6 +- .../sandbox/local/local_sandbox_provider.py | 235 ++++++++++- .../harness/deerflow/sandbox/tools.py | 10 +- ...est_local_sandbox_virtual_path_contract.py | 366 ++++++++++++++++++ 4 files changed, 592 insertions(+), 25 deletions(-) create mode 100644 backend/tests/test_local_sandbox_virtual_path_contract.py diff --git a/backend/CLAUDE.md b/backend/CLAUDE.md index 5e0aebfdb..35607c6fd 100644 --- a/backend/CLAUDE.md +++ b/backend/CLAUDE.md @@ -232,14 +232,14 @@ Proxied through nginx: `/api/langgraph/*` → Gateway LangGraph-compatible runti **Interface**: Abstract `Sandbox` with `execute_command`, `read_file`, `write_file`, `list_dir` **Provider Pattern**: `SandboxProvider` with `acquire`, `get`, `release` lifecycle **Implementations**: -- `LocalSandboxProvider` - Singleton local filesystem execution with path mappings +- `LocalSandboxProvider` - Local filesystem execution. `acquire(thread_id)` returns a per-thread `LocalSandbox` (id `local:{thread_id}`) whose `path_mappings` resolve `/mnt/user-data/{workspace,uploads,outputs}` and `/mnt/acp-workspace` to that thread's host directories, so the public `Sandbox` API honours the `/mnt/user-data` contract uniformly with AIO. `acquire()` / `acquire(None)` keeps the legacy generic singleton (id `local`) for callers without a thread context. Per-thread sandboxes are held in an LRU cache (default 256 entries) guarded by a `threading.Lock`. - `AioSandboxProvider` (`packages/harness/deerflow/community/`) - Docker-based isolation **Virtual Path System**: - Agent sees: `/mnt/user-data/{workspace,uploads,outputs}`, `/mnt/skills` - Physical: `backend/.deer-flow/users/{user_id}/threads/{thread_id}/user-data/...`, `deer-flow/skills/` -- Translation: `replace_virtual_path()` / `replace_virtual_paths_in_command()` -- Detection: `is_local_sandbox()` checks `sandbox_id == "local"` +- Translation: `LocalSandboxProvider` builds per-thread `PathMapping`s for the user-data prefixes at acquire time; `tools.py` keeps `replace_virtual_path()` / `replace_virtual_paths_in_command()` as a defense-in-depth layer (and for path validation). AIO has the directories volume-mounted at the same virtual paths inside its container, so both implementations accept `/mnt/user-data/...` natively. +- Detection: `is_local_sandbox()` accepts both `sandbox_id == "local"` (legacy / no-thread) and `sandbox_id.startswith("local:")` (per-thread) **Sandbox Tools** (in `packages/harness/deerflow/sandbox/tools.py`): - `bash` - Execute commands with path translation and error handling diff --git a/backend/packages/harness/deerflow/sandbox/local/local_sandbox_provider.py b/backend/packages/harness/deerflow/sandbox/local/local_sandbox_provider.py index 0510a2473..d64a1c220 100644 --- a/backend/packages/harness/deerflow/sandbox/local/local_sandbox_provider.py +++ b/backend/packages/harness/deerflow/sandbox/local/local_sandbox_provider.py @@ -1,4 +1,6 @@ import logging +import threading +from collections import OrderedDict from pathlib import Path from deerflow.sandbox.local.local_sandbox import LocalSandbox, PathMapping @@ -7,25 +9,87 @@ from deerflow.sandbox.sandbox_provider import SandboxProvider logger = logging.getLogger(__name__) +# Module-level alias kept for backward compatibility with older callers/tests +# that reach into ``local_sandbox_provider._singleton`` directly. New code reads +# the provider instance attributes (``_generic_sandbox`` / ``_thread_sandboxes``) +# instead. _singleton: LocalSandbox | None = None +# Virtual prefixes that must be reserved by the per-thread mappings created in +# ``acquire`` — custom mounts from ``config.yaml`` may not overlap with these. +_USER_DATA_VIRTUAL_PREFIX = "/mnt/user-data" +_ACP_WORKSPACE_VIRTUAL_PREFIX = "/mnt/acp-workspace" + +# Default upper bound on per-thread LocalSandbox instances retained in memory. +# Each cached instance is cheap (a small Python object with a list of +# PathMapping and a set of agent-written paths used for reverse resolve), but +# in a long-running gateway the number of distinct thread_ids is unbounded. +# When the cap is exceeded the least-recently-used entry is dropped; the next +# ``acquire(thread_id)`` for that thread simply rebuilds the sandbox at the +# cost of losing its accumulated ``_agent_written_paths`` (read_file falls +# back to no reverse resolution, which is the same behaviour as a fresh run). +DEFAULT_MAX_CACHED_THREAD_SANDBOXES = 256 + class LocalSandboxProvider(SandboxProvider): + """Local-filesystem sandbox provider with per-thread path scoping. + + Earlier revisions of this provider returned a single process-wide + ``LocalSandbox`` keyed by the literal id ``"local"``. That singleton could + not honour the documented ``/mnt/user-data/...`` contract at the public + ``Sandbox`` API boundary because the corresponding host directory is + per-thread (``{base_dir}/users/{user_id}/threads/{thread_id}/user-data/``). + + The provider now produces a fresh ``LocalSandbox`` per ``thread_id`` whose + ``path_mappings`` include thread-scoped entries for + ``/mnt/user-data/{workspace,uploads,outputs}`` and ``/mnt/acp-workspace``, + mirroring how :class:`AioSandboxProvider` bind-mounts those paths into its + docker container. The legacy ``acquire()`` / ``acquire(None)`` call still + returns a generic singleton with id ``"local"`` for callers (and tests) + that do not have a thread context. + + Thread-safety: ``acquire``, ``get`` and ``reset`` may be invoked from + multiple threads (Gateway tool dispatch, subagent worker pools, the + background memory updater, …) so all cache state changes are serialised + through a provider-wide :class:`threading.Lock`. This matches the pattern + used by :class:`AioSandboxProvider`. + + Memory bound: ``_thread_sandboxes`` is an LRU cache capped at + ``max_cached_threads`` (default :data:`DEFAULT_MAX_CACHED_THREAD_SANDBOXES`). + When the cap is exceeded the least-recently-used entry is evicted on the + next ``acquire``; the evicted thread's next ``acquire`` rebuilds a fresh + sandbox (losing only its ``_agent_written_paths`` reverse-resolve hint, + which gracefully degrades read_file output). + """ + uses_thread_data_mounts = True - def __init__(self): - """Initialize the local sandbox provider with path mappings.""" + def __init__(self, max_cached_threads: int = DEFAULT_MAX_CACHED_THREAD_SANDBOXES): + """Initialize the local sandbox provider with static path mappings. + + Args: + max_cached_threads: Upper bound on per-thread sandboxes retained in + the LRU cache. When exceeded, the least-recently-used entry is + evicted on the next ``acquire``. + """ self._path_mappings = self._setup_path_mappings() + self._generic_sandbox: LocalSandbox | None = None + self._thread_sandboxes: OrderedDict[str, LocalSandbox] = OrderedDict() + self._max_cached_threads = max_cached_threads + self._lock = threading.Lock() def _setup_path_mappings(self) -> list[PathMapping]: """ - Setup path mappings for local sandbox. + Setup static path mappings shared by every sandbox this provider yields. - Maps container paths to actual local paths, including skills directory - and any custom mounts configured in config.yaml. + Static mappings cover the skills directory and any custom mounts from + ``config.yaml`` — both are process-wide and identical for every thread. + Per-thread ``/mnt/user-data/...`` and ``/mnt/acp-workspace`` mappings + are appended inside :meth:`acquire` because they depend on + ``thread_id`` and the effective ``user_id``. Returns: - List of path mappings + List of static path mappings """ mappings: list[PathMapping] = [] @@ -48,7 +112,11 @@ class LocalSandboxProvider(SandboxProvider): ) # Map custom mounts from sandbox config - _RESERVED_CONTAINER_PREFIXES = [container_path, "/mnt/acp-workspace", "/mnt/user-data"] + _RESERVED_CONTAINER_PREFIXES = [ + container_path, + _ACP_WORKSPACE_VIRTUAL_PREFIX, + _USER_DATA_VIRTUAL_PREFIX, + ] sandbox_config = config.sandbox if sandbox_config and sandbox_config.mounts: for mount in sandbox_config.mounts: @@ -99,33 +167,162 @@ class LocalSandboxProvider(SandboxProvider): return mappings + @staticmethod + def _build_thread_path_mappings(thread_id: str) -> list[PathMapping]: + """Build per-thread path mappings for /mnt/user-data and /mnt/acp-workspace. + + Resolves ``user_id`` via :func:`get_effective_user_id` (the same path + :class:`AioSandboxProvider` uses) and ensures the backing host + directories exist before they are mapped into the sandbox view. + """ + from deerflow.config.paths import get_paths + from deerflow.runtime.user_context import get_effective_user_id + + paths = get_paths() + user_id = get_effective_user_id() + paths.ensure_thread_dirs(thread_id, user_id=user_id) + + return [ + # Aggregate parent mapping so ``ls /mnt/user-data`` and other + # parent-level operations behave the same as inside AIO (where the + # parent directory is real and contains the three subdirs). Longer + # subpath mappings below still win for ``/mnt/user-data/workspace/...`` + # because ``_find_path_mapping`` sorts by container_path length. + PathMapping( + container_path=_USER_DATA_VIRTUAL_PREFIX, + local_path=str(paths.sandbox_user_data_dir(thread_id, user_id=user_id)), + read_only=False, + ), + PathMapping( + container_path=f"{_USER_DATA_VIRTUAL_PREFIX}/workspace", + local_path=str(paths.sandbox_work_dir(thread_id, user_id=user_id)), + read_only=False, + ), + PathMapping( + container_path=f"{_USER_DATA_VIRTUAL_PREFIX}/uploads", + local_path=str(paths.sandbox_uploads_dir(thread_id, user_id=user_id)), + read_only=False, + ), + PathMapping( + container_path=f"{_USER_DATA_VIRTUAL_PREFIX}/outputs", + local_path=str(paths.sandbox_outputs_dir(thread_id, user_id=user_id)), + read_only=False, + ), + PathMapping( + container_path=_ACP_WORKSPACE_VIRTUAL_PREFIX, + local_path=str(paths.acp_workspace_dir(thread_id, user_id=user_id)), + read_only=False, + ), + ] + def acquire(self, thread_id: str | None = None) -> str: + """Return a sandbox id scoped to *thread_id* (or the generic singleton). + + - ``thread_id=None`` keeps the legacy singleton with id ``"local"`` for + callers that have no thread context (e.g. legacy tests, scripts). + - ``thread_id="abc"`` yields a per-thread ``LocalSandbox`` with id + ``"local:abc"`` whose ``path_mappings`` resolve ``/mnt/user-data/...`` + to that thread's host directories. + + Thread-safe under concurrent invocation: the cache check + insert is + guarded by ``self._lock`` so two callers racing on the same + ``thread_id`` always observe the same LocalSandbox instance. + """ global _singleton - if _singleton is None: - _singleton = LocalSandbox("local", path_mappings=self._path_mappings) - return _singleton.id + + if thread_id is None: + with self._lock: + if self._generic_sandbox is None: + self._generic_sandbox = LocalSandbox("local", path_mappings=list(self._path_mappings)) + _singleton = self._generic_sandbox + return self._generic_sandbox.id + + # Fast path under lock. + with self._lock: + cached = self._thread_sandboxes.get(thread_id) + if cached is not None: + # Mark as most-recently used so frequently-touched threads + # survive eviction. + self._thread_sandboxes.move_to_end(thread_id) + return cached.id + + # ``_build_thread_path_mappings`` touches the filesystem + # (``ensure_thread_dirs``); release the lock during I/O. + new_mappings = list(self._path_mappings) + self._build_thread_path_mappings(thread_id) + + with self._lock: + # Re-check after the lock-free I/O: another caller may have + # populated the cache while we were computing mappings. + cached = self._thread_sandboxes.get(thread_id) + if cached is None: + cached = LocalSandbox(f"local:{thread_id}", path_mappings=new_mappings) + self._thread_sandboxes[thread_id] = cached + self._evict_until_within_cap_locked() + else: + self._thread_sandboxes.move_to_end(thread_id) + return cached.id + + def _evict_until_within_cap_locked(self) -> None: + """LRU-evict cached thread sandboxes once the cap is exceeded. + + Caller MUST hold ``self._lock``. + """ + while len(self._thread_sandboxes) > self._max_cached_threads: + evicted_thread_id, _ = self._thread_sandboxes.popitem(last=False) + logger.info( + "Evicting LocalSandbox cache entry for thread %s (cap=%d)", + evicted_thread_id, + self._max_cached_threads, + ) def get(self, sandbox_id: str) -> Sandbox | None: if sandbox_id == "local": - if _singleton is None: + with self._lock: + generic = self._generic_sandbox + if generic is None: self.acquire() - return _singleton + with self._lock: + return self._generic_sandbox + return generic + if isinstance(sandbox_id, str) and sandbox_id.startswith("local:"): + thread_id = sandbox_id[len("local:") :] + with self._lock: + cached = self._thread_sandboxes.get(thread_id) + if cached is not None: + # Touching a thread via ``get`` (used by tools.py to look + # up the sandbox once per tool call) promotes it in LRU + # order so an active thread isn't evicted under load. + self._thread_sandboxes.move_to_end(thread_id) + return cached return None def release(self, sandbox_id: str) -> None: - # LocalSandbox uses singleton pattern - no cleanup needed. + # LocalSandbox has no resources to release; keep the cached instance so + # that ``_agent_written_paths`` (used to reverse-resolve agent-authored + # file contents on read) survives between turns. LRU eviction in + # ``acquire`` and explicit ``reset()`` / ``shutdown()`` are the only + # paths that drop cached entries. + # # Note: This method is intentionally not called by SandboxMiddleware # to allow sandbox reuse across multiple turns in a thread. - # For Docker-based providers (e.g., AioSandboxProvider), cleanup - # happens at application shutdown via the shutdown() method. pass def reset(self) -> None: - # reset_sandbox_provider() must also clear the module singleton. + """Drop all cached LocalSandbox instances. + + ``reset_sandbox_provider()`` calls this to ensure config / mount + changes take effect on the next ``acquire()``. We also reset the + module-level ``_singleton`` alias so older callers/tests that reach + into it see a fresh state. + """ global _singleton - _singleton = None + with self._lock: + self._generic_sandbox = None + self._thread_sandboxes.clear() + _singleton = None def shutdown(self) -> None: - # LocalSandboxProvider has no extra resources beyond the shared - # singleton, so shutdown uses the same cleanup path as reset. + # LocalSandboxProvider has no extra resources beyond the cached + # ``LocalSandbox`` instances, so shutdown uses the same cleanup path + # as ``reset``. self.reset() diff --git a/backend/packages/harness/deerflow/sandbox/tools.py b/backend/packages/harness/deerflow/sandbox/tools.py index 7c746b1aa..2694e9406 100644 --- a/backend/packages/harness/deerflow/sandbox/tools.py +++ b/backend/packages/harness/deerflow/sandbox/tools.py @@ -1006,8 +1006,9 @@ def get_thread_data(runtime: Runtime | None) -> ThreadDataState | None: def is_local_sandbox(runtime: Runtime | None) -> bool: """Check if the current sandbox is a local sandbox. - Path replacement is only needed for local sandbox since aio sandbox - already has /mnt/user-data mounted in the container. + Accepts both the legacy generic id ``"local"`` (acquire with no thread + context) and the per-thread id format ``"local:{thread_id}"`` produced by + :meth:`LocalSandboxProvider.acquire` once a thread is known. """ if runtime is None: return False @@ -1016,7 +1017,10 @@ def is_local_sandbox(runtime: Runtime | None) -> bool: sandbox_state = runtime.state.get("sandbox") if sandbox_state is None: return False - return sandbox_state.get("sandbox_id") == "local" + sandbox_id = sandbox_state.get("sandbox_id") + if not isinstance(sandbox_id, str): + return False + return sandbox_id == "local" or sandbox_id.startswith("local:") def sandbox_from_runtime(runtime: Runtime | None = None) -> Sandbox: diff --git a/backend/tests/test_local_sandbox_virtual_path_contract.py b/backend/tests/test_local_sandbox_virtual_path_contract.py new file mode 100644 index 000000000..d9ec0cbdc --- /dev/null +++ b/backend/tests/test_local_sandbox_virtual_path_contract.py @@ -0,0 +1,366 @@ +"""Issue #2873 regression — the public Sandbox API must honor the documented +/mnt/user-data contract uniformly across implementations. + +Today AIO sandbox already accepts /mnt/user-data/... paths directly because the +container has those paths bind-mounted per-thread. LocalSandbox, however, +externalises that translation to ``deerflow.sandbox.tools`` via ``thread_data``, +so any caller that bypasses tools.py (e.g. ``uploads.py`` syncing files into a +remote sandbox via ``sandbox.update_file(virtual_path, ...)``) sees inconsistent +behaviour. + +These tests pin down the **public Sandbox API boundary**: when a caller obtains +a ``LocalSandbox`` from ``LocalSandboxProvider.acquire(thread_id)`` and invokes +its abstract methods with documented virtual paths, those paths must resolve to +the thread's user-data directory automatically — no tools.py / thread_data +shim required. +""" + +from __future__ import annotations + +from pathlib import Path +from types import SimpleNamespace +from unittest.mock import patch + +import pytest + +from deerflow.config.sandbox_config import SandboxConfig +from deerflow.sandbox.local.local_sandbox_provider import LocalSandboxProvider + + +def _build_config(skills_dir: Path) -> SimpleNamespace: + """Minimal app config covering what ``LocalSandboxProvider`` reads at init.""" + return SimpleNamespace( + skills=SimpleNamespace( + container_path="/mnt/skills", + get_skills_path=lambda: skills_dir, + use="deerflow.skills.storage.local_skill_storage:LocalSkillStorage", + ), + sandbox=SandboxConfig(use="deerflow.sandbox.local:LocalSandboxProvider", mounts=[]), + ) + + +@pytest.fixture +def isolated_paths(monkeypatch, tmp_path): + """Redirect ``get_paths().base_dir`` to ``tmp_path`` and reset its singleton. + + Without this, per-thread directories would be created under the developer's + real ``.deer-flow/`` tree. + """ + monkeypatch.setenv("DEER_FLOW_HOME", str(tmp_path)) + from deerflow.config import paths as paths_module + + monkeypatch.setattr(paths_module, "_paths", None) + yield tmp_path + monkeypatch.setattr(paths_module, "_paths", None) + + +@pytest.fixture +def provider(isolated_paths, tmp_path): + """Provider with a real skills dir and no custom mounts.""" + skills_dir = tmp_path / "skills" + skills_dir.mkdir() + cfg = _build_config(skills_dir) + with patch("deerflow.config.get_app_config", return_value=cfg): + yield LocalSandboxProvider() + + +# ────────────────────────────────────────────────────────────────────────── +# 1. Direct Sandbox API accepts the virtual path contract for ``acquire(tid)`` +# ────────────────────────────────────────────────────────────────────────── + + +def test_acquire_with_thread_id_returns_per_thread_id(provider): + sandbox_id = provider.acquire("alpha") + assert sandbox_id == "local:alpha" + + +def test_acquire_without_thread_id_remains_legacy_local_id(provider): + """Backward-compat: ``acquire()`` with no thread keeps the singleton id.""" + assert provider.acquire() == "local" + assert provider.acquire(None) == "local" + + +def test_write_then_read_via_public_api_with_virtual_path(provider): + sandbox_id = provider.acquire("alpha") + sbx = provider.get(sandbox_id) + assert sbx is not None + + virtual = "/mnt/user-data/workspace/hello.txt" + sbx.write_file(virtual, "hi there") + assert sbx.read_file(virtual) == "hi there" + + +def test_list_dir_via_public_api_with_virtual_path(provider): + sandbox_id = provider.acquire("alpha") + sbx = provider.get(sandbox_id) + sbx.write_file("/mnt/user-data/workspace/foo.txt", "x") + entries = sbx.list_dir("/mnt/user-data/workspace") + # entries should be reverse-resolved back to the virtual prefix + assert any("/mnt/user-data/workspace/foo.txt" in e for e in entries) + + +def test_execute_command_with_virtual_path(provider): + sandbox_id = provider.acquire("alpha") + sbx = provider.get(sandbox_id) + sbx.write_file("/mnt/user-data/uploads/note.txt", "payload") + output = sbx.execute_command("ls /mnt/user-data/uploads") + assert "note.txt" in output + + +def test_glob_with_virtual_path(provider): + sandbox_id = provider.acquire("alpha") + sbx = provider.get(sandbox_id) + sbx.write_file("/mnt/user-data/outputs/report.md", "# r") + matches, _ = sbx.glob("/mnt/user-data/outputs", "*.md") + assert any(m.endswith("/mnt/user-data/outputs/report.md") for m in matches) + + +def test_grep_with_virtual_path(provider): + sandbox_id = provider.acquire("alpha") + sbx = provider.get(sandbox_id) + sbx.write_file("/mnt/user-data/workspace/findme.txt", "needle line\nother line") + matches, _ = sbx.grep("/mnt/user-data/workspace", "needle", literal=True) + assert matches + assert matches[0].path.endswith("/mnt/user-data/workspace/findme.txt") + + +def test_execute_command_lists_aggregate_user_data_root(provider): + """``ls /mnt/user-data`` (the parent prefix itself) must list the three + subdirs — matching the AIO container's natural filesystem view.""" + sandbox_id = provider.acquire("alpha") + sbx = provider.get(sandbox_id) + # Touch all three subdirs so they materialise on disk + sbx.write_file("/mnt/user-data/workspace/.keep", "") + sbx.write_file("/mnt/user-data/uploads/.keep", "") + sbx.write_file("/mnt/user-data/outputs/.keep", "") + output = sbx.execute_command("ls /mnt/user-data") + assert "workspace" in output + assert "uploads" in output + assert "outputs" in output + + +def test_update_file_with_virtual_path_for_remote_sync_scenario(provider): + """This is the exact code path used by ``uploads.py:282`` and ``feishu.py:389``. + + They build a ``virtual_path`` like ``/mnt/user-data/uploads/foo.pdf`` and hand + raw bytes to the sandbox. Before this fix LocalSandbox would try to write to + the literal host path ``/mnt/user-data/uploads/foo.pdf`` and fail. + """ + sandbox_id = provider.acquire("alpha") + sbx = provider.get(sandbox_id) + sbx.update_file("/mnt/user-data/uploads/blob.bin", b"\x00\x01\x02binary") + assert sbx.read_file("/mnt/user-data/uploads/blob.bin").startswith("\x00\x01\x02") + + +# ────────────────────────────────────────────────────────────────────────── +# 2. Per-thread isolation (no cross-thread state leaks) +# ────────────────────────────────────────────────────────────────────────── + + +def test_two_threads_get_distinct_sandboxes(provider): + sid_a = provider.acquire("alpha") + sid_b = provider.acquire("beta") + assert sid_a != sid_b + + sbx_a = provider.get(sid_a) + sbx_b = provider.get(sid_b) + assert sbx_a is not sbx_b + + +def test_per_thread_user_data_mapping_isolated(provider, isolated_paths): + """Files written via one thread's sandbox must not be visible through another.""" + sid_a = provider.acquire("alpha") + sid_b = provider.acquire("beta") + sbx_a = provider.get(sid_a) + sbx_b = provider.get(sid_b) + + sbx_a.write_file("/mnt/user-data/workspace/secret.txt", "alpha-only") + # The same virtual path resolves to a different host path in thread "beta" + with pytest.raises(FileNotFoundError): + sbx_b.read_file("/mnt/user-data/workspace/secret.txt") + + +def test_agent_written_paths_per_thread_isolation(provider): + """``_agent_written_paths`` tracks files this sandbox wrote so reverse-resolve + runs on read. The set must not leak across threads.""" + sid_a = provider.acquire("alpha") + sid_b = provider.acquire("beta") + sbx_a = provider.get(sid_a) + sbx_b = provider.get(sid_b) + sbx_a.write_file("/mnt/user-data/workspace/in-a.txt", "marker") + assert sbx_a._agent_written_paths + assert not sbx_b._agent_written_paths + + +# ────────────────────────────────────────────────────────────────────────── +# 3. Lifecycle: get / release / reset +# ────────────────────────────────────────────────────────────────────────── + + +def test_get_returns_cached_instance_for_known_id(provider): + sid = provider.acquire("alpha") + assert provider.get(sid) is provider.get(sid) + + +def test_get_unknown_id_returns_none(provider): + assert provider.get("local:nonexistent") is None + + +def test_release_is_noop_keeps_instance_available(provider): + """Local has no resources to release; the cached instance stays alive across + turns so ``_agent_written_paths`` persists for reverse-resolve on later reads.""" + sid = provider.acquire("alpha") + sbx_before = provider.get(sid) + provider.release(sid) + sbx_after = provider.get(sid) + assert sbx_before is sbx_after + + +def test_reset_clears_both_generic_and_per_thread_caches(provider): + provider.acquire() # populate generic + provider.acquire("alpha") # populate per-thread + assert provider._generic_sandbox is not None + assert provider._thread_sandboxes + + provider.reset() + assert provider._generic_sandbox is None + assert not provider._thread_sandboxes + + +# ────────────────────────────────────────────────────────────────────────── +# 4. is_local_sandbox detects both legacy and per-thread ids +# ────────────────────────────────────────────────────────────────────────── + + +def test_is_local_sandbox_accepts_both_id_formats(): + from deerflow.sandbox.tools import is_local_sandbox + + legacy = SimpleNamespace(state={"sandbox": {"sandbox_id": "local"}}, context={}) + per_thread = SimpleNamespace(state={"sandbox": {"sandbox_id": "local:alpha"}}, context={}) + foreign = SimpleNamespace(state={"sandbox": {"sandbox_id": "aio-12345"}}, context={}) + unset = SimpleNamespace(state={}, context={}) + + assert is_local_sandbox(legacy) is True + assert is_local_sandbox(per_thread) is True + assert is_local_sandbox(foreign) is False + assert is_local_sandbox(unset) is False + + +# ────────────────────────────────────────────────────────────────────────── +# 5. Concurrency safety (Copilot review feedback) +# ────────────────────────────────────────────────────────────────────────── + + +def test_concurrent_acquire_same_thread_yields_single_instance(provider): + """Two threads racing on ``acquire("alpha")`` must share one LocalSandbox. + + Without the provider lock the check-then-act in ``acquire`` is non-atomic: + both racers would see an empty cache, both would build their own + LocalSandbox, and one would overwrite the other — losing the loser's + ``_agent_written_paths`` and any in-flight state on it. + """ + import threading + import time + + from deerflow.sandbox.local import local_sandbox as local_sandbox_module + + # Force a wide race window by slowing the LocalSandbox constructor down. + original_init = local_sandbox_module.LocalSandbox.__init__ + + def slow_init(self, *args, **kwargs): + time.sleep(0.05) + original_init(self, *args, **kwargs) + + barrier = threading.Barrier(8) + results: list[str] = [] + results_lock = threading.Lock() + + def racer(): + barrier.wait() + sid = provider.acquire("alpha") + with results_lock: + results.append(sid) + + with patch.object(local_sandbox_module.LocalSandbox, "__init__", slow_init): + threads = [threading.Thread(target=racer) for _ in range(8)] + for t in threads: + t.start() + for t in threads: + t.join() + + # Every racer must observe the same ``sandbox_id``… + assert len(set(results)) == 1, f"Racers saw different ids: {results}" + # …and the cache must hold exactly one instance for ``alpha``. + assert len(provider._thread_sandboxes) == 1 + assert "alpha" in provider._thread_sandboxes + + +def test_concurrent_acquire_distinct_threads_yields_distinct_instances(provider): + """Different thread_ids race-acquired in parallel each get their own sandbox.""" + import threading + + barrier = threading.Barrier(6) + sids: dict[str, str] = {} + lock = threading.Lock() + + def racer(name: str): + barrier.wait() + sid = provider.acquire(name) + with lock: + sids[name] = sid + + threads = [threading.Thread(target=racer, args=(f"t{i}",)) for i in range(6)] + for t in threads: + t.start() + for t in threads: + t.join() + + assert set(sids.values()) == {f"local:t{i}" for i in range(6)} + assert set(provider._thread_sandboxes.keys()) == {f"t{i}" for i in range(6)} + + +# ────────────────────────────────────────────────────────────────────────── +# 6. Bounded memory growth (Copilot review feedback) +# ────────────────────────────────────────────────────────────────────────── + + +def test_thread_sandbox_cache_is_bounded(isolated_paths, tmp_path): + """The LRU cap must evict the least-recently-used thread sandboxes once + exceeded — otherwise long-running gateways would accumulate cache entries + for every distinct ``thread_id`` ever served.""" + skills_dir = tmp_path / "skills" + skills_dir.mkdir() + cfg = _build_config(skills_dir) + + with patch("deerflow.config.get_app_config", return_value=cfg): + provider = LocalSandboxProvider(max_cached_threads=3) + + for i in range(5): + provider.acquire(f"t{i}") + + # Only the 3 most-recent thread_ids should be retained. + assert set(provider._thread_sandboxes.keys()) == {"t2", "t3", "t4"} + assert provider.get("local:t0") is None + assert provider.get("local:t4") is not None + + +def test_lru_promotes_recently_used_thread(isolated_paths, tmp_path): + """``get`` on a cached thread should mark it as most-recently used so a + later acquire-storm doesn't evict an active thread that is being polled.""" + skills_dir = tmp_path / "skills" + skills_dir.mkdir() + cfg = _build_config(skills_dir) + + with patch("deerflow.config.get_app_config", return_value=cfg): + provider = LocalSandboxProvider(max_cached_threads=3) + + for name in ["a", "b", "c"]: + provider.acquire(name) + # Touch "a" via ``get`` so it becomes most-recently used. + provider.get("local:a") + # Adding a fourth thread should evict "b" (the new LRU), not "a". + provider.acquire("d") + + assert "a" in provider._thread_sandboxes + assert "b" not in provider._thread_sandboxes + assert {"a", "c", "d"} == set(provider._thread_sandboxes.keys())