mirror of
https://github.com/bytedance/deer-flow.git
synced 2026-05-23 08:25:57 +00:00
fix(threads): load history messages from event store, immune to summarize
``get_thread_history`` and ``get_thread_state`` in Gateway mode read
messages from ``checkpoint.channel_values["messages"]``. After
SummarizationMiddleware runs mid-run, that list is rewritten in-place:
pre-summarize messages are dropped and a synthetic summary-as-human
message takes position 0. The frontend then renders a chat history that
starts with ``"Here is a summary of the conversation to date:..."``
instead of the user's original query, and all earlier turns are gone.
The event store (``RunEventStore``) is append-only and never rewritten,
so it retains the full transcript. This commit adds a helper
``_get_event_store_messages`` that loads the event store's message
stream and overrides ``values["messages"]`` in both endpoints; the
checkpoint fallback kicks in only when the event store is unavailable.
Behavior contract of the helper:
- **Full pagination.** ``list_messages`` returns the newest ``limit``
records when no cursor is given, so a fixed limit silently drops
older messages on long threads. The helper sizes the read from
``count_messages()`` and pages forward with ``after_seq`` cursors.
- **Copy-on-read.** Each content dict is copied before ``id`` is
patched so the live store object (``MemoryRunEventStore`` returns
references) is never mutated.
- **Stable ids.** Messages with ``id=None`` (human + tool_result,
which don't receive an id until checkpoint persistence) get a
deterministic ``uuid5(NAMESPACE_URL, f"{thread_id}:{seq}")`` so
React keys stay stable across requests. AI messages keep their
LLM-assigned ``lc_run--*`` ids.
- **Legacy ``Command`` repr sanitization.** Rows captured before the
``journal.py`` ``on_tool_end`` fix (previous commit) stored
``str(Command(update={'messages': [ToolMessage(content='X', ...)]}))``
as the tool_result content. ``_sanitize_legacy_command_repr``
regex-extracts the inner text so old threads render cleanly.
- **Inline feedback.** When loading the stream, the helper also pulls
``feedback_repo.list_by_thread_grouped`` and attaches ``run_id`` to
every message plus ``feedback`` to the final ``ai_message`` of each
run. This removes the frontend's need to fetch a second endpoint
and positional-index-map its way back to the right run. When the
feedback subsystem is unavailable, the ``feedback`` field is left
absent entirely so the frontend hides the button rather than
rendering it over a broken write path.
- **User context.** ``DbRunEventStore`` is user-scoped by default via
``resolve_user_id(AUTO)``. The helper relies on the ``@require_permission``
decorator having populated the user contextvar on both callers; the
docstring documents this dependency explicitly so nobody wires it
into a CLI or migration script without passing ``user_id=None``.
Real data verification against thread
``6d30913e-dcd4-41c8-8941-f66c716cf359``: checkpoint showed 12 messages
(summarize-corrupted), event store had 16. The original human message
``"最新伊美局势"`` was preserved as seq=1 in the event store and
correctly restored to position 0 in the helper output. Helper output
for AI messages was byte-identical to checkpoint for every overlapping
message; only tool_result ids differed (patched to uuid5) and the
legacy Command repr at seq=48 was sanitized.
Tests:
- ``test_thread_state_event_store.py`` — 18 tests covering
``_sanitize_legacy_command_repr`` (passthrough, single/double-quote
extraction, unparseable fallback), helper happy path (all message
types, stable uuid5, store non-mutation), multi-page pagination,
summarize regression (recovers pre-summarize messages), feedback
attachment (per-run, multi-run threads, repo failure graceful),
and dependency failure fallback to ``None``.
Docs:
- ``docs/superpowers/plans/2026-04-10-event-store-history.md`` — the
implementation plan this commit realizes, with Task 1 revised after
the evaluation findings (pagination, copy-on-read, Command wrap
already landed in journal.py, frontend feedback pagination in the
follow-up commit, Standard-mode follow-up noted).
- ``docs/superpowers/specs/2026-04-11-runjournal-history-evaluation.md``
— the Claude + second-opinion evaluation document that drove the
plan revisions (pagination bug, dict-mutation bug, feedback hidden
bug, Command bug).
- ``docs/superpowers/specs/2026-04-11-summarize-marker-design.md`` —
design for a follow-up PR that visually marks summarize events in
history, based on a verified ``adispatch_custom_event`` experiment
(``trace=False`` middleware nodes can still forward the Pregel task
config via explicit signature injection).
Scope: Gateway mode only (``make dev-pro``). Standard mode
(``make dev``) hits LangGraph Server directly and bypasses these
endpoints; the summarize symptom is still present there and is
tracked as a separate follow-up in the plan.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -13,6 +13,7 @@ matching the LangGraph Platform wire format expected by the
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import re
|
||||
import time
|
||||
import uuid
|
||||
from typing import Any
|
||||
@@ -21,7 +22,7 @@ from fastapi import APIRouter, HTTPException, Request
|
||||
from pydantic import BaseModel, Field, field_validator
|
||||
|
||||
from app.gateway.authz import require_permission
|
||||
from app.gateway.deps import get_checkpointer
|
||||
from app.gateway.deps import get_checkpointer, get_current_user, get_feedback_repo, get_run_event_store
|
||||
from app.gateway.utils import sanitize_log_param
|
||||
from deerflow.config.paths import Paths, get_paths
|
||||
from deerflow.runtime import serialize_channel_values
|
||||
@@ -402,6 +403,165 @@ async def get_thread(thread_id: str, request: Request) -> ThreadResponse:
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Event-store-backed message loader
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
_LEGACY_CMD_INNER_CONTENT_RE = re.compile(
|
||||
r"ToolMessage\(content=(?P<q>['\"])(?P<inner>.*?)(?P=q)",
|
||||
re.DOTALL,
|
||||
)
|
||||
|
||||
|
||||
def _sanitize_legacy_command_repr(content_field: Any) -> Any:
|
||||
"""Recover the inner ToolMessage text from a legacy ``str(Command(...))`` repr.
|
||||
|
||||
Runs captured before the ``on_tool_end`` fix in ``journal.py`` stored
|
||||
``str(Command(update={'messages':[ToolMessage(content='X', ...)]}))`` as the
|
||||
tool_result content. New runs store ``'X'`` directly. For legacy rows, try
|
||||
to extract ``'X'`` defensively; return the original string if extraction
|
||||
fails (still no worse than the checkpoint fallback for summarized threads).
|
||||
"""
|
||||
if not isinstance(content_field, str) or not content_field.startswith("Command(update="):
|
||||
return content_field
|
||||
match = _LEGACY_CMD_INNER_CONTENT_RE.search(content_field)
|
||||
return match.group("inner") if match else content_field
|
||||
|
||||
|
||||
async def _get_event_store_messages(request: Request, thread_id: str) -> list[dict] | None:
|
||||
"""Load the full message stream for ``thread_id`` from the event store.
|
||||
|
||||
The event store is append-only and unaffected by summarization — the
|
||||
checkpoint's ``channel_values["messages"]`` is rewritten in-place when the
|
||||
SummarizationMiddleware runs, which drops all pre-summarize messages. The
|
||||
event store retains the full transcript, so callers in Gateway mode should
|
||||
prefer it for rendering the conversation history.
|
||||
|
||||
In addition to the core message content, this helper attaches two extra
|
||||
fields to every returned dict:
|
||||
|
||||
- ``run_id``: the ``run_id`` of the event that produced this message.
|
||||
Always present.
|
||||
- ``feedback``: thumbs-up/down data. Present only on the **final
|
||||
``ai_message`` of each run** (matching the per-run feedback semantics
|
||||
of ``POST /api/threads/{id}/runs/{run_id}/feedback``). The frontend uses
|
||||
the presence of this field to decide whether to render the feedback
|
||||
button, which sidesteps the positional-index mapping bug that an
|
||||
out-of-band ``/messages`` fetch exhibited.
|
||||
|
||||
Behaviour contract:
|
||||
|
||||
- **Full pagination.** ``RunEventStore.list_messages`` returns the newest
|
||||
``limit`` records when no cursor is given, so a fixed limit silently
|
||||
drops older messages on long threads. We size the read from
|
||||
``count_messages()`` and then page forward with ``after_seq`` cursors.
|
||||
- **Copy-on-read.** Each content dict is copied before ``id`` is patched
|
||||
so the live store object is never mutated; ``MemoryRunEventStore``
|
||||
returns live references.
|
||||
- **Stable ids.** Messages with ``id=None`` (human + tool_result) receive
|
||||
a deterministic ``uuid5(NAMESPACE_URL, f"{thread_id}:{seq}")`` so React
|
||||
keys are stable across requests without altering stored data. AI messages
|
||||
retain their LLM-assigned ``lc_run--*`` ids.
|
||||
- **Legacy Command repr.** Rows captured before the ``journal.py``
|
||||
``on_tool_end`` fix stored ``str(Command(update={...}))`` as the tool
|
||||
result content. ``_sanitize_legacy_command_repr`` extracts the inner
|
||||
ToolMessage text.
|
||||
- **User context.** ``DbRunEventStore`` is user-scoped by default via
|
||||
``resolve_user_id(AUTO)`` in ``runtime/user_context.py``. This helper
|
||||
must run inside a request where ``@require_permission`` has populated
|
||||
the user contextvar. Both callers below are decorated appropriately.
|
||||
Do not call this helper from CLI or migration scripts without passing
|
||||
``user_id=None`` explicitly to the underlying store methods.
|
||||
|
||||
Returns ``None`` when the event store is not configured or has no message
|
||||
events for this thread, so callers fall back to checkpoint messages.
|
||||
"""
|
||||
try:
|
||||
event_store = get_run_event_store(request)
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
try:
|
||||
total = await event_store.count_messages(thread_id)
|
||||
except Exception:
|
||||
logger.exception("count_messages failed for thread %s", sanitize_log_param(thread_id))
|
||||
return None
|
||||
if not total:
|
||||
return None
|
||||
|
||||
# Batch by page_size to keep memory bounded for very long threads.
|
||||
page_size = 500
|
||||
collected: list[dict] = []
|
||||
after_seq: int | None = None
|
||||
while True:
|
||||
try:
|
||||
page = await event_store.list_messages(thread_id, limit=page_size, after_seq=after_seq)
|
||||
except Exception:
|
||||
logger.exception("list_messages failed for thread %s", sanitize_log_param(thread_id))
|
||||
return None
|
||||
if not page:
|
||||
break
|
||||
collected.extend(page)
|
||||
if len(page) < page_size:
|
||||
break
|
||||
next_cursor = page[-1].get("seq")
|
||||
if next_cursor is None or (after_seq is not None and next_cursor <= after_seq):
|
||||
break
|
||||
after_seq = next_cursor
|
||||
|
||||
# Build the message list; track the final ``ai_message`` index per run so
|
||||
# feedback can be attached at the right position (matches thread_runs.py).
|
||||
messages: list[dict] = []
|
||||
last_ai_per_run: dict[str, int] = {}
|
||||
for evt in collected:
|
||||
raw = evt.get("content")
|
||||
if not isinstance(raw, dict) or "type" not in raw:
|
||||
continue
|
||||
content = dict(raw)
|
||||
if content.get("id") is None:
|
||||
content["id"] = str(uuid.uuid5(uuid.NAMESPACE_URL, f"{thread_id}:{evt['seq']}"))
|
||||
if content.get("type") == "tool":
|
||||
content["content"] = _sanitize_legacy_command_repr(content.get("content"))
|
||||
run_id = evt.get("run_id")
|
||||
if run_id:
|
||||
content["run_id"] = run_id
|
||||
if evt.get("event_type") == "ai_message" and run_id:
|
||||
last_ai_per_run[run_id] = len(messages)
|
||||
messages.append(content)
|
||||
|
||||
if not messages:
|
||||
return None
|
||||
|
||||
# Attach feedback to the final ai_message of each run. If the feedback
|
||||
# subsystem is unavailable, leave the ``feedback`` field absent entirely
|
||||
# so the frontend hides the button rather than showing it over a broken
|
||||
# write path.
|
||||
feedback_available = False
|
||||
feedback_map: dict[str, dict] = {}
|
||||
try:
|
||||
feedback_repo = get_feedback_repo(request)
|
||||
user_id = await get_current_user(request)
|
||||
feedback_map = await feedback_repo.list_by_thread_grouped(thread_id, user_id=user_id)
|
||||
feedback_available = True
|
||||
except Exception:
|
||||
logger.exception("feedback lookup failed for thread %s", sanitize_log_param(thread_id))
|
||||
|
||||
if feedback_available:
|
||||
for run_id, idx in last_ai_per_run.items():
|
||||
fb = feedback_map.get(run_id)
|
||||
messages[idx]["feedback"] = (
|
||||
{
|
||||
"feedback_id": fb["feedback_id"],
|
||||
"rating": fb["rating"],
|
||||
"comment": fb.get("comment"),
|
||||
}
|
||||
if fb
|
||||
else None
|
||||
)
|
||||
|
||||
return messages
|
||||
|
||||
|
||||
@router.get("/{thread_id}/state", response_model=ThreadStateResponse)
|
||||
@require_permission("threads", "read", owner_check=True)
|
||||
async def get_thread_state(thread_id: str, request: Request) -> ThreadStateResponse:
|
||||
@@ -440,8 +600,15 @@ async def get_thread_state(thread_id: str, request: Request) -> ThreadStateRespo
|
||||
next_tasks = [t.name for t in tasks_raw if hasattr(t, "name")]
|
||||
tasks = [{"id": getattr(t, "id", ""), "name": getattr(t, "name", "")} for t in tasks_raw]
|
||||
|
||||
values = serialize_channel_values(channel_values)
|
||||
|
||||
# Prefer event-store messages: append-only, immune to summarization.
|
||||
es_messages = await _get_event_store_messages(request, thread_id)
|
||||
if es_messages is not None:
|
||||
values["messages"] = es_messages
|
||||
|
||||
return ThreadStateResponse(
|
||||
values=serialize_channel_values(channel_values),
|
||||
values=values,
|
||||
next=next_tasks,
|
||||
metadata=metadata,
|
||||
checkpoint={"id": checkpoint_id, "ts": str(metadata.get("created_at", ""))},
|
||||
@@ -559,6 +726,11 @@ async def get_thread_history(thread_id: str, body: ThreadHistoryRequest, request
|
||||
if body.before:
|
||||
config["configurable"]["checkpoint_id"] = body.before
|
||||
|
||||
# Load the full event-store message stream once; attach to the latest
|
||||
# checkpoint entry only (matching the prior semantics). The event store
|
||||
# is append-only and immune to summarization.
|
||||
es_messages = await _get_event_store_messages(request, thread_id)
|
||||
|
||||
entries: list[HistoryEntry] = []
|
||||
is_latest_checkpoint = True
|
||||
try:
|
||||
@@ -582,11 +754,17 @@ async def get_thread_history(thread_id: str, body: ThreadHistoryRequest, request
|
||||
if thread_data := channel_values.get("thread_data"):
|
||||
values["thread_data"] = thread_data
|
||||
|
||||
# Attach messages from checkpointer only for the latest checkpoint
|
||||
# Attach messages only to the latest checkpoint. Prefer the
|
||||
# event-store stream (complete and unaffected by summarization);
|
||||
# fall back to checkpoint channel_values when the event store is
|
||||
# unavailable or empty.
|
||||
if is_latest_checkpoint:
|
||||
messages = channel_values.get("messages")
|
||||
if messages:
|
||||
values["messages"] = serialize_channel_values({"messages": messages}).get("messages", [])
|
||||
if es_messages is not None:
|
||||
values["messages"] = es_messages
|
||||
else:
|
||||
messages = channel_values.get("messages")
|
||||
if messages:
|
||||
values["messages"] = serialize_channel_values({"messages": messages}).get("messages", [])
|
||||
is_latest_checkpoint = False
|
||||
|
||||
# Derive next tasks
|
||||
|
||||
Reference in New Issue
Block a user