diff --git a/backend/app/gateway/routers/runs.py b/backend/app/gateway/routers/runs.py index a0aae03cd..91ac155f4 100644 --- a/backend/app/gateway/routers/runs.py +++ b/backend/app/gateway/routers/runs.py @@ -18,7 +18,7 @@ from app.gateway.deps import get_checkpointer, get_feedback_repo, get_run_event_ from app.gateway.pagination import trim_run_message_page from app.gateway.routers.thread_runs import RunCreateRequest from app.gateway.services import sse_consumer, start_run, wait_for_run_completion -from deerflow.runtime import serialize_channel_values +from deerflow.runtime import serialize_channel_values_for_api logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/runs", tags=["runs"]) @@ -82,7 +82,7 @@ async def stateless_wait(body: RunCreateRequest, request: Request) -> dict: if checkpoint_tuple is not None: checkpoint = getattr(checkpoint_tuple, "checkpoint", {}) or {} channel_values = checkpoint.get("channel_values", {}) - return serialize_channel_values(channel_values) + return serialize_channel_values_for_api(channel_values) except Exception: logger.exception("Failed to fetch final state for run %s", record.run_id) diff --git a/backend/app/gateway/routers/thread_runs.py b/backend/app/gateway/routers/thread_runs.py index 65bf0698f..c3e2ee7b9 100644 --- a/backend/app/gateway/routers/thread_runs.py +++ b/backend/app/gateway/routers/thread_runs.py @@ -23,7 +23,7 @@ from app.gateway.authz import require_permission from app.gateway.deps import get_checkpointer, get_current_user, get_feedback_repo, get_run_event_store, get_run_manager, get_run_store, get_stream_bridge from app.gateway.pagination import trim_run_message_page from app.gateway.services import sse_consumer, start_run, wait_for_run_completion -from deerflow.runtime import RunRecord, RunStatus, serialize_channel_values +from deerflow.runtime import RunRecord, RunStatus, serialize_channel_values_for_api logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/threads", tags=["runs"]) @@ -192,7 +192,7 @@ async def wait_run(thread_id: str, body: RunCreateRequest, request: Request) -> if checkpoint_tuple is not None: checkpoint = getattr(checkpoint_tuple, "checkpoint", {}) or {} channel_values = checkpoint.get("channel_values", {}) - return serialize_channel_values(channel_values) + return serialize_channel_values_for_api(channel_values) except Exception: logger.exception("Failed to fetch final state for run %s", record.run_id) diff --git a/backend/app/gateway/routers/threads.py b/backend/app/gateway/routers/threads.py index fd6c05289..0570f0519 100644 --- a/backend/app/gateway/routers/threads.py +++ b/backend/app/gateway/routers/threads.py @@ -25,7 +25,7 @@ from app.gateway.deps import get_checkpointer from app.gateway.internal_auth import get_trusted_internal_owner_user_id from app.gateway.utils import sanitize_log_param from deerflow.config.paths import Paths, get_paths -from deerflow.runtime import serialize_channel_values +from deerflow.runtime import serialize_channel_values_for_api from deerflow.runtime.user_context import get_effective_user_id from deerflow.utils.time import coerce_iso, now_iso @@ -437,7 +437,7 @@ async def get_thread(thread_id: str, request: Request) -> ThreadResponse: created_at=coerce_iso(record.get("created_at", "")), updated_at=coerce_iso(record.get("updated_at", "")), metadata=record.get("metadata", {}), - values=serialize_channel_values(channel_values), + values=serialize_channel_values_for_api(channel_values), ) @@ -480,7 +480,7 @@ async def get_thread_state(thread_id: str, request: Request) -> ThreadStateRespo next_tasks = [t.name for t in tasks_raw if hasattr(t, "name")] tasks = [{"id": getattr(t, "id", ""), "name": getattr(t, "name", "")} for t in tasks_raw] - values = serialize_channel_values(channel_values) + values = serialize_channel_values_for_api(channel_values) return ThreadStateResponse( values=values, @@ -588,7 +588,7 @@ async def update_thread_state(thread_id: str, body: ThreadStateUpdateRequest, re logger.debug("Failed to sync title to thread_meta for %s (non-fatal)", sanitize_log_param(thread_id)) return ThreadStateResponse( - values=serialize_channel_values(channel_values), + values=serialize_channel_values_for_api(channel_values), next=[], metadata=metadata, checkpoint_id=new_checkpoint_id, @@ -640,7 +640,7 @@ async def get_thread_history(thread_id: str, body: ThreadHistoryRequest, request if is_latest_checkpoint: messages = channel_values.get("messages") if messages: - values["messages"] = serialize_channel_values({"messages": messages}).get("messages", []) + values["messages"] = serialize_channel_values_for_api({"messages": messages}).get("messages", []) is_latest_checkpoint = False # Derive next tasks diff --git a/backend/packages/harness/deerflow/runtime/__init__.py b/backend/packages/harness/deerflow/runtime/__init__.py index 5a3df2eb6..4f69ba1ce 100644 --- a/backend/packages/harness/deerflow/runtime/__init__.py +++ b/backend/packages/harness/deerflow/runtime/__init__.py @@ -7,7 +7,7 @@ directly from ``deerflow.runtime``. from .checkpointer import checkpointer_context, get_checkpointer, make_checkpointer, reset_checkpointer from .runs import ConflictError, DisconnectMode, RunContext, RunManager, RunRecord, RunStatus, UnsupportedStrategyError, run_agent -from .serialization import serialize, serialize_channel_values, serialize_lc_object, serialize_messages_tuple +from .serialization import serialize, serialize_channel_values, serialize_channel_values_for_api, serialize_lc_object, serialize_messages_tuple, strip_data_url_image_blocks from .store import get_store, make_store, reset_store, store_context from .stream_bridge import END_SENTINEL, HEARTBEAT_SENTINEL, MemoryStreamBridge, StreamBridge, StreamEvent, make_stream_bridge @@ -29,8 +29,10 @@ __all__ = [ # serialization "serialize", "serialize_channel_values", + "serialize_channel_values_for_api", "serialize_lc_object", "serialize_messages_tuple", + "strip_data_url_image_blocks", # store "get_store", "make_store", diff --git a/backend/packages/harness/deerflow/runtime/serialization.py b/backend/packages/harness/deerflow/runtime/serialization.py index 48853dfb3..ef791b7ec 100644 --- a/backend/packages/harness/deerflow/runtime/serialization.py +++ b/backend/packages/harness/deerflow/runtime/serialization.py @@ -56,6 +56,56 @@ def serialize_channel_values(channel_values: dict[str, Any]) -> dict[str, Any]: return result +def strip_data_url_image_blocks(messages: list[dict[str, Any]]) -> list[dict[str, Any]]: + """Remove ``data:``-scheme ``image_url`` blocks from *hide_from_ui* messages. + + The history and run-wait endpoints return checkpoint-persisted messages to + the frontend. ``ViewImageMiddleware`` stores full base64 image payloads in + ``hide_from_ui`` human messages — these are internal model context and must + not be sent over the wire (huge response bodies, no UI value). + + Only content blocks of type ``image_url`` whose URL starts with ``data:`` + are stripped. Text blocks, ``https://`` image URLs, and non-hidden + messages are left untouched so that message ordering and count are + preserved. + """ + result: list[dict[str, Any]] = [] + for msg in messages: + if not isinstance(msg, dict): + result.append(msg) + continue + + # Only touch messages explicitly flagged as hidden from the UI. + additional_kwargs = msg.get("additional_kwargs") + if not (isinstance(additional_kwargs, dict) and additional_kwargs.get("hide_from_ui") is True): + result.append(msg) + continue + + content = msg.get("content") + if not isinstance(content, list): + result.append(msg) + continue + + # Filter out image_url blocks with data: scheme. + filtered = [block for block in content if not (isinstance(block, dict) and block.get("type") == "image_url" and isinstance(block.get("image_url"), dict) and str(block["image_url"].get("url", "")).startswith("data:"))] + result.append({**msg, "content": filtered}) + return result + + +def serialize_channel_values_for_api(channel_values: dict[str, Any]) -> dict[str, Any]: + """Serialize channel values and strip base64 image data from messages. + + Convenience wrapper combining :func:`serialize_channel_values` with + :func:`strip_data_url_image_blocks`. Use this in all REST endpoints + that return channel values to the frontend so that ``data:``-scheme + base64 image payloads are never sent over the wire. + """ + result = serialize_channel_values(channel_values) + if isinstance(result.get("messages"), list): + result["messages"] = strip_data_url_image_blocks(result["messages"]) + return result + + def serialize_messages_tuple(obj: Any) -> Any: """Serialize a messages-mode tuple ``(chunk, metadata)``.""" if isinstance(obj, tuple) and len(obj) == 2: diff --git a/backend/tests/test_serialization.py b/backend/tests/test_serialization.py index b707d7143..5d6f074de 100644 --- a/backend/tests/test_serialization.py +++ b/backend/tests/test_serialization.py @@ -157,3 +157,171 @@ def test_serialize_dispatcher_default_mode(): result = serialize(_FakePydanticV1()) assert result == {"key": "v1"} + + +# ── strip_data_url_image_blocks ────────────────────────────────────────────── + + +def _make_msg( + content, + *, + hide_from_ui=False, + msg_type="human", +): + """Build a serialised-style message dict.""" + msg = {"type": msg_type, "content": content} + if hide_from_ui: + msg["additional_kwargs"] = {"hide_from_ui": True} + return msg + + +def test_strip_data_url_removes_base64_from_hidden_messages(): + from deerflow.runtime.serialization import strip_data_url_image_blocks + + messages = [ + _make_msg( + [ + {"type": "text", "text": "Here are the images:"}, + { + "type": "image_url", + "image_url": {"url": "data:image/png;base64,iVBOR..."}, + }, + {"type": "text", "text": "- file.jpg (image/jpeg)"}, + { + "type": "image_url", + "image_url": {"url": "data:image/jpeg;base64,/9j/..."}, + }, + ], + hide_from_ui=True, + ), + ] + result = strip_data_url_image_blocks(messages) + assert len(result) == 1 + content = result[0]["content"] + # Only text blocks remain + assert content == [ + {"type": "text", "text": "Here are the images:"}, + {"type": "text", "text": "- file.jpg (image/jpeg)"}, + ] + + +def test_strip_data_url_preserves_non_hidden_messages(): + from deerflow.runtime.serialization import strip_data_url_image_blocks + + messages = [ + _make_msg( + [ + {"type": "text", "text": "Check this out"}, + { + "type": "image_url", + "image_url": {"url": "data:image/png;base64,iVBOR..."}, + }, + ], + hide_from_ui=False, + ), + ] + result = strip_data_url_image_blocks(messages) + assert result == messages + + +def test_strip_data_url_preserves_https_image_urls(): + from deerflow.runtime.serialization import strip_data_url_image_blocks + + messages = [ + _make_msg( + [ + {"type": "text", "text": "See image"}, + { + "type": "image_url", + "image_url": {"url": "https://example.com/img.png"}, + }, + ], + hide_from_ui=True, + ), + ] + result = strip_data_url_image_blocks(messages) + assert result == messages + + +def test_strip_data_url_handles_string_content(): + from deerflow.runtime.serialization import strip_data_url_image_blocks + + messages = [ + _make_msg("plain text content", hide_from_ui=True), + ] + result = strip_data_url_image_blocks(messages) + assert result == messages + + +def test_strip_data_url_handles_non_dict_messages(): + from deerflow.runtime.serialization import strip_data_url_image_blocks + + result = strip_data_url_image_blocks(["a_string", None, 42]) + assert result == ["a_string", None, 42] + + +def test_strip_data_url_mixed_messages(): + """A realistic mix: normal user message + hidden image injection + AI reply.""" + from deerflow.runtime.serialization import strip_data_url_image_blocks + + messages = [ + _make_msg("Please analyze this image", hide_from_ui=False), + _make_msg( + [ + {"type": "text", "text": "Here are the images:"}, + { + "type": "image_url", + "image_url": {"url": "data:image/png;base64,AABBCCDD"}, + }, + ], + hide_from_ui=True, + ), + _make_msg("I can see a landscape", msg_type="ai"), + ] + result = strip_data_url_image_blocks(messages) + assert len(result) == 3 + # First message untouched + assert result[0]["content"] == "Please analyze this image" + # Hidden message: image_url stripped, text kept + assert result[1]["content"] == [{"type": "text", "text": "Here are the images:"}] + # AI message untouched + assert result[2]["content"] == "I can see a landscape" + + +def test_serialize_channel_values_for_api_strips_base64(): + from deerflow.runtime.serialization import serialize_channel_values_for_api + + channel_values = { + "messages": [ + { + "type": "human", + "content": "hello", + }, + { + "type": "human", + "content": [ + {"type": "text", "text": "images:"}, + { + "type": "image_url", + "image_url": {"url": "data:image/png;base64,BIGDATA"}, + }, + ], + "additional_kwargs": {"hide_from_ui": True}, + }, + ], + "title": "My thread", + } + result = serialize_channel_values_for_api(channel_values) + assert result["title"] == "My thread" + assert len(result["messages"]) == 2 + assert result["messages"][0]["content"] == "hello" + # base64 block stripped, text block kept + assert result["messages"][1]["content"] == [{"type": "text", "text": "images:"}] + + +def test_serialize_channel_values_for_api_no_messages(): + """When channel_values has no messages key, returns without error.""" + from deerflow.runtime.serialization import serialize_channel_values_for_api + + result = serialize_channel_values_for_api({"title": "empty"}) + assert result == {"title": "empty"}