From 3c2b60aaaeec3fe7d6066f4703744ba480dbcdda Mon Sep 17 00:00:00 2001 From: Xun Date: Mon, 8 Jun 2026 23:12:25 +0800 Subject: [PATCH] fix(threads): assign new checkpoint ID in update_thread_state (#2391) * async * add test * test(threads): assert aput preserves endpoint-assigned checkpoint id Confirm the update_thread_state fix is real, not a no-op: all supported savers (InMemorySaver, AsyncSqliteSaver, AsyncPostgresSaver) persist and echo checkpoint["id"] verbatim rather than minting their own. Add assertions that each POST /state response's checkpoint_id round-tripped into persisted history and kept its uuid6 time-ordering through aput, and document the verified contract in the router. Co-Authored-By: Claude Opus 4.8 (1M context) --------- Co-authored-by: Claude Opus 4.8 (1M context) --- backend/app/gateway/routers/threads.py | 20 ++++++++--- backend/tests/test_threads_router.py | 49 ++++++++++++++++++++++++++ 2 files changed, 65 insertions(+), 4 deletions(-) diff --git a/backend/app/gateway/routers/threads.py b/backend/app/gateway/routers/threads.py index e6f4fa2ae..fa8de61ff 100644 --- a/backend/app/gateway/routers/threads.py +++ b/backend/app/gateway/routers/threads.py @@ -17,7 +17,7 @@ import uuid from typing import Any from fastapi import APIRouter, HTTPException, Request -from langgraph.checkpoint.base import empty_checkpoint +from langgraph.checkpoint.base import empty_checkpoint, uuid6 from pydantic import BaseModel, Field, field_validator from app.gateway.authz import require_permission @@ -536,9 +536,21 @@ async def update_thread_state(thread_id: str, body: ThreadStateUpdateRequest, re metadata["step"] = metadata.get("step", 0) + 1 metadata["writes"] = {body.as_node: body.values} + # Assign a new checkpoint ID so aput performs an INSERT rather than an + # in-place REPLACE of the existing row. Use uuid6 (time-ordered) rather + # than uuid4 (random) so the new ID is always lexicographically greater + # than the previous one — LangGraph's checkpointers determine the "latest" + # checkpoint by max(checkpoint_ids) string order, matching the uuid6 epoch. + checkpoint["id"] = str(uuid6()) + # aput requires checkpoint_ns in the config — use the same config used for the - # read (which always includes checkpoint_ns=""). Do NOT include checkpoint_id - # so that aput generates a fresh checkpoint ID for the new snapshot. + # read (which always includes checkpoint_ns=""). The fresh checkpoint ID is + # assigned above via checkpoint["id"]; keep checkpoint_id out of the config so + # the write is keyed by the new checkpoint payload rather than the prior read. + # All supported savers (InMemorySaver, AsyncSqliteSaver, AsyncPostgresSaver) + # persist and echo back checkpoint["id"] verbatim — none mint their own — so + # the new_config below carries the uuid6 we assigned here. (Regression-locked + # by test_update_thread_state_inserts_new_checkpoint_each_call.) write_config: dict[str, Any] = { "configurable": { "thread_id": thread_id, @@ -557,7 +569,7 @@ async def update_thread_state(thread_id: str, body: ThreadStateUpdateRequest, re # Sync title changes through the ThreadMetaStore abstraction so /threads/search # reflects them immediately in both sqlite and memory backends. - if body.values and "title" in body.values: + if thread_store and body.values and "title" in body.values: new_title = body.values["title"] if new_title: # Skip empty strings and None try: diff --git a/backend/tests/test_threads_router.py b/backend/tests/test_threads_router.py index 9e37f3c86..f6f6adcef 100644 --- a/backend/tests/test_threads_router.py +++ b/backend/tests/test_threads_router.py @@ -485,3 +485,52 @@ def test_search_threads_succeeds_with_valid_metadata() -> None: response = client.post("/api/threads/search", json={"metadata": {"env": "prod"}}) assert response.status_code == 200 + + +# ── update_thread_state: each call inserts a new checkpoint (regression) ─────── + + +def test_update_thread_state_inserts_new_checkpoint_each_call() -> None: + """Each ``POST /state`` must INSERT a distinct, time-ordered checkpoint. + + Regression for the in-place REPLACE bug: before the fix the new + checkpoint reused the previous checkpoint["id"], so InMemorySaver/SQLite + overwrote the existing row and history never grew. The fix assigns a + fresh uuid6 to checkpoint["id"] before aput. + """ + app, _store, checkpointer = _build_thread_app() + + with TestClient(app) as client: + created = client.post("/api/threads", json={"metadata": {}}) + assert created.status_code == 200, created.text + thread_id = created.json()["thread_id"] + + r1 = client.post(f"/api/threads/{thread_id}/state", json={"values": {"title": "First"}}) + assert r1.status_code == 200, r1.text + r2 = client.post(f"/api/threads/{thread_id}/state", json={"values": {"title": "Second"}}) + assert r2.status_code == 200, r2.text + + import asyncio + + async def _collect(): + return [cp async for cp in checkpointer.alist({"configurable": {"thread_id": thread_id}})] + + history = asyncio.run(_collect()) + + # 1 empty checkpoint from create_thread + 1 per update call. + assert len(history) >= 3, f"expected >=3 checkpoints, got {len(history)}" + + ids = [cp.config["configurable"]["checkpoint_id"] for cp in history] + assert len(ids) == len(set(ids)), f"duplicate checkpoint ids: {ids}" + # alist() returns newest-first; uuid6 is time-ordered so newest > oldest. + assert ids[0] > ids[-1], f"checkpoint ids not time-ordered (uuid4 instead of uuid6?): {ids}" + + # aput must PRESERVE the endpoint-assigned checkpoint["id"], not mint its own + # and discard the payload's. If it generated a fresh id internally the fix + # would be a no-op (the bug would never have existed). Assert the id returned + # in each response round-tripped into the persisted history, and that the two + # update writes kept the endpoint's uuid6 time-ordering through aput. + resp_ids = [r1.json()["checkpoint_id"], r2.json()["checkpoint_id"]] + assert all(cid is not None for cid in resp_ids), f"response missing checkpoint_id: {resp_ids}" + assert set(resp_ids) <= set(ids), f"aput discarded endpoint-assigned id: returned {resp_ids}, stored {ids}" + assert resp_ids[1] > resp_ids[0], f"endpoint-assigned uuid6 not preserved/ordered through aput: {resp_ids}"