fix(summarization): tag summary LLM calls nostream to stop phantom stream messages (#2503) (#3378)

* fix(summarization): tag summary LLM calls nostream to stop phantom stream messages (#2503)

The SummarizationMiddleware runs its summary LLM call inside a before_model
hook. Without a nostream tag the summary tokens were captured by LangGraph's
messages-tuple stream callback and broadcast to the frontend as a phantom AI
message.

Generate a dedicated summary model copy tagged with "nostream" (merged on top
of any existing tags such as "middleware:summarize" so RunJournal attribution
is preserved) and override _create_summary / _acreate_summary to invoke it
directly. This avoids temporarily swapping the shared self.model, which would
otherwise leak the RunnableBinding across concurrent runs and break parent
logic that inspects the raw model (profile / _get_ls_params).

Add regression tests covering nostream tagging, concurrent-run isolation, raw
model preservation, and existing-tag merge.

* fix(summarization): address nostream review feedback
This commit is contained in:
Ryker_Feng
2026-06-07 17:55:04 +08:00
committed by GitHub
parent 88e36d9686
commit d133b1119a
2 changed files with 185 additions and 1 deletions
@@ -9,8 +9,9 @@ from typing import Any, Protocol, override, runtime_checkable
from langchain.agents import AgentState
from langchain.agents.middleware import SummarizationMiddleware
from langchain_core.messages import AIMessage, AnyMessage, HumanMessage, RemoveMessage, ToolMessage
from langchain_core.messages import AIMessage, AnyMessage, HumanMessage, RemoveMessage, ToolMessage, get_buffer_string
from langgraph.config import get_config
from langgraph.constants import TAG_NOSTREAM
from langgraph.graph.message import REMOVE_ALL_MESSAGES
from langgraph.runtime import Runtime
@@ -116,6 +117,74 @@ class DeerFlowSummarizationMiddleware(SummarizationMiddleware):
self._preserve_recent_skill_count = max(0, preserve_recent_skill_count)
self._preserve_recent_skill_tokens = max(0, preserve_recent_skill_tokens)
self._preserve_recent_skill_tokens_per_skill = max(0, preserve_recent_skill_tokens_per_skill)
# The summary LLM call runs inside a LangGraph middleware hook, so its token
# stream would otherwise be captured by the messages-tuple stream callback and
# broadcast to the frontend as a phantom AI message. Tag a dedicated model copy
# with TAG_NOSTREAM so the streaming handler skips it.
# Keep self.model untagged so the parent's profile / ls_params inspection still works.
#
# Preserve any tags already bound on the model (e.g. "middleware:summarize" set in
# lead_agent/agent.py for RunJournal attribution): RunnableBinding.with_config does a
# shallow merge that would otherwise overwrite the existing tags list entirely.
existing_tags = list((getattr(self.model, "config", None) or {}).get("tags") or [])
merged_tags = [*existing_tags, TAG_NOSTREAM] if TAG_NOSTREAM not in existing_tags else existing_tags
self._summary_model = self.model.with_config(tags=merged_tags)
@override
def _create_summary(self, messages_to_summarize: list[AnyMessage]) -> str:
return self._summarize_with(messages_to_summarize)
@override
async def _acreate_summary(self, messages_to_summarize: list[AnyMessage]) -> str:
return await self._asummarize_with(messages_to_summarize)
def _summarize_with(self, messages_to_summarize: list[AnyMessage]) -> str:
"""Mirror the parent ``_create_summary`` but invoke the nostream-tagged model.
We do not swap ``self.model`` at the instance level: the agent/middleware is
cached and reused across concurrent runs, so a temporary swap would leak the
``RunnableBinding`` to other coroutines during ``await`` and break parent logic
that inspects the raw model (``profile`` / ``_get_ls_params``).
"""
if not messages_to_summarize:
return "No previous conversation history."
prompt = self._build_summary_prompt(messages_to_summarize)
if prompt is None:
return "Previous conversation was too long to summarize."
try:
response = self._summary_model.invoke(
prompt,
config={"metadata": {"lc_source": "summarization"}},
)
return response.text.strip()
except Exception as e:
return f"Error generating summary: {e!s}"
async def _asummarize_with(self, messages_to_summarize: list[AnyMessage]) -> str:
"""Async counterpart of :meth:`_summarize_with` using the nostream model."""
if not messages_to_summarize:
return "No previous conversation history."
prompt = self._build_summary_prompt(messages_to_summarize)
if prompt is None:
return "Previous conversation was too long to summarize."
try:
response = await self._summary_model.ainvoke(
prompt,
config={"metadata": {"lc_source": "summarization"}},
)
return response.text.strip()
except Exception as e:
return f"Error generating summary: {e!s}"
def _build_summary_prompt(self, messages_to_summarize: list[AnyMessage]) -> str | None:
"""Build the summary prompt, returning ``None`` when trimming leaves nothing."""
trimmed_messages = self._trim_messages_for_summary(messages_to_summarize)
if not trimmed_messages:
return None
# Format messages to avoid token inflation from metadata when str() is called on
# message objects.
formatted_messages = get_buffer_string(trimmed_messages)
return self.summary_prompt.format(messages=formatted_messages).rstrip()
def before_model(self, state: AgentState, runtime: Runtime) -> dict | None:
return self._maybe_summarize(state, runtime)