diff --git a/backend/packages/harness/deerflow/agents/thread_state.py b/backend/packages/harness/deerflow/agents/thread_state.py index 0805cc0f3..8e5690779 100644 --- a/backend/packages/harness/deerflow/agents/thread_state.py +++ b/backend/packages/harness/deerflow/agents/thread_state.py @@ -18,6 +18,27 @@ class ViewedImageData(TypedDict): mime_type: str +def merge_sandbox(existing: SandboxState | None, new: SandboxState | None) -> SandboxState | None: + """Reducer for sandbox state - accepts idempotent writes only. + + Multiple sandbox tools can initialize lazily in the same graph step and + emit the same sandbox_id via Command(update=...). LangGraph needs an + explicit reducer for that shared state key. Different sandbox ids in the + same thread indicate a lifecycle/isolation bug, so fail closed instead of + choosing one silently. + """ + if new is None: + return existing + if existing is None: + return new + + existing_id = existing.get("sandbox_id") + new_id = new.get("sandbox_id") + if existing_id == new_id: + return existing + raise ValueError(f"Conflicting sandbox state updates: {existing_id!r} != {new_id!r}") + + def merge_artifacts(existing: list[str] | None, new: list[str] | None) -> list[str]: """Reducer for artifacts list - merges and deduplicates artifacts.""" if existing is None: @@ -85,7 +106,7 @@ def merge_promoted(existing: PromotedTools | None, new: PromotedTools | None) -> class ThreadState(AgentState): - sandbox: NotRequired[SandboxState | None] + sandbox: Annotated[NotRequired[SandboxState | None], merge_sandbox] thread_data: NotRequired[ThreadDataState | None] title: NotRequired[str | None] artifacts: Annotated[list[str], merge_artifacts] diff --git a/backend/tests/test_thread_state_reducers.py b/backend/tests/test_thread_state_reducers.py index bc419c93a..6c79f85fb 100644 --- a/backend/tests/test_thread_state_reducers.py +++ b/backend/tests/test_thread_state_reducers.py @@ -7,14 +7,51 @@ overwrites the previously accumulated value. from typing import get_type_hints +import pytest + from deerflow.agents.thread_state import ( ThreadState, merge_artifacts, + merge_sandbox, merge_todos, merge_viewed_images, ) +class TestMergeSandbox: + """Reducer for ThreadState.sandbox - allows idempotent concurrent writes.""" + + def test_none_new_preserves_existing(self): + existing = {"sandbox_id": "sandbox-1"} + assert merge_sandbox(existing, None) == existing + + def test_none_existing_accepts_new(self): + new = {"sandbox_id": "sandbox-1"} + assert merge_sandbox(None, new) == new + + def test_same_sandbox_id_is_idempotent(self): + existing = {"sandbox_id": "sandbox-1"} + new = {"sandbox_id": "sandbox-1"} + assert merge_sandbox(existing, new) == existing + + def test_both_none_sandbox_id_is_idempotent(self): + existing = {"sandbox_id": None} + new = {"sandbox_id": None} + assert merge_sandbox(existing, new) == existing + + def test_omitted_sandbox_id_is_idempotent(self): + """An omitted sandbox_id represents uninitialized sandbox state.""" + existing = {} + new = {} + assert merge_sandbox(existing, new) == existing + + def test_conflicting_sandbox_ids_raise(self): + existing = {"sandbox_id": "sandbox-1"} + new = {"sandbox_id": "sandbox-2"} + with pytest.raises(ValueError, match="Conflicting sandbox state updates"): + merge_sandbox(existing, new) + + class TestMergeTodos: """Reducer for ThreadState.todos - keeps last non-None value.""" @@ -95,3 +132,13 @@ class TestThreadStateAnnotations: """Sanity check that existing reducer wiring is preserved.""" hints = get_type_hints(ThreadState, include_extras=True) assert merge_artifacts in hints["artifacts"].__metadata__ + + def test_sandbox_field_is_wired_to_merge_sandbox(self): + """ThreadState.sandbox must merge idempotent lazy-init updates. + + Without this Annotated binding, concurrent sandbox tools that all + persist the same lazily acquired sandbox_id can trigger LangGraph's + INVALID_CONCURRENT_GRAPH_UPDATE error. + """ + hints = get_type_hints(ThreadState, include_extras=True) + assert merge_sandbox in hints["sandbox"].__metadata__