mirror of
https://github.com/bytedance/deer-flow.git
synced 2026-06-14 03:15:58 +00:00
fix(sandbox): merge idempotent sandbox state updates (#3518)
* fix(sandbox): merge idempotent sandbox state updates * fix(sandbox): merge idempotent sandbox state updates
This commit is contained in:
@@ -18,6 +18,27 @@ class ViewedImageData(TypedDict):
|
|||||||
mime_type: str
|
mime_type: str
|
||||||
|
|
||||||
|
|
||||||
|
def merge_sandbox(existing: SandboxState | None, new: SandboxState | None) -> SandboxState | None:
|
||||||
|
"""Reducer for sandbox state - accepts idempotent writes only.
|
||||||
|
|
||||||
|
Multiple sandbox tools can initialize lazily in the same graph step and
|
||||||
|
emit the same sandbox_id via Command(update=...). LangGraph needs an
|
||||||
|
explicit reducer for that shared state key. Different sandbox ids in the
|
||||||
|
same thread indicate a lifecycle/isolation bug, so fail closed instead of
|
||||||
|
choosing one silently.
|
||||||
|
"""
|
||||||
|
if new is None:
|
||||||
|
return existing
|
||||||
|
if existing is None:
|
||||||
|
return new
|
||||||
|
|
||||||
|
existing_id = existing.get("sandbox_id")
|
||||||
|
new_id = new.get("sandbox_id")
|
||||||
|
if existing_id == new_id:
|
||||||
|
return existing
|
||||||
|
raise ValueError(f"Conflicting sandbox state updates: {existing_id!r} != {new_id!r}")
|
||||||
|
|
||||||
|
|
||||||
def merge_artifacts(existing: list[str] | None, new: list[str] | None) -> list[str]:
|
def merge_artifacts(existing: list[str] | None, new: list[str] | None) -> list[str]:
|
||||||
"""Reducer for artifacts list - merges and deduplicates artifacts."""
|
"""Reducer for artifacts list - merges and deduplicates artifacts."""
|
||||||
if existing is None:
|
if existing is None:
|
||||||
@@ -85,7 +106,7 @@ def merge_promoted(existing: PromotedTools | None, new: PromotedTools | None) ->
|
|||||||
|
|
||||||
|
|
||||||
class ThreadState(AgentState):
|
class ThreadState(AgentState):
|
||||||
sandbox: NotRequired[SandboxState | None]
|
sandbox: Annotated[NotRequired[SandboxState | None], merge_sandbox]
|
||||||
thread_data: NotRequired[ThreadDataState | None]
|
thread_data: NotRequired[ThreadDataState | None]
|
||||||
title: NotRequired[str | None]
|
title: NotRequired[str | None]
|
||||||
artifacts: Annotated[list[str], merge_artifacts]
|
artifacts: Annotated[list[str], merge_artifacts]
|
||||||
|
|||||||
@@ -7,14 +7,51 @@ overwrites the previously accumulated value.
|
|||||||
|
|
||||||
from typing import get_type_hints
|
from typing import get_type_hints
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
from deerflow.agents.thread_state import (
|
from deerflow.agents.thread_state import (
|
||||||
ThreadState,
|
ThreadState,
|
||||||
merge_artifacts,
|
merge_artifacts,
|
||||||
|
merge_sandbox,
|
||||||
merge_todos,
|
merge_todos,
|
||||||
merge_viewed_images,
|
merge_viewed_images,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class TestMergeSandbox:
|
||||||
|
"""Reducer for ThreadState.sandbox - allows idempotent concurrent writes."""
|
||||||
|
|
||||||
|
def test_none_new_preserves_existing(self):
|
||||||
|
existing = {"sandbox_id": "sandbox-1"}
|
||||||
|
assert merge_sandbox(existing, None) == existing
|
||||||
|
|
||||||
|
def test_none_existing_accepts_new(self):
|
||||||
|
new = {"sandbox_id": "sandbox-1"}
|
||||||
|
assert merge_sandbox(None, new) == new
|
||||||
|
|
||||||
|
def test_same_sandbox_id_is_idempotent(self):
|
||||||
|
existing = {"sandbox_id": "sandbox-1"}
|
||||||
|
new = {"sandbox_id": "sandbox-1"}
|
||||||
|
assert merge_sandbox(existing, new) == existing
|
||||||
|
|
||||||
|
def test_both_none_sandbox_id_is_idempotent(self):
|
||||||
|
existing = {"sandbox_id": None}
|
||||||
|
new = {"sandbox_id": None}
|
||||||
|
assert merge_sandbox(existing, new) == existing
|
||||||
|
|
||||||
|
def test_omitted_sandbox_id_is_idempotent(self):
|
||||||
|
"""An omitted sandbox_id represents uninitialized sandbox state."""
|
||||||
|
existing = {}
|
||||||
|
new = {}
|
||||||
|
assert merge_sandbox(existing, new) == existing
|
||||||
|
|
||||||
|
def test_conflicting_sandbox_ids_raise(self):
|
||||||
|
existing = {"sandbox_id": "sandbox-1"}
|
||||||
|
new = {"sandbox_id": "sandbox-2"}
|
||||||
|
with pytest.raises(ValueError, match="Conflicting sandbox state updates"):
|
||||||
|
merge_sandbox(existing, new)
|
||||||
|
|
||||||
|
|
||||||
class TestMergeTodos:
|
class TestMergeTodos:
|
||||||
"""Reducer for ThreadState.todos - keeps last non-None value."""
|
"""Reducer for ThreadState.todos - keeps last non-None value."""
|
||||||
|
|
||||||
@@ -95,3 +132,13 @@ class TestThreadStateAnnotations:
|
|||||||
"""Sanity check that existing reducer wiring is preserved."""
|
"""Sanity check that existing reducer wiring is preserved."""
|
||||||
hints = get_type_hints(ThreadState, include_extras=True)
|
hints = get_type_hints(ThreadState, include_extras=True)
|
||||||
assert merge_artifacts in hints["artifacts"].__metadata__
|
assert merge_artifacts in hints["artifacts"].__metadata__
|
||||||
|
|
||||||
|
def test_sandbox_field_is_wired_to_merge_sandbox(self):
|
||||||
|
"""ThreadState.sandbox must merge idempotent lazy-init updates.
|
||||||
|
|
||||||
|
Without this Annotated binding, concurrent sandbox tools that all
|
||||||
|
persist the same lazily acquired sandbox_id can trigger LangGraph's
|
||||||
|
INVALID_CONCURRENT_GRAPH_UPDATE error.
|
||||||
|
"""
|
||||||
|
hints = get_type_hints(ThreadState, include_extras=True)
|
||||||
|
assert merge_sandbox in hints["sandbox"].__metadata__
|
||||||
|
|||||||
Reference in New Issue
Block a user