mirror of
https://github.com/bytedance/deer-flow.git
synced 2026-05-21 23:46:50 +00:00
fix(harness): constrain view_image to thread data paths (#2557)
* fix(harness): constrain view_image to thread data paths Fixes #2530 * fix(harness): address view_image review findings * style(harness): format view_image changes * fix(harness): address view_image review comments
This commit is contained in:
@@ -0,0 +1,164 @@
|
||||
import base64
|
||||
import importlib
|
||||
import os
|
||||
from pathlib import Path
|
||||
from types import SimpleNamespace
|
||||
|
||||
import pytest
|
||||
|
||||
from deerflow.tools.builtins.view_image_tool import view_image_tool
|
||||
|
||||
view_image_module = importlib.import_module("deerflow.tools.builtins.view_image_tool")
|
||||
|
||||
PNG_BYTES = base64.b64decode("iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mNk+M9QDwADhgGAWjR9awAAAABJRU5ErkJggg==")
|
||||
|
||||
|
||||
def _make_thread_data(tmp_path: Path) -> dict[str, str]:
|
||||
user_data = tmp_path / "threads" / "thread-1" / "user-data"
|
||||
workspace = user_data / "workspace"
|
||||
uploads = user_data / "uploads"
|
||||
outputs = user_data / "outputs"
|
||||
for directory in (workspace, uploads, outputs):
|
||||
directory.mkdir(parents=True)
|
||||
|
||||
return {
|
||||
"workspace_path": str(workspace),
|
||||
"uploads_path": str(uploads),
|
||||
"outputs_path": str(outputs),
|
||||
}
|
||||
|
||||
|
||||
def _make_runtime(thread_data: dict[str, str]) -> SimpleNamespace:
|
||||
return SimpleNamespace(
|
||||
state={"thread_data": thread_data},
|
||||
context={"thread_id": "thread-1"},
|
||||
config={},
|
||||
)
|
||||
|
||||
|
||||
def _message_content(result) -> str:
|
||||
return result.update["messages"][0].content
|
||||
|
||||
|
||||
def test_view_image_rejects_external_absolute_path(tmp_path: Path) -> None:
|
||||
thread_data = _make_thread_data(tmp_path)
|
||||
outside_image = tmp_path / "outside.png"
|
||||
outside_image.write_bytes(PNG_BYTES)
|
||||
|
||||
result = view_image_tool.func(
|
||||
runtime=_make_runtime(thread_data),
|
||||
image_path=str(outside_image),
|
||||
tool_call_id="tc-external",
|
||||
)
|
||||
|
||||
assert "Only image paths under /mnt/user-data" in _message_content(result)
|
||||
assert "viewed_images" not in result.update
|
||||
|
||||
|
||||
def test_view_image_reads_virtual_uploads_path(tmp_path: Path) -> None:
|
||||
thread_data = _make_thread_data(tmp_path)
|
||||
image_path = Path(thread_data["uploads_path"]) / "sample.png"
|
||||
image_path.write_bytes(PNG_BYTES)
|
||||
|
||||
result = view_image_tool.func(
|
||||
runtime=_make_runtime(thread_data),
|
||||
image_path="/mnt/user-data/uploads/sample.png",
|
||||
tool_call_id="tc-uploads",
|
||||
)
|
||||
|
||||
assert _message_content(result) == "Successfully read image"
|
||||
viewed_image = result.update["viewed_images"]["/mnt/user-data/uploads/sample.png"]
|
||||
assert viewed_image["base64"] == base64.b64encode(PNG_BYTES).decode("utf-8")
|
||||
assert viewed_image["mime_type"] == "image/png"
|
||||
|
||||
|
||||
def test_view_image_rejects_spoofed_extension(tmp_path: Path) -> None:
|
||||
thread_data = _make_thread_data(tmp_path)
|
||||
image_path = Path(thread_data["uploads_path"]) / "not-really.png"
|
||||
image_path.write_bytes(b"not an image")
|
||||
|
||||
result = view_image_tool.func(
|
||||
runtime=_make_runtime(thread_data),
|
||||
image_path="/mnt/user-data/uploads/not-really.png",
|
||||
tool_call_id="tc-spoofed",
|
||||
)
|
||||
|
||||
assert "contents do not match" in _message_content(result)
|
||||
assert "viewed_images" not in result.update
|
||||
|
||||
|
||||
def test_view_image_rejects_mismatched_magic_bytes(tmp_path: Path) -> None:
|
||||
thread_data = _make_thread_data(tmp_path)
|
||||
image_path = Path(thread_data["uploads_path"]) / "jpeg-named-png.png"
|
||||
image_path.write_bytes(b"\xff\xd8\xff\xe0fake-jpeg")
|
||||
|
||||
result = view_image_tool.func(
|
||||
runtime=_make_runtime(thread_data),
|
||||
image_path="/mnt/user-data/uploads/jpeg-named-png.png",
|
||||
tool_call_id="tc-mismatch",
|
||||
)
|
||||
|
||||
assert "file extension indicates image/png" in _message_content(result)
|
||||
assert "viewed_images" not in result.update
|
||||
|
||||
|
||||
def test_view_image_rejects_oversized_image(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
thread_data = _make_thread_data(tmp_path)
|
||||
image_path = Path(thread_data["uploads_path"]) / "sample.png"
|
||||
image_path.write_bytes(PNG_BYTES)
|
||||
monkeypatch.setattr(view_image_module, "_MAX_IMAGE_BYTES", len(PNG_BYTES) - 1)
|
||||
|
||||
result = view_image_tool.func(
|
||||
runtime=_make_runtime(thread_data),
|
||||
image_path="/mnt/user-data/uploads/sample.png",
|
||||
tool_call_id="tc-oversized",
|
||||
)
|
||||
|
||||
assert "Image file is too large" in _message_content(result)
|
||||
assert "viewed_images" not in result.update
|
||||
|
||||
|
||||
def test_view_image_sanitizes_read_errors(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
thread_data = _make_thread_data(tmp_path)
|
||||
image_path = Path(thread_data["uploads_path"]) / "sample.png"
|
||||
image_path.write_bytes(PNG_BYTES)
|
||||
|
||||
def _open(*args, **kwargs):
|
||||
raise PermissionError(f"permission denied: {image_path}")
|
||||
|
||||
monkeypatch.setattr("builtins.open", _open)
|
||||
|
||||
result = view_image_tool.func(
|
||||
runtime=_make_runtime(thread_data),
|
||||
image_path="/mnt/user-data/uploads/sample.png",
|
||||
tool_call_id="tc-read-error",
|
||||
)
|
||||
|
||||
message = _message_content(result)
|
||||
assert "Error reading image file" in message
|
||||
assert str(image_path) not in message
|
||||
assert str(Path(thread_data["uploads_path"])) not in message
|
||||
assert "/mnt/user-data/uploads/sample.png" in message
|
||||
assert "viewed_images" not in result.update
|
||||
|
||||
|
||||
@pytest.mark.skipif(os.name == "nt", reason="symlink semantics differ on Windows")
|
||||
def test_view_image_rejects_uploads_symlink_escape(tmp_path: Path) -> None:
|
||||
thread_data = _make_thread_data(tmp_path)
|
||||
outside_image = tmp_path / "outside-target.png"
|
||||
outside_image.write_bytes(PNG_BYTES)
|
||||
|
||||
link_path = Path(thread_data["uploads_path"]) / "escape.png"
|
||||
try:
|
||||
link_path.symlink_to(outside_image)
|
||||
except OSError as exc:
|
||||
pytest.skip(f"symlink creation failed: {exc}")
|
||||
|
||||
result = view_image_tool.func(
|
||||
runtime=_make_runtime(thread_data),
|
||||
image_path="/mnt/user-data/uploads/escape.png",
|
||||
tool_call_id="tc-symlink",
|
||||
)
|
||||
|
||||
assert "path traversal" in _message_content(result)
|
||||
assert "viewed_images" not in result.update
|
||||
Reference in New Issue
Block a user