fix(agents): preserve todos state across node updates (#3180)

* fix(agents): preserve todos state across node updates

ThreadState.todos had no reducer, so any downstream node returning a
partial state without todos was implicitly setting it to None, which
LangGraph then used to overwrite the previously streamed value. This
caused the to-do list to render correctly during streaming but vanish
once streaming completed.

Add a merge_todos reducer that keeps the last non-None value, mirroring
the merge_artifacts pattern already used in the same file. An explicit
empty list is still respected so that 'user cleared todos' works.

Tests: 10 new unit tests in tests/test_thread_state_reducers.py covering
merge_todos plus regression coverage for merge_artifacts and
merge_viewed_images. All 69 thread-related tests pass locally.

Closes #3123

* test(agents): add annotation binding regression guard

Address Copilot review feedback on #3123:

- Add TestThreadStateAnnotations asserting that ThreadState.todos is
  Annotated with merge_todos. Without this guard, reverting the
  Annotated[list | None, merge_todos] binding would silently regress
  #3123 while all existing reducer unit tests continue to pass.

- Align test imports to 'from deerflow.agents.thread_state import ...'
  matching the rest of the backend test suite.
This commit is contained in:
Huixin615
2026-05-23 23:25:38 +08:00
committed by GitHub
parent 0fb05825a2
commit 8785658a2e
2 changed files with 111 additions and 1 deletions
@@ -45,11 +45,24 @@ def merge_viewed_images(existing: dict[str, ViewedImageData] | None, new: dict[s
return {**existing, **new}
def merge_todos(existing: list | None, new: list | None) -> list | None:
"""Reducer for todos list - keeps the last non-None value.
Semantics:
- If `new` is None (node didn't touch todos), preserve `existing`.
- If `new` is provided (even empty list), it represents an explicit
update and wins over `existing`.
"""
if new is None:
return existing
return new
class ThreadState(AgentState):
sandbox: NotRequired[SandboxState | None]
thread_data: NotRequired[ThreadDataState | None]
title: NotRequired[str | None]
artifacts: Annotated[list[str], merge_artifacts]
todos: NotRequired[list | None]
todos: Annotated[list | None, merge_todos]
uploaded_files: NotRequired[list[dict] | None]
viewed_images: Annotated[dict[str, ViewedImageData], merge_viewed_images] # image_path -> {base64, mime_type}
@@ -0,0 +1,97 @@
"""Unit tests for ThreadState reducers.
Regression coverage for issue #3123: todos list disappearing after streaming
completes because a downstream node's partial state update with `todos=None`
overwrites the previously accumulated value.
"""
from typing import get_type_hints
from deerflow.agents.thread_state import (
ThreadState,
merge_artifacts,
merge_todos,
merge_viewed_images,
)
class TestMergeTodos:
"""Reducer for ThreadState.todos - keeps last non-None value."""
def test_new_value_overrides_existing(self):
existing = [{"id": 1, "text": "old", "done": False}]
new = [{"id": 1, "text": "old", "done": True}]
assert merge_todos(existing, new) == new
def test_none_new_preserves_existing(self):
"""THE KEY FIX for #3123: a node that doesn't touch todos must NOT
wipe them out by returning an implicit None."""
existing = [{"id": 1, "text": "task", "done": False}]
assert merge_todos(existing, None) == existing
def test_none_existing_accepts_new(self):
new = [{"id": 1, "text": "first todo"}]
assert merge_todos(None, new) == new
def test_both_none_returns_none(self):
assert merge_todos(None, None) is None
def test_empty_list_is_explicit_clear(self):
"""An explicit empty list means 'user cleared all todos' and must
win over the previous list."""
existing = [{"id": 1, "text": "task"}]
assert merge_todos(existing, []) == []
class TestMergeArtifacts:
"""Sanity check for the existing artifacts reducer."""
def test_dedupes_and_preserves_order(self):
assert merge_artifacts(["a", "b"], ["b", "c"]) == ["a", "b", "c"]
def test_none_new_preserves_existing(self):
assert merge_artifacts(["a"], None) == ["a"]
def test_none_existing_accepts_new(self):
assert merge_artifacts(None, ["a"]) == ["a"]
class TestMergeViewedImages:
"""Sanity check for the existing viewed_images reducer."""
def test_merges_dicts(self):
existing = {"k1": {"base64": "x", "mime_type": "image/png"}}
new = {"k2": {"base64": "y", "mime_type": "image/jpeg"}}
merged = merge_viewed_images(existing, new)
assert set(merged.keys()) == {"k1", "k2"}
def test_empty_dict_clears(self):
existing = {"k1": {"base64": "x", "mime_type": "image/png"}}
assert merge_viewed_images(existing, {}) == {}
class TestThreadStateAnnotations:
"""Regression guards: ensure reducer wiring on ThreadState fields.
These tests protect against silent regressions where a field's
``Annotated[..., reducer]`` is reverted to a plain type, which would
re-introduce bugs even when the reducer functions themselves remain
correct.
"""
def test_todos_field_is_wired_to_merge_todos(self):
"""ThreadState.todos must use merge_todos.
Without this Annotated binding, LangGraph falls back to last-value-wins
behavior, and partial state updates that omit todos will silently clear
previously streamed values.
"""
hints = get_type_hints(ThreadState, include_extras=True)
todos_hint = hints["todos"]
assert hasattr(todos_hint, "__metadata__"), "ThreadState.todos must be Annotated with a reducer"
assert merge_todos in todos_hint.__metadata__, "ThreadState.todos must be wired to merge_todos reducer (see #3123)"
def test_artifacts_field_is_wired_to_merge_artifacts(self):
"""Sanity check that existing reducer wiring is preserved."""
hints = get_type_hints(ThreadState, include_extras=True)
assert merge_artifacts in hints["artifacts"].__metadata__