feat(sandbox): add read-only support for local sandbox path mappings (#1808)

This commit is contained in:
finallylly
2026-04-03 19:46:22 +08:00
committed by GitHub
parent c6cdf200ce
commit 1694c616ef
6 changed files with 768 additions and 33 deletions
@@ -0,0 +1,388 @@
import errno
from types import SimpleNamespace
from unittest.mock import patch
import pytest
from deerflow.sandbox.local.local_sandbox import LocalSandbox, PathMapping
from deerflow.sandbox.local.local_sandbox_provider import LocalSandboxProvider
class TestPathMapping:
def test_path_mapping_dataclass(self):
mapping = PathMapping(container_path="/mnt/skills", local_path="/home/user/skills", read_only=True)
assert mapping.container_path == "/mnt/skills"
assert mapping.local_path == "/home/user/skills"
assert mapping.read_only is True
def test_path_mapping_defaults_to_false(self):
mapping = PathMapping(container_path="/mnt/data", local_path="/home/user/data")
assert mapping.read_only is False
class TestLocalSandboxPathResolution:
def test_resolve_path_exact_match(self):
sandbox = LocalSandbox(
"test",
[
PathMapping(container_path="/mnt/skills", local_path="/home/user/skills"),
],
)
resolved = sandbox._resolve_path("/mnt/skills")
assert resolved == "/home/user/skills"
def test_resolve_path_nested_path(self):
sandbox = LocalSandbox(
"test",
[
PathMapping(container_path="/mnt/skills", local_path="/home/user/skills"),
],
)
resolved = sandbox._resolve_path("/mnt/skills/agent/prompt.py")
assert resolved == "/home/user/skills/agent/prompt.py"
def test_resolve_path_no_mapping(self):
sandbox = LocalSandbox(
"test",
[
PathMapping(container_path="/mnt/skills", local_path="/home/user/skills"),
],
)
resolved = sandbox._resolve_path("/mnt/other/file.txt")
assert resolved == "/mnt/other/file.txt"
def test_resolve_path_longest_prefix_first(self):
sandbox = LocalSandbox(
"test",
[
PathMapping(container_path="/mnt/skills", local_path="/home/user/skills"),
PathMapping(container_path="/mnt", local_path="/var/mnt"),
],
)
resolved = sandbox._resolve_path("/mnt/skills/file.py")
# Should match /mnt/skills first (longer prefix)
assert resolved == "/home/user/skills/file.py"
def test_reverse_resolve_path_exact_match(self, tmp_path):
skills_dir = tmp_path / "skills"
skills_dir.mkdir()
sandbox = LocalSandbox(
"test",
[
PathMapping(container_path="/mnt/skills", local_path=str(skills_dir)),
],
)
resolved = sandbox._reverse_resolve_path(str(skills_dir))
assert resolved == "/mnt/skills"
def test_reverse_resolve_path_nested(self, tmp_path):
skills_dir = tmp_path / "skills"
skills_dir.mkdir()
file_path = skills_dir / "agent" / "prompt.py"
file_path.parent.mkdir()
file_path.write_text("test")
sandbox = LocalSandbox(
"test",
[
PathMapping(container_path="/mnt/skills", local_path=str(skills_dir)),
],
)
resolved = sandbox._reverse_resolve_path(str(file_path))
assert resolved == "/mnt/skills/agent/prompt.py"
class TestReadOnlyPath:
def test_is_read_only_true(self):
sandbox = LocalSandbox(
"test",
[
PathMapping(container_path="/mnt/skills", local_path="/home/user/skills", read_only=True),
],
)
assert sandbox._is_read_only_path("/home/user/skills/file.py") is True
def test_is_read_only_false_for_writable(self):
sandbox = LocalSandbox(
"test",
[
PathMapping(container_path="/mnt/data", local_path="/home/user/data", read_only=False),
],
)
assert sandbox._is_read_only_path("/home/user/data/file.txt") is False
def test_is_read_only_false_for_unmapped_path(self):
sandbox = LocalSandbox(
"test",
[
PathMapping(container_path="/mnt/skills", local_path="/home/user/skills", read_only=True),
],
)
# Path not under any mapping
assert sandbox._is_read_only_path("/tmp/other/file.txt") is False
def test_is_read_only_true_for_exact_match(self):
sandbox = LocalSandbox(
"test",
[
PathMapping(container_path="/mnt/skills", local_path="/home/user/skills", read_only=True),
],
)
assert sandbox._is_read_only_path("/home/user/skills") is True
def test_write_file_blocked_on_read_only(self, tmp_path):
skills_dir = tmp_path / "skills"
skills_dir.mkdir()
sandbox = LocalSandbox(
"test",
[
PathMapping(container_path="/mnt/skills", local_path=str(skills_dir), read_only=True),
],
)
# Skills dir is read-only, write should be blocked
with pytest.raises(OSError) as exc_info:
sandbox.write_file("/mnt/skills/new_file.py", "content")
assert exc_info.value.errno == errno.EROFS
def test_write_file_allowed_on_writable_mount(self, tmp_path):
data_dir = tmp_path / "data"
data_dir.mkdir()
sandbox = LocalSandbox(
"test",
[
PathMapping(container_path="/mnt/data", local_path=str(data_dir), read_only=False),
],
)
sandbox.write_file("/mnt/data/file.txt", "content")
assert (data_dir / "file.txt").read_text() == "content"
def test_update_file_blocked_on_read_only(self, tmp_path):
skills_dir = tmp_path / "skills"
skills_dir.mkdir()
existing_file = skills_dir / "existing.py"
existing_file.write_bytes(b"original")
sandbox = LocalSandbox(
"test",
[
PathMapping(container_path="/mnt/skills", local_path=str(skills_dir), read_only=True),
],
)
with pytest.raises(OSError) as exc_info:
sandbox.update_file("/mnt/skills/existing.py", b"updated")
assert exc_info.value.errno == errno.EROFS
class TestMultipleMounts:
def test_multiple_read_write_mounts(self, tmp_path):
skills_dir = tmp_path / "skills"
skills_dir.mkdir()
data_dir = tmp_path / "data"
data_dir.mkdir()
external_dir = tmp_path / "external"
external_dir.mkdir()
sandbox = LocalSandbox(
"test",
[
PathMapping(container_path="/mnt/skills", local_path=str(skills_dir), read_only=True),
PathMapping(container_path="/mnt/data", local_path=str(data_dir), read_only=False),
PathMapping(container_path="/mnt/external", local_path=str(external_dir), read_only=True),
],
)
# Skills is read-only
with pytest.raises(OSError):
sandbox.write_file("/mnt/skills/file.py", "content")
# Data is writable
sandbox.write_file("/mnt/data/file.txt", "data content")
assert (data_dir / "file.txt").read_text() == "data content"
# External is read-only
with pytest.raises(OSError):
sandbox.write_file("/mnt/external/file.txt", "content")
def test_nested_mounts_writable_under_readonly(self, tmp_path):
"""A writable mount nested under a read-only mount should allow writes."""
ro_dir = tmp_path / "ro"
ro_dir.mkdir()
rw_dir = ro_dir / "writable"
rw_dir.mkdir()
sandbox = LocalSandbox(
"test",
[
PathMapping(container_path="/mnt/repo", local_path=str(ro_dir), read_only=True),
PathMapping(container_path="/mnt/repo/writable", local_path=str(rw_dir), read_only=False),
],
)
# Parent mount is read-only
with pytest.raises(OSError):
sandbox.write_file("/mnt/repo/file.txt", "content")
# Nested writable mount should allow writes
sandbox.write_file("/mnt/repo/writable/file.txt", "content")
assert (rw_dir / "file.txt").read_text() == "content"
def test_execute_command_path_replacement(self, tmp_path, monkeypatch):
data_dir = tmp_path / "data"
data_dir.mkdir()
test_file = data_dir / "test.txt"
test_file.write_text("hello")
sandbox = LocalSandbox(
"test",
[
PathMapping(container_path="/mnt/data", local_path=str(data_dir)),
],
)
# Mock subprocess to capture the resolved command
captured = {}
original_run = __import__("subprocess").run
def mock_run(*args, **kwargs):
if len(args) > 0:
captured["command"] = args[0]
return original_run(*args, **kwargs)
monkeypatch.setattr("deerflow.sandbox.local.local_sandbox.subprocess.run", mock_run)
monkeypatch.setattr("deerflow.sandbox.local.local_sandbox.LocalSandbox._get_shell", lambda self: "/bin/sh")
sandbox.execute_command("cat /mnt/data/test.txt")
# Verify the command received the resolved local path
assert str(data_dir) in captured.get("command", "")
def test_reverse_resolve_path_does_not_match_partial_prefix(self, tmp_path):
foo_dir = tmp_path / "foo"
foo_dir.mkdir()
foobar_dir = tmp_path / "foobar"
foobar_dir.mkdir()
target = foobar_dir / "file.txt"
target.write_text("test")
sandbox = LocalSandbox(
"test",
[
PathMapping(container_path="/mnt/foo", local_path=str(foo_dir)),
],
)
resolved = sandbox._reverse_resolve_path(str(target))
assert resolved == str(target.resolve())
def test_reverse_resolve_paths_in_output_supports_backslash_separator(self, tmp_path):
mount_dir = tmp_path / "mount"
mount_dir.mkdir()
sandbox = LocalSandbox(
"test",
[
PathMapping(container_path="/mnt/data", local_path=str(mount_dir)),
],
)
output = f"Copied: {mount_dir}\\file.txt"
masked = sandbox._reverse_resolve_paths_in_output(output)
assert "/mnt/data/file.txt" in masked
assert str(mount_dir) not in masked
class TestLocalSandboxProviderMounts:
def test_setup_path_mappings_uses_configured_skills_container_path_as_reserved_prefix(self, tmp_path):
skills_dir = tmp_path / "skills"
skills_dir.mkdir()
custom_dir = tmp_path / "custom"
custom_dir.mkdir()
from deerflow.config.sandbox_config import SandboxConfig, VolumeMountConfig
sandbox_config = SandboxConfig(
use="deerflow.sandbox.local:LocalSandboxProvider",
mounts=[
VolumeMountConfig(host_path=str(custom_dir), container_path="/custom-skills/nested", read_only=False),
],
)
config = SimpleNamespace(
skills=SimpleNamespace(container_path="/custom-skills", get_skills_path=lambda: skills_dir),
sandbox=sandbox_config,
)
with patch("deerflow.config.get_app_config", return_value=config):
provider = LocalSandboxProvider()
assert [m.container_path for m in provider._path_mappings] == ["/custom-skills"]
def test_setup_path_mappings_skips_relative_host_path(self, tmp_path):
skills_dir = tmp_path / "skills"
skills_dir.mkdir()
from deerflow.config.sandbox_config import SandboxConfig, VolumeMountConfig
sandbox_config = SandboxConfig(
use="deerflow.sandbox.local:LocalSandboxProvider",
mounts=[
VolumeMountConfig(host_path="relative/path", container_path="/mnt/data", read_only=False),
],
)
config = SimpleNamespace(
skills=SimpleNamespace(container_path="/mnt/skills", get_skills_path=lambda: skills_dir),
sandbox=sandbox_config,
)
with patch("deerflow.config.get_app_config", return_value=config):
provider = LocalSandboxProvider()
assert [m.container_path for m in provider._path_mappings] == ["/mnt/skills"]
def test_setup_path_mappings_skips_non_absolute_container_path(self, tmp_path):
skills_dir = tmp_path / "skills"
skills_dir.mkdir()
custom_dir = tmp_path / "custom"
custom_dir.mkdir()
from deerflow.config.sandbox_config import SandboxConfig, VolumeMountConfig
sandbox_config = SandboxConfig(
use="deerflow.sandbox.local:LocalSandboxProvider",
mounts=[
VolumeMountConfig(host_path=str(custom_dir), container_path="mnt/data", read_only=False),
],
)
config = SimpleNamespace(
skills=SimpleNamespace(container_path="/mnt/skills", get_skills_path=lambda: skills_dir),
sandbox=sandbox_config,
)
with patch("deerflow.config.get_app_config", return_value=config):
provider = LocalSandboxProvider()
assert [m.container_path for m in provider._path_mappings] == ["/mnt/skills"]
def test_setup_path_mappings_normalizes_container_path_trailing_slash(self, tmp_path):
skills_dir = tmp_path / "skills"
skills_dir.mkdir()
custom_dir = tmp_path / "custom"
custom_dir.mkdir()
from deerflow.config.sandbox_config import SandboxConfig, VolumeMountConfig
sandbox_config = SandboxConfig(
use="deerflow.sandbox.local:LocalSandboxProvider",
mounts=[
VolumeMountConfig(host_path=str(custom_dir), container_path="/mnt/data/", read_only=False),
],
)
config = SimpleNamespace(
skills=SimpleNamespace(container_path="/mnt/skills", get_skills_path=lambda: skills_dir),
sandbox=sandbox_config,
)
with patch("deerflow.config.get_app_config", return_value=config):
provider = LocalSandboxProvider()
assert [m.container_path for m in provider._path_mappings] == ["/mnt/skills", "/mnt/data"]
@@ -8,7 +8,10 @@ import pytest
from deerflow.sandbox.tools import (
VIRTUAL_PATH_PREFIX,
_apply_cwd_prefix,
_get_custom_mount_for_path,
_get_custom_mounts,
_is_acp_workspace_path,
_is_custom_mount_path,
_is_skills_path,
_reject_path_traversal,
_resolve_acp_workspace_path,
@@ -96,6 +99,25 @@ def test_validate_local_tool_path_rejects_non_virtual_path() -> None:
validate_local_tool_path("/Users/someone/config.yaml", _THREAD_DATA)
def test_validate_local_tool_path_rejects_non_virtual_path_mentions_configured_mounts() -> None:
with pytest.raises(PermissionError, match="configured mount paths"):
validate_local_tool_path("/Users/someone/config.yaml", _THREAD_DATA)
def test_validate_local_tool_path_prioritizes_user_data_before_custom_mounts() -> None:
from deerflow.config.sandbox_config import VolumeMountConfig
mounts = [
VolumeMountConfig(host_path="/tmp/host-user-data", container_path=VIRTUAL_PATH_PREFIX, read_only=False),
]
with patch("deerflow.sandbox.tools._get_custom_mounts", return_value=mounts):
validate_local_tool_path(f"{VIRTUAL_PATH_PREFIX}/workspace/file.txt", _THREAD_DATA, read_only=True)
with patch("deerflow.sandbox.tools._get_custom_mounts", return_value=mounts):
with pytest.raises(PermissionError, match="path traversal"):
validate_local_tool_path(f"{VIRTUAL_PATH_PREFIX}/workspace/../../etc/passwd", _THREAD_DATA, read_only=True)
def test_validate_local_tool_path_rejects_bare_virtual_root() -> None:
"""The bare /mnt/user-data root without trailing slash is not a valid sub-path."""
with pytest.raises(PermissionError, match="Only paths under"):
@@ -567,6 +589,156 @@ def test_validate_local_bash_command_paths_allows_mcp_filesystem_paths() -> None
validate_local_bash_command_paths("ls /mnt/d/workspace", _THREAD_DATA)
# ---------- Custom mount path tests ----------
def _mock_custom_mounts():
"""Create mock VolumeMountConfig objects for testing."""
from deerflow.config.sandbox_config import VolumeMountConfig
return [
VolumeMountConfig(host_path="/home/user/code-read", container_path="/mnt/code-read", read_only=True),
VolumeMountConfig(host_path="/home/user/data", container_path="/mnt/data", read_only=False),
]
def test_is_custom_mount_path_recognises_configured_mounts() -> None:
with patch("deerflow.sandbox.tools._get_custom_mounts", return_value=_mock_custom_mounts()):
assert _is_custom_mount_path("/mnt/code-read") is True
assert _is_custom_mount_path("/mnt/code-read/src/main.py") is True
assert _is_custom_mount_path("/mnt/data") is True
assert _is_custom_mount_path("/mnt/data/file.txt") is True
assert _is_custom_mount_path("/mnt/code-read-extra/foo") is False
assert _is_custom_mount_path("/mnt/other") is False
def test_get_custom_mount_for_path_returns_longest_prefix() -> None:
from deerflow.config.sandbox_config import VolumeMountConfig
mounts = [
VolumeMountConfig(host_path="/var/mnt", container_path="/mnt", read_only=False),
VolumeMountConfig(host_path="/home/user/code", container_path="/mnt/code", read_only=True),
]
with patch("deerflow.sandbox.tools._get_custom_mounts", return_value=mounts):
mount = _get_custom_mount_for_path("/mnt/code/file.py")
assert mount is not None
assert mount.container_path == "/mnt/code"
def test_validate_local_tool_path_allows_custom_mount_read() -> None:
"""read_file / ls should be able to access custom mount paths."""
with patch("deerflow.sandbox.tools._get_custom_mounts", return_value=_mock_custom_mounts()):
validate_local_tool_path("/mnt/code-read/src/main.py", _THREAD_DATA, read_only=True)
validate_local_tool_path("/mnt/data/file.txt", _THREAD_DATA, read_only=True)
def test_validate_local_tool_path_blocks_read_only_mount_write() -> None:
"""write_file / str_replace must NOT write to read-only custom mounts."""
with patch("deerflow.sandbox.tools._get_custom_mounts", return_value=_mock_custom_mounts()):
with pytest.raises(PermissionError, match="Write access to read-only mount is not allowed"):
validate_local_tool_path("/mnt/code-read/src/main.py", _THREAD_DATA, read_only=False)
def test_validate_local_tool_path_allows_writable_mount_write() -> None:
"""write_file / str_replace should succeed on writable custom mounts."""
with patch("deerflow.sandbox.tools._get_custom_mounts", return_value=_mock_custom_mounts()):
validate_local_tool_path("/mnt/data/file.txt", _THREAD_DATA, read_only=False)
def test_validate_local_tool_path_blocks_traversal_in_custom_mount() -> None:
"""Path traversal via .. in custom mount paths must be rejected."""
with patch("deerflow.sandbox.tools._get_custom_mounts", return_value=_mock_custom_mounts()):
with pytest.raises(PermissionError, match="path traversal"):
validate_local_tool_path("/mnt/code-read/../../etc/passwd", _THREAD_DATA, read_only=True)
def test_validate_local_bash_command_paths_allows_custom_mount() -> None:
"""bash commands referencing custom mount paths should be allowed."""
with patch("deerflow.sandbox.tools._get_custom_mounts", return_value=_mock_custom_mounts()):
validate_local_bash_command_paths("cat /mnt/code-read/src/main.py", _THREAD_DATA)
validate_local_bash_command_paths("ls /mnt/data", _THREAD_DATA)
def test_validate_local_bash_command_paths_blocks_traversal_in_custom_mount() -> None:
"""Bash commands with traversal in custom mount paths should be blocked."""
with patch("deerflow.sandbox.tools._get_custom_mounts", return_value=_mock_custom_mounts()):
with pytest.raises(PermissionError, match="path traversal"):
validate_local_bash_command_paths("cat /mnt/code-read/../../etc/passwd", _THREAD_DATA)
def test_validate_local_bash_command_paths_still_blocks_non_mount_paths() -> None:
"""Paths not matching any custom mount should still be blocked."""
with patch("deerflow.sandbox.tools._get_custom_mounts", return_value=_mock_custom_mounts()):
with pytest.raises(PermissionError, match="Unsafe absolute paths"):
validate_local_bash_command_paths("cat /etc/shadow", _THREAD_DATA)
def test_get_custom_mounts_caching(monkeypatch, tmp_path) -> None:
"""_get_custom_mounts should cache after first successful load."""
# Clear any existing cache
if hasattr(_get_custom_mounts, "_cached"):
monkeypatch.delattr(_get_custom_mounts, "_cached")
# Use real directories so host_path.exists() filtering passes
dir_a = tmp_path / "code-read"
dir_a.mkdir()
dir_b = tmp_path / "data"
dir_b.mkdir()
from deerflow.config.sandbox_config import SandboxConfig, VolumeMountConfig
mounts = [
VolumeMountConfig(host_path=str(dir_a), container_path="/mnt/code-read", read_only=True),
VolumeMountConfig(host_path=str(dir_b), container_path="/mnt/data", read_only=False),
]
mock_sandbox = SandboxConfig(use="deerflow.sandbox.local:LocalSandboxProvider", mounts=mounts)
mock_config = SimpleNamespace(sandbox=mock_sandbox)
with patch("deerflow.config.get_app_config", return_value=mock_config):
result = _get_custom_mounts()
assert len(result) == 2
# After caching, should return cached value even without mock
assert hasattr(_get_custom_mounts, "_cached")
assert len(_get_custom_mounts()) == 2
# Cleanup
monkeypatch.delattr(_get_custom_mounts, "_cached")
def test_get_custom_mounts_filters_nonexistent_host_path(monkeypatch, tmp_path) -> None:
"""_get_custom_mounts should only return mounts whose host_path exists."""
if hasattr(_get_custom_mounts, "_cached"):
monkeypatch.delattr(_get_custom_mounts, "_cached")
from deerflow.config.sandbox_config import SandboxConfig, VolumeMountConfig
existing_dir = tmp_path / "existing"
existing_dir.mkdir()
mounts = [
VolumeMountConfig(host_path=str(existing_dir), container_path="/mnt/existing", read_only=True),
VolumeMountConfig(host_path="/nonexistent/path/12345", container_path="/mnt/ghost", read_only=False),
]
mock_sandbox = SandboxConfig(use="deerflow.sandbox.local:LocalSandboxProvider", mounts=mounts)
mock_config = SimpleNamespace(sandbox=mock_sandbox)
with patch("deerflow.config.get_app_config", return_value=mock_config):
result = _get_custom_mounts()
assert len(result) == 1
assert result[0].container_path == "/mnt/existing"
# Cleanup
monkeypatch.delattr(_get_custom_mounts, "_cached")
def test_get_custom_mount_for_path_boundary_no_false_prefix_match() -> None:
"""_get_custom_mount_for_path must not match /mnt/code-read-extra for /mnt/code-read."""
with patch("deerflow.sandbox.tools._get_custom_mounts", return_value=_mock_custom_mounts()):
mount = _get_custom_mount_for_path("/mnt/code-read-extra/foo")
assert mount is None
def test_str_replace_parallel_updates_should_preserve_both_edits(monkeypatch) -> None:
class SharedSandbox:
def __init__(self) -> None: