mirror of
https://github.com/bytedance/deer-flow.git
synced 2026-05-23 08:25:57 +00:00
* fix(sandbox): uphold /mnt/user-data contract at Sandbox API boundary (#2873) LocalSandboxProvider used a process-wide singleton with no /mnt/user-data mapping, forcing every caller to translate virtual paths via tools.py before invoking the public Sandbox API. AIO already exposes /mnt/user-data natively (per-thread bind mounts), so the same code path behaved differently across implementations — and direct callers like uploads.py:282 / feishu.py:389 only worked thanks to the `uses_thread_data_mounts` workaround flag. Switch the provider to a dual-track cache: keep the `"local"` singleton for legacy acquire(None) callers (backward-compat for existing tests and scripts), and create a per-thread LocalSandbox with id `"local:{tid}"` for acquire(thread_id). Each per-thread instance carries PathMapping entries for /mnt/user-data, its three subdirs, and /mnt/acp-workspace, mirroring how AioSandboxProvider mounts those paths into its container. is_local_sandbox() now recognises both id formats. `_agent_written_paths` becomes per-thread (it was a process-wide set that leaked across threads — a latent isolation bug also fixed by this change). Verified via TDD: a new contract test suite hits the public Sandbox API directly (write/read/list/exec/glob/grep/update + per-thread isolation + lifecycle). 3212 backend tests still pass, ruff is clean. * fix(sandbox): address Copilot review on #2881 Three follow-ups from Copilot's review of the LocalSandboxProvider refactor: 1. Synchronisation: ``acquire`` / ``get`` / ``reset`` mutated the cache without any lock, so concurrent acquire of the same ``thread_id`` could create two ``LocalSandbox`` instances and lose one's ``_agent_written_paths`` state. Add a provider-wide ``threading.Lock`` (matching ``AioSandboxProvider``) and build per-thread mappings outside the lock to avoid holding it during the ``ensure_thread_dirs`` filesystem touch. 2. Memory bound: ``_thread_sandboxes`` grew monotonically. Replace the plain dict with an ``OrderedDict`` LRU capped at ``DEFAULT_MAX_CACHED_THREAD_SANDBOXES`` (256, configurable per provider instance). ``get`` promotes touched threads to the MRU end so an active thread isn't evicted under load. Eviction is graceful: the next ``acquire`` rebuilds a fresh sandbox; only ``_agent_written_paths`` (reverse-resolve hint) is lost. 3. Docs: update ``CLAUDE.md`` to reflect the new per-thread architecture, the LRU cap, and that ``is_local_sandbox`` recognises both id formats. New regression tests: - Concurrent ``acquire("alpha")`` from 8 threads yields a single instance (slow-init injection forces the race window wide open). - Concurrent ``acquire`` of distinct thread_ids yields distinct instances. - The cache evicts the least-recently-used thread once the cap is exceeded. - ``get`` promotes recency so a polled thread survives a later acquire-storm.
This commit is contained in:
@@ -1,4 +1,6 @@
|
||||
import logging
|
||||
import threading
|
||||
from collections import OrderedDict
|
||||
from pathlib import Path
|
||||
|
||||
from deerflow.sandbox.local.local_sandbox import LocalSandbox, PathMapping
|
||||
@@ -7,25 +9,87 @@ from deerflow.sandbox.sandbox_provider import SandboxProvider
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Module-level alias kept for backward compatibility with older callers/tests
|
||||
# that reach into ``local_sandbox_provider._singleton`` directly. New code reads
|
||||
# the provider instance attributes (``_generic_sandbox`` / ``_thread_sandboxes``)
|
||||
# instead.
|
||||
_singleton: LocalSandbox | None = None
|
||||
|
||||
# Virtual prefixes that must be reserved by the per-thread mappings created in
|
||||
# ``acquire`` — custom mounts from ``config.yaml`` may not overlap with these.
|
||||
_USER_DATA_VIRTUAL_PREFIX = "/mnt/user-data"
|
||||
_ACP_WORKSPACE_VIRTUAL_PREFIX = "/mnt/acp-workspace"
|
||||
|
||||
# Default upper bound on per-thread LocalSandbox instances retained in memory.
|
||||
# Each cached instance is cheap (a small Python object with a list of
|
||||
# PathMapping and a set of agent-written paths used for reverse resolve), but
|
||||
# in a long-running gateway the number of distinct thread_ids is unbounded.
|
||||
# When the cap is exceeded the least-recently-used entry is dropped; the next
|
||||
# ``acquire(thread_id)`` for that thread simply rebuilds the sandbox at the
|
||||
# cost of losing its accumulated ``_agent_written_paths`` (read_file falls
|
||||
# back to no reverse resolution, which is the same behaviour as a fresh run).
|
||||
DEFAULT_MAX_CACHED_THREAD_SANDBOXES = 256
|
||||
|
||||
|
||||
class LocalSandboxProvider(SandboxProvider):
|
||||
"""Local-filesystem sandbox provider with per-thread path scoping.
|
||||
|
||||
Earlier revisions of this provider returned a single process-wide
|
||||
``LocalSandbox`` keyed by the literal id ``"local"``. That singleton could
|
||||
not honour the documented ``/mnt/user-data/...`` contract at the public
|
||||
``Sandbox`` API boundary because the corresponding host directory is
|
||||
per-thread (``{base_dir}/users/{user_id}/threads/{thread_id}/user-data/``).
|
||||
|
||||
The provider now produces a fresh ``LocalSandbox`` per ``thread_id`` whose
|
||||
``path_mappings`` include thread-scoped entries for
|
||||
``/mnt/user-data/{workspace,uploads,outputs}`` and ``/mnt/acp-workspace``,
|
||||
mirroring how :class:`AioSandboxProvider` bind-mounts those paths into its
|
||||
docker container. The legacy ``acquire()`` / ``acquire(None)`` call still
|
||||
returns a generic singleton with id ``"local"`` for callers (and tests)
|
||||
that do not have a thread context.
|
||||
|
||||
Thread-safety: ``acquire``, ``get`` and ``reset`` may be invoked from
|
||||
multiple threads (Gateway tool dispatch, subagent worker pools, the
|
||||
background memory updater, …) so all cache state changes are serialised
|
||||
through a provider-wide :class:`threading.Lock`. This matches the pattern
|
||||
used by :class:`AioSandboxProvider`.
|
||||
|
||||
Memory bound: ``_thread_sandboxes`` is an LRU cache capped at
|
||||
``max_cached_threads`` (default :data:`DEFAULT_MAX_CACHED_THREAD_SANDBOXES`).
|
||||
When the cap is exceeded the least-recently-used entry is evicted on the
|
||||
next ``acquire``; the evicted thread's next ``acquire`` rebuilds a fresh
|
||||
sandbox (losing only its ``_agent_written_paths`` reverse-resolve hint,
|
||||
which gracefully degrades read_file output).
|
||||
"""
|
||||
|
||||
uses_thread_data_mounts = True
|
||||
|
||||
def __init__(self):
|
||||
"""Initialize the local sandbox provider with path mappings."""
|
||||
def __init__(self, max_cached_threads: int = DEFAULT_MAX_CACHED_THREAD_SANDBOXES):
|
||||
"""Initialize the local sandbox provider with static path mappings.
|
||||
|
||||
Args:
|
||||
max_cached_threads: Upper bound on per-thread sandboxes retained in
|
||||
the LRU cache. When exceeded, the least-recently-used entry is
|
||||
evicted on the next ``acquire``.
|
||||
"""
|
||||
self._path_mappings = self._setup_path_mappings()
|
||||
self._generic_sandbox: LocalSandbox | None = None
|
||||
self._thread_sandboxes: OrderedDict[str, LocalSandbox] = OrderedDict()
|
||||
self._max_cached_threads = max_cached_threads
|
||||
self._lock = threading.Lock()
|
||||
|
||||
def _setup_path_mappings(self) -> list[PathMapping]:
|
||||
"""
|
||||
Setup path mappings for local sandbox.
|
||||
Setup static path mappings shared by every sandbox this provider yields.
|
||||
|
||||
Maps container paths to actual local paths, including skills directory
|
||||
and any custom mounts configured in config.yaml.
|
||||
Static mappings cover the skills directory and any custom mounts from
|
||||
``config.yaml`` — both are process-wide and identical for every thread.
|
||||
Per-thread ``/mnt/user-data/...`` and ``/mnt/acp-workspace`` mappings
|
||||
are appended inside :meth:`acquire` because they depend on
|
||||
``thread_id`` and the effective ``user_id``.
|
||||
|
||||
Returns:
|
||||
List of path mappings
|
||||
List of static path mappings
|
||||
"""
|
||||
mappings: list[PathMapping] = []
|
||||
|
||||
@@ -48,7 +112,11 @@ class LocalSandboxProvider(SandboxProvider):
|
||||
)
|
||||
|
||||
# Map custom mounts from sandbox config
|
||||
_RESERVED_CONTAINER_PREFIXES = [container_path, "/mnt/acp-workspace", "/mnt/user-data"]
|
||||
_RESERVED_CONTAINER_PREFIXES = [
|
||||
container_path,
|
||||
_ACP_WORKSPACE_VIRTUAL_PREFIX,
|
||||
_USER_DATA_VIRTUAL_PREFIX,
|
||||
]
|
||||
sandbox_config = config.sandbox
|
||||
if sandbox_config and sandbox_config.mounts:
|
||||
for mount in sandbox_config.mounts:
|
||||
@@ -99,33 +167,162 @@ class LocalSandboxProvider(SandboxProvider):
|
||||
|
||||
return mappings
|
||||
|
||||
@staticmethod
|
||||
def _build_thread_path_mappings(thread_id: str) -> list[PathMapping]:
|
||||
"""Build per-thread path mappings for /mnt/user-data and /mnt/acp-workspace.
|
||||
|
||||
Resolves ``user_id`` via :func:`get_effective_user_id` (the same path
|
||||
:class:`AioSandboxProvider` uses) and ensures the backing host
|
||||
directories exist before they are mapped into the sandbox view.
|
||||
"""
|
||||
from deerflow.config.paths import get_paths
|
||||
from deerflow.runtime.user_context import get_effective_user_id
|
||||
|
||||
paths = get_paths()
|
||||
user_id = get_effective_user_id()
|
||||
paths.ensure_thread_dirs(thread_id, user_id=user_id)
|
||||
|
||||
return [
|
||||
# Aggregate parent mapping so ``ls /mnt/user-data`` and other
|
||||
# parent-level operations behave the same as inside AIO (where the
|
||||
# parent directory is real and contains the three subdirs). Longer
|
||||
# subpath mappings below still win for ``/mnt/user-data/workspace/...``
|
||||
# because ``_find_path_mapping`` sorts by container_path length.
|
||||
PathMapping(
|
||||
container_path=_USER_DATA_VIRTUAL_PREFIX,
|
||||
local_path=str(paths.sandbox_user_data_dir(thread_id, user_id=user_id)),
|
||||
read_only=False,
|
||||
),
|
||||
PathMapping(
|
||||
container_path=f"{_USER_DATA_VIRTUAL_PREFIX}/workspace",
|
||||
local_path=str(paths.sandbox_work_dir(thread_id, user_id=user_id)),
|
||||
read_only=False,
|
||||
),
|
||||
PathMapping(
|
||||
container_path=f"{_USER_DATA_VIRTUAL_PREFIX}/uploads",
|
||||
local_path=str(paths.sandbox_uploads_dir(thread_id, user_id=user_id)),
|
||||
read_only=False,
|
||||
),
|
||||
PathMapping(
|
||||
container_path=f"{_USER_DATA_VIRTUAL_PREFIX}/outputs",
|
||||
local_path=str(paths.sandbox_outputs_dir(thread_id, user_id=user_id)),
|
||||
read_only=False,
|
||||
),
|
||||
PathMapping(
|
||||
container_path=_ACP_WORKSPACE_VIRTUAL_PREFIX,
|
||||
local_path=str(paths.acp_workspace_dir(thread_id, user_id=user_id)),
|
||||
read_only=False,
|
||||
),
|
||||
]
|
||||
|
||||
def acquire(self, thread_id: str | None = None) -> str:
|
||||
"""Return a sandbox id scoped to *thread_id* (or the generic singleton).
|
||||
|
||||
- ``thread_id=None`` keeps the legacy singleton with id ``"local"`` for
|
||||
callers that have no thread context (e.g. legacy tests, scripts).
|
||||
- ``thread_id="abc"`` yields a per-thread ``LocalSandbox`` with id
|
||||
``"local:abc"`` whose ``path_mappings`` resolve ``/mnt/user-data/...``
|
||||
to that thread's host directories.
|
||||
|
||||
Thread-safe under concurrent invocation: the cache check + insert is
|
||||
guarded by ``self._lock`` so two callers racing on the same
|
||||
``thread_id`` always observe the same LocalSandbox instance.
|
||||
"""
|
||||
global _singleton
|
||||
if _singleton is None:
|
||||
_singleton = LocalSandbox("local", path_mappings=self._path_mappings)
|
||||
return _singleton.id
|
||||
|
||||
if thread_id is None:
|
||||
with self._lock:
|
||||
if self._generic_sandbox is None:
|
||||
self._generic_sandbox = LocalSandbox("local", path_mappings=list(self._path_mappings))
|
||||
_singleton = self._generic_sandbox
|
||||
return self._generic_sandbox.id
|
||||
|
||||
# Fast path under lock.
|
||||
with self._lock:
|
||||
cached = self._thread_sandboxes.get(thread_id)
|
||||
if cached is not None:
|
||||
# Mark as most-recently used so frequently-touched threads
|
||||
# survive eviction.
|
||||
self._thread_sandboxes.move_to_end(thread_id)
|
||||
return cached.id
|
||||
|
||||
# ``_build_thread_path_mappings`` touches the filesystem
|
||||
# (``ensure_thread_dirs``); release the lock during I/O.
|
||||
new_mappings = list(self._path_mappings) + self._build_thread_path_mappings(thread_id)
|
||||
|
||||
with self._lock:
|
||||
# Re-check after the lock-free I/O: another caller may have
|
||||
# populated the cache while we were computing mappings.
|
||||
cached = self._thread_sandboxes.get(thread_id)
|
||||
if cached is None:
|
||||
cached = LocalSandbox(f"local:{thread_id}", path_mappings=new_mappings)
|
||||
self._thread_sandboxes[thread_id] = cached
|
||||
self._evict_until_within_cap_locked()
|
||||
else:
|
||||
self._thread_sandboxes.move_to_end(thread_id)
|
||||
return cached.id
|
||||
|
||||
def _evict_until_within_cap_locked(self) -> None:
|
||||
"""LRU-evict cached thread sandboxes once the cap is exceeded.
|
||||
|
||||
Caller MUST hold ``self._lock``.
|
||||
"""
|
||||
while len(self._thread_sandboxes) > self._max_cached_threads:
|
||||
evicted_thread_id, _ = self._thread_sandboxes.popitem(last=False)
|
||||
logger.info(
|
||||
"Evicting LocalSandbox cache entry for thread %s (cap=%d)",
|
||||
evicted_thread_id,
|
||||
self._max_cached_threads,
|
||||
)
|
||||
|
||||
def get(self, sandbox_id: str) -> Sandbox | None:
|
||||
if sandbox_id == "local":
|
||||
if _singleton is None:
|
||||
with self._lock:
|
||||
generic = self._generic_sandbox
|
||||
if generic is None:
|
||||
self.acquire()
|
||||
return _singleton
|
||||
with self._lock:
|
||||
return self._generic_sandbox
|
||||
return generic
|
||||
if isinstance(sandbox_id, str) and sandbox_id.startswith("local:"):
|
||||
thread_id = sandbox_id[len("local:") :]
|
||||
with self._lock:
|
||||
cached = self._thread_sandboxes.get(thread_id)
|
||||
if cached is not None:
|
||||
# Touching a thread via ``get`` (used by tools.py to look
|
||||
# up the sandbox once per tool call) promotes it in LRU
|
||||
# order so an active thread isn't evicted under load.
|
||||
self._thread_sandboxes.move_to_end(thread_id)
|
||||
return cached
|
||||
return None
|
||||
|
||||
def release(self, sandbox_id: str) -> None:
|
||||
# LocalSandbox uses singleton pattern - no cleanup needed.
|
||||
# LocalSandbox has no resources to release; keep the cached instance so
|
||||
# that ``_agent_written_paths`` (used to reverse-resolve agent-authored
|
||||
# file contents on read) survives between turns. LRU eviction in
|
||||
# ``acquire`` and explicit ``reset()`` / ``shutdown()`` are the only
|
||||
# paths that drop cached entries.
|
||||
#
|
||||
# Note: This method is intentionally not called by SandboxMiddleware
|
||||
# to allow sandbox reuse across multiple turns in a thread.
|
||||
# For Docker-based providers (e.g., AioSandboxProvider), cleanup
|
||||
# happens at application shutdown via the shutdown() method.
|
||||
pass
|
||||
|
||||
def reset(self) -> None:
|
||||
# reset_sandbox_provider() must also clear the module singleton.
|
||||
"""Drop all cached LocalSandbox instances.
|
||||
|
||||
``reset_sandbox_provider()`` calls this to ensure config / mount
|
||||
changes take effect on the next ``acquire()``. We also reset the
|
||||
module-level ``_singleton`` alias so older callers/tests that reach
|
||||
into it see a fresh state.
|
||||
"""
|
||||
global _singleton
|
||||
_singleton = None
|
||||
with self._lock:
|
||||
self._generic_sandbox = None
|
||||
self._thread_sandboxes.clear()
|
||||
_singleton = None
|
||||
|
||||
def shutdown(self) -> None:
|
||||
# LocalSandboxProvider has no extra resources beyond the shared
|
||||
# singleton, so shutdown uses the same cleanup path as reset.
|
||||
# LocalSandboxProvider has no extra resources beyond the cached
|
||||
# ``LocalSandbox`` instances, so shutdown uses the same cleanup path
|
||||
# as ``reset``.
|
||||
self.reset()
|
||||
|
||||
@@ -1006,8 +1006,9 @@ def get_thread_data(runtime: Runtime | None) -> ThreadDataState | None:
|
||||
def is_local_sandbox(runtime: Runtime | None) -> bool:
|
||||
"""Check if the current sandbox is a local sandbox.
|
||||
|
||||
Path replacement is only needed for local sandbox since aio sandbox
|
||||
already has /mnt/user-data mounted in the container.
|
||||
Accepts both the legacy generic id ``"local"`` (acquire with no thread
|
||||
context) and the per-thread id format ``"local:{thread_id}"`` produced by
|
||||
:meth:`LocalSandboxProvider.acquire` once a thread is known.
|
||||
"""
|
||||
if runtime is None:
|
||||
return False
|
||||
@@ -1016,7 +1017,10 @@ def is_local_sandbox(runtime: Runtime | None) -> bool:
|
||||
sandbox_state = runtime.state.get("sandbox")
|
||||
if sandbox_state is None:
|
||||
return False
|
||||
return sandbox_state.get("sandbox_id") == "local"
|
||||
sandbox_id = sandbox_state.get("sandbox_id")
|
||||
if not isinstance(sandbox_id, str):
|
||||
return False
|
||||
return sandbox_id == "local" or sandbox_id.startswith("local:")
|
||||
|
||||
|
||||
def sandbox_from_runtime(runtime: Runtime | None = None) -> Sandbox:
|
||||
|
||||
Reference in New Issue
Block a user