from typing import Annotated, NotRequired, TypedDict from langchain.agents import AgentState class SandboxState(TypedDict): sandbox_id: NotRequired[str | None] class ThreadDataState(TypedDict): workspace_path: NotRequired[str | None] uploads_path: NotRequired[str | None] outputs_path: NotRequired[str | None] class ViewedImageData(TypedDict): base64: str mime_type: str def merge_artifacts(existing: list[str] | None, new: list[str] | None) -> list[str]: """Reducer for artifacts list - merges and deduplicates artifacts.""" if existing is None: return new or [] if new is None: return existing # Use dict.fromkeys to deduplicate while preserving order return list(dict.fromkeys(existing + new)) def merge_viewed_images(existing: dict[str, ViewedImageData] | None, new: dict[str, ViewedImageData] | None) -> dict[str, ViewedImageData]: """Reducer for viewed_images dict - merges image dictionaries. Special case: If new is an empty dict {}, it clears the existing images. This allows middlewares to clear the viewed_images state after processing. """ if existing is None: return new or {} if new is None: return existing # Special case: empty dict means clear all viewed images if len(new) == 0: return {} # Merge dictionaries, new values override existing ones for same keys return {**existing, **new} def merge_todos(existing: list | None, new: list | None) -> list | None: """Reducer for todos list - keeps the last non-None value. Semantics: - If `new` is None (node didn't touch todos), preserve `existing`. - If `new` is provided (even empty list), it represents an explicit update and wins over `existing`. """ if new is None: return existing return new class PromotedTools(TypedDict): catalog_hash: str names: list[str] def merge_promoted(existing: PromotedTools | None, new: PromotedTools | None) -> PromotedTools | None: """Reducer for deferred-tool promotions, scoped by catalog hash. - new None/empty -> preserve existing (node didn't touch promotions). - catalog_hash changed -> replace wholesale, dropping stale names (prevents a persisted bare name from exposing a different tool after catalog drift). - same catalog_hash -> union names, dedupe, preserve order. """ if not new: return existing if existing is None or existing.get("catalog_hash") != new["catalog_hash"]: return { "catalog_hash": new["catalog_hash"], "names": list(dict.fromkeys(new["names"])), } return { "catalog_hash": existing["catalog_hash"], "names": list(dict.fromkeys(existing["names"] + new["names"])), } class ThreadState(AgentState): sandbox: NotRequired[SandboxState | None] thread_data: NotRequired[ThreadDataState | None] title: NotRequired[str | None] artifacts: Annotated[list[str], merge_artifacts] todos: Annotated[list | None, merge_todos] uploaded_files: NotRequired[list[dict] | None] viewed_images: Annotated[dict[str, ViewedImageData], merge_viewed_images] # image_path -> {base64, mime_type} promoted: Annotated[PromotedTools | None, merge_promoted]