380255f722
* fix(sandbox): uphold /mnt/user-data contract at Sandbox API boundary (#2873) LocalSandboxProvider used a process-wide singleton with no /mnt/user-data mapping, forcing every caller to translate virtual paths via tools.py before invoking the public Sandbox API. AIO already exposes /mnt/user-data natively (per-thread bind mounts), so the same code path behaved differently across implementations — and direct callers like uploads.py:282 / feishu.py:389 only worked thanks to the `uses_thread_data_mounts` workaround flag. Switch the provider to a dual-track cache: keep the `"local"` singleton for legacy acquire(None) callers (backward-compat for existing tests and scripts), and create a per-thread LocalSandbox with id `"local:{tid}"` for acquire(thread_id). Each per-thread instance carries PathMapping entries for /mnt/user-data, its three subdirs, and /mnt/acp-workspace, mirroring how AioSandboxProvider mounts those paths into its container. is_local_sandbox() now recognises both id formats. `_agent_written_paths` becomes per-thread (it was a process-wide set that leaked across threads — a latent isolation bug also fixed by this change). Verified via TDD: a new contract test suite hits the public Sandbox API directly (write/read/list/exec/glob/grep/update + per-thread isolation + lifecycle). 3212 backend tests still pass, ruff is clean. * fix(sandbox): address Copilot review on #2881 Three follow-ups from Copilot's review of the LocalSandboxProvider refactor: 1. Synchronisation: ``acquire`` / ``get`` / ``reset`` mutated the cache without any lock, so concurrent acquire of the same ``thread_id`` could create two ``LocalSandbox`` instances and lose one's ``_agent_written_paths`` state. Add a provider-wide ``threading.Lock`` (matching ``AioSandboxProvider``) and build per-thread mappings outside the lock to avoid holding it during the ``ensure_thread_dirs`` filesystem touch. 2. Memory bound: ``_thread_sandboxes`` grew monotonically. Replace the plain dict with an ``OrderedDict`` LRU capped at ``DEFAULT_MAX_CACHED_THREAD_SANDBOXES`` (256, configurable per provider instance). ``get`` promotes touched threads to the MRU end so an active thread isn't evicted under load. Eviction is graceful: the next ``acquire`` rebuilds a fresh sandbox; only ``_agent_written_paths`` (reverse-resolve hint) is lost. 3. Docs: update ``CLAUDE.md`` to reflect the new per-thread architecture, the LRU cap, and that ``is_local_sandbox`` recognises both id formats. New regression tests: - Concurrent ``acquire("alpha")`` from 8 threads yields a single instance (slow-init injection forces the race window wide open). - Concurrent ``acquire`` of distinct thread_ids yields distinct instances. - The cache evicts the least-recently-used thread once the cap is exceeded. - ``get`` promotes recency so a polled thread survives a later acquire-storm.
367 lines
16 KiB
Python
367 lines
16 KiB
Python
"""Issue #2873 regression — the public Sandbox API must honor the documented
|
|
/mnt/user-data contract uniformly across implementations.
|
|
|
|
Today AIO sandbox already accepts /mnt/user-data/... paths directly because the
|
|
container has those paths bind-mounted per-thread. LocalSandbox, however,
|
|
externalises that translation to ``deerflow.sandbox.tools`` via ``thread_data``,
|
|
so any caller that bypasses tools.py (e.g. ``uploads.py`` syncing files into a
|
|
remote sandbox via ``sandbox.update_file(virtual_path, ...)``) sees inconsistent
|
|
behaviour.
|
|
|
|
These tests pin down the **public Sandbox API boundary**: when a caller obtains
|
|
a ``LocalSandbox`` from ``LocalSandboxProvider.acquire(thread_id)`` and invokes
|
|
its abstract methods with documented virtual paths, those paths must resolve to
|
|
the thread's user-data directory automatically — no tools.py / thread_data
|
|
shim required.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from pathlib import Path
|
|
from types import SimpleNamespace
|
|
from unittest.mock import patch
|
|
|
|
import pytest
|
|
|
|
from deerflow.config.sandbox_config import SandboxConfig
|
|
from deerflow.sandbox.local.local_sandbox_provider import LocalSandboxProvider
|
|
|
|
|
|
def _build_config(skills_dir: Path) -> SimpleNamespace:
|
|
"""Minimal app config covering what ``LocalSandboxProvider`` reads at init."""
|
|
return SimpleNamespace(
|
|
skills=SimpleNamespace(
|
|
container_path="/mnt/skills",
|
|
get_skills_path=lambda: skills_dir,
|
|
use="deerflow.skills.storage.local_skill_storage:LocalSkillStorage",
|
|
),
|
|
sandbox=SandboxConfig(use="deerflow.sandbox.local:LocalSandboxProvider", mounts=[]),
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
def isolated_paths(monkeypatch, tmp_path):
|
|
"""Redirect ``get_paths().base_dir`` to ``tmp_path`` and reset its singleton.
|
|
|
|
Without this, per-thread directories would be created under the developer's
|
|
real ``.deer-flow/`` tree.
|
|
"""
|
|
monkeypatch.setenv("DEER_FLOW_HOME", str(tmp_path))
|
|
from deerflow.config import paths as paths_module
|
|
|
|
monkeypatch.setattr(paths_module, "_paths", None)
|
|
yield tmp_path
|
|
monkeypatch.setattr(paths_module, "_paths", None)
|
|
|
|
|
|
@pytest.fixture
|
|
def provider(isolated_paths, tmp_path):
|
|
"""Provider with a real skills dir and no custom mounts."""
|
|
skills_dir = tmp_path / "skills"
|
|
skills_dir.mkdir()
|
|
cfg = _build_config(skills_dir)
|
|
with patch("deerflow.config.get_app_config", return_value=cfg):
|
|
yield LocalSandboxProvider()
|
|
|
|
|
|
# ──────────────────────────────────────────────────────────────────────────
|
|
# 1. Direct Sandbox API accepts the virtual path contract for ``acquire(tid)``
|
|
# ──────────────────────────────────────────────────────────────────────────
|
|
|
|
|
|
def test_acquire_with_thread_id_returns_per_thread_id(provider):
|
|
sandbox_id = provider.acquire("alpha")
|
|
assert sandbox_id == "local:alpha"
|
|
|
|
|
|
def test_acquire_without_thread_id_remains_legacy_local_id(provider):
|
|
"""Backward-compat: ``acquire()`` with no thread keeps the singleton id."""
|
|
assert provider.acquire() == "local"
|
|
assert provider.acquire(None) == "local"
|
|
|
|
|
|
def test_write_then_read_via_public_api_with_virtual_path(provider):
|
|
sandbox_id = provider.acquire("alpha")
|
|
sbx = provider.get(sandbox_id)
|
|
assert sbx is not None
|
|
|
|
virtual = "/mnt/user-data/workspace/hello.txt"
|
|
sbx.write_file(virtual, "hi there")
|
|
assert sbx.read_file(virtual) == "hi there"
|
|
|
|
|
|
def test_list_dir_via_public_api_with_virtual_path(provider):
|
|
sandbox_id = provider.acquire("alpha")
|
|
sbx = provider.get(sandbox_id)
|
|
sbx.write_file("/mnt/user-data/workspace/foo.txt", "x")
|
|
entries = sbx.list_dir("/mnt/user-data/workspace")
|
|
# entries should be reverse-resolved back to the virtual prefix
|
|
assert any("/mnt/user-data/workspace/foo.txt" in e for e in entries)
|
|
|
|
|
|
def test_execute_command_with_virtual_path(provider):
|
|
sandbox_id = provider.acquire("alpha")
|
|
sbx = provider.get(sandbox_id)
|
|
sbx.write_file("/mnt/user-data/uploads/note.txt", "payload")
|
|
output = sbx.execute_command("ls /mnt/user-data/uploads")
|
|
assert "note.txt" in output
|
|
|
|
|
|
def test_glob_with_virtual_path(provider):
|
|
sandbox_id = provider.acquire("alpha")
|
|
sbx = provider.get(sandbox_id)
|
|
sbx.write_file("/mnt/user-data/outputs/report.md", "# r")
|
|
matches, _ = sbx.glob("/mnt/user-data/outputs", "*.md")
|
|
assert any(m.endswith("/mnt/user-data/outputs/report.md") for m in matches)
|
|
|
|
|
|
def test_grep_with_virtual_path(provider):
|
|
sandbox_id = provider.acquire("alpha")
|
|
sbx = provider.get(sandbox_id)
|
|
sbx.write_file("/mnt/user-data/workspace/findme.txt", "needle line\nother line")
|
|
matches, _ = sbx.grep("/mnt/user-data/workspace", "needle", literal=True)
|
|
assert matches
|
|
assert matches[0].path.endswith("/mnt/user-data/workspace/findme.txt")
|
|
|
|
|
|
def test_execute_command_lists_aggregate_user_data_root(provider):
|
|
"""``ls /mnt/user-data`` (the parent prefix itself) must list the three
|
|
subdirs — matching the AIO container's natural filesystem view."""
|
|
sandbox_id = provider.acquire("alpha")
|
|
sbx = provider.get(sandbox_id)
|
|
# Touch all three subdirs so they materialise on disk
|
|
sbx.write_file("/mnt/user-data/workspace/.keep", "")
|
|
sbx.write_file("/mnt/user-data/uploads/.keep", "")
|
|
sbx.write_file("/mnt/user-data/outputs/.keep", "")
|
|
output = sbx.execute_command("ls /mnt/user-data")
|
|
assert "workspace" in output
|
|
assert "uploads" in output
|
|
assert "outputs" in output
|
|
|
|
|
|
def test_update_file_with_virtual_path_for_remote_sync_scenario(provider):
|
|
"""This is the exact code path used by ``uploads.py:282`` and ``feishu.py:389``.
|
|
|
|
They build a ``virtual_path`` like ``/mnt/user-data/uploads/foo.pdf`` and hand
|
|
raw bytes to the sandbox. Before this fix LocalSandbox would try to write to
|
|
the literal host path ``/mnt/user-data/uploads/foo.pdf`` and fail.
|
|
"""
|
|
sandbox_id = provider.acquire("alpha")
|
|
sbx = provider.get(sandbox_id)
|
|
sbx.update_file("/mnt/user-data/uploads/blob.bin", b"\x00\x01\x02binary")
|
|
assert sbx.read_file("/mnt/user-data/uploads/blob.bin").startswith("\x00\x01\x02")
|
|
|
|
|
|
# ──────────────────────────────────────────────────────────────────────────
|
|
# 2. Per-thread isolation (no cross-thread state leaks)
|
|
# ──────────────────────────────────────────────────────────────────────────
|
|
|
|
|
|
def test_two_threads_get_distinct_sandboxes(provider):
|
|
sid_a = provider.acquire("alpha")
|
|
sid_b = provider.acquire("beta")
|
|
assert sid_a != sid_b
|
|
|
|
sbx_a = provider.get(sid_a)
|
|
sbx_b = provider.get(sid_b)
|
|
assert sbx_a is not sbx_b
|
|
|
|
|
|
def test_per_thread_user_data_mapping_isolated(provider, isolated_paths):
|
|
"""Files written via one thread's sandbox must not be visible through another."""
|
|
sid_a = provider.acquire("alpha")
|
|
sid_b = provider.acquire("beta")
|
|
sbx_a = provider.get(sid_a)
|
|
sbx_b = provider.get(sid_b)
|
|
|
|
sbx_a.write_file("/mnt/user-data/workspace/secret.txt", "alpha-only")
|
|
# The same virtual path resolves to a different host path in thread "beta"
|
|
with pytest.raises(FileNotFoundError):
|
|
sbx_b.read_file("/mnt/user-data/workspace/secret.txt")
|
|
|
|
|
|
def test_agent_written_paths_per_thread_isolation(provider):
|
|
"""``_agent_written_paths`` tracks files this sandbox wrote so reverse-resolve
|
|
runs on read. The set must not leak across threads."""
|
|
sid_a = provider.acquire("alpha")
|
|
sid_b = provider.acquire("beta")
|
|
sbx_a = provider.get(sid_a)
|
|
sbx_b = provider.get(sid_b)
|
|
sbx_a.write_file("/mnt/user-data/workspace/in-a.txt", "marker")
|
|
assert sbx_a._agent_written_paths
|
|
assert not sbx_b._agent_written_paths
|
|
|
|
|
|
# ──────────────────────────────────────────────────────────────────────────
|
|
# 3. Lifecycle: get / release / reset
|
|
# ──────────────────────────────────────────────────────────────────────────
|
|
|
|
|
|
def test_get_returns_cached_instance_for_known_id(provider):
|
|
sid = provider.acquire("alpha")
|
|
assert provider.get(sid) is provider.get(sid)
|
|
|
|
|
|
def test_get_unknown_id_returns_none(provider):
|
|
assert provider.get("local:nonexistent") is None
|
|
|
|
|
|
def test_release_is_noop_keeps_instance_available(provider):
|
|
"""Local has no resources to release; the cached instance stays alive across
|
|
turns so ``_agent_written_paths`` persists for reverse-resolve on later reads."""
|
|
sid = provider.acquire("alpha")
|
|
sbx_before = provider.get(sid)
|
|
provider.release(sid)
|
|
sbx_after = provider.get(sid)
|
|
assert sbx_before is sbx_after
|
|
|
|
|
|
def test_reset_clears_both_generic_and_per_thread_caches(provider):
|
|
provider.acquire() # populate generic
|
|
provider.acquire("alpha") # populate per-thread
|
|
assert provider._generic_sandbox is not None
|
|
assert provider._thread_sandboxes
|
|
|
|
provider.reset()
|
|
assert provider._generic_sandbox is None
|
|
assert not provider._thread_sandboxes
|
|
|
|
|
|
# ──────────────────────────────────────────────────────────────────────────
|
|
# 4. is_local_sandbox detects both legacy and per-thread ids
|
|
# ──────────────────────────────────────────────────────────────────────────
|
|
|
|
|
|
def test_is_local_sandbox_accepts_both_id_formats():
|
|
from deerflow.sandbox.tools import is_local_sandbox
|
|
|
|
legacy = SimpleNamespace(state={"sandbox": {"sandbox_id": "local"}}, context={})
|
|
per_thread = SimpleNamespace(state={"sandbox": {"sandbox_id": "local:alpha"}}, context={})
|
|
foreign = SimpleNamespace(state={"sandbox": {"sandbox_id": "aio-12345"}}, context={})
|
|
unset = SimpleNamespace(state={}, context={})
|
|
|
|
assert is_local_sandbox(legacy) is True
|
|
assert is_local_sandbox(per_thread) is True
|
|
assert is_local_sandbox(foreign) is False
|
|
assert is_local_sandbox(unset) is False
|
|
|
|
|
|
# ──────────────────────────────────────────────────────────────────────────
|
|
# 5. Concurrency safety (Copilot review feedback)
|
|
# ──────────────────────────────────────────────────────────────────────────
|
|
|
|
|
|
def test_concurrent_acquire_same_thread_yields_single_instance(provider):
|
|
"""Two threads racing on ``acquire("alpha")`` must share one LocalSandbox.
|
|
|
|
Without the provider lock the check-then-act in ``acquire`` is non-atomic:
|
|
both racers would see an empty cache, both would build their own
|
|
LocalSandbox, and one would overwrite the other — losing the loser's
|
|
``_agent_written_paths`` and any in-flight state on it.
|
|
"""
|
|
import threading
|
|
import time
|
|
|
|
from deerflow.sandbox.local import local_sandbox as local_sandbox_module
|
|
|
|
# Force a wide race window by slowing the LocalSandbox constructor down.
|
|
original_init = local_sandbox_module.LocalSandbox.__init__
|
|
|
|
def slow_init(self, *args, **kwargs):
|
|
time.sleep(0.05)
|
|
original_init(self, *args, **kwargs)
|
|
|
|
barrier = threading.Barrier(8)
|
|
results: list[str] = []
|
|
results_lock = threading.Lock()
|
|
|
|
def racer():
|
|
barrier.wait()
|
|
sid = provider.acquire("alpha")
|
|
with results_lock:
|
|
results.append(sid)
|
|
|
|
with patch.object(local_sandbox_module.LocalSandbox, "__init__", slow_init):
|
|
threads = [threading.Thread(target=racer) for _ in range(8)]
|
|
for t in threads:
|
|
t.start()
|
|
for t in threads:
|
|
t.join()
|
|
|
|
# Every racer must observe the same ``sandbox_id``…
|
|
assert len(set(results)) == 1, f"Racers saw different ids: {results}"
|
|
# …and the cache must hold exactly one instance for ``alpha``.
|
|
assert len(provider._thread_sandboxes) == 1
|
|
assert "alpha" in provider._thread_sandboxes
|
|
|
|
|
|
def test_concurrent_acquire_distinct_threads_yields_distinct_instances(provider):
|
|
"""Different thread_ids race-acquired in parallel each get their own sandbox."""
|
|
import threading
|
|
|
|
barrier = threading.Barrier(6)
|
|
sids: dict[str, str] = {}
|
|
lock = threading.Lock()
|
|
|
|
def racer(name: str):
|
|
barrier.wait()
|
|
sid = provider.acquire(name)
|
|
with lock:
|
|
sids[name] = sid
|
|
|
|
threads = [threading.Thread(target=racer, args=(f"t{i}",)) for i in range(6)]
|
|
for t in threads:
|
|
t.start()
|
|
for t in threads:
|
|
t.join()
|
|
|
|
assert set(sids.values()) == {f"local:t{i}" for i in range(6)}
|
|
assert set(provider._thread_sandboxes.keys()) == {f"t{i}" for i in range(6)}
|
|
|
|
|
|
# ──────────────────────────────────────────────────────────────────────────
|
|
# 6. Bounded memory growth (Copilot review feedback)
|
|
# ──────────────────────────────────────────────────────────────────────────
|
|
|
|
|
|
def test_thread_sandbox_cache_is_bounded(isolated_paths, tmp_path):
|
|
"""The LRU cap must evict the least-recently-used thread sandboxes once
|
|
exceeded — otherwise long-running gateways would accumulate cache entries
|
|
for every distinct ``thread_id`` ever served."""
|
|
skills_dir = tmp_path / "skills"
|
|
skills_dir.mkdir()
|
|
cfg = _build_config(skills_dir)
|
|
|
|
with patch("deerflow.config.get_app_config", return_value=cfg):
|
|
provider = LocalSandboxProvider(max_cached_threads=3)
|
|
|
|
for i in range(5):
|
|
provider.acquire(f"t{i}")
|
|
|
|
# Only the 3 most-recent thread_ids should be retained.
|
|
assert set(provider._thread_sandboxes.keys()) == {"t2", "t3", "t4"}
|
|
assert provider.get("local:t0") is None
|
|
assert provider.get("local:t4") is not None
|
|
|
|
|
|
def test_lru_promotes_recently_used_thread(isolated_paths, tmp_path):
|
|
"""``get`` on a cached thread should mark it as most-recently used so a
|
|
later acquire-storm doesn't evict an active thread that is being polled."""
|
|
skills_dir = tmp_path / "skills"
|
|
skills_dir.mkdir()
|
|
cfg = _build_config(skills_dir)
|
|
|
|
with patch("deerflow.config.get_app_config", return_value=cfg):
|
|
provider = LocalSandboxProvider(max_cached_threads=3)
|
|
|
|
for name in ["a", "b", "c"]:
|
|
provider.acquire(name)
|
|
# Touch "a" via ``get`` so it becomes most-recently used.
|
|
provider.get("local:a")
|
|
# Adding a fourth thread should evict "b" (the new LRU), not "a".
|
|
provider.acquire("d")
|
|
|
|
assert "a" in provider._thread_sandboxes
|
|
assert "b" not in provider._thread_sandboxes
|
|
assert {"a", "c", "d"} == set(provider._thread_sandboxes.keys())
|