fix(#3189): prevent write_file streaming timeout on long reports (#3195)

* fix(#3189): prevent write_file streaming timeout on long reports

Adds a layered defense against StreamChunkTimeoutError caused by oversized
single-shot write_file tool calls:

- factory: default stream_chunk_timeout to 240s for OpenAI-compatible
  clients (overridable via ModelConfig.stream_chunk_timeout in config.yaml)
- sandbox/tools: server-side 80 KB length guard on non-append write_file
  calls (configurable via DEERFLOW_WRITE_FILE_MAX_BYTES env var, 0 disables);
  rejects oversized payloads with a structured error pointing the model at
  str_replace or append=True
- middleware: classify StreamChunkTimeoutError as transient but cap retries
  at 1 via per-exception _RETRY_BUDGET_OVERRIDES (same-payload retry on a
  chunk-gap timeout buffers the same way upstream; full 3-attempt loop
  would stack 6-12 min of dead air)
- middleware: surface an actionable user-facing message for stream-drop
  exceptions instead of leaking the raw langchain stack
- prompts: add a routing-style File Editing Workflow hint to both lead_agent
  and general_purpose subagent prompts, pointing the model at str_replace
  for incremental edits (mirrors Claude Code's Edit / Codex's apply_patch)
- tests: behavioural coverage for size guard, retry budget override,
  stream-drop user message, factory default injection

Refs #3189

* fix(#3189): drop stream_chunk_timeout for non-OpenAI providers

Address CR feedback on PR #3195:

- factory: pop `stream_chunk_timeout` from kwargs for any model_use_path other than `langchain_openai:ChatOpenAI` instead of returning early. `ModelConfig.stream_chunk_timeout` is part of the shared schema, so a user-supplied value on a non-OpenAI provider would otherwise be forwarded to its constructor and raise `TypeError: unexpected keyword argument`.

- factory: rewrite docstring to describe the actual `exclude_none=True` behaviour (explicit null is excluded and falls back to the default) instead of the misleading "None falling out via exclude_none=True keeps its value".

- tests: add regression coverage asserting the kwarg is stripped before reaching a non-OpenAI provider's constructor.

Refs: bytedance#3189

* fix(#3189): restrict stream-drop user copy to StreamChunkTimeoutError only

Per CR on #3195: narrow _STREAM_DROP_EXCEPTIONS to StreamChunkTimeoutError. Generic httpx RemoteProtocolError / ReadError fall back to the standard 'temporarily unavailable' copy, since they routinely fire on transient network blips where the 'split the output' guidance is misleading. Retry/backoff classification is unchanged — both remain transient/retriable. Tests updated to reflect new copy, plus a symmetric regression test for ReadError.

---------

Co-authored-by: Willem Jiang <willem.jiang@gmail.com>
This commit is contained in:
Huixin615
2026-06-07 17:47:11 +08:00
committed by GitHub
parent 268fdd6968
commit 88e36d9686
10 changed files with 677 additions and 4 deletions
@@ -542,6 +542,14 @@ combined with a FastAPI gateway for REST API access [citation:FastAPI](https://f
{subagent_reminder}- Skill First: Always load the relevant skill before starting **complex** tasks. {subagent_reminder}- Skill First: Always load the relevant skill before starting **complex** tasks.
- Progressive Loading: Load resources incrementally as referenced in skills - Progressive Loading: Load resources incrementally as referenced in skills
- Output Files: Final deliverables must be in `/mnt/user-data/outputs` - Output Files: Final deliverables must be in `/mnt/user-data/outputs`
- File Editing Workflow: When revising an existing file, prefer
`str_replace` over `write_file` — it sends only the diff and avoids
re-emitting the whole file (mirrors Claude Code's Edit and Codex's
apply_patch). When writing long new content from scratch, split it
into sections: the first `write_file` call creates the file, then use
`write_file` with append=True to extend it section by section. This
keeps each tool call small and avoids mid-stream chunk-gap timeouts
on oversized single-shot writes. (See issue #3189.)
- Clarity: Be direct and helpful, avoid unnecessary meta-commentary - Clarity: Be direct and helpful, avoid unnecessary meta-commentary
- Including Images and Mermaid: Images and Mermaid diagrams are always welcomed in the Markdown format, and you're encouraged to use `![Image Description](image_path)\n\n` or "```mermaid" to display images in response or Markdown files - Including Images and Mermaid: Images and Mermaid diagrams are always welcomed in the Markdown format, and you're encouraged to use `![Image Description](image_path)\n\n` or "```mermaid" to display images in response or Markdown files
- Multi-task: Better utilize parallel tool calling to call multiple tools at one time for better performance - Multi-task: Better utilize parallel tool calling to call multiple tools at one time for better performance
@@ -62,6 +62,41 @@ _AUTH_PATTERNS = (
"未授权", "未授权",
) )
# Per-exception retry budget overrides.
#
# Some transient errors are retriable in principle but expensive to retry at
# the default budget. StreamChunkTimeoutError in particular fires after the
# upstream provider has already stalled for `stream_chunk_timeout` seconds
# (typically 120-240s); a full 3-attempt loop can therefore stack 6-12 minutes
# of dead air before surfacing the failure to the user. We keep exactly one
# retry (cheap reconnect that catches genuine transient TCP blips) and then
# fail fast — the same buffered payload is overwhelmingly likely to fail
# again at the upstream provider for the same reason.
#
# Keys are exception class *names* (not classes) so we don't introduce
# import-time coupling on optional dependencies like langchain-openai. The
# value is the absolute max attempt count, NOT additional retries — so a
# value of 2 means "1 first attempt + 1 retry" (the CR-requested
# "keep one retry" behavior).
_RETRY_BUDGET_OVERRIDES: dict[str, int] = {
"StreamChunkTimeoutError": 2,
}
# Exception class names that indicate the upstream stream-chunk watchdog
# fired because the model stalled mid-flight. These deserve a more specific
# user-facing message than the generic "temporarily unavailable" copy,
# because the typical root cause is a long tool-call serialization stalling
# the upstream stream — and the most actionable advice we can give the user
# is "ask for a shorter / split output" rather than "wait and retry".
# Generic connection drops (httpx RemoteProtocolError / ReadError) are
# intentionally excluded: they routinely fire on transient network blips
# with normal payloads, where the "split the work" guidance is misleading.
_STREAM_DROP_EXCEPTIONS: frozenset[str] = frozenset(
{
"StreamChunkTimeoutError",
}
)
class LLMErrorHandlingMiddleware(AgentMiddleware[AgentState]): class LLMErrorHandlingMiddleware(AgentMiddleware[AgentState]):
"""Retry transient LLM errors and surface graceful assistant messages.""" """Retry transient LLM errors and surface graceful assistant messages."""
@@ -83,6 +118,18 @@ class LLMErrorHandlingMiddleware(AgentMiddleware[AgentState]):
self._circuit_state = "closed" self._circuit_state = "closed"
self._circuit_probe_in_flight = False self._circuit_probe_in_flight = False
def _max_attempts_for(self, exc: BaseException) -> int:
"""Return the effective max attempt count for this exception.
Falls back to `self.retry_max_attempts` unless the exception class name
appears in the per-exception override table.
"""
override = _RETRY_BUDGET_OVERRIDES.get(type(exc).__name__)
if override is None:
return self.retry_max_attempts
return min(override, self.retry_max_attempts)
def _check_circuit(self) -> bool: def _check_circuit(self) -> bool:
"""Returns True if circuit is OPEN (fast fail), False otherwise.""" """Returns True if circuit is OPEN (fast fail), False otherwise."""
with self._circuit_lock: with self._circuit_lock:
@@ -153,6 +200,7 @@ class LLMErrorHandlingMiddleware(AgentMiddleware[AgentState]):
"InternalServerError", "InternalServerError",
"ReadError", # httpx.ReadError: connection dropped mid-stream "ReadError", # httpx.ReadError: connection dropped mid-stream
"RemoteProtocolError", # httpx: server closed connection unexpectedly "RemoteProtocolError", # httpx: server closed connection unexpectedly
"StreamChunkTimeoutError", # langchain-openai: chunk gap exceeded stream_chunk_timeout
}: }:
return True, "transient" return True, "transient"
if status_code in _RETRIABLE_STATUS_CODES: if status_code in _RETRIABLE_STATUS_CODES:
@@ -202,6 +250,20 @@ class LLMErrorHandlingMiddleware(AgentMiddleware[AgentState]):
if reason == "auth": if reason == "auth":
return "The configured LLM provider rejected the request because authentication or access is invalid. Please check the provider credentials and try again." return "The configured LLM provider rejected the request because authentication or access is invalid. Please check the provider credentials and try again."
if reason in {"busy", "transient"}: if reason in {"busy", "transient"}:
# Stream-drop failures (chunk-gap timeout, peer-closed connection,
# raw read error) almost always point at a single oversized
# tool-call payload — the model spent so long serializing JSON
# arguments that the upstream provider buffered and the stream
# gap exceeded `stream_chunk_timeout`. Surfacing this distinct
# cause lets the user split or shorten their next request
# instead of helplessly retrying the same prompt.
if type(exc).__name__ in _STREAM_DROP_EXCEPTIONS:
return (
"The model's streaming response was interrupted before it could "
"finish. This usually happens when a single response or tool call "
"is very large — please ask the assistant to split the work into "
"smaller steps, or shorten the requested output, and try again."
)
return "The configured LLM provider is temporarily unavailable after multiple retries. Please wait a moment and continue the conversation." return "The configured LLM provider is temporarily unavailable after multiple retries. Please wait a moment and continue the conversation."
return f"LLM request failed: {detail}" return f"LLM request failed: {detail}"
@@ -259,7 +321,8 @@ class LLMErrorHandlingMiddleware(AgentMiddleware[AgentState]):
raise raise
except Exception as exc: except Exception as exc:
retriable, reason = self._classify_error(exc) retriable, reason = self._classify_error(exc)
if retriable and attempt < self.retry_max_attempts: max_attempts = self._max_attempts_for(exc)
if retriable and attempt < max_attempts:
wait_ms = self._build_retry_delay_ms(attempt, exc) wait_ms = self._build_retry_delay_ms(attempt, exc)
logger.warning( logger.warning(
"Transient LLM error on attempt %d/%d; retrying in %dms: %s", "Transient LLM error on attempt %d/%d; retrying in %dms: %s",
@@ -310,7 +373,8 @@ class LLMErrorHandlingMiddleware(AgentMiddleware[AgentState]):
raise raise
except Exception as exc: except Exception as exc:
retriable, reason = self._classify_error(exc) retriable, reason = self._classify_error(exc)
if retriable and attempt < self.retry_max_attempts: max_attempts = self._max_attempts_for(exc)
if retriable and attempt < max_attempts:
wait_ms = self._build_retry_delay_ms(attempt, exc) wait_ms = self._build_retry_delay_ms(attempt, exc)
logger.warning( logger.warning(
"Transient LLM error on attempt %d/%d; retrying in %dms: %s", "Transient LLM error on attempt %d/%d; retrying in %dms: %s",
@@ -32,6 +32,16 @@ class ModelConfig(BaseModel):
description="Extra settings to be passed to the model when thinking is disabled", description="Extra settings to be passed to the model when thinking is disabled",
) )
supports_vision: bool = Field(default_factory=lambda: False, description="Whether the model supports vision/image inputs") supports_vision: bool = Field(default_factory=lambda: False, description="Whether the model supports vision/image inputs")
stream_chunk_timeout: float | None = Field(
default=None,
description=(
"Maximum seconds to wait between successive streaming chunks before "
"langchain-openai raises StreamChunkTimeoutError. None means use the "
"factory default (240s for OpenAI-compatible clients). Tune higher for "
"reasoning models with long thinking pauses; lower for latency-sensitive "
"interactive endpoints. Has no effect on non-OpenAI-compatible providers."
),
)
thinking: dict | None = Field( thinking: dict | None = Field(
default_factory=lambda: None, default_factory=lambda: None,
description=( description=(
@@ -47,6 +47,38 @@ def _enable_stream_usage_by_default(model_use_path: str, model_settings_from_con
model_settings_from_config["stream_usage"] = True model_settings_from_config["stream_usage"] = True
# Default chunk-gap budget for OpenAI-compatible streaming responses.
#
# langchain-openai raises ``StreamChunkTimeoutError`` after this many seconds
# without receiving a chunk. Its own default is 60s, which is too aggressive for
# reasoning models (DeepSeek-R1, Doubao-thinking, GPT-5) whose first chunk can
# legitimately take 90~150s. We default to 240s so the streaming layer rarely
# trips on long thinking pauses; the LLMErrorHandlingMiddleware still retries
# (budget=2) if a real stall happens. Users can override per-model in config.yaml.
_DEFAULT_STREAM_CHUNK_TIMEOUT_SECONDS: float = 240.0
def _apply_stream_chunk_timeout_default(model_use_path: str, model_settings_from_config: dict) -> None:
"""Inject a generous ``stream_chunk_timeout`` for OpenAI-compatible clients.
The ``stream_chunk_timeout`` kwarg is specific to ``langchain_openai:ChatOpenAI``
and is rejected by other providers' constructors as an unexpected keyword
argument. Behaviour:
* OpenAI-compatible path: an explicit value in ``config.yaml`` is preserved.
An explicit ``null`` is dropped upstream by ``model_dump(exclude_none=True)``
and therefore treated as "unset", so the default is injected.
* Non-OpenAI path: drop the key so it is never forwarded to an incompatible
constructor (which would raise ``TypeError: unexpected keyword argument``).
"""
if model_use_path != "langchain_openai:ChatOpenAI":
model_settings_from_config.pop("stream_chunk_timeout", None)
return
if "stream_chunk_timeout" in model_settings_from_config:
return
model_settings_from_config["stream_chunk_timeout"] = _DEFAULT_STREAM_CHUNK_TIMEOUT_SECONDS
def create_chat_model(name: str | None = None, thinking_enabled: bool = False, *, app_config: AppConfig | None = None, attach_tracing: bool = True, **kwargs) -> BaseChatModel: def create_chat_model(name: str | None = None, thinking_enabled: bool = False, *, app_config: AppConfig | None = None, attach_tracing: bool = True, **kwargs) -> BaseChatModel:
"""Create a chat model instance from the config. """Create a chat model instance from the config.
@@ -128,6 +160,7 @@ def create_chat_model(name: str | None = None, thinking_enabled: bool = False, *
model_settings_from_config.pop("reasoning_effort", None) model_settings_from_config.pop("reasoning_effort", None)
_enable_stream_usage_by_default(model_config.use, model_settings_from_config) _enable_stream_usage_by_default(model_config.use, model_settings_from_config)
_apply_stream_chunk_timeout_default(model_config.use, model_settings_from_config)
# For Codex Responses API models: map thinking mode to reasoning_effort # For Codex Responses API models: map thinking mode to reasoning_effort
from deerflow.models.openai_codex_provider import CodexChatModel from deerflow.models.openai_codex_provider import CodexChatModel
@@ -1,4 +1,5 @@
import asyncio import asyncio
import os
import posixpath import posixpath
import re import re
import shlex import shlex
@@ -43,6 +44,16 @@ _MAX_GLOB_MAX_RESULTS = 1000
_DEFAULT_GREP_MAX_RESULTS = 100 _DEFAULT_GREP_MAX_RESULTS = 100
_MAX_GREP_MAX_RESULTS = 500 _MAX_GREP_MAX_RESULTS = 500
_DEFAULT_WRITE_FILE_ERROR_MAX_CHARS = 2000 _DEFAULT_WRITE_FILE_ERROR_MAX_CHARS = 2000
# Maximum bytes accepted in a single non-append write_file call (issue #3189).
# Oversized single-shot writes correlate with LLM streaming chunk-gap timeouts
# because the tool-call JSON payload (which the model must emit as one
# continuous stream) grows past the safe window. 80 KB ≈ 20K tokens, a
# comfortable headroom under the factory-default 240s stream_chunk_timeout.
# Deployments can override via env var DEERFLOW_WRITE_FILE_MAX_BYTES; set to
# 0 (or negative) to disable the guard entirely.
_WRITE_FILE_CONTENT_MAX_BYTES = 80 * 1024
_WRITE_FILE_MAX_BYTES_ENV = "DEERFLOW_WRITE_FILE_MAX_BYTES"
_LOCAL_BASH_CWD_COMMANDS = {"cd", "pushd"} _LOCAL_BASH_CWD_COMMANDS = {"cd", "pushd"}
_LOCAL_BASH_COMMAND_WRAPPERS = {"command", "builtin"} _LOCAL_BASH_COMMAND_WRAPPERS = {"command", "builtin"}
_LOCAL_BASH_COMMAND_PREFIX_KEYWORDS = {"!", "{", "case", "do", "elif", "else", "for", "if", "select", "then", "time", "until", "while"} _LOCAL_BASH_COMMAND_PREFIX_KEYWORDS = {"!", "{", "case", "do", "elif", "else", "for", "if", "select", "then", "time", "until", "while"}
@@ -1671,6 +1682,23 @@ async def _read_file_tool_async(
read_file_tool.coroutine = _read_file_tool_async read_file_tool.coroutine = _read_file_tool_async
def _effective_write_file_max_bytes() -> int:
"""Return the active size cap for non-append write_file calls.
Reads ``DEERFLOW_WRITE_FILE_MAX_BYTES`` at call time (not import time)
so tests and runtime tweaks take effect without restart. Falls back to
the default on missing/malformed values. A non-positive value disables
the guard.
"""
raw = os.environ.get(_WRITE_FILE_MAX_BYTES_ENV)
if raw is None:
return _WRITE_FILE_CONTENT_MAX_BYTES
try:
return int(raw)
except ValueError:
return _WRITE_FILE_CONTENT_MAX_BYTES
@tool("write_file", parse_docstring=True) @tool("write_file", parse_docstring=True)
def write_file_tool( def write_file_tool(
runtime: Runtime, runtime: Runtime,
@@ -1679,14 +1707,47 @@ def write_file_tool(
content: str, content: str,
append: bool = False, append: bool = False,
) -> str: ) -> str:
"""Write text content to a file. By default this overwrites the target file; set append to true to add content to the end without replacing existing content. """Write text content to a file. By default this overwrites the target file; set append=True to add content to the end without replacing existing content.
SIZE POLICY (issue #3189):
A single non-append write_file call must not exceed 80 KB of UTF-8 content.
Oversized single-shot writes correlate with LLM streaming chunk-gap
timeouts because the tool-call JSON payload — which the model must emit as
one continuous stream — grows past the safe window. For larger documents,
use ONE of these strategies (write_file rejects oversized payloads with an
actionable error):
1. INCREMENTAL EDIT (preferred for revisions): after the initial write,
use `str_replace` to surgically update sections. This is the same
pattern Claude Code's Write+Edit and OpenAI Codex's apply_patch use,
and keeps each tool call's payload small.
2. APPEND-IN-CHUNKS (for new long-form content): split the document into
sections, each well under 80 KB. First call uses append=False to
create the file; subsequent calls use append=True. The 80 KB cap does
NOT apply to append=True calls.
Operators can override the cap via env var `DEERFLOW_WRITE_FILE_MAX_BYTES`
(0 disables the guard entirely). Raising it risks streaming timeouts.
Args: Args:
description: Explain why you are writing to this file in short words. ALWAYS PROVIDE THIS PARAMETER FIRST. description: Explain why you are writing to this file in short words. ALWAYS PROVIDE THIS PARAMETER FIRST.
path: The **absolute** path to the file to write to. ALWAYS PROVIDE THIS PARAMETER SECOND. path: The **absolute** path to the file to write to. ALWAYS PROVIDE THIS PARAMETER SECOND.
content: The content to write to the file. ALWAYS PROVIDE THIS PARAMETER THIRD. content: The content to write to the file. ALWAYS PROVIDE THIS PARAMETER THIRD.
append: Whether to append content to the end of the file instead of overwriting it. Defaults to false. append: Whether to append content to the end of the file instead of overwriting it. Defaults to False.
""" """
if not append:
max_bytes = _effective_write_file_max_bytes()
if max_bytes > 0:
content_bytes = len(content.encode("utf-8"))
if content_bytes > max_bytes:
return (
f"Error: write_file content ({content_bytes} bytes) exceeds the "
f"{max_bytes}-byte single-call limit. Split the content into smaller "
"pieces: either (a) write the first section now, then use `str_replace` "
"for further edits, or (b) call write_file again with append=True "
"carrying the next section. See SIZE POLICY in the tool docstring "
"or issue #3189 for the rationale."
)
try: try:
requested_path = path requested_path = path
sandbox = ensure_sandbox_initialized(runtime) sandbox = ensure_sandbox_initialized(runtime)
@@ -24,6 +24,17 @@ Do NOT use for simple, single-step operations.""",
- Do NOT ask for clarification - work with the information provided - Do NOT ask for clarification - work with the information provided
</guidelines> </guidelines>
<file_editing_workflow>
When revising an existing file, prefer `str_replace` over `write_file` —
it sends only the diff and avoids re-emitting the whole file (mirrors
Claude Code's Edit and Codex's apply_patch). When writing long new
content from scratch, split it into sections: the first `write_file`
call creates the file, then use `write_file` with append=True to extend
it section by section. This keeps each tool call small and avoids
mid-stream chunk-gap timeouts on oversized single-shot writes.
(See issue #3189.)
</file_editing_workflow>
<output_format> <output_format>
When you complete the task, provide: When you complete the task, provide:
1. A brief summary of what was accomplished 1. A brief summary of what was accomplished
+41
View File
@@ -373,3 +373,44 @@ def test_warm_enabled_skills_cache_logs_on_timeout(monkeypatch, caplog):
assert warmed is False assert warmed is False
assert "Timed out waiting" in caplog.text assert "Timed out waiting" in caplog.text
def test_system_prompt_template_contains_file_editing_workflow_rule():
"""The File Editing Workflow rule must remain in the system prompt
template so the planner picks the right tool (str_replace for edits,
write_file + append=True for long new content) and avoids mid-stream
chunk-gap timeouts on oversized single-shot writes. See issue #3189
/ PR #3195.
We deliberately do NOT assert on any specific byte / word threshold
here — that would re-introduce the docstring-lock-in pattern the
reviewers flagged. The numeric cap lives in the server-side guard
(see test_write_file_tool_size_guard.py), which is where it belongs.
"""
template = prompt_module.SYSTEM_PROMPT_TEMPLATE
# Section anchor — keeps the rule discoverable in the assembled prompt.
assert "File Editing Workflow" in template
# Behavioural anchors — if either of these disappears, the model will
# silently regress to single-shot write_file calls for long content.
assert "str_replace" in template
assert "append=True" in template
def test_system_prompt_template_preserves_placeholders():
"""Ensure the chunking-rule edit didn't drop any f-string placeholder
consumed by apply_prompt_template(). A missing placeholder would
crash prompt rendering at runtime.
"""
template = prompt_module.SYSTEM_PROMPT_TEMPLATE
for ph in (
"{agent_name}",
"{soul}",
"{self_update_section}",
"{subagent_thinking}",
"{skills_section}",
"{deferred_tools_section}",
"{subagent_section}",
"{acp_section}",
"{subagent_reminder}",
):
assert ph in template, f"placeholder {ph} accidentally removed"
@@ -373,7 +373,11 @@ def test_sync_read_error_triggers_retry_loop(monkeypatch: pytest.MonkeyPatch) ->
result = middleware.wrap_model_call(SimpleNamespace(), handler) result = middleware.wrap_model_call(SimpleNamespace(), handler)
assert isinstance(result, AIMessage) assert isinstance(result, AIMessage)
# ReadError is a generic connection drop, not a chunk-gap timeout, so
# it must fall back to the legacy transient copy rather than the
# specialized "split the work into smaller steps" guidance (#3195 CR).
assert "temporarily unavailable" in result.content assert "temporarily unavailable" in result.content
assert "streaming response was interrupted" not in result.content
assert attempts == 3 # exhausted all retries assert attempts == 3 # exhausted all retries
assert len(waits) == 2 # slept between attempts 1→2 and 2→3 assert len(waits) == 2 # slept between attempts 1→2 and 2→3
@@ -397,7 +401,11 @@ async def test_async_read_error_triggers_retry_loop(monkeypatch: pytest.MonkeyPa
result = await middleware.awrap_model_call(SimpleNamespace(), handler) result = await middleware.awrap_model_call(SimpleNamespace(), handler)
assert isinstance(result, AIMessage) assert isinstance(result, AIMessage)
# ReadError is a generic connection drop, not a chunk-gap timeout, so
# it must fall back to the legacy transient copy rather than the
# specialized "split the work into smaller steps" guidance (#3195 CR).
assert "temporarily unavailable" in result.content assert "temporarily unavailable" in result.content
assert "streaming response was interrupted" not in result.content
assert attempts == 3 # exhausted all retries assert attempts == 3 # exhausted all retries
assert len(waits) == 2 # slept between attempts 1→2 and 2→3 assert len(waits) == 2 # slept between attempts 1→2 and 2→3
@@ -462,3 +470,211 @@ async def test_async_circuit_breaker_trips_and_recovers(monkeypatch: pytest.Monk
assert result.content == "Success" assert result.content == "Success"
assert middleware._circuit_failure_count == 0 # RESET assert middleware._circuit_failure_count == 0 # RESET
assert middleware._check_circuit() is False assert middleware._check_circuit() is False
class _StreamChunkTimeoutError(Exception):
"""Local stand-in for langchain_openai's StreamChunkTimeoutError —
matched by class name, no langchain-openai import needed (mirrors
how this file already stubs httpx.ReadError / RemoteProtocolError).
"""
_StreamChunkTimeoutError.__name__ = "StreamChunkTimeoutError"
def test_classify_error_stream_chunk_timeout_is_retriable() -> None:
"""StreamChunkTimeoutError must be classified as transient/retriable."""
middleware = _build_middleware()
exc = _StreamChunkTimeoutError("No streaming chunk received for 120.0s (model=mimo-v2.5, chunks_received=58).")
exc.__class__.__name__ = "StreamChunkTimeoutError"
retriable, reason = middleware._classify_error(exc)
assert retriable is True
assert reason == "transient"
def test_sync_stream_chunk_timeout_retries_once(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Sync handler raising StreamChunkTimeoutError is retried exactly once —
the per-exception override caps it at 2 total attempts (1 first call + 1
retry) even when retry_max_attempts=3.
Same-payload retry on a chunk-gap timeout buffers the same way upstream;
a full 3-attempt loop would stack 6-12 minutes of dead air before
surfacing failure. We keep one cheap reconnect for genuine transient TCP
blips, then surface the failure so the model can re-plan on its next turn.
"""
middleware = _build_middleware(
retry_max_attempts=3,
retry_base_delay_ms=10,
retry_cap_delay_ms=10,
)
attempts = 0
waits: list[float] = []
monkeypatch.setattr("time.sleep", lambda d: waits.append(d))
def handler(_request) -> AIMessage:
nonlocal attempts
attempts += 1
raise _StreamChunkTimeoutError("No streaming chunk received for 120.0s")
result = middleware.wrap_model_call(SimpleNamespace(), handler)
assert isinstance(result, AIMessage)
assert "streaming response was interrupted" in result.content
# Override caps StreamChunkTimeoutError at 2 attempts (1 first call + 1 retry).
assert attempts == 2
# Exactly one sleep between the first attempt and the single retry.
assert len(waits) == 1
@pytest.mark.anyio
async def test_async_stream_chunk_timeout_retries_once(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Async mirror of the sync test: StreamChunkTimeoutError is capped at
2 attempts (1 first call + 1 retry) so we don't stack 6-12 minutes of
dead air on a same-payload buffering failure.
"""
middleware = _build_middleware(
retry_max_attempts=3,
retry_base_delay_ms=10,
retry_cap_delay_ms=10,
)
attempts = 0
waits: list[float] = []
async def fake_sleep(d: float) -> None:
waits.append(d)
monkeypatch.setattr(asyncio, "sleep", fake_sleep)
async def handler(_request) -> AIMessage:
nonlocal attempts
attempts += 1
raise _StreamChunkTimeoutError("No streaming chunk received for 120.0s")
result = await middleware.awrap_model_call(SimpleNamespace(), handler)
assert isinstance(result, AIMessage)
assert "streaming response was interrupted" in result.content
assert attempts == 2
# Exactly one sleep between the first attempt and the single retry.
assert len(waits) == 1
def test_max_attempts_for_returns_override_for_stream_chunk_timeout() -> None:
"""StreamChunkTimeoutError must use the tightened budget (2 = "keep one retry"),
not the default of 3."""
middleware = _build_middleware(retry_max_attempts=3)
exc = _StreamChunkTimeoutError("upstream stalled")
exc.__class__.__name__ = "StreamChunkTimeoutError"
assert middleware._max_attempts_for(exc) == 2
def test_max_attempts_for_falls_back_to_default_for_unlisted_exception() -> None:
"""ReadError / RemoteProtocolError keep the full retry budget — only
StreamChunkTimeoutError pays for stalling upstream for `stream_chunk_timeout`
seconds per attempt, so only it gets the tighter cap.
"""
middleware = _build_middleware(retry_max_attempts=3)
read_err = _ReadError("conn reset")
read_err.__class__.__name__ = "ReadError"
proto_err = _RemoteProtocolError("peer closed")
proto_err.__class__.__name__ = "RemoteProtocolError"
assert middleware._max_attempts_for(read_err) == 3
assert middleware._max_attempts_for(proto_err) == 3
assert middleware._max_attempts_for(FakeError("boom")) == 3
def test_max_attempts_for_override_never_exceeds_user_cap() -> None:
"""If the operator lowered retry_max_attempts below the override default,
the user-configured cap wins — overrides only ever *tighten*, never loosen.
"""
middleware = _build_middleware(retry_max_attempts=1)
exc = _StreamChunkTimeoutError("upstream stalled")
exc.__class__.__name__ = "StreamChunkTimeoutError"
assert middleware._max_attempts_for(exc) == 1
def test_user_message_for_stream_chunk_timeout_mentions_split_or_shorten() -> None:
"""When the retry budget for StreamChunkTimeoutError is exhausted, the user
message must guide the user toward splitting / shortening the request
instead of suggesting a generic retry. This is the actionable advice
Reviewer B asked for in the follow-up CR (issue #3189).
"""
middleware = _build_middleware()
exc = _StreamChunkTimeoutError("No streaming chunk received for 120.0s")
exc.__class__.__name__ = "StreamChunkTimeoutError"
message = middleware._build_user_message(exc, reason="transient")
assert "streaming response was interrupted" in message
assert "split" in message or "shorten" in message
# The old generic "streaming response was interrupted" wording must NOT appear here,
# otherwise the actionable guidance is buried.
assert "temporarily unavailable" not in message
def test_user_message_for_remote_protocol_error_uses_generic_transient_copy() -> None:
"""RemoteProtocolError is a generic connection drop that can fire on
transient network blips with perfectly normal payloads. The
"split the work into smaller steps" guidance only applies when the
upstream chunk-gap watchdog fires (StreamChunkTimeoutError), so
RemoteProtocolError must fall back to the legacy transient copy.
Regression guard for the #3195 CR feedback.
"""
middleware = _build_middleware()
exc = _RemoteProtocolError("Server closed connection unexpectedly")
exc.__class__.__name__ = "RemoteProtocolError"
message = middleware._build_user_message(exc, reason="transient")
assert "temporarily unavailable" in message
assert "streaming response was interrupted" not in message
def test_user_message_for_read_error_uses_generic_transient_copy() -> None:
"""httpx.ReadError is symmetric to RemoteProtocolError: a generic
connection drop that must NOT receive the "split the work" guidance.
Regression guard for the #3195 CR feedback.
"""
middleware = _build_middleware()
exc = FakeError("connection dropped mid-stream")
exc.__class__.__name__ = "ReadError"
message = middleware._build_user_message(exc, reason="transient")
assert "temporarily unavailable" in message
assert "streaming response was interrupted" not in message
def test_user_message_for_generic_transient_keeps_legacy_copy() -> None:
"""Generic transient errors (HTTP 503, 'cluster busy', etc.) must keep
the original 'streaming response was interrupted' message — only stream-drop
exceptions get the new specialized copy. This prevents regression on
callers who already rely on the legacy wording.
"""
middleware = _build_middleware()
exc = FakeError("server busy", status_code=503)
message = middleware._build_user_message(exc, reason="transient")
assert "temporarily unavailable" in message
assert "streaming response was interrupted" not in message
def test_user_message_for_quota_unchanged() -> None:
"""Sanity check: the quota / auth branches must remain untouched by the
stream-drop refactor.
"""
middleware = _build_middleware()
exc = FakeError("insufficient_quota", status_code=429, code="insufficient_quota")
message = middleware._build_user_message(exc, reason="quota")
assert "out of quota" in message
assert "streaming response was interrupted" not in message
+113
View File
@@ -1069,3 +1069,116 @@ def test_no_duplicate_kwarg_when_reasoning_effort_in_config_and_thinking_disable
# kwargs (runtime) takes precedence: thinking-disabled path sets reasoning_effort=minimal # kwargs (runtime) takes precedence: thinking-disabled path sets reasoning_effort=minimal
assert captured.get("reasoning_effort") == "minimal" assert captured.get("reasoning_effort") == "minimal"
# ---------------------------------------------------------------------------
# stream_chunk_timeout default injection (issue #3189)
# ---------------------------------------------------------------------------
def test_stream_chunk_timeout_defaults_to_240_for_openai_compatible_model(monkeypatch):
"""OpenAI-compatible clients must receive a generous 240s chunk-gap budget by
default, so reasoning models with long thinking pauses don't trip
langchain-openai's aggressive 60s built-in default.
"""
model = _make_model(use="langchain_openai:ChatOpenAI")
cfg = _make_app_config([model])
captured: dict = {}
class CapturingModel(FakeChatModel):
def __init__(self, **kwargs):
captured.update(kwargs)
BaseChatModel.__init__(self, **kwargs)
_patch_factory(monkeypatch, cfg, model_class=CapturingModel)
factory_module.create_chat_model(name="test-model")
assert captured.get("stream_chunk_timeout") == 240.0
def test_stream_chunk_timeout_user_value_not_overridden(monkeypatch):
"""If the user explicitly sets stream_chunk_timeout in config.yaml, the
factory must not overwrite it with the default — even if the value is
smaller (60s) or larger (600s) than the default.
"""
model = ModelConfig(
name="custom-timeout-model",
display_name="Custom Timeout",
description=None,
use="langchain_openai:ChatOpenAI",
model="gpt-4o-mini",
stream_chunk_timeout=60.0, # user-set explicit value
)
cfg = _make_app_config([model])
captured: dict = {}
class CapturingModel(FakeChatModel):
def __init__(self, **kwargs):
captured.update(kwargs)
BaseChatModel.__init__(self, **kwargs)
_patch_factory(monkeypatch, cfg, model_class=CapturingModel)
factory_module.create_chat_model(name="custom-timeout-model")
assert captured.get("stream_chunk_timeout") == 60.0
def test_stream_chunk_timeout_not_injected_for_non_openai_provider(monkeypatch):
"""Only langchain_openai:ChatOpenAI receives the default. Anthropic / Vertex /
other clients that don't understand this kwarg must not be polluted with it.
"""
model = _make_model(use="langchain_anthropic:ChatAnthropic")
cfg = _make_app_config([model])
captured: dict = {}
class CapturingModel(FakeChatModel):
def __init__(self, **kwargs):
captured.update(kwargs)
BaseChatModel.__init__(self, **kwargs)
_patch_factory(monkeypatch, cfg, model_class=CapturingModel)
factory_module.create_chat_model(name="test-model")
assert "stream_chunk_timeout" not in captured
def test_stream_chunk_timeout_default_constant_is_documented():
"""Lock the default value at 240s. If we ever want to change this, the
deliberate update here (and the docstring on _apply_stream_chunk_timeout_default)
forces a paired review of the rationale comment block above the constant.
"""
assert factory_module._DEFAULT_STREAM_CHUNK_TIMEOUT_SECONDS == 240.0
def test_stream_chunk_timeout_popped_for_non_openai_provider_when_user_set_it(monkeypatch):
"""Regression for CR feedback on issue #3189: if a user accidentally sets
``stream_chunk_timeout`` on a non-OpenAI provider, the factory must drop
the kwarg before forwarding it to the model constructor. Otherwise the
third-party client raises ``TypeError: unexpected keyword argument
'stream_chunk_timeout'`` because the parameter is specific to
``langchain_openai:ChatOpenAI``.
"""
model = ModelConfig(
name="anthropic-with-stray-timeout",
display_name="Anthropic With Stray Timeout",
description=None,
use="langchain_anthropic:ChatAnthropic",
model="claude-sonnet-4",
stream_chunk_timeout=60.0, # user-set on a non-OpenAI provider — must be dropped
)
cfg = _make_app_config([model])
captured: dict = {}
class CapturingModel(FakeChatModel):
def __init__(self, **kwargs):
captured.update(kwargs)
BaseChatModel.__init__(self, **kwargs)
_patch_factory(monkeypatch, cfg, model_class=CapturingModel)
factory_module.create_chat_model(name="anthropic-with-stray-timeout")
assert "stream_chunk_timeout" not in captured
@@ -0,0 +1,116 @@
"""Size-guard tests for write_file_tool (issue #3189, PR #3195).
These tests verify that write_file_tool rejects oversized single-shot payloads
with an actionable message, while leaving append-mode and env-override paths
untouched. They run purely against the tool's internal guard — no real sandbox
or filesystem is exercised, so they're fast and hermetic.
"""
from __future__ import annotations
from unittest.mock import MagicMock, patch
import pytest
from deerflow.sandbox import tools as tools_module
from deerflow.sandbox.tools import write_file_tool
def _call_write_file(*, content: str, append: bool = False) -> str:
"""Invoke write_file_tool via its underlying callable.
We patch the sandbox initialisation chain to a no-op MagicMock so the test
focuses purely on the size guard. The guard runs BEFORE any sandbox call,
so when the guard rejects we never enter the patched path; when the guard
passes, the patched sandbox.write_file returns silently and the tool
returns "OK".
"""
fn = getattr(write_file_tool, "func", write_file_tool)
runtime = MagicMock()
with (
patch.object(tools_module, "ensure_sandbox_initialized") as mock_ensure,
patch.object(tools_module, "ensure_thread_directories_exist"),
patch.object(tools_module, "is_local_sandbox", return_value=False),
patch.object(tools_module, "get_file_operation_lock") as mock_lock,
):
sandbox = MagicMock()
sandbox.write_file = MagicMock()
mock_ensure.return_value = sandbox
mock_lock.return_value.__enter__ = MagicMock(return_value=None)
mock_lock.return_value.__exit__ = MagicMock(return_value=False)
return fn(
runtime=runtime,
description="test write",
path="/tmp/test.txt",
content=content,
append=append,
)
def test_below_cap_succeeds():
"""A 79 KB payload sits comfortably under the 80 KB default and must pass
straight through to the sandbox layer.
"""
payload = "a" * (79 * 1024)
result = _call_write_file(content=payload)
assert result == "OK"
def test_above_cap_returns_actionable_error():
"""An 81 KB payload trips the guard. The error message must name the
cap, the actual size, and steer the LLM toward str_replace / append=True
— these are the exact handles Reviewer A/B asked for in PR #3195.
"""
payload = "a" * (81 * 1024)
result = _call_write_file(content=payload)
assert result.startswith("Error: write_file content")
assert "81920 bytes" in result or "82944 bytes" in result, "Error must report the actual content size so the LLM/operator can judge how much to trim or chunk."
assert "str_replace" in result, "Error must point to str_replace as the preferred incremental-edit path."
assert "append=True" in result, "Error must also surface the append-in-chunks alternative."
def test_above_cap_with_append_true_bypasses_guard():
"""append=True is the *correct* way to write a large document in chunks,
so the guard must not block it. The 80 KB cap intentionally applies only
to single-shot overwrite calls.
"""
payload = "a" * (200 * 1024) # 200 KB
result = _call_write_file(content=payload, append=True)
assert result == "OK", f"append=True must bypass the size guard, got: {result!r}"
def test_env_override_raises_cap(monkeypatch: pytest.MonkeyPatch):
"""Setting DEERFLOW_WRITE_FILE_MAX_BYTES lets deployments accept larger
payloads when the underlying LLM/network can demonstrably handle them.
"""
monkeypatch.setenv("DEERFLOW_WRITE_FILE_MAX_BYTES", str(300 * 1024))
payload = "a" * (150 * 1024) # 150 KB — would normally trip the 80 KB cap
result = _call_write_file(content=payload)
assert result == "OK"
def test_env_override_zero_disables_guard(monkeypatch: pytest.MonkeyPatch):
"""Setting the env var to 0 is the documented escape hatch for operators
who want to opt out of the guard entirely (e.g. when running models with
very large stream_chunk_timeout values).
"""
monkeypatch.setenv("DEERFLOW_WRITE_FILE_MAX_BYTES", "0")
payload = "a" * (500 * 1024) # 500 KB
result = _call_write_file(content=payload)
assert result == "OK"
def test_env_override_malformed_falls_back_to_default(monkeypatch: pytest.MonkeyPatch):
"""A typo in the env var (e.g. 'lots') must not crash the tool — fall
back silently to the safe 80 KB default. Crashing on every write because
of a misconfigured env var would be far worse than ignoring it.
"""
monkeypatch.setenv("DEERFLOW_WRITE_FILE_MAX_BYTES", "lots")
# 100 KB should still be rejected because the malformed value falls back
# to the 80 KB default.
payload = "a" * (100 * 1024)
result = _call_write_file(content=payload)
assert result.startswith("Error: write_file content")