mirror of
https://github.com/bytedance/deer-flow.git
synced 2026-06-14 03:15:58 +00:00
8955b3222a
* fix(sandbox): merge idempotent sandbox state updates * fix(sandbox): merge idempotent sandbox state updates
117 lines
4.0 KiB
Python
117 lines
4.0 KiB
Python
from typing import Annotated, NotRequired, TypedDict
|
|
|
|
from langchain.agents import AgentState
|
|
|
|
|
|
class SandboxState(TypedDict):
|
|
sandbox_id: NotRequired[str | None]
|
|
|
|
|
|
class ThreadDataState(TypedDict):
|
|
workspace_path: NotRequired[str | None]
|
|
uploads_path: NotRequired[str | None]
|
|
outputs_path: NotRequired[str | None]
|
|
|
|
|
|
class ViewedImageData(TypedDict):
|
|
base64: str
|
|
mime_type: str
|
|
|
|
|
|
def merge_sandbox(existing: SandboxState | None, new: SandboxState | None) -> SandboxState | None:
|
|
"""Reducer for sandbox state - accepts idempotent writes only.
|
|
|
|
Multiple sandbox tools can initialize lazily in the same graph step and
|
|
emit the same sandbox_id via Command(update=...). LangGraph needs an
|
|
explicit reducer for that shared state key. Different sandbox ids in the
|
|
same thread indicate a lifecycle/isolation bug, so fail closed instead of
|
|
choosing one silently.
|
|
"""
|
|
if new is None:
|
|
return existing
|
|
if existing is None:
|
|
return new
|
|
|
|
existing_id = existing.get("sandbox_id")
|
|
new_id = new.get("sandbox_id")
|
|
if existing_id == new_id:
|
|
return existing
|
|
raise ValueError(f"Conflicting sandbox state updates: {existing_id!r} != {new_id!r}")
|
|
|
|
|
|
def merge_artifacts(existing: list[str] | None, new: list[str] | None) -> list[str]:
|
|
"""Reducer for artifacts list - merges and deduplicates artifacts."""
|
|
if existing is None:
|
|
return new or []
|
|
if new is None:
|
|
return existing
|
|
# Use dict.fromkeys to deduplicate while preserving order
|
|
return list(dict.fromkeys(existing + new))
|
|
|
|
|
|
def merge_viewed_images(existing: dict[str, ViewedImageData] | None, new: dict[str, ViewedImageData] | None) -> dict[str, ViewedImageData]:
|
|
"""Reducer for viewed_images dict - merges image dictionaries.
|
|
|
|
Special case: If new is an empty dict {}, it clears the existing images.
|
|
This allows middlewares to clear the viewed_images state after processing.
|
|
"""
|
|
if existing is None:
|
|
return new or {}
|
|
if new is None:
|
|
return existing
|
|
# Special case: empty dict means clear all viewed images
|
|
if len(new) == 0:
|
|
return {}
|
|
# Merge dictionaries, new values override existing ones for same keys
|
|
return {**existing, **new}
|
|
|
|
|
|
def merge_todos(existing: list | None, new: list | None) -> list | None:
|
|
"""Reducer for todos list - keeps the last non-None value.
|
|
|
|
Semantics:
|
|
- If `new` is None (node didn't touch todos), preserve `existing`.
|
|
- If `new` is provided (even empty list), it represents an explicit
|
|
update and wins over `existing`.
|
|
"""
|
|
if new is None:
|
|
return existing
|
|
return new
|
|
|
|
|
|
class PromotedTools(TypedDict):
|
|
catalog_hash: str
|
|
names: list[str]
|
|
|
|
|
|
def merge_promoted(existing: PromotedTools | None, new: PromotedTools | None) -> PromotedTools | None:
|
|
"""Reducer for deferred-tool promotions, scoped by catalog hash.
|
|
|
|
- new None/empty -> preserve existing (node didn't touch promotions).
|
|
- catalog_hash changed -> replace wholesale, dropping stale names (prevents a
|
|
persisted bare name from exposing a different tool after catalog drift).
|
|
- same catalog_hash -> union names, dedupe, preserve order.
|
|
"""
|
|
if not new:
|
|
return existing
|
|
if existing is None or existing.get("catalog_hash") != new["catalog_hash"]:
|
|
return {
|
|
"catalog_hash": new["catalog_hash"],
|
|
"names": list(dict.fromkeys(new["names"])),
|
|
}
|
|
return {
|
|
"catalog_hash": existing["catalog_hash"],
|
|
"names": list(dict.fromkeys(existing["names"] + new["names"])),
|
|
}
|
|
|
|
|
|
class ThreadState(AgentState):
|
|
sandbox: Annotated[NotRequired[SandboxState | None], merge_sandbox]
|
|
thread_data: NotRequired[ThreadDataState | None]
|
|
title: NotRequired[str | None]
|
|
artifacts: Annotated[list[str], merge_artifacts]
|
|
todos: Annotated[list | None, merge_todos]
|
|
uploaded_files: NotRequired[list[dict] | None]
|
|
viewed_images: Annotated[dict[str, ViewedImageData], merge_viewed_images] # image_path -> {base64, mime_type}
|
|
promoted: Annotated[PromotedTools | None, merge_promoted]
|