Compare commits

...

5 Commits

Author SHA1 Message Date
Willem Jiang c881d95898 fix(mcp): persist MCP sessions across tool calls for stateful servers (#3089)
* fix(mcp): persist MCP sessions across tool calls for stateful servers

  MCP tools loaded via langchain-mcp-adapters created a new session on
  every call, causing stateful servers like Playwright to lose browser
  state (pages, forms) between consecutive tool invocations within the
  same thread.

  Add MCPSessionPool that maintains persistent sessions scoped by
  (server_name, thread_id). Tool calls within the same thread now reuse
  the same MCP session, preserving server-side state. Sessions are evicted
  in LRU order (max 256) and cleaned up on cache invalidation.

  Fixes #3054

* fix(sandbox): add group/other read permissions to uploaded files for Docker sandbox (#3127)

  When using AIO sandbox with LocalContainerBackend, uploaded files are
  created with 0o600 (owner-only) permissions by the gateway process
  running as root. The sandbox process inside the Docker container runs
  as a non-root user and cannot read these bind-mounted files, causing
  a "Permission denied" error on read_file.

  Add `needs_upload_permission_adjustment` attribute to SandboxProvider
  (default True) to indicate that uploaded files need chmod adjustment.
  LocalSandboxProvider opts out (same user). A new `_make_file_sandbox_readable`
  function adds S_IRGRP | S_IROTH bits after files are written, changing
  permissions from 0o600 to 0o644 so the sandbox can read the uploads.

* fix(mcp): address review comments on session pool and tools

- _extract_thread_id: return "default" instead of stringifying None
  when get_config() returns no thread_id
- call_with_persistent_session: fix **arguments annotation from
  dict[str,Any] to Any
- Replace private _convert_call_tool_result import with a local
  implementation that handles all MCP content block types
- _make_session_pool_tool: accept tool_interceptors and apply the
  configured interceptor chain on every call (preserving OAuth and
  custom interceptors)
- MCPSessionPool: replace asyncio.Lock with threading.Lock; restructure
  get/close methods to never await while holding the lock; add
  close_all_sync() that closes sessions on their owning event loops
- reset_mcp_tools_cache: use pool.close_all_sync() instead of
  asyncio.run-in-thread to close sessions deterministically
- test: add test_session_pool_tool_sync_wrapper_path_is_safe covering
  tool invocation via the sync wrapper (tool.func) path

Agent-Logs-Url: https://github.com/bytedance/deer-flow/sessions/9e7f9e7f-1d2b-464a-b3b7-7f1649b74122

Co-authored-by: WillemJiang <219644+WillemJiang@users.noreply.github.com>

* fix(mcp): extract SESSION_CLOSE_TIMEOUT to class constant

Agent-Logs-Url: https://github.com/bytedance/deer-flow/sessions/9e7f9e7f-1d2b-464a-b3b7-7f1649b74122

Co-authored-by: WillemJiang <219644+WillemJiang@users.noreply.github.com>

* Potential fix for pull request finding 'Empty except'

Co-authored-by: Copilot Autofix powered by AI <223894421+github-code-quality[bot]@users.noreply.github.com>

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: Copilot Autofix powered by AI <223894421+github-code-quality[bot]@users.noreply.github.com>
2026-05-21 23:22:20 +08:00
Xinmin Zeng e93f658472 fix(stability): resolve P0 blockers from v2.0-m1-rc1 stability audit (#3107) (#3131)
* fix(task-tool): unwrap callback manager when locating usage recorder

`config["callbacks"]` may arrive as a `BaseCallbackManager` (e.g. the
`AsyncCallbackManager` LangChain hands to async tool runs), not just a plain
list. The previous `for cb in callbacks` loop raised
`TypeError: 'AsyncCallbackManager' object is not iterable`, which
`ToolErrorHandlingMiddleware` then converted into a failed `task` ToolMessage
even though the subagent had completed internally — Ultra mode lost subagent
results and the lead agent fell back to redoing the work.

Unwrap `BaseCallbackManager.handlers` before searching for the recorder.

Refs: bytedance/deer-flow#3107 (BUG-002)

* fix(frontend): treat any task tool error as a terminal subtask failure

The subtask card status machine matched only three English prefixes (`Task
Succeeded. Result:`, `Task failed.`, `Task timed out`). Anything else fell
through to `in_progress`, so a `task` tool error wrapped by
`ToolErrorHandlingMiddleware` (`Error: Tool 'task' failed ...`) left the card
spinning forever even after the run had ended.

Extract the prefix logic into `parseSubtaskResult` and recognise any leading
`Error:` token as a terminal failure. The extracted function is unit-tested
against the legacy prefixes plus the `AsyncCallbackManager` regression
captured in the upstream issue.

Refs: bytedance/deer-flow#3107 (BUG-007)

* fix(frontend): exclude hidden, reasoning, and tool payloads from chat export

`formatThreadAsMarkdown` / `formatThreadAsJSON` iterated raw messages without
running the UI-level `isHiddenFromUIMessage` filter. Exported transcripts
therefore included `hide_from_ui` system reminders, memory injections,
provider `reasoning_content`, tool calls, and tool result messages — content
that is intentionally hidden in the chat view.

Filter the export to the user-visible transcript by default and gate
reasoning / tool calls / tool messages / hidden messages behind explicit
`ExportOptions` flags so a future debug export can opt back in without
forking the formatter.

Refs: bytedance/deer-flow#3107 (BUG-006)

* fix(gateway): route get_config through get_app_config for mtime hot reload

`get_config(request)` returned the `app.state.config` snapshot captured at
startup. The worker / lead-agent path then threaded that frozen `AppConfig`
through `RunContext` and `agent_factory`, so per-run fields edited in
`config.yaml` (notably `max_tokens`) were ignored until the gateway process
was restarted — even though `get_app_config()` already does mtime-based
reload at the bottom layer.

Route the request dependency through `get_app_config()` directly. Runtime
`ContextVar` overrides (`push_current_app_config`) and test-injected
singletons (`set_app_config`) keep working; `app.state.config` is now only
read at startup for one-shot bootstrap (logging level, IM channels,
`langgraph_runtime` engines).

`tests/test_gateway_deps_config.py` encoded the old snapshot contract and is
removed; `tests/test_gateway_config_freshness.py` replaces it with mtime,
ContextVar, and `set_app_config` coverage. `test_skills_custom_router.py` and
`test_uploads_router.py` now inject test configs via FastAPI
`dependency_overrides[get_config]` instead of mutating `app.state.config`.

Document the hot-reload boundary in `backend/CLAUDE.md` so reviewers know
which fields are picked up on the next request vs. which still require a
restart (`database`, `checkpointer`, `run_events`, `stream_bridge`,
`sandbox.use`, `log_level`, `channels.*`).

Refs: bytedance/deer-flow#3107 (BUG-001)

* fix(gateway): broaden get_config 503 to any config-load failure

Address review feedback on the previous commit:

1. Narrow exception catch removed. The old contract returned 503 whenever
   `app.state.config is None`. The first cut only mapped
   `FileNotFoundError`, leaving `PermissionError`, YAML parse errors, and
   pydantic `ValidationError` to bubble up as 500. At the request boundary
   we treat any inability to materialise the config as "configuration not
   available" (503) and log the original exception so the operator still
   has the stack.

2. Removed the unused `request: Request` parameter and the matching
   `# noqa: ARG001`. FastAPI's `Depends()` does not require the dependency
   to accept `Request`; the only call site uses the no-arg form.

3. `backend/CLAUDE.md` boundary now lists the *reason* each field is
   restart-required (engine binding, singleton caching, one-shot
   `apply_logging_level`, etc.), not just the field name, so reviewers do
   not have to reverse-engineer the boundary themselves.

Tests parametrise four exception classes (`FileNotFoundError`,
`PermissionError`, `ValueError`, `RuntimeError`) and assert 503 for each.

Refs: bytedance/deer-flow#3107 (BUG-001)

* fix(task-tool): defend _find_usage_recorder against non-list callbacks

Address review feedback. The previous commit handled the two common shapes
LangChain hands to async tool runs — a plain `list[BaseCallbackHandler]` and
a `BaseCallbackManager` subclass — but iterated any other shape directly,
which would still raise `TypeError` if e.g. a single handler instance leaked
through without a list wrapper.

Treat any non-list, non-manager `config["callbacks"]` value as "no recorder"
rather than crash. Docstring now lists all four shapes explicitly. New tests
cover the single-handler-object case, `runtime is None`, `callbacks is None`,
and `runtime.config` being a non-dict — all required to be silent no-ops.

Refs: bytedance/deer-flow#3107 (BUG-002)

* fix(frontend): drop dead identity ternary and add opt-in export tests

Address review feedback on the previous export commit:

1. Removed the no-op `typeof msg.content === "string" ? msg.content : msg.content`
   expression in `formatThreadAsJSON`. Both branches returned the same value;
   the message content now flows through unchanged whether it is a string or
   the rich `MessageContent[]` shape (LangChain JSON-serialises the array
   structure correctly already).

2. Expanded the JSDoc on `ExportOptions` to make it clearer that the four
   flags are not currently wired to any UI control — callers wanting a debug
   export must build the options object explicitly. The default behaviour
   continues to match the explicit prescription in
   bytedance/deer-flow#3107 BUG-006.

3. Added opt-in coverage. The previous tests only exercised the
   `options = {}` default path; the new cases verify each flag flips the
   corresponding payload back into the export so a future debug-export
   surface does not silently break the contract.

Refs: bytedance/deer-flow#3107 (BUG-006)

* fix(frontend): export subtask prefix constants and document fallback intent

Address review feedback on the previous BUG-007 commit:

1. `SUCCESS_PREFIX`, `FAILURE_PREFIX`, `TIMEOUT_PREFIX`, and the
   `ERROR_WRAPPER_PATTERN` regex are now exported. The JSDoc explicitly
   pins them as part of the backend↔frontend contract defined in
   `task_tool.py` and `tool_error_handling_middleware.py`, so any future
   structured-status migration (e.g. backend writing
   `additional_kwargs.subagent_status` instead of leading text) can
   reference these from one canonical place rather than redefine them.

2. The `in_progress` fallback now carries a docstring explaining the
   deliberate choice — LangChain only ever emits a `ToolMessage` once the
   tool itself has returned, so unrecognised content means the contract
   has drifted and "still running" is the right operator signal (eagerly
   marking it terminal-failed would mask the drift).

No behaviour change; this is documentation and an API export.

Refs: bytedance/deer-flow#3107 (BUG-007)

* fix(gateway): drop app.state.config snapshot and freeze run_events_config

Address @ShenAC-SAC's BUG-001 review on #3131. The previous cut still
stored an ``AppConfig`` snapshot on ``app.state.config`` for startup
bootstrap. Two follow-on hazards from that:

1. Future code touching the gateway lifespan could accidentally start
   reading ``app.state.config`` again, silently regressing the request
   hot path back to a stale snapshot.
2. ``get_run_context()`` paired a freshly-reloaded ``AppConfig`` with the
   startup-bound ``event_store`` and a *live* ``run_events_config``
   field — so an operator who edited ``run_events.backend`` mid-flight
   would have produced a run context whose ``event_store`` and
   ``run_events_config`` referred to different backends.

Clean approach (aligned with the direction in PR #3128):

- ``lifespan()`` keeps a local ``startup_config`` variable and passes it
  explicitly into ``langgraph_runtime(app, startup_config)`` and into
  ``start_channel_service``. No ``app.state.config`` attribute is set at
  any point.
- ``langgraph_runtime`` now accepts ``startup_config`` as a required
  parameter, removing the ``getattr(app.state, "config", None)`` lookup
  and the "config not initialised" runtime error.
- The matching ``run_events_config`` is frozen onto ``app.state`` next
  to ``run_event_store`` so ``get_run_context`` reads the two from the
  same startup-time source. ``app_config`` continues to be resolved
  live via ``get_app_config()``.
- ``backend/CLAUDE.md`` boundary explanation updated to spell out the
  ``startup_config`` / ``get_app_config()`` split.

New regression test ``test_run_context_app_config_reflects_yaml_edit``
exercises the worker-feeding path: it asserts that ``ctx.app_config``
follows a mid-flight ``config.yaml`` edit while
``ctx.run_events_config`` stays frozen to the startup snapshot the
event store was built from.

Refs: bytedance/deer-flow#3107 (BUG-001), bytedance/deer-flow#3131 review

* fix(frontend): parse Task cancelled and polling timed out as terminal

Address @ShenAC-SAC's BUG-007 review on #3131. `task_tool.py` actually
emits five terminal strings:

- `Task Succeeded. Result: …`
- `Task failed. …`
- `Task timed out. …`
- `Task cancelled by user.`               ← previously matched none
- `Task polling timed out after N minutes …` ← previously matched none

The previous cut handled three; the last two fell through to the
"unknown content" branch and pushed the subtask card back to
`in_progress` even though the backend had already reached a terminal
state. Add explicit matches plus regression tests for both. The
`in_progress` fallback is now reserved for genuinely unrecognised
output (i.e. contract drift), as documented.

Refs: bytedance/deer-flow#3107 (BUG-007), bytedance/deer-flow#3131 review

* fix(frontend): sanitize JSON export content via the Markdown content path

Address @ShenAC-SAC's BUG-006 review and the Copilot inline comment on
#3131. The previous cut filtered hidden/tool messages out of the JSON
export but still serialised `msg.content` verbatim, so:

- inline `<think>…</think>` wrappers stayed in the exported `content`
  even with `includeReasoning: false`,
- content-array thinking blocks leaked the `thinking` field,
- `<uploaded_files>…</uploaded_files>` markers leaked the workspace
  paths a user uploaded files to.

JSON now goes through the same sanitiser the Markdown path uses
(`extractContentFromMessage` + `stripUploadedFilesTag`). Reasoning and
tool_calls remain gated behind their `ExportOptions` flags. AI / human
rows that sanitise to empty content with no opted-in reasoning or tool
calls are dropped so the JSON matches the Markdown path's `continue`
on empty assistant fragments.

New regression tests cover the three leak shapes the reviewer called
out plus the empty-content-drop case.

Refs: bytedance/deer-flow#3107 (BUG-006), bytedance/deer-flow#3131 review

* test(gateway): align lifespan stub with langgraph_runtime two-arg signature

Codex round-3 review of c0bc7a06 flagged this: changing
`langgraph_runtime` to require `startup_config` as a second positional
argument broke the one-arg stub `_noop_langgraph_runtime(_app)` in
`test_gateway_lifespan_shutdown.py`, which is patched into
`app.gateway.app.langgraph_runtime` by the lifespan shutdown bounded-timeout
regression. Lifespan would then call the stub with two args and raise
`TypeError` before the bounded-shutdown assertion ran.

Update the stub to match the new signature. The shutdown test itself is
unaffected — it only cares about the channel `stop_channel_service` hang
path.

Refs: bytedance/deer-flow#3107 (BUG-001), bytedance/deer-flow#3131 review

* fix(frontend): strip every known backend marker in export, not just uploads

Codex round-3 review of 258ca800 and the matching maintainer feedback on
PR #3131 made the same point: the JSON export now ran the
Markdown-side sanitiser, but that sanitiser only stripped
`<uploaded_files>`. The full set of payloads middleware embeds inside
message `content` is larger:

- `<uploaded_files>` — `UploadsMiddleware`
- `<system-reminder>` — `DynamicContextMiddleware`
- `<memory>` — `DynamicContextMiddleware` (nested inside system-reminder)
- `<current_date>` — `DynamicContextMiddleware`

The primary protection is still `isHiddenFromUIMessage`: the
`<system-reminder>` HumanMessage is marked `hide_from_ui: true` and never
reaches the formatter. This commit adds the second line of defence so a
regression that drops the `hide_from_ui` flag — or any future middleware
that injects the same tag vocabulary into a visible HumanMessage —
cannot leak the payload into the export file.

Concrete changes:

- New `INTERNAL_MARKER_TAGS` constant + `stripInternalMarkers(content)`
  helper in `core/messages/utils.ts`. The constant doubles as
  documentation for the backend↔frontend contract.
- `formatMessageContent` in `export.ts` now calls `stripInternalMarkers`
  instead of `stripUploadedFilesTag`. UI render paths
  (`message-list-item.tsx`) keep using the narrower function so a user
  legitimately typing `<memory>` in a meta-discussion is preserved.
- The "drop empty rows" guard in `buildJSONMessage` switched from
  `=== undefined` to truthy `!` checks. Codex spotted the asymmetry: when
  `extractReasoningContentFromMessage` returned the empty string (which it
  legitimately can), the JSON path emitted `{reasoning: ""}` while the
  Markdown path's `!reasoning` `continue` correctly dropped the row.

New regression tests cover the defence-in-depth strip with a
`<system-reminder><memory><current_date>` payload deliberately *not*
marked `hide_from_ui`; tool-message sanitization under
`includeToolMessages: true`; the mixed-content-array case
(`thinking + text + image_url`); and the opted-in empty-reasoning drop.

Live verification on a real Ultra-mode thread that uploaded a PDF
(`曾鑫民-薪资交易流水.pdf`): backend state's first HumanMessage carries the
`<uploaded_files>` block (with `/mnt/user-data/uploads/...` paths) as part
of a content-array. The Markdown and JSON export blobs both come back
free of `<uploaded_files>`, `<system-reminder>`, `<current_date>`,
`tool_calls`, and reasoning — while preserving the user's `这是什么 ?`
prompt and the assistant's visible answer.

Refs: bytedance/deer-flow#3107 (BUG-006), bytedance/deer-flow#3131 review

* test(frontend): cover trim, varied N, and pre-execution Error: prefixes

Codex round-3 review of 50e2c257 flagged three coverage gaps in the
subtask-status parser:

1. `Task cancelled by user.` and `Task polling timed out` previously had
   no whitespace-trim coverage — the original trim test only exercised
   the success prefix. Streaming chunks can arrive with leading/trailing
   newlines; the regex needed an explicit assertion.
2. The polling-timeout case was tested only at one `N` (15 minutes). The
   backend interpolates the live `timeout_seconds // 60` value, so the
   matcher must hold for any positive integer. Now we run the case for
   1, 5, and 60 minutes.
3. `task_tool.py` also emits three `Error:` strings for pre-execution
   failures — unknown subagent type, host-bash disabled, and "task
   disappeared from background tasks". They are intentionally handled by
   `ERROR_WRAPPER_PATTERN` rather than dedicated prefixes (the wrapper
   already produces the right terminal-failed shape) but had no test
   coverage proving that wiring. Codex was right that a refactor splitting
   one of them off into its own prefix would silently break things.

The JSDoc on the constants block now spells the three pre-execution
errors out so the relationship between `task_tool.py` returns and the
prefix vocabulary is explicit.

No production code change beyond the docstring — this commit is pure
coverage hardening for the contract that already exists.

Refs: bytedance/deer-flow#3107 (BUG-007), bytedance/deer-flow#3131 review
2026-05-21 21:18:10 +08:00
john lee 4cb2a22400 docs(config.example): fix Claude thinking example — add supports_thinking and budget_tokens (#3068)
The commented Claude example used Claude 3.5 Sonnet with
when_thinking_enabled but lacked supports_thinking: true. Copying the
block and swapping to a Claude 4 model name would silently fall back to
non-thinking mode (agent.py line 380 suppresses the error and logs only
a warning).

A second trap: budget_tokens is required by the Anthropic API when
thinking.type == "enabled"; there is no server default. The old example
omitted it, so any user who did add supports_thinking: true would get an
API error on the first thinking request.

Replace with a Claude Sonnet 4 example that includes both fields and
inline comments explaining the constraints.

Closes #2336

Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-21 21:13:24 +08:00
Xinmin Zeng 9c03a71a07 fix(gateway): preserve message additional_kwargs in normalize_input (#3132) (#3136)
* fix(gateway): preserve message additional_kwargs in normalize_input (#3132)

The gateway's hand-rolled dict→message coercion only forwarded `content`
and collapsed every role to `HumanMessage`, silently dropping the
frontend's `additional_kwargs.files` payload (along with `id`, `name`,
and ai/system/tool roles).

Effect on issue #3132:

- `UploadsMiddleware` saw no `files` on the last human message, so the
  just-uploaded file got bucketed under "previous messages" while the
  current turn was reported as `(empty)`.
- The persisted human message had no `files`, so the attachment chip on
  the message disappeared the moment the optimistic UI cleared.

Delegate the conversion to `langchain_core.messages.utils.convert_to_messages`
so `additional_kwargs`, `id`, `name`, and non-human roles round-trip
unchanged.

* fix(gateway): convert malformed-message ValueError into HTTP 400

normalize_input now sits at the request boundary, so a malformed
input.messages[N] dict (missing role/type/content, unsupported role,
etc.) should surface as 400 with the offending index — not bubble out
of FastAPI as 500.

Per Copilot review on #3136.
2026-05-21 21:06:19 +08:00
Lawrance_YXLiao 1c5c585741 fix(runtime): bound write_file execution-failure observations (#3133)
* fix(runtime): bound write_file execution-failure observations

* fix(runtime): preserve write_file error prefixes

* test(runtime): trim write_file prefix assertions

* refactor(runtime): drop redundant exception suffix for permission/directory write errors

Address Copilot review on #3133: the PermissionError and IsADirectoryError
branches now return self-contained, non-redundant messages (e.g.
"Error: Permission denied writing to file: /mnt/...") via direct
truncation, instead of going through _format_write_file_error which
appended a duplicate ": PermissionError: permission denied" suffix.

OSError, SandboxError and the generic Exception branches keep the
unified "Failed to write file '{path}': {ExceptionType}: {detail}"
format so the model still sees a stable, machine-readable error class.

Removes the now-unused message= parameter from _format_write_file_error,
keeping a single code path. Truncation contract (<= 2000 chars) and
host-path sanitization unchanged.

* fix(runtime): handle write_file sandbox init errors

Initialize the requested path before sandbox setup so early sandbox failures can still return a bounded write_file error.

Add a regression test for sandbox initialization failures.

* style(test): format sandbox security tests
2026-05-21 20:35:46 +08:00
25 changed files with 2214 additions and 139 deletions
+12
View File
@@ -184,6 +184,18 @@ Setup: Copy `config.example.yaml` to `config.yaml` in the **project root** direc
**Config Caching**: `get_app_config()` caches the parsed config, but automatically reloads it when the resolved config path changes or the file's mtime increases. This keeps Gateway and LangGraph reads aligned with `config.yaml` edits without requiring a manual process restart.
**Config Hot-Reload Boundary**: Gateway dependencies route through `get_app_config()` on every request, so per-run fields like `models[*].max_tokens`, `summarization.*`, `title.*`, `memory.*`, `subagents.*`, `tools[*]`, and the agent system prompt pick up `config.yaml` edits on the next message. `AppConfig` is intentionally **not** cached on `app.state``lifespan()` keeps a local `startup_config` variable for one-shot bootstrap work (logging level, channels, `langgraph_runtime` engines) and passes it explicitly to `langgraph_runtime(app, startup_config)`. Infrastructure fields are **restart-required**:
| Field | Why a restart is required |
|---|---|
| `database.*` | `init_engine_from_config()` runs once during `langgraph_runtime()` startup; the SQLAlchemy engine holds the connection pool. |
| `checkpointer.*` (including SQLite WAL/journal settings) | `make_checkpointer()` binds the persistent checkpointer once at startup. |
| `run_events.*` | `make_run_event_store()` selects memory- vs. SQL-backed implementation at startup. |
| `stream_bridge.*` | `make_stream_bridge()` constructs the bridge object once. |
| `sandbox.use` | `get_sandbox_provider()` caches the provider singleton (`_default_sandbox_provider`); a new class path takes effect only on next process start. |
| `log_level` | `apply_logging_level()` is called only in `app.py` startup; it mutates the root logger's level, and `get_app_config()` returning a fresh `AppConfig` does not retrigger it. |
| `channels.*` IM platform credentials | `start_channel_service()` is invoked once during startup; live channels are not rebuilt on config change. |
Configuration priority:
1. Explicit `config_path` argument
2. `DEER_FLOW_CONFIG_PATH` environment variable
+11 -5
View File
@@ -161,10 +161,16 @@ async def _migrate_orphaned_threads(store, admin_user_id: str) -> int:
async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
"""Application lifespan handler."""
# Load config and check necessary environment variables at startup
# Load config and check necessary environment variables at startup.
# `startup_config` is a local snapshot used only for one-shot bootstrap
# work (logging level, langgraph_runtime engines, channels). Request-time
# config resolution always routes through `get_app_config()` in
# `app/gateway/deps.py::get_config()` so `config.yaml` edits become
# visible without a process restart. We deliberately do NOT cache this
# snapshot on `app.state` to keep that contract enforceable.
try:
app.state.config = get_app_config()
apply_logging_level(app.state.config.log_level)
startup_config = get_app_config()
apply_logging_level(startup_config.log_level)
logger.info("Configuration loaded successfully")
except Exception as e:
error_msg = f"Failed to load configuration during gateway startup: {e}"
@@ -174,7 +180,7 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
logger.info(f"Starting API Gateway on {config.host}:{config.port}")
# Initialize LangGraph runtime components (StreamBridge, RunManager, checkpointer, store)
async with langgraph_runtime(app):
async with langgraph_runtime(app, startup_config):
logger.info("LangGraph runtime initialised")
# Check admin bootstrap state and migrate orphan threads after admin exists.
@@ -185,7 +191,7 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
try:
from app.channels.service import start_channel_service
channel_service = await start_channel_service(app.state.config)
channel_service = await start_channel_service(startup_config)
logger.info("Channel service started: %s", channel_service.get_status())
except Exception:
logger.exception("No IM channels configured or channel service failed to start")
+69 -17
View File
@@ -3,11 +3,21 @@
**Getters** (used by routers): raise 503 when a required dependency is
missing, except ``get_store`` which returns ``None``.
``AppConfig`` is intentionally *not* cached on ``app.state``. Routers and the
run path resolve it through :func:`deerflow.config.app_config.get_app_config`,
which performs mtime-based hot reload, so edits to ``config.yaml`` take
effect on the next request without a process restart. The engines created in
:func:`langgraph_runtime` (stream bridge, persistence, checkpointer, store,
run-event store) accept a ``startup_config`` snapshot — they are
restart-required by design and stay bound to that snapshot to keep the live
process consistent with itself.
Initialization is handled directly in ``app.py`` via :class:`AsyncExitStack`.
"""
from __future__ import annotations
import logging
from collections.abc import AsyncGenerator, Callable
from contextlib import AsyncExitStack, asynccontextmanager
from typing import TYPE_CHECKING, TypeVar, cast
@@ -15,12 +25,14 @@ from typing import TYPE_CHECKING, TypeVar, cast
from fastapi import FastAPI, HTTPException, Request
from langgraph.types import Checkpointer
from deerflow.config.app_config import AppConfig
from deerflow.config.app_config import AppConfig, get_app_config
from deerflow.persistence.feedback import FeedbackRepository
from deerflow.runtime import RunContext, RunManager, StreamBridge
from deerflow.runtime.events.store.base import RunEventStore
from deerflow.runtime.runs.store.base import RunStore
logger = logging.getLogger(__name__)
if TYPE_CHECKING:
from app.gateway.auth.local_provider import LocalAuthProvider
from app.gateway.auth.repositories.sqlite import SQLiteUserRepository
@@ -30,21 +42,55 @@ if TYPE_CHECKING:
T = TypeVar("T")
def get_config(request: Request) -> AppConfig:
"""Return the app-scoped ``AppConfig`` stored on ``app.state``."""
config = getattr(request.app.state, "config", None)
if config is None:
raise HTTPException(status_code=503, detail="Configuration not available")
return config
def get_config() -> AppConfig:
"""Return the freshest ``AppConfig`` for the current request.
Routes through :func:`deerflow.config.app_config.get_app_config`, which
honours runtime ``ContextVar`` overrides and reloads ``config.yaml`` from
disk when its mtime changes. ``AppConfig`` is not cached on ``app.state``
at all — the only startup-time snapshot lives as a local
``startup_config`` variable inside ``lifespan()`` and is passed
explicitly into :func:`langgraph_runtime` for the engines that are
restart-required by design. Routing every request through
:func:`get_app_config` closes the bytedance/deer-flow issue #3107 BUG-001
split-brain where the worker / lead-agent thread saw a stale startup
snapshot.
Any failure to materialise the config (missing file, permission denied,
YAML parse error, validation error) is reported as 503 — semantically
"the gateway cannot serve requests without a usable configuration" — and
logged with the original exception so operators have something to debug.
"""
try:
return get_app_config()
except Exception as exc: # noqa: BLE001 - request boundary: log and degrade gracefully
logger.exception("Failed to load AppConfig at request time")
raise HTTPException(status_code=503, detail="Configuration not available") from exc
@asynccontextmanager
async def langgraph_runtime(app: FastAPI) -> AsyncGenerator[None, None]:
async def langgraph_runtime(app: FastAPI, startup_config: AppConfig) -> AsyncGenerator[None, None]:
"""Bootstrap and tear down all LangGraph runtime singletons.
``startup_config`` is the ``AppConfig`` snapshot taken once during
``lifespan()`` for one-shot infrastructure bootstrap. The engines and
stores constructed here (stream bridge, persistence engine, checkpointer,
store, run-event store) are restart-required by design — they hold live
connections, file handles, or singleton providers — so they bind to this
snapshot and survive across `config.yaml` edits. Request-time consumers
must still go through :func:`get_config` for any field that should be
hot-reloadable. See ``backend/CLAUDE.md`` "Config Hot-Reload Boundary".
The matching ``run_events_config`` is frozen onto ``app.state`` so
:func:`get_run_context` pairs a freshly-loaded ``AppConfig`` with the
*startup-time* run-events configuration the underlying ``event_store``
was built from — otherwise the runtime could end up combining a live
new ``run_events_config`` with an event store still bound to the
previous backend.
Usage in ``app.py``::
async with langgraph_runtime(app):
async with langgraph_runtime(app, startup_config):
yield
"""
from deerflow.persistence.engine import close_engine, get_session_factory, init_engine_from_config
@@ -53,9 +99,7 @@ async def langgraph_runtime(app: FastAPI) -> AsyncGenerator[None, None]:
from deerflow.runtime.events.store import make_run_event_store
async with AsyncExitStack() as stack:
config = getattr(app.state, "config", None)
if config is None:
raise RuntimeError("langgraph_runtime() requires app.state.config to be initialized")
config = startup_config
app.state.stream_bridge = await stack.enter_async_context(make_stream_bridge(config))
@@ -84,8 +128,12 @@ async def langgraph_runtime(app: FastAPI) -> AsyncGenerator[None, None]:
app.state.thread_store = make_thread_store(sf, app.state.store)
# Run event store (has its own factory with config-driven backend selection)
# Run event store. The store and the matching ``run_events_config`` are
# both frozen at startup so ``get_run_context`` does not combine a
# freshly-reloaded ``AppConfig.run_events`` with a store still bound to
# the previous backend.
run_events_config = getattr(config, "run_events", None)
app.state.run_events_config = run_events_config
app.state.run_event_store = make_run_event_store(run_events_config)
# RunManager with store backing for persistence
@@ -139,16 +187,20 @@ def get_thread_store(request: Request) -> ThreadMetaStore:
def get_run_context(request: Request) -> RunContext:
"""Build a :class:`RunContext` from ``app.state`` singletons.
Returns a *base* context with infrastructure dependencies.
Returns a *base* context with infrastructure dependencies. The
``app_config`` field is resolved live so per-run fields (e.g.
``models[*].max_tokens``) follow ``config.yaml`` edits; the
``event_store`` / ``run_events_config`` pair stays frozen to the snapshot
captured in :func:`langgraph_runtime` so callers never see a store bound
to one backend paired with a config pointing at another.
"""
config = get_config(request)
return RunContext(
checkpointer=get_checkpointer(request),
store=get_store(request),
event_store=get_run_event_store(request),
run_events_config=getattr(config, "run_events", None),
run_events_config=getattr(request.app.state, "run_events_config", None),
thread_store=get_thread_store(request),
app_config=config,
app_config=get_config(),
)
+27 -12
View File
@@ -15,7 +15,8 @@ from collections.abc import Mapping
from typing import Any
from fastapi import HTTPException, Request
from langchain_core.messages import HumanMessage
from langchain_core.messages import BaseMessage
from langchain_core.messages.utils import convert_to_messages
from app.gateway.deps import get_run_context, get_run_manager, get_stream_bridge
from app.gateway.utils import sanitize_log_param
@@ -76,21 +77,35 @@ def normalize_stream_modes(raw: list[str] | str | None) -> list[str]:
def normalize_input(raw_input: dict[str, Any] | None) -> dict[str, Any]:
"""Convert LangGraph Platform input format to LangChain state dict."""
"""Convert LangGraph Platform input format to LangChain state dict.
Delegates dict→message coercion to ``langchain_core.messages.utils.convert_to_messages``
so that ``additional_kwargs`` (e.g. uploaded-file metadata — gh #3132), ``id``,
``name``, and non-human roles (ai/system/tool) survive unchanged. An earlier
hand-rolled version only forwarded ``content`` and collapsed every role to
``HumanMessage``, which silently stripped frontend-supplied attachments.
Malformed message dicts (missing ``role``/``type``/``content``, unsupported
role, etc.) raise ``HTTPException(400)`` with the offending index, instead
of bubbling up as a 500. The gateway is a system boundary, so per-entry
validation errors are the right shape for clients to retry against.
"""
if raw_input is None:
return {}
messages = raw_input.get("messages")
if messages and isinstance(messages, list):
converted = []
for msg in messages:
if isinstance(msg, dict):
role = msg.get("role", msg.get("type", "user"))
content = msg.get("content", "")
if role in ("user", "human"):
converted.append(HumanMessage(content=content))
else:
# TODO: handle other message types (system, ai, tool)
converted.append(HumanMessage(content=content))
converted: list[Any] = []
for index, msg in enumerate(messages):
if isinstance(msg, BaseMessage):
converted.append(msg)
elif isinstance(msg, dict):
try:
converted.extend(convert_to_messages([msg]))
except (ValueError, TypeError, NotImplementedError) as exc:
raise HTTPException(
status_code=400,
detail=f"Invalid message at input.messages[{index}]: {exc}",
) from exc
else:
converted.append(msg)
return {**raw_input, "messages": converted}
@@ -134,9 +134,25 @@ def reset_mcp_tools_cache() -> None:
"""Reset the MCP tools cache.
This is useful for testing or when you want to reload MCP tools.
Also closes all persistent MCP sessions so they are recreated on
the next tool load.
"""
global _mcp_tools_cache, _cache_initialized, _config_mtime
_mcp_tools_cache = None
_cache_initialized = False
_config_mtime = None
# Close persistent sessions they will be recreated by the next
# get_mcp_tools() call with the (possibly updated) connection config.
try:
from deerflow.mcp.session_pool import get_session_pool
pool = get_session_pool()
pool.close_all_sync()
except Exception:
logger.debug("Could not close MCP session pool on cache reset", exc_info=True)
from deerflow.mcp.session_pool import reset_session_pool
reset_session_pool()
logger.info("MCP tools cache reset")
@@ -0,0 +1,198 @@
"""Persistent MCP session pool for stateful tool calls.
When MCP tools are loaded via langchain-mcp-adapters with ``session=None``,
each tool call creates a new MCP session. For stateful servers like Playwright,
this means browser state (opened pages, filled forms) is lost between calls.
This module provides a session pool that maintains persistent MCP sessions,
scoped by ``(server_name, scope_key)`` — typically scope_key is the thread_id —
so that consecutive tool calls share the same session and server-side state.
Sessions are evicted in LRU order when the pool reaches capacity.
"""
from __future__ import annotations
import asyncio
import logging
import threading
from collections import OrderedDict
from typing import Any
from mcp import ClientSession
logger = logging.getLogger(__name__)
class MCPSessionPool:
"""Manages persistent MCP sessions scoped by ``(server_name, scope_key)``."""
MAX_SESSIONS = 256
SESSION_CLOSE_TIMEOUT = 5.0 # seconds to wait when closing a session via run_coroutine_threadsafe
def __init__(self) -> None:
self._entries: OrderedDict[
tuple[str, str],
tuple[ClientSession, asyncio.AbstractEventLoop],
] = OrderedDict()
self._context_managers: dict[tuple[str, str], Any] = {}
# threading.Lock is not bound to any event loop, so it is safe to
# acquire from both async paths and sync/worker-thread paths.
self._lock = threading.Lock()
async def get_session(
self,
server_name: str,
scope_key: str,
connection: dict[str, Any],
) -> ClientSession:
"""Get or create a persistent MCP session.
If an existing session was created in a different event loop (e.g.
the sync-wrapper path), it is closed and replaced with a fresh one
in the current loop.
Args:
server_name: MCP server name.
scope_key: Isolation key (typically thread_id).
connection: Connection configuration for ``create_session``.
Returns:
An initialized ``ClientSession``.
"""
key = (server_name, scope_key)
current_loop = asyncio.get_running_loop()
# Phase 1: inspect/mutate the registry under the thread lock (no awaits).
cms_to_close: list[tuple[tuple[str, str], Any]] = []
with self._lock:
if key in self._entries:
session, loop = self._entries[key]
if loop is current_loop:
self._entries.move_to_end(key)
return session
# Session belongs to a different event loop evict it.
cm = self._context_managers.pop(key, None)
self._entries.pop(key)
if cm is not None:
cms_to_close.append((key, cm))
# Evict LRU entries when at capacity.
while len(self._entries) >= self.MAX_SESSIONS:
oldest_key = next(iter(self._entries))
cm = self._context_managers.pop(oldest_key, None)
self._entries.pop(oldest_key)
if cm is not None:
cms_to_close.append((oldest_key, cm))
# Phase 2: async cleanup outside the lock so we never await while holding it.
for close_key, cm in cms_to_close:
try:
await cm.__aexit__(None, None, None)
except Exception:
logger.warning("Error closing MCP session %s", close_key, exc_info=True)
from langchain_mcp_adapters.sessions import create_session
cm = create_session(connection)
session = await cm.__aenter__()
await session.initialize()
# Phase 3: register the new session under the lock.
with self._lock:
self._entries[key] = (session, current_loop)
self._context_managers[key] = cm
logger.info("Created persistent MCP session for %s/%s", server_name, scope_key)
return session
# ------------------------------------------------------------------
# Cleanup helpers
# ------------------------------------------------------------------
async def _close_cm(self, key: tuple[str, str], cm: Any) -> None:
"""Close a single context manager (must be called WITHOUT the lock)."""
try:
await cm.__aexit__(None, None, None)
except Exception:
logger.warning("Error closing MCP session %s", key, exc_info=True)
async def close_scope(self, scope_key: str) -> None:
"""Close all sessions for a given scope (e.g. thread_id)."""
with self._lock:
keys = [k for k in self._entries if k[1] == scope_key]
cms = [(k, self._context_managers.pop(k, None)) for k in keys]
for k in keys:
self._entries.pop(k, None)
for key, cm in cms:
if cm is not None:
await self._close_cm(key, cm)
async def close_server(self, server_name: str) -> None:
"""Close all sessions for a given server."""
with self._lock:
keys = [k for k in self._entries if k[0] == server_name]
cms = [(k, self._context_managers.pop(k, None)) for k in keys]
for k in keys:
self._entries.pop(k, None)
for key, cm in cms:
if cm is not None:
await self._close_cm(key, cm)
async def close_all(self) -> None:
"""Close every managed session."""
with self._lock:
cms = list(self._context_managers.items())
self._context_managers.clear()
self._entries.clear()
for key, cm in cms:
await self._close_cm(key, cm)
def close_all_sync(self) -> None:
"""Close all sessions using their owning event loops (synchronous).
Each session is closed on the loop it was created in, avoiding
cross-loop resource leaks. Safe to call from any thread without an
active event loop.
"""
with self._lock:
entries = list(self._entries.items())
cms = dict(self._context_managers)
self._entries.clear()
self._context_managers.clear()
for key, (_, loop) in entries:
cm = cms.get(key)
if cm is None or loop.is_closed():
continue
try:
if loop.is_running():
# Schedule on the owning loop from this (different) thread.
future = asyncio.run_coroutine_threadsafe(cm.__aexit__(None, None, None), loop)
future.result(timeout=self.SESSION_CLOSE_TIMEOUT)
else:
loop.run_until_complete(cm.__aexit__(None, None, None))
except Exception:
logger.debug("Error closing MCP session %s during sync close", key, exc_info=True)
# ------------------------------------------------------------------
# Module-level singleton
# ------------------------------------------------------------------
_pool: MCPSessionPool | None = None
_pool_lock = threading.Lock()
def get_session_pool() -> MCPSessionPool:
"""Return the global session-pool singleton."""
global _pool
if _pool is None:
with _pool_lock:
if _pool is None:
_pool = MCPSessionPool()
return _pool
def reset_session_pool() -> None:
"""Reset the singleton (for tests)."""
global _pool
_pool = None
+190 -8
View File
@@ -1,21 +1,181 @@
"""Load MCP tools using langchain-mcp-adapters."""
"""Load MCP tools using langchain-mcp-adapters with persistent sessions."""
from __future__ import annotations
import logging
from typing import Any
from langchain_core.tools import BaseTool
from langchain_core.tools import BaseTool, StructuredTool
from langgraph.config import get_config
from deerflow.config.extensions_config import ExtensionsConfig
from deerflow.mcp.client import build_servers_config
from deerflow.mcp.oauth import build_oauth_tool_interceptor, get_initial_oauth_headers
from deerflow.mcp.session_pool import get_session_pool
from deerflow.reflection import resolve_variable
from deerflow.tools.sync import make_sync_tool_wrapper
from deerflow.tools.types import Runtime
logger = logging.getLogger(__name__)
def _extract_thread_id(runtime: Runtime | None) -> str:
"""Extract thread_id from the injected tool runtime or LangGraph config."""
if runtime is not None:
tid = runtime.context.get("thread_id") if runtime.context else None
if tid is not None:
return str(tid)
config = runtime.config or {}
tid = config.get("configurable", {}).get("thread_id")
if tid is not None:
return str(tid)
try:
tid = get_config().get("configurable", {}).get("thread_id")
return str(tid) if tid is not None else "default"
except RuntimeError:
return "default"
def _convert_call_tool_result(call_tool_result: Any) -> Any:
"""Convert an MCP CallToolResult to the LangChain ``content_and_artifact`` format.
Implements the same conversion logic as the adapter without relying on
the private ``langchain_mcp_adapters.tools._convert_call_tool_result`` symbol.
"""
from langchain_core.messages import ToolMessage
from langchain_core.messages.content import create_file_block, create_image_block, create_text_block
from langchain_core.tools import ToolException
from mcp.types import EmbeddedResource, ImageContent, ResourceLink, TextContent, TextResourceContents
# Pass ToolMessage through directly (interceptor short-circuit).
if isinstance(call_tool_result, ToolMessage):
return call_tool_result, None
# Pass LangGraph Command through directly when langgraph is installed.
try:
from langgraph.types import Command
if isinstance(call_tool_result, Command):
return call_tool_result, None
except ImportError:
# langgraph is optional; if unavailable, continue with standard MCP content conversion.
pass
# Convert MCP content blocks to LangChain content blocks.
lc_content = []
for item in call_tool_result.content:
if isinstance(item, TextContent):
lc_content.append(create_text_block(text=item.text))
elif isinstance(item, ImageContent):
lc_content.append(create_image_block(base64=item.data, mime_type=item.mimeType))
elif isinstance(item, ResourceLink):
mime = item.mimeType or None
if mime and mime.startswith("image/"):
lc_content.append(create_image_block(url=str(item.uri), mime_type=mime))
else:
lc_content.append(create_file_block(url=str(item.uri), mime_type=mime))
elif isinstance(item, EmbeddedResource):
from mcp.types import BlobResourceContents
res = item.resource
if isinstance(res, TextResourceContents):
lc_content.append(create_text_block(text=res.text))
elif isinstance(res, BlobResourceContents):
mime = res.mimeType or None
if mime and mime.startswith("image/"):
lc_content.append(create_image_block(base64=res.blob, mime_type=mime))
else:
lc_content.append(create_file_block(base64=res.blob, mime_type=mime))
else:
lc_content.append(create_text_block(text=str(res)))
else:
lc_content.append(create_text_block(text=str(item)))
if call_tool_result.isError:
error_parts = [item["text"] for item in lc_content if isinstance(item, dict) and item.get("type") == "text"]
raise ToolException("\n".join(error_parts) if error_parts else str(lc_content))
artifact = None
if call_tool_result.structuredContent is not None:
artifact = {"structured_content": call_tool_result.structuredContent}
return lc_content, artifact
def _make_session_pool_tool(
tool: BaseTool,
server_name: str,
connection: dict[str, Any],
tool_interceptors: list[Any] | None = None,
) -> BaseTool:
"""Wrap an MCP tool so it reuses a persistent session from the pool.
Replaces the per-call session creation with pool-managed sessions scoped
by ``(server_name, thread_id)``. This ensures stateful MCP servers (e.g.
Playwright) keep their state across tool calls within the same thread.
The configured ``tool_interceptors`` (OAuth, custom) are preserved and
applied on every call before invoking the pooled session.
"""
# Strip the server-name prefix to recover the original MCP tool name.
original_name = tool.name
prefix = f"{server_name}_"
if original_name.startswith(prefix):
original_name = original_name[len(prefix) :]
pool = get_session_pool()
async def call_with_persistent_session(
runtime: Runtime | None = None,
**arguments: Any,
) -> Any:
thread_id = _extract_thread_id(runtime)
session = await pool.get_session(server_name, thread_id, connection)
if tool_interceptors:
from langchain_mcp_adapters.interceptors import MCPToolCallRequest
async def base_handler(request: MCPToolCallRequest) -> Any:
return await session.call_tool(request.name, request.args)
handler = base_handler
for interceptor in reversed(tool_interceptors):
outer = handler
async def wrapped(req: Any, _i: Any = interceptor, _h: Any = outer) -> Any:
return await _i(req, _h)
handler = wrapped
request = MCPToolCallRequest(
name=original_name,
args=arguments,
server_name=server_name,
runtime=runtime,
)
call_tool_result = await handler(request)
else:
call_tool_result = await session.call_tool(original_name, arguments)
return _convert_call_tool_result(call_tool_result)
return StructuredTool(
name=tool.name,
description=tool.description,
args_schema=tool.args_schema,
coroutine=call_with_persistent_session,
response_format="content_and_artifact",
metadata=tool.metadata,
)
async def get_mcp_tools() -> list[BaseTool]:
"""Get all tools from enabled MCP servers.
Tools are wrapped with persistent-session logic so that consecutive
calls within the same thread reuse the same MCP session.
Returns:
List of LangChain tools from all enabled MCP servers.
"""
@@ -50,7 +210,7 @@ async def get_mcp_tools() -> list[BaseTool]:
existing_headers["Authorization"] = auth_header
servers_config[server_name]["headers"] = existing_headers
tool_interceptors = []
tool_interceptors: list[Any] = []
oauth_interceptor = build_oauth_tool_interceptor(extensions_config)
if oauth_interceptor is not None:
tool_interceptors.append(oauth_interceptor)
@@ -74,20 +234,42 @@ async def get_mcp_tools() -> list[BaseTool]:
elif interceptor is not None:
logger.warning(f"Builder {interceptor_path} returned non-callable {type(interceptor).__name__}; skipping")
except Exception as e:
logger.warning(f"Failed to load MCP interceptor {interceptor_path}: {e}", exc_info=True)
logger.warning(
f"Failed to load MCP interceptor {interceptor_path}: {e}",
exc_info=True,
)
client = MultiServerMCPClient(servers_config, tool_interceptors=tool_interceptors, tool_name_prefix=True)
client = MultiServerMCPClient(
servers_config,
tool_interceptors=tool_interceptors,
tool_name_prefix=True,
)
# Get all tools from all servers
# Get all tools from all servers (discovers tool definitions via
# temporary sessions the persistent-session wrapping is applied below).
tools = await client.get_tools()
logger.info(f"Successfully loaded {len(tools)} tool(s) from MCP servers")
# Patch tools to support sync invocation, as deerflow client streams synchronously
# Wrap each tool with persistent-session logic.
wrapped_tools: list[BaseTool] = []
for tool in tools:
tool_server: str | None = None
for name in servers_config:
if tool.name.startswith(f"{name}_"):
tool_server = name
break
if tool_server is not None:
wrapped_tools.append(_make_session_pool_tool(tool, tool_server, servers_config[tool_server], tool_interceptors))
else:
wrapped_tools.append(tool)
# Patch tools to support sync invocation, as deerflow client streams synchronously
for tool in wrapped_tools:
if getattr(tool, "func", None) is None and getattr(tool, "coroutine", None) is not None:
tool.func = make_sync_tool_wrapper(tool.coroutine, tool.name)
return tools
return wrapped_tools
except Exception as e:
logger.error(f"Failed to load MCP tools: {e}", exc_info=True)
@@ -42,6 +42,7 @@ _DEFAULT_GLOB_MAX_RESULTS = 200
_MAX_GLOB_MAX_RESULTS = 1000
_DEFAULT_GREP_MAX_RESULTS = 100
_MAX_GREP_MAX_RESULTS = 500
_DEFAULT_WRITE_FILE_ERROR_MAX_CHARS = 2000
_LOCAL_BASH_CWD_COMMANDS = {"cd", "pushd"}
_LOCAL_BASH_COMMAND_WRAPPERS = {"command", "builtin"}
_LOCAL_BASH_COMMAND_PREFIX_KEYWORDS = {"!", "{", "case", "do", "elif", "else", "for", "if", "select", "then", "time", "until", "while"}
@@ -435,6 +436,42 @@ def _sanitize_error(error: Exception, runtime: Runtime | None = None) -> str:
return msg
def _truncate_write_file_error_detail(detail: str, max_chars: int) -> str:
"""Middle-truncate write_file error details, preserving the head and tail."""
if max_chars == 0:
return detail
if len(detail) <= max_chars:
return detail
total = len(detail)
marker_max_len = len(f"\n... [write_file error truncated: {total} chars skipped] ...\n")
kept = max(0, max_chars - marker_max_len)
if kept == 0:
return detail[:max_chars]
head_len = kept // 2
tail_len = kept - head_len
skipped = total - kept
marker = f"\n... [write_file error truncated: {skipped} chars skipped] ...\n"
return f"{detail[:head_len]}{marker}{detail[-tail_len:] if tail_len > 0 else ''}"
def _format_write_file_error(
requested_path: str,
error: Exception,
runtime: Runtime | None = None,
*,
max_chars: int = _DEFAULT_WRITE_FILE_ERROR_MAX_CHARS,
) -> str:
"""Return a bounded, sanitized error string for write_file failures."""
header = f"Error: Failed to write file '{requested_path}'"
detail = _sanitize_error(error, runtime)
if max_chars == 0:
return f"{header}: {detail}"
detail_budget = max_chars - len(header) - 2
if detail_budget <= 0:
return _truncate_write_file_error_detail(f"{header}: {detail}", max_chars)
return f"{header}: {_truncate_write_file_error_detail(detail, detail_budget)}"
def replace_virtual_path(path: str, thread_data: ThreadDataState | None) -> str:
"""Replace virtual /mnt/user-data paths with actual thread data paths.
@@ -1651,9 +1688,9 @@ def write_file_tool(
append: Whether to append content to the end of the file instead of overwriting it. Defaults to false.
"""
try:
requested_path = path
sandbox = ensure_sandbox_initialized(runtime)
ensure_thread_directories_exist(runtime)
requested_path = path
if is_local_sandbox(runtime):
thread_data = get_thread_data(runtime)
validate_local_tool_path(path, thread_data)
@@ -1664,15 +1701,21 @@ def write_file_tool(
sandbox.write_file(path, content, append)
return "OK"
except SandboxError as e:
return f"Error: {e}"
return _format_write_file_error(requested_path, e, runtime)
except PermissionError:
return f"Error: Permission denied writing to file: {requested_path}"
return _truncate_write_file_error_detail(
f"Error: Permission denied writing to file: {requested_path}",
_DEFAULT_WRITE_FILE_ERROR_MAX_CHARS,
)
except IsADirectoryError:
return f"Error: Path is a directory, not a file: {requested_path}"
return _truncate_write_file_error_detail(
f"Error: Path is a directory, not a file: {requested_path}",
_DEFAULT_WRITE_FILE_ERROR_MAX_CHARS,
)
except OSError as e:
return f"Error: Failed to write file '{requested_path}': {_sanitize_error(e, runtime)}"
return _format_write_file_error(requested_path, e, runtime)
except Exception as e:
return f"Error: Unexpected error writing file: {_sanitize_error(e, runtime)}"
return _format_write_file_error(requested_path, e, runtime)
async def _write_file_tool_async(
@@ -7,6 +7,7 @@ from dataclasses import replace
from typing import TYPE_CHECKING, Annotated, Any, cast
from langchain.tools import InjectedToolCallId, tool
from langchain_core.callbacks import BaseCallbackManager
from langgraph.config import get_stream_writer
from deerflow.config import get_app_config
@@ -99,15 +100,31 @@ def _schedule_deferred_subagent_cleanup(task_id: str, trace_id: str, max_polls:
def _find_usage_recorder(runtime: Any) -> Any | None:
"""Find a callback handler with ``record_external_llm_usage_records`` in the runtime config."""
"""Find a callback handler with ``record_external_llm_usage_records`` in the runtime config.
LangChain may pass ``config["callbacks"]`` in three different shapes:
- ``None`` (no callbacks registered): no recorder.
- A plain ``list[BaseCallbackHandler]``: iterate it directly.
- A ``BaseCallbackManager`` instance (e.g. ``AsyncCallbackManager`` on async
tool runs): managers are not iterable, so we unwrap ``.handlers`` first.
Any other shape (e.g. a single handler object accidentally passed without a
list wrapper) cannot be iterated safely; treat it as "no recorder" rather
than raise.
"""
if runtime is None:
return None
config = getattr(runtime, "config", None)
if not isinstance(config, dict):
return None
callbacks = config.get("callbacks", [])
callbacks = config.get("callbacks")
if isinstance(callbacks, BaseCallbackManager):
callbacks = callbacks.handlers
if not callbacks:
return None
if not isinstance(callbacks, list):
return None
for cb in callbacks:
if hasattr(cb, "record_external_llm_usage_records"):
return cb
@@ -0,0 +1,189 @@
"""Regression tests for gateway config freshness on the request hot path.
Bytedance/deer-flow issue #3107 BUG-001: the worker and lead-agent path
captured ``app.state.config`` at gateway startup. ``config.yaml`` edits during
runtime were therefore ignored ``get_app_config()``'s mtime-based reload
existed but was bypassed because the snapshot object was passed through
explicitly.
These tests pin the desired behaviour: a request-time ``get_config`` call must
observe the most recent on-disk ``config.yaml`` (mtime reload), and the
runtime ``ContextVar`` override must keep working for per-request injection.
"""
from __future__ import annotations
import os
from pathlib import Path
import pytest
from fastapi import Depends, FastAPI
from fastapi.testclient import TestClient
from app.gateway import deps as gateway_deps
from app.gateway.deps import get_config
from deerflow.config.app_config import (
AppConfig,
pop_current_app_config,
push_current_app_config,
reset_app_config,
set_app_config,
)
from deerflow.config.sandbox_config import SandboxConfig
@pytest.fixture(autouse=True)
def _isolate_app_config_singleton():
"""Ensure each test starts with a clean module-level cache."""
reset_app_config()
yield
reset_app_config()
def _write_config_yaml(path: Path, *, log_level: str) -> None:
path.write_text(
f"""
sandbox:
use: deerflow.sandbox.local.provider:LocalSandboxProvider
log_level: {log_level}
""".strip()
+ "\n",
encoding="utf-8",
)
def _build_app() -> FastAPI:
app = FastAPI()
@app.get("/probe")
def probe(cfg: AppConfig = Depends(get_config)):
return {"log_level": cfg.log_level}
return app
def test_get_config_reflects_file_mtime_reload(tmp_path, monkeypatch):
"""Editing config.yaml at runtime must be visible to /probe without restart.
This is the literal repro for the issue: the gateway must not freeze the
config to whatever was on disk when the process started.
"""
config_file = tmp_path / "config.yaml"
_write_config_yaml(config_file, log_level="info")
monkeypatch.setenv("DEER_FLOW_CONFIG_PATH", str(config_file))
app = _build_app()
client = TestClient(app)
assert client.get("/probe").json() == {"log_level": "info"}
# Edit the file and bump its mtime — simulating a maintainer changing
# max_tokens / model settings in production while the gateway is live.
_write_config_yaml(config_file, log_level="debug")
future_mtime = config_file.stat().st_mtime + 5
os.utime(config_file, (future_mtime, future_mtime))
assert client.get("/probe").json() == {"log_level": "debug"}
def test_get_config_respects_runtime_context_override(tmp_path, monkeypatch):
"""Per-request ``push_current_app_config`` injection must still win."""
config_file = tmp_path / "config.yaml"
_write_config_yaml(config_file, log_level="info")
monkeypatch.setenv("DEER_FLOW_CONFIG_PATH", str(config_file))
override = AppConfig(sandbox=SandboxConfig(use="test"), log_level="trace")
push_current_app_config(override)
try:
app = _build_app()
client = TestClient(app)
assert client.get("/probe").json() == {"log_level": "trace"}
finally:
pop_current_app_config()
def test_get_config_respects_test_set_app_config():
"""``set_app_config`` (used by upload/skills router tests) keeps working."""
injected = AppConfig(sandbox=SandboxConfig(use="test"), log_level="warning")
set_app_config(injected)
app = _build_app()
client = TestClient(app)
assert client.get("/probe").json() == {"log_level": "warning"}
def test_run_context_app_config_reflects_yaml_edit(tmp_path, monkeypatch):
"""``RunContext.app_config`` must follow live `config.yaml` edits.
BUG-001 review feedback: the run-context that feeds worker / lead-agent
factories must observe the same mtime reload that `get_config()` does;
otherwise stale config slips back in through the run path even after the
request dependency is fixed.
"""
from unittest.mock import MagicMock
from app.gateway.deps import get_run_context
config_file = tmp_path / "config.yaml"
_write_config_yaml(config_file, log_level="info")
monkeypatch.setenv("DEER_FLOW_CONFIG_PATH", str(config_file))
app = FastAPI()
# Sentinel values for the rest of the RunContext wiring — we only care
# about ``ctx.app_config`` for this assertion.
app.state.checkpointer = MagicMock()
app.state.store = MagicMock()
app.state.run_event_store = MagicMock()
app.state.run_events_config = {"frozen": "startup"}
app.state.thread_store = MagicMock()
@app.get("/run-ctx-log-level")
def probe(ctx=Depends(get_run_context)):
return {
"log_level": ctx.app_config.log_level,
"run_events_config": ctx.run_events_config,
}
client = TestClient(app)
first = client.get("/run-ctx-log-level").json()
assert first == {"log_level": "info", "run_events_config": {"frozen": "startup"}}
_write_config_yaml(config_file, log_level="debug")
future_mtime = config_file.stat().st_mtime + 5
os.utime(config_file, (future_mtime, future_mtime))
second = client.get("/run-ctx-log-level").json()
# app_config follows the edit; run_events_config stays frozen to the
# startup snapshot we wrote onto app.state above.
assert second == {"log_level": "debug", "run_events_config": {"frozen": "startup"}}
@pytest.mark.parametrize(
"exception",
[
FileNotFoundError("config.yaml not found"),
PermissionError("config.yaml not readable"),
ValueError("invalid config"),
RuntimeError("yaml parse error"),
],
)
def test_get_config_returns_503_on_any_load_failure(monkeypatch, exception):
"""Any failure to materialise the config must surface as 503, not 500.
Bytedance/deer-flow issue #3107 BUG-001 review: the original snapshot
contract returned 503 when ``app.state.config is None``. The first cut of
this fix only mapped ``FileNotFoundError`` to 503, which left
``PermissionError`` / ``yaml.YAMLError`` / ``ValidationError`` etc. bubbling
up as 500. Catch every load failure at the request boundary.
"""
def _broken_get_app_config():
raise exception
monkeypatch.setattr(gateway_deps, "get_app_config", _broken_get_app_config)
app = _build_app()
client = TestClient(app, raise_server_exceptions=False)
response = client.get("/probe")
assert response.status_code == 503
assert response.json() == {"detail": "Configuration not available"}
-41
View File
@@ -1,41 +0,0 @@
from __future__ import annotations
from fastapi import Depends, FastAPI
from fastapi.testclient import TestClient
from app.gateway.deps import get_config
from deerflow.config.app_config import AppConfig
from deerflow.config.sandbox_config import SandboxConfig
def test_get_config_returns_app_state_config():
"""get_config should return the exact AppConfig stored on app.state."""
app = FastAPI()
config = AppConfig(sandbox=SandboxConfig(use="test"))
app.state.config = config
@app.get("/probe")
def probe(cfg: AppConfig = Depends(get_config)):
return {"same_identity": cfg is config, "log_level": cfg.log_level}
client = TestClient(app)
response = client.get("/probe")
assert response.status_code == 200
assert response.json() == {"same_identity": True, "log_level": "info"}
def test_get_config_reads_updated_app_state():
"""Swapping app.state.config should be visible to the dependency."""
app = FastAPI()
app.state.config = AppConfig(sandbox=SandboxConfig(use="test"), log_level="info")
@app.get("/log-level")
def log_level(cfg: AppConfig = Depends(get_config)):
return {"level": cfg.log_level}
client = TestClient(app)
assert client.get("/log-level").json() == {"level": "info"}
app.state.config = app.state.config.model_copy(update={"log_level": "debug"})
assert client.get("/log-level").json() == {"level": "debug"}
@@ -17,7 +17,7 @@ from fastapi import FastAPI
@asynccontextmanager
async def _noop_langgraph_runtime(_app):
async def _noop_langgraph_runtime(_app, _startup_config):
yield
+88
View File
@@ -81,6 +81,94 @@ def test_normalize_input_passthrough():
assert result == {"custom_key": "value"}
def test_normalize_input_preserves_additional_kwargs_and_id():
"""Regression: gh #3132 — frontend ships uploaded-file metadata in
additional_kwargs.files (and a client-side message id). The gateway must
not strip them before the graph runs, otherwise UploadsMiddleware reports
"(empty)" for new uploads and the frontend message loses its file chip.
"""
from langchain_core.messages import HumanMessage
from app.gateway.services import normalize_input
files = [{"filename": "a.csv", "size": 100, "path": "/mnt/user-data/uploads/a.csv", "status": "uploaded"}]
result = normalize_input(
{
"messages": [
{
"type": "human",
"id": "client-msg-1",
"name": "user-input",
"content": [{"type": "text", "text": "clean it"}],
"additional_kwargs": {"files": files, "custom": "keep-me"},
}
]
}
)
assert len(result["messages"]) == 1
msg = result["messages"][0]
assert isinstance(msg, HumanMessage)
assert msg.id == "client-msg-1"
assert msg.name == "user-input"
assert msg.content == [{"type": "text", "text": "clean it"}]
assert msg.additional_kwargs == {"files": files, "custom": "keep-me"}
def test_normalize_input_passes_through_basemessage_instances():
from langchain_core.messages import HumanMessage
from app.gateway.services import normalize_input
msg = HumanMessage(content="hello", id="m-1", additional_kwargs={"files": [{"filename": "x"}]})
result = normalize_input({"messages": [msg]})
assert result["messages"][0] is msg
def test_normalize_input_rejects_malformed_message_with_400():
"""Boundary validation: ``convert_to_messages`` raises ``ValueError`` when a
message dict is missing ``role``/``type``/``content``. ``normalize_input``
runs inside the gateway HTTP boundary, so a malformed payload should surface
as a 400 referencing the offending entry not bubble up as a 500.
Raised after the Copilot review on PR #3136.
"""
import pytest
from fastapi import HTTPException
from app.gateway.services import normalize_input
with pytest.raises(HTTPException) as excinfo:
normalize_input({"messages": [{"role": "human", "content": "ok"}, {"oops": "no role here"}]})
assert excinfo.value.status_code == 400
assert "input.messages[1]" in excinfo.value.detail
def test_normalize_input_handles_non_human_roles():
"""The previous implementation collapsed every role to HumanMessage with a
`# TODO: handle other message types` comment. Resuming a thread with prior
AI/tool messages would silently rewrite them as human turns corrupting
the conversation. Use langchain's standard conversion so ai/system/tool
roles round-trip correctly.
"""
from langchain_core.messages import AIMessage, SystemMessage, ToolMessage
from app.gateway.services import normalize_input
result = normalize_input(
{
"messages": [
{"role": "system", "content": "sys"},
{"role": "ai", "content": "hi", "id": "ai-1"},
{"role": "tool", "content": "result", "tool_call_id": "call-1"},
]
}
)
types = [type(m) for m in result["messages"]]
assert types == [SystemMessage, AIMessage, ToolMessage]
assert result["messages"][1].id == "ai-1"
assert result["messages"][2].tool_call_id == "call-1"
def test_build_run_config_basic():
from app.gateway.services import build_run_config
+409
View File
@@ -0,0 +1,409 @@
"""Tests for the MCP persistent-session pool."""
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from deerflow.mcp.session_pool import MCPSessionPool, get_session_pool, reset_session_pool
@pytest.fixture(autouse=True)
def _reset_pool():
reset_session_pool()
yield
reset_session_pool()
# ---------------------------------------------------------------------------
# MCPSessionPool unit tests
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_get_session_creates_new():
"""First call for a key creates a new session."""
pool = MCPSessionPool()
mock_session = AsyncMock()
mock_cm = MagicMock()
mock_cm.__aenter__ = AsyncMock(return_value=mock_session)
mock_cm.__aexit__ = AsyncMock(return_value=False)
with patch("langchain_mcp_adapters.sessions.create_session", return_value=mock_cm):
session = await pool.get_session("server", "thread-1", {"transport": "stdio", "command": "x", "args": []})
assert session is mock_session
mock_session.initialize.assert_awaited_once()
@pytest.mark.asyncio
async def test_get_session_reuses_existing():
"""Second call for the same key returns the cached session."""
pool = MCPSessionPool()
mock_session = AsyncMock()
mock_cm = MagicMock()
mock_cm.__aenter__ = AsyncMock(return_value=mock_session)
mock_cm.__aexit__ = AsyncMock(return_value=False)
with patch("langchain_mcp_adapters.sessions.create_session", return_value=mock_cm):
s1 = await pool.get_session("server", "thread-1", {"transport": "stdio", "command": "x", "args": []})
s2 = await pool.get_session("server", "thread-1", {"transport": "stdio", "command": "x", "args": []})
assert s1 is s2
# Only one session should have been created.
assert mock_cm.__aenter__.await_count == 1
@pytest.mark.asyncio
async def test_different_scope_creates_different_session():
"""Different scope keys get different sessions."""
pool = MCPSessionPool()
sessions = [AsyncMock(), AsyncMock()]
idx = 0
class CmFactory:
def __init__(self):
self.enter_count = 0
async def __aenter__(self):
nonlocal idx
s = sessions[idx]
idx += 1
self.enter_count += 1
return s
async def __aexit__(self, *args):
return False
with patch("langchain_mcp_adapters.sessions.create_session", side_effect=lambda *a, **kw: CmFactory()):
s1 = await pool.get_session("server", "thread-1", {"transport": "stdio", "command": "x", "args": []})
s2 = await pool.get_session("server", "thread-2", {"transport": "stdio", "command": "x", "args": []})
assert s1 is not s2
assert s1 is sessions[0]
assert s2 is sessions[1]
@pytest.mark.asyncio
async def test_lru_eviction():
"""Oldest entries are evicted when the pool is full."""
pool = MCPSessionPool()
pool.MAX_SESSIONS = 2
class CmFactory:
def __init__(self):
self.closed = False
async def __aenter__(self):
return AsyncMock()
async def __aexit__(self, *args):
self.closed = True
return False
cms: list[CmFactory] = []
def make_cm(*a, **kw):
cm = CmFactory()
cms.append(cm)
return cm
with patch("langchain_mcp_adapters.sessions.create_session", side_effect=make_cm):
await pool.get_session("s", "t1", {"transport": "stdio", "command": "x", "args": []})
await pool.get_session("s", "t2", {"transport": "stdio", "command": "x", "args": []})
# Pool is full (2). Adding t3 should evict t1.
await pool.get_session("s", "t3", {"transport": "stdio", "command": "x", "args": []})
assert cms[0].closed is True
assert cms[1].closed is False
assert cms[2].closed is False
@pytest.mark.asyncio
async def test_close_scope():
"""close_scope shuts down sessions for a specific scope key."""
pool = MCPSessionPool()
class CmFactory:
def __init__(self):
self.closed = False
async def __aenter__(self):
return AsyncMock()
async def __aexit__(self, *args):
self.closed = True
return False
cms: list[CmFactory] = []
def make_cm(*a, **kw):
cm = CmFactory()
cms.append(cm)
return cm
with patch("langchain_mcp_adapters.sessions.create_session", side_effect=make_cm):
await pool.get_session("s", "t1", {"transport": "stdio", "command": "x", "args": []})
await pool.get_session("s", "t2", {"transport": "stdio", "command": "x", "args": []})
await pool.close_scope("t1")
assert cms[0].closed is True
assert cms[1].closed is False
# t2 session still exists.
assert ("s", "t2") in pool._entries
@pytest.mark.asyncio
async def test_close_all():
"""close_all shuts down every session."""
pool = MCPSessionPool()
class CmFactory:
def __init__(self):
self.closed = False
async def __aenter__(self):
return AsyncMock()
async def __aexit__(self, *args):
self.closed = True
return False
cms: list[CmFactory] = []
def make_cm(*a, **kw):
cm = CmFactory()
cms.append(cm)
return cm
with patch("langchain_mcp_adapters.sessions.create_session", side_effect=make_cm):
await pool.get_session("s1", "t1", {"transport": "stdio", "command": "x", "args": []})
await pool.get_session("s2", "t2", {"transport": "stdio", "command": "x", "args": []})
await pool.close_all()
assert all(cm.closed for cm in cms)
assert len(pool._entries) == 0
# ---------------------------------------------------------------------------
# Singleton helpers
# ---------------------------------------------------------------------------
def test_get_session_pool_singleton():
"""get_session_pool returns the same instance."""
p1 = get_session_pool()
p2 = get_session_pool()
assert p1 is p2
def test_reset_session_pool():
"""reset_session_pool clears the singleton."""
p1 = get_session_pool()
reset_session_pool()
p2 = get_session_pool()
assert p1 is not p2
# ---------------------------------------------------------------------------
# Integration: _make_session_pool_tool uses the pool
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_session_pool_tool_wrapping():
"""The wrapper tool delegates to a pool-managed session."""
# Build a dummy StructuredTool (as returned by langchain-mcp-adapters).
from langchain_core.tools import StructuredTool
from pydantic import BaseModel, Field
from deerflow.mcp.tools import _make_session_pool_tool
class Args(BaseModel):
url: str = Field(..., description="url")
original_tool = StructuredTool(
name="playwright_navigate",
description="Navigate browser",
args_schema=Args,
coroutine=AsyncMock(),
response_format="content_and_artifact",
)
mock_session = AsyncMock()
mock_session.call_tool = AsyncMock(return_value=MagicMock(content=[], isError=False, structuredContent=None))
mock_cm = MagicMock()
mock_cm.__aenter__ = AsyncMock(return_value=mock_session)
mock_cm.__aexit__ = AsyncMock(return_value=False)
connection = {"transport": "stdio", "command": "pw", "args": []}
with patch("langchain_mcp_adapters.sessions.create_session", return_value=mock_cm):
wrapped = _make_session_pool_tool(original_tool, "playwright", connection)
# Simulate a tool call with a runtime context containing thread_id.
mock_runtime = MagicMock()
mock_runtime.context = {"thread_id": "thread-42"}
mock_runtime.config = {}
await wrapped.coroutine(runtime=mock_runtime, url="https://example.com")
mock_session.call_tool.assert_awaited_once_with("navigate", {"url": "https://example.com"})
@pytest.mark.asyncio
async def test_session_pool_tool_extracts_thread_id():
"""Thread ID is extracted from runtime.config when not in context."""
from langchain_core.tools import StructuredTool
from pydantic import BaseModel, Field
from deerflow.mcp.tools import _make_session_pool_tool
class Args(BaseModel):
x: int = Field(..., description="x")
original_tool = StructuredTool(
name="server_tool",
description="test",
args_schema=Args,
coroutine=AsyncMock(),
response_format="content_and_artifact",
)
mock_session = AsyncMock()
mock_session.call_tool = AsyncMock(return_value=MagicMock(content=[], isError=False, structuredContent=None))
mock_cm = MagicMock()
mock_cm.__aenter__ = AsyncMock(return_value=mock_session)
mock_cm.__aexit__ = AsyncMock(return_value=False)
with patch("langchain_mcp_adapters.sessions.create_session", return_value=mock_cm):
wrapped = _make_session_pool_tool(original_tool, "server", {"transport": "stdio", "command": "x", "args": []})
mock_runtime = MagicMock()
mock_runtime.context = {}
mock_runtime.config = {"configurable": {"thread_id": "from-config"}}
await wrapped.coroutine(runtime=mock_runtime, x=1)
# Verify the session was created with the correct scope key.
pool = get_session_pool()
assert ("server", "from-config") in pool._entries
@pytest.mark.asyncio
async def test_session_pool_tool_default_scope():
"""When no thread_id is available, 'default' is used as scope key."""
from langchain_core.tools import StructuredTool
from pydantic import BaseModel, Field
from deerflow.mcp.tools import _make_session_pool_tool
class Args(BaseModel):
x: int = Field(..., description="x")
original_tool = StructuredTool(
name="server_tool",
description="test",
args_schema=Args,
coroutine=AsyncMock(),
response_format="content_and_artifact",
)
mock_session = AsyncMock()
mock_session.call_tool = AsyncMock(return_value=MagicMock(content=[], isError=False, structuredContent=None))
mock_cm = MagicMock()
mock_cm.__aenter__ = AsyncMock(return_value=mock_session)
mock_cm.__aexit__ = AsyncMock(return_value=False)
with patch("langchain_mcp_adapters.sessions.create_session", return_value=mock_cm):
wrapped = _make_session_pool_tool(original_tool, "server", {"transport": "stdio", "command": "x", "args": []})
# No thread_id in runtime at all.
await wrapped.coroutine(runtime=None, x=1)
pool = get_session_pool()
assert ("server", "default") in pool._entries
@pytest.mark.asyncio
async def test_session_pool_tool_get_config_fallback():
"""When runtime is None, get_config() provides thread_id as fallback."""
from langchain_core.tools import StructuredTool
from pydantic import BaseModel, Field
from deerflow.mcp.tools import _make_session_pool_tool
class Args(BaseModel):
x: int = Field(..., description="x")
original_tool = StructuredTool(
name="server_tool",
description="test",
args_schema=Args,
coroutine=AsyncMock(),
response_format="content_and_artifact",
)
mock_session = AsyncMock()
mock_session.call_tool = AsyncMock(return_value=MagicMock(content=[], isError=False, structuredContent=None))
mock_cm = MagicMock()
mock_cm.__aenter__ = AsyncMock(return_value=mock_session)
mock_cm.__aexit__ = AsyncMock(return_value=False)
fake_config = {"configurable": {"thread_id": "from-langgraph-config"}}
with (
patch("langchain_mcp_adapters.sessions.create_session", return_value=mock_cm),
patch("deerflow.mcp.tools.get_config", return_value=fake_config),
):
wrapped = _make_session_pool_tool(original_tool, "server", {"transport": "stdio", "command": "x", "args": []})
# runtime=None — get_config() fallback should provide thread_id
await wrapped.coroutine(runtime=None, x=1)
pool = get_session_pool()
assert ("server", "from-langgraph-config") in pool._entries
def test_session_pool_tool_sync_wrapper_path_is_safe():
"""Sync wrapper (tool.func) invocation doesn't crash on cross-loop access."""
from langchain_core.tools import StructuredTool
from pydantic import BaseModel, Field
from deerflow.mcp.tools import _make_session_pool_tool
from deerflow.tools.sync import make_sync_tool_wrapper
class Args(BaseModel):
url: str = Field(..., description="url")
original_tool = StructuredTool(
name="playwright_navigate",
description="Navigate browser",
args_schema=Args,
coroutine=AsyncMock(),
response_format="content_and_artifact",
)
mock_session = AsyncMock()
mock_session.call_tool = AsyncMock(return_value=MagicMock(content=[], isError=False, structuredContent=None))
mock_cm = MagicMock()
mock_cm.__aenter__ = AsyncMock(return_value=mock_session)
mock_cm.__aexit__ = AsyncMock(return_value=False)
connection = {"transport": "stdio", "command": "pw", "args": []}
with patch("langchain_mcp_adapters.sessions.create_session", return_value=mock_cm):
wrapped = _make_session_pool_tool(original_tool, "playwright", connection)
# Attach the sync wrapper exactly as get_mcp_tools() does.
wrapped.func = make_sync_tool_wrapper(wrapped.coroutine, wrapped.name)
# Call via the sync path (asyncio.run in a worker thread).
# runtime is not supplied so _extract_thread_id falls back to "default".
wrapped.func(url="https://example.com")
mock_session.call_tool.assert_called_once_with("navigate", {"url": "https://example.com"})
@@ -5,6 +5,7 @@ from unittest.mock import patch
import pytest
from deerflow.sandbox.exceptions import SandboxError
from deerflow.sandbox.tools import (
VIRTUAL_PATH_PREFIX,
_apply_cwd_prefix,
@@ -1140,6 +1141,170 @@ def test_str_replace_and_append_on_same_path_should_preserve_both_updates(monkey
assert sandbox.content == "ALPHA\ntail\n"
def test_write_file_tool_bounds_large_oserror_and_masks_local_paths(monkeypatch) -> None:
class FailingSandbox:
id = "sandbox-write-large-oserror"
def write_file(self, path: str, content: str, append: bool = False) -> None:
host_path = f"{_THREAD_DATA['workspace_path']}/nested/output.txt"
raise OSError(f"write failed at {host_path}\n{'A' * 12000}\nremote tail marker")
runtime = SimpleNamespace(state={}, context={"thread_id": "thread-1"}, config={})
sandbox = FailingSandbox()
monkeypatch.setattr("deerflow.sandbox.tools.ensure_sandbox_initialized", lambda runtime: sandbox)
monkeypatch.setattr("deerflow.sandbox.tools.ensure_thread_directories_exist", lambda runtime: None)
monkeypatch.setattr("deerflow.sandbox.tools.is_local_sandbox", lambda runtime: True)
monkeypatch.setattr("deerflow.sandbox.tools.get_thread_data", lambda runtime: _THREAD_DATA)
monkeypatch.setattr("deerflow.sandbox.tools.validate_local_tool_path", lambda path, thread_data: None)
monkeypatch.setattr(
"deerflow.sandbox.tools._resolve_and_validate_user_data_path",
lambda path, thread_data: f"{_THREAD_DATA['workspace_path']}/output.txt",
)
result = write_file_tool.func(
runtime=runtime,
description="写入大文件失败",
path="/mnt/user-data/workspace/output.txt",
content="report body",
)
assert len(result) <= 2000
assert "Error: Failed to write file '/mnt/user-data/workspace/output.txt':" in result
assert "/tmp/deer-flow/threads/t1/user-data/workspace" not in result
assert "/mnt/user-data/workspace/nested/output.txt" in result
assert "remote tail marker" in result
assert "[write_file error truncated:" in result
def test_write_file_tool_preserves_short_oserror_without_truncation(monkeypatch) -> None:
class FailingSandbox:
id = "sandbox-write-short-oserror"
def write_file(self, path: str, content: str, append: bool = False) -> None:
raise OSError("disk quota exceeded")
runtime = SimpleNamespace(state={}, context={"thread_id": "thread-1"}, config={})
sandbox = FailingSandbox()
monkeypatch.setattr("deerflow.sandbox.tools.ensure_sandbox_initialized", lambda runtime: sandbox)
monkeypatch.setattr("deerflow.sandbox.tools.ensure_thread_directories_exist", lambda runtime: None)
monkeypatch.setattr("deerflow.sandbox.tools.is_local_sandbox", lambda runtime: False)
result = write_file_tool.func(
runtime=runtime,
description="写入失败",
path="/mnt/user-data/workspace/output.txt",
content="tiny payload",
)
assert result == "Error: Failed to write file '/mnt/user-data/workspace/output.txt': OSError: disk quota exceeded"
assert "[write_file error truncated:" not in result
def test_write_file_tool_bounds_large_sandbox_error(monkeypatch) -> None:
class FailingSandbox:
id = "sandbox-write-large-sandbox-error"
def write_file(self, path: str, content: str, append: bool = False) -> None:
raise SandboxError(f"remote write rejected {'B' * 12000} final detail")
runtime = SimpleNamespace(state={}, context={"thread_id": "thread-1"}, config={})
sandbox = FailingSandbox()
monkeypatch.setattr("deerflow.sandbox.tools.ensure_sandbox_initialized", lambda runtime: sandbox)
monkeypatch.setattr("deerflow.sandbox.tools.ensure_thread_directories_exist", lambda runtime: None)
monkeypatch.setattr("deerflow.sandbox.tools.is_local_sandbox", lambda runtime: False)
result = write_file_tool.func(
runtime=runtime,
description="远端写入失败",
path="/mnt/user-data/workspace/output.txt",
content="tiny payload",
)
assert len(result) <= 2000
assert "Error: Failed to write file '/mnt/user-data/workspace/output.txt':" in result
assert "SandboxError: remote write rejected" in result
assert "final detail" in result
assert "[write_file error truncated:" in result
@pytest.mark.parametrize(
("raised_error", "expected_fragment"),
[
pytest.param(
PermissionError("permission denied"),
"Error: Permission denied writing to file: /mnt/user-data/workspace/output.txt",
id="permission",
),
pytest.param(
IsADirectoryError("target is a directory"),
"Error: Path is a directory, not a file: /mnt/user-data/workspace/output.txt",
id="directory",
),
pytest.param(
Exception("remote sandbox timeout"),
"Exception: remote sandbox timeout",
id="generic",
),
],
)
def test_write_file_tool_formats_all_other_failure_branches(
monkeypatch,
raised_error: Exception,
expected_fragment: str,
) -> None:
class FailingSandbox:
id = "sandbox-write-other-failure"
def write_file(self, path: str, content: str, append: bool = False) -> None:
raise raised_error
runtime = SimpleNamespace(state={}, context={"thread_id": "thread-1"}, config={})
sandbox = FailingSandbox()
monkeypatch.setattr("deerflow.sandbox.tools.ensure_sandbox_initialized", lambda runtime: sandbox)
monkeypatch.setattr("deerflow.sandbox.tools.ensure_thread_directories_exist", lambda runtime: None)
monkeypatch.setattr("deerflow.sandbox.tools.is_local_sandbox", lambda runtime: False)
result = write_file_tool.func(
runtime=runtime,
description="验证错误分支格式化",
path="/mnt/user-data/workspace/output.txt",
content="tiny payload",
)
assert "/mnt/user-data/workspace/output.txt" in result
assert expected_fragment in result
assert "[write_file error truncated:" not in result
def test_write_file_tool_handles_sandbox_init_failure(monkeypatch) -> None:
"""Regression for #3133 review: SandboxError raised during sandbox
initialization (before the local `requested_path` assignment) must still
surface as a bounded tool error rather than an UnboundLocalError.
"""
def raise_sandbox_error(runtime):
raise SandboxError("sandbox missing")
runtime = SimpleNamespace(state={}, context={"thread_id": "thread-1"}, config={})
monkeypatch.setattr("deerflow.sandbox.tools.ensure_sandbox_initialized", raise_sandbox_error)
monkeypatch.setattr("deerflow.sandbox.tools.is_local_sandbox", lambda runtime: False)
result = write_file_tool.func(
runtime=runtime,
description="sandbox 初始化失败",
path="/mnt/user-data/workspace/output.txt",
content="tiny payload",
)
assert "Error: Failed to write file '/mnt/user-data/workspace/output.txt':" in result
assert "SandboxError: sandbox missing" in result
assert "[write_file error truncated:" not in result
def test_file_operation_lock_memory_cleanup() -> None:
"""Verify that released locks are eventually cleaned up by WeakValueDictionary.
+3 -1
View File
@@ -7,6 +7,7 @@ from types import SimpleNamespace
from fastapi import FastAPI
from fastapi.testclient import TestClient
from app.gateway.deps import get_config
from app.gateway.routers import skills as skills_router
from deerflow.skills.storage import get_or_new_skill_storage
from deerflow.skills.types import Skill
@@ -38,7 +39,8 @@ def _make_skill(name: str, *, enabled: bool) -> Skill:
def _make_test_app(config) -> FastAPI:
app = FastAPI()
app.state.config = config
app.state.config = config # kept for any startup-style reads
app.dependency_overrides[get_config] = lambda: config
app.include_router(skills_router.router)
return app
@@ -0,0 +1,91 @@
"""Regression tests for _find_usage_recorder callback shape handling.
Bytedance issue #3107 BUG-002: When LangChain passes ``config["callbacks"]`` as
an ``AsyncCallbackManager`` (instead of a plain list), the previous
``for cb in callbacks`` loop raised ``TypeError: 'AsyncCallbackManager' object
is not iterable``. ToolErrorHandlingMiddleware then converted the entire ``task``
tool call into an error ToolMessage, losing the subagent result.
"""
from types import SimpleNamespace
from langchain_core.callbacks import AsyncCallbackManager, CallbackManager
from deerflow.tools.builtins.task_tool import _find_usage_recorder
class _RecorderHandler:
def record_external_llm_usage_records(self, records):
self.records = records
class _OtherHandler:
pass
def _make_runtime(callbacks):
return SimpleNamespace(config={"callbacks": callbacks})
def test_find_usage_recorder_with_plain_list():
recorder = _RecorderHandler()
runtime = _make_runtime([_OtherHandler(), recorder])
assert _find_usage_recorder(runtime) is recorder
def test_find_usage_recorder_with_async_callback_manager():
"""LangChain wraps callbacks in AsyncCallbackManager for async tool runs.
The old implementation raised TypeError here. The recorder lives on
``manager.handlers``; we must look there too.
"""
recorder = _RecorderHandler()
manager = AsyncCallbackManager(handlers=[_OtherHandler(), recorder])
runtime = _make_runtime(manager)
assert _find_usage_recorder(runtime) is recorder
def test_find_usage_recorder_with_sync_callback_manager():
"""Sync flavor of the same wrapper used by some langchain code paths."""
recorder = _RecorderHandler()
manager = CallbackManager(handlers=[recorder])
runtime = _make_runtime(manager)
assert _find_usage_recorder(runtime) is recorder
def test_find_usage_recorder_returns_none_when_no_recorder():
manager = AsyncCallbackManager(handlers=[_OtherHandler()])
runtime = _make_runtime(manager)
assert _find_usage_recorder(runtime) is None
def test_find_usage_recorder_handles_empty_manager():
manager = AsyncCallbackManager(handlers=[])
runtime = _make_runtime(manager)
assert _find_usage_recorder(runtime) is None
def test_find_usage_recorder_returns_none_for_none_runtime():
assert _find_usage_recorder(None) is None
def test_find_usage_recorder_returns_none_when_callbacks_is_none():
runtime = _make_runtime(None)
assert _find_usage_recorder(runtime) is None
def test_find_usage_recorder_returns_none_for_single_handler_object():
"""A single handler instance (not wrapped in a list or manager) should not crash.
LangChain's contract is that ``config["callbacks"]`` is a list-or-manager,
but we treat any other shape defensively rather than letting a ``for`` loop
blow up at runtime.
"""
runtime = _make_runtime(_RecorderHandler())
assert _find_usage_recorder(runtime) is None
def test_find_usage_recorder_returns_none_when_config_not_dict():
"""Defensive: a runtime without a dict-shaped config should not raise."""
runtime = SimpleNamespace(config="not-a-dict")
assert _find_usage_recorder(runtime) is None
+2
View File
@@ -11,6 +11,7 @@ from _router_auth_helpers import call_unwrapped, make_authed_test_app
from fastapi import HTTPException, UploadFile
from fastapi.testclient import TestClient
from app.gateway.deps import get_config
from app.gateway.routers import uploads
@@ -631,6 +632,7 @@ def test_upload_limits_endpoint_requires_thread_access():
cfg.uploads = {}
app = make_authed_test_app(owner_check_passes=False)
app.state.config = cfg
app.dependency_overrides[get_config] = lambda: cfg
app.include_router(uploads.router)
with TestClient(app) as client:
+12 -6
View File
@@ -118,19 +118,25 @@ models:
# For Docker deployments, use host.docker.internal instead of localhost:
# base_url: http://host.docker.internal:11434
# Example: Anthropic Claude model
# - name: claude-3-5-sonnet
# display_name: Claude 3.5 Sonnet
# Example: Anthropic Claude model (with extended thinking)
# supports_thinking: true is required — without it, DeerFlow silently falls
# back to non-thinking mode even when the UI thinking toggle is on.
# budget_tokens is required by the Anthropic API when thinking.type=enabled
# (no server default; min 1024; must be less than max_tokens).
# - name: claude-sonnet-4
# display_name: Claude Sonnet 4
# use: langchain_anthropic:ChatAnthropic
# model: claude-3-5-sonnet-20241022
# model: claude-sonnet-4-20250514
# api_key: $ANTHROPIC_API_KEY
# default_request_timeout: 600.0
# max_retries: 2
# max_tokens: 8192
# supports_vision: true # Enable vision support for view_image tool
# max_tokens: 16000
# supports_vision: true
# supports_thinking: true
# when_thinking_enabled:
# thinking:
# type: enabled
# budget_tokens: 4096 # required; min 1024; must be < max_tokens
# when_thinking_disabled:
# thinking:
# type: disabled
@@ -27,6 +27,7 @@ import {
import { useRehypeSplitWordsIntoSpans } from "@/core/rehype";
import type { Subtask } from "@/core/tasks";
import { useUpdateSubtask } from "@/core/tasks/context";
import { parseSubtaskResult } from "@/core/tasks/subtask-result";
import type { AgentThreadState } from "@/core/threads";
import { cn } from "@/lib/utils";
@@ -359,33 +360,10 @@ export function MessageList({
} else if (message.type === "tool") {
const taskId = message.tool_call_id;
if (taskId) {
const result = extractTextFromMessage(message);
if (result.startsWith("Task Succeeded. Result:")) {
updateSubtask({
id: taskId,
status: "completed",
result: result
.split("Task Succeeded. Result:")[1]
?.trim(),
});
} else if (result.startsWith("Task failed.")) {
updateSubtask({
id: taskId,
status: "failed",
error: result.split("Task failed.")[1]?.trim(),
});
} else if (result.startsWith("Task timed out")) {
updateSubtask({
id: taskId,
status: "failed",
error: result,
});
} else {
updateSubtask({
id: taskId,
status: "in_progress",
});
}
const parsed = parseSubtaskResult(
extractTextFromMessage(message),
);
updateSubtask({ id: taskId, ...parsed });
}
}
}
+44
View File
@@ -397,6 +397,50 @@ export function stripUploadedFilesTag(content: string): string {
.trim();
}
/**
* Tag names that backend middlewares wrap around internal payloads before
* letting them ride along inside LangGraph message ``content``.
*
* These markers are *not* user copy they come from:
*
* - ``UploadsMiddleware`` ``<uploaded_files>``
* - ``DynamicContextMiddleware`` ``<system-reminder>`` (carrying
* ``<memory>`` / ``<current_date>`` inside)
* - ``TodoListMiddleware`` / ``LoopDetectionMiddleware`` style reminders
* live in ``hide_from_ui`` HumanMessages, but their inner payload uses
* the same tag vocabulary.
*
* The primary export filter is {@link isHiddenFromUIMessage}. This list is
* the defence-in-depth strip for any message that by middleware bug,
* provider quirk, or merge-conflict regression slips through without
* its ``hide_from_ui`` flag set.
*/
export const INTERNAL_MARKER_TAGS = [
"uploaded_files",
"system-reminder",
"memory",
"current_date",
] as const;
const INTERNAL_MARKER_RE = new RegExp(
`<(${INTERNAL_MARKER_TAGS.join("|")})>[\\s\\S]*?</\\1>`,
"g",
);
/**
* Strip every known backend-injected marker from message content.
*
* Intended for the chat export path where a marker leaking through is a
* privacy regression. UI render paths should keep using
* {@link stripUploadedFilesTag} they receive ``hide_from_ui`` messages
* via a separate filter and the narrower function avoids stripping content
* a user might legitimately type into a meta-discussion (e.g. asking the
* model about its own ``<memory>`` system).
*/
export function stripInternalMarkers(content: string): string {
return content.replace(INTERNAL_MARKER_RE, "").trim();
}
export function parseUploadedFiles(content: string): FileInMessage[] {
// Match <uploaded_files>...</uploaded_files> tag
const uploadedFilesRegex = /<uploaded_files>([\s\S]*?)<\/uploaded_files>/;
+88
View File
@@ -0,0 +1,88 @@
import type { Subtask } from "./types";
export type SubtaskStatus = Subtask["status"];
export interface SubtaskResultUpdate {
status: SubtaskStatus;
result?: string;
error?: string;
}
/**
* Prefix strings the backend `task` tool writes into its result `content`.
*
* These values are not user-facing copy they are part of the
* backendfrontend contract defined in
* `backend/packages/harness/deerflow/tools/builtins/task_tool.py` (returned
* from the tool body) and in
* `backend/packages/harness/deerflow/agents/middlewares/tool_error_handling_middleware.py`
* (wrapper for tool exceptions). Any change here must be paired with the
* matching backend change. Exported so a future structured-status migration
* can reference the same values from one place.
*
* `task_tool.py` also emits three `Error:` strings for pre-execution failures
* unknown subagent type, host-bash disabled, and "task disappeared from
* background tasks". They are handled by {@link ERROR_WRAPPER_PATTERN}
* rather than dedicated prefixes because the wrapper already produces
* exactly the right `terminal failed` shape.
*/
export const SUCCESS_PREFIX = "Task Succeeded. Result:";
export const FAILURE_PREFIX = "Task failed.";
export const TIMEOUT_PREFIX = "Task timed out";
export const CANCELLED_PREFIX = "Task cancelled by user.";
export const POLLING_TIMEOUT_PREFIX = "Task polling timed out";
export const ERROR_WRAPPER_PATTERN = /^Error\b/i;
/**
* Map a `task` tool result string to a {@link SubtaskStatus}.
*
* Bytedance/deer-flow issue #3107 BUG-007: parent-visible task tool errors do
* not always start with one of the three legacy prefixes (e.g. when
* `ToolErrorHandlingMiddleware` wraps an exception as
* `Error: Tool 'task' failed ...`). Treat any leading `Error:` token as a
* terminal failure so subtask cards stop being stuck on "in_progress".
*
* Returning `in_progress` is the **deliberate** fallback for content that
* matches none of the known prefixes. LangChain only ever emits a
* `ToolMessage` once the tool itself has returned (success or wrapped
* exception), so an unknown shape means "the contract changed underneath us"
* surfacing it as still-running prompts the operator to investigate, where
* eagerly marking it terminal-failed would mask the drift.
*/
export function parseSubtaskResult(text: string): SubtaskResultUpdate {
const trimmed = text.trim();
if (trimmed.startsWith(SUCCESS_PREFIX)) {
return {
status: "completed",
result: trimmed.slice(SUCCESS_PREFIX.length).trim(),
};
}
if (trimmed.startsWith(FAILURE_PREFIX)) {
return {
status: "failed",
error: trimmed.slice(FAILURE_PREFIX.length).trim(),
};
}
if (trimmed.startsWith(TIMEOUT_PREFIX)) {
return { status: "failed", error: trimmed };
}
if (trimmed.startsWith(CANCELLED_PREFIX)) {
return { status: "failed", error: trimmed };
}
if (trimmed.startsWith(POLLING_TIMEOUT_PREFIX)) {
return { status: "failed", error: trimmed };
}
// ToolErrorHandlingMiddleware-style wrapper, or any other terminal error
// signal the backend forwards to the lead agent.
if (ERROR_WRAPPER_PATTERN.test(trimmed)) {
return { status: "failed", error: trimmed };
}
return { status: "in_progress" };
}
+97 -13
View File
@@ -5,16 +5,53 @@ import {
extractReasoningContentFromMessage,
hasContent,
hasToolCalls,
stripUploadedFilesTag,
isHiddenFromUIMessage,
stripInternalMarkers,
} from "../messages/utils";
import type { AgentThread } from "./types";
import { titleOfThread } from "./utils";
/**
* Optional debug switches for advanced exports.
*
* Bytedance/deer-flow issue #3107 BUG-006 explicitly prescribes that the
* default export includes only the user-visible transcript and excludes
* thinking/reasoning content, tool calls, tool results, hidden messages,
* memory injection, and `<system-reminder>` payloads. These options let a
* future "debug export" surface re-include any of those categories without
* forking the formatter. They are not currently wired to any UI control
* callers that want them must construct the options object explicitly.
*/
export interface ExportOptions {
includeReasoning?: boolean;
includeToolCalls?: boolean;
includeToolMessages?: boolean;
includeHidden?: boolean;
}
function visibleMessages(
messages: Message[],
options: ExportOptions,
): Message[] {
return messages.filter((message) => {
if (!options.includeHidden && isHiddenFromUIMessage(message)) {
return false;
}
if (!options.includeToolMessages && message.type === "tool") {
return false;
}
return true;
});
}
function formatMessageContent(message: Message): string {
const text = extractContentFromMessage(message);
if (!text) return "";
return stripUploadedFilesTag(text);
// Defence-in-depth: even if a middleware-injected marker slipped through
// the `hide_from_ui` filter, scrub every known internal tag before the
// content lands in a user-visible export file.
return stripInternalMarkers(text);
}
function formatToolCalls(message: Message): string {
@@ -26,6 +63,7 @@ function formatToolCalls(message: Message): string {
export function formatThreadAsMarkdown(
thread: AgentThread,
messages: Message[],
options: ExportOptions = {},
): string {
const title = titleOfThread(thread);
const createdAt = thread.created_at
@@ -41,16 +79,20 @@ export function formatThreadAsMarkdown(
"",
];
for (const message of messages) {
for (const message of visibleMessages(messages, options)) {
if (message.type === "human") {
const content = formatMessageContent(message);
if (content) {
lines.push(`## 🧑 User`, "", content, "", "---", "");
}
} else if (message.type === "ai") {
const reasoning = extractReasoningContentFromMessage(message);
const reasoning = options.includeReasoning
? extractReasoningContentFromMessage(message)
: undefined;
const content = formatMessageContent(message);
const toolCalls = formatToolCalls(message);
const toolCalls = options.includeToolCalls
? formatToolCalls(message)
: "";
if (!content && !toolCalls && !reasoning) continue;
@@ -83,23 +125,65 @@ export function formatThreadAsMarkdown(
return lines.join("\n").trimEnd() + "\n";
}
interface JSONExportMessage {
type: Message["type"];
id: string | undefined;
content: string;
reasoning?: string;
tool_calls?: unknown;
}
function buildJSONMessage(
msg: Message,
options: ExportOptions,
): JSONExportMessage | null {
// Run the same sanitiser the Markdown path uses so the JSON `content`
// field never carries inline `<think>...</think>` wrappers, content-array
// thinking blocks, `<uploaded_files>` markers, or other internal payloads.
const content = formatMessageContent(msg);
const reasoning =
options.includeReasoning && msg.type === "ai"
? (extractReasoningContentFromMessage(msg) ?? undefined)
: undefined;
const toolCalls =
options.includeToolCalls &&
msg.type === "ai" &&
"tool_calls" in msg &&
msg.tool_calls?.length
? msg.tool_calls
: undefined;
// Drop rows with no exportable payload (empty content + no opted-in
// reasoning / tool_calls). Uses falsy semantics so `reasoning: ""` (the
// empty string ``extractReasoningContentFromMessage`` can hand back) is
// treated the same way Markdown's `!reasoning` continue does — otherwise
// an opted-in but empty reasoning field would leak as `{reasoning: ""}`.
if (!content && !reasoning && !toolCalls) {
return null;
}
return {
type: msg.type,
id: msg.id,
content,
...(reasoning !== undefined ? { reasoning } : {}),
...(toolCalls !== undefined ? { tool_calls: toolCalls } : {}),
};
}
export function formatThreadAsJSON(
thread: AgentThread,
messages: Message[],
options: ExportOptions = {},
): string {
const exportData = {
title: titleOfThread(thread),
thread_id: thread.thread_id,
created_at: thread.created_at,
exported_at: new Date().toISOString(),
messages: messages.map((msg) => ({
type: msg.type,
id: msg.id,
content: typeof msg.content === "string" ? msg.content : msg.content,
...(msg.type === "ai" && msg.tool_calls?.length
? { tool_calls: msg.tool_calls }
: {}),
})),
messages: visibleMessages(messages, options)
.map((msg) => buildJSONMessage(msg, options))
.filter((m): m is JSONExportMessage => m !== null),
};
return JSON.stringify(exportData, null, 2);
}
@@ -0,0 +1,112 @@
import { describe, expect, it } from "vitest";
import { parseSubtaskResult } from "@/core/tasks/subtask-result";
describe("parseSubtaskResult", () => {
it("recognises the standard success prefix", () => {
const parsed = parseSubtaskResult(
"Task Succeeded. Result: investigated and produced a 3-page report",
);
expect(parsed.status).toBe("completed");
expect(parsed.result).toBe("investigated and produced a 3-page report");
});
it("recognises the standard failure prefix", () => {
const parsed = parseSubtaskResult(
"Task failed. underlying tool raised RuntimeError",
);
expect(parsed.status).toBe("failed");
expect(parsed.error).toBe("underlying tool raised RuntimeError");
});
it("recognises the standard timeout prefix", () => {
const parsed = parseSubtaskResult("Task timed out after 900s");
expect(parsed.status).toBe("failed");
expect(parsed.error).toBe("Task timed out after 900s");
});
it("recognises the cancelled-by-user prefix", () => {
// bytedance/deer-flow#3131 review: this is one of the five terminal
// strings task_tool.py actually emits — the previous cut treated it as
// unrecognised content and pushed the card back to in_progress.
const parsed = parseSubtaskResult("Task cancelled by user.");
expect(parsed.status).toBe("failed");
expect(parsed.error).toBe("Task cancelled by user.");
});
it("recognises the polling-timed-out prefix", () => {
// Emitted by task_tool when the background polling loop runs out of
// budget waiting for the subagent to reach a terminal state.
const parsed = parseSubtaskResult(
"Task polling timed out after 15 minutes. This may indicate the background task is stuck. Status: RUNNING",
);
expect(parsed.status).toBe("failed");
expect(parsed.error).toContain("polling timed out");
});
it("recognises polling-timed-out with different durations", () => {
// `task_tool` emits `Task polling timed out after {N} minutes` where N
// varies with the configured subagent timeout. Guard against the regex
// accidentally being pinned to a specific number.
for (const n of [1, 5, 60]) {
const parsed = parseSubtaskResult(
`Task polling timed out after ${n} minutes. Status: RUNNING`,
);
expect(parsed.status).toBe("failed");
}
});
it("trims whitespace around cancelled and polling-timed-out prefixes", () => {
// Streaming chunks sometimes arrive with leading/trailing newlines.
expect(parseSubtaskResult(" Task cancelled by user. \n").status).toBe(
"failed",
);
expect(
parseSubtaskResult("\n\nTask polling timed out after 3 minutes").status,
).toBe("failed");
});
it("recognises task_tool pre-execution Error: returns via the wrapper", () => {
// `task_tool.py` returns three `Error:` strings for unknown subagent
// type, host-bash disabled, and "task disappeared". They share the
// ERROR_WRAPPER_PATTERN, not a dedicated prefix, so this guards
// against a refactor splitting them off.
for (const text of [
"Error: Unknown subagent type 'foo'. Available: bash, general-purpose",
"Error: Host bash subagent is disabled by configuration",
"Error: Task 1234 disappeared from background tasks",
]) {
expect(parseSubtaskResult(text).status).toBe("failed");
}
});
it("treats middleware-wrapped tool errors as terminal failures", () => {
// bytedance/deer-flow issue #3107 BUG-007: the parent-visible ToolMessage
// produced by ToolErrorHandlingMiddleware never matches the three legacy
// prefixes, so subtask cards stay stuck on "in_progress".
const parsed = parseSubtaskResult(
"Error: Tool 'task' failed with TypeError: 'AsyncCallbackManager' object is not iterable. Continue with available context, or choose an alternative tool.",
);
expect(parsed.status).toBe("failed");
expect(parsed.error).toContain("AsyncCallbackManager");
});
it("treats any other Error: prefix as a terminal failure", () => {
const parsed = parseSubtaskResult("Error: subagent worker pool exhausted");
expect(parsed.status).toBe("failed");
});
it("keeps unrecognised non-error output as in_progress", () => {
// Streaming partial chunks should not flip the card to terminal early.
const parsed = parseSubtaskResult("Investigating ...");
expect(parsed.status).toBe("in_progress");
expect(parsed.error).toBeUndefined();
expect(parsed.result).toBeUndefined();
});
it("trims surrounding whitespace before matching prefixes", () => {
const parsed = parseSubtaskResult(" Task Succeeded. Result: ok ");
expect(parsed.status).toBe("completed");
expect(parsed.result).toBe("ok");
});
});
@@ -0,0 +1,317 @@
import type { Message } from "@langchain/langgraph-sdk";
import { describe, expect, it } from "vitest";
import {
formatThreadAsJSON,
formatThreadAsMarkdown,
} from "@/core/threads/export";
import type { AgentThread } from "@/core/threads/types";
// Bytedance/deer-flow issue #3107 BUG-006: the chat export path bypasses the
// UI-level hidden-message filter and emits reasoning content, tool calls, and
// any other "internal" payload as if it were part of the user transcript.
function makeThread(): AgentThread {
return {
thread_id: "thread-1",
created_at: "2026-05-21T00:00:00Z",
updated_at: "2026-05-21T00:00:00Z",
metadata: { title: "Demo thread" },
status: "idle",
values: { messages: [] },
} as unknown as AgentThread;
}
function human(content: string, extra: Partial<Message> = {}): Message {
return {
id: `h-${content}`,
type: "human",
content,
...extra,
} as Message;
}
function ai(
content: string,
extra: Partial<Message> & { tool_calls?: unknown } = {},
): Message {
return {
id: `a-${content}`,
type: "ai",
content,
...extra,
} as Message;
}
function toolMsg(content: string): Message {
return {
id: `t-${content}`,
type: "tool",
content,
name: "task",
tool_call_id: "call-1",
} as unknown as Message;
}
describe("formatThreadAsMarkdown", () => {
it("includes plain user and assistant text", () => {
const md = formatThreadAsMarkdown(makeThread(), [
human("hello"),
ai("hi there"),
]);
expect(md).toContain("hello");
expect(md).toContain("hi there");
});
it("drops messages marked hide_from_ui", () => {
const hidden = human("internal system reminder", {
additional_kwargs: { hide_from_ui: true },
} as Partial<Message>);
const md = formatThreadAsMarkdown(makeThread(), [
hidden,
ai("public answer"),
]);
expect(md).not.toContain("internal system reminder");
expect(md).toContain("public answer");
});
it("does not emit reasoning_content by default", () => {
const message = ai("final answer", {
additional_kwargs: {
reasoning_content: "secret chain of thought",
},
} as Partial<Message>);
const md = formatThreadAsMarkdown(makeThread(), [message]);
expect(md).not.toContain("secret chain of thought");
expect(md).not.toContain("Thinking");
});
it("does not emit tool calls by default", () => {
const message = ai("ok", {
tool_calls: [{ id: "1", name: "task", args: { description: "do work" } }],
} as Partial<Message>);
const md = formatThreadAsMarkdown(makeThread(), [message]);
expect(md).not.toContain("**Tool:**");
expect(md).not.toContain("`task`");
});
it("drops tool result messages", () => {
const md = formatThreadAsMarkdown(makeThread(), [
ai("delegating"),
toolMsg("Task Succeeded. Result: confidential"),
]);
expect(md).not.toContain("confidential");
});
});
describe("formatThreadAsMarkdown opt-in flags", () => {
it("emits reasoning when includeReasoning is true", () => {
const message = ai("final answer", {
additional_kwargs: {
reasoning_content: "step-by-step chain of thought",
},
} as Partial<Message>);
const md = formatThreadAsMarkdown(makeThread(), [message], {
includeReasoning: true,
});
expect(md).toContain("step-by-step chain of thought");
expect(md).toContain("Thinking");
});
it("emits tool call rows when includeToolCalls is true", () => {
const message = ai("ok", {
tool_calls: [{ id: "1", name: "task", args: { description: "do work" } }],
} as Partial<Message>);
const md = formatThreadAsMarkdown(makeThread(), [message], {
includeToolCalls: true,
});
expect(md).toContain("**Tool:**");
expect(md).toContain("`task`");
});
it("keeps hidden messages when includeHidden is true", () => {
const hidden = human("internal reminder", {
additional_kwargs: { hide_from_ui: true },
} as Partial<Message>);
const md = formatThreadAsMarkdown(makeThread(), [hidden], {
includeHidden: true,
});
expect(md).toContain("internal reminder");
});
});
describe("formatThreadAsJSON opt-in flags", () => {
it("emits tool_calls field when includeToolCalls is true", () => {
const message = ai("ok", {
tool_calls: [{ id: "1", name: "task", args: { description: "x" } }],
} as Partial<Message>);
const raw = formatThreadAsJSON(makeThread(), [message], {
includeToolCalls: true,
});
expect(raw).toContain("tool_calls");
expect(raw).toContain('"task"');
});
it("keeps tool messages when includeToolMessages is true", () => {
const raw = formatThreadAsJSON(
makeThread(),
[toolMsg("Task Succeeded. Result: keep me")],
{ includeToolMessages: true },
);
const parsed = JSON.parse(raw) as { messages: { type: string }[] };
expect(parsed.messages.some((m) => m.type === "tool")).toBe(true);
expect(raw).toContain("keep me");
});
});
describe("formatThreadAsJSON", () => {
it("strips hidden messages, tool messages, reasoning, and tool calls", () => {
const messages = [
human("hello"),
human("secret reminder", {
additional_kwargs: { hide_from_ui: true },
} as Partial<Message>),
ai("answer", {
additional_kwargs: {
reasoning_content: "secret reasoning",
},
tool_calls: [{ id: "1", name: "task", args: {} }],
} as Partial<Message>),
toolMsg("internal trace"),
];
const raw = formatThreadAsJSON(makeThread(), messages);
const parsed = JSON.parse(raw) as {
messages: { type: string; tool_calls?: unknown[] }[];
};
expect(parsed.messages).toHaveLength(2);
expect(parsed.messages.every((m) => m.type !== "tool")).toBe(true);
expect(raw).not.toContain("secret reminder");
expect(raw).not.toContain("secret reasoning");
expect(raw).not.toContain("internal trace");
expect(raw).not.toContain("tool_calls");
});
it("strips inline <think>...</think> wrappers from content", () => {
// bytedance/deer-flow#3131 review: JSON export must run the same
// sanitiser the Markdown path uses so inline reasoning never leaks
// even when `includeReasoning` is left at its default false.
const message = ai("<think>internal monologue</think>visible answer", {
id: "ai-1",
} as Partial<Message>);
const raw = formatThreadAsJSON(makeThread(), [message]);
expect(raw).not.toContain("internal monologue");
expect(raw).not.toContain("<think>");
expect(raw).toContain("visible answer");
});
it("strips content-array thinking blocks from content", () => {
const message = ai("placeholder", {
id: "ai-2",
content: [
{ type: "thinking", thinking: "hidden reasoning step" },
{ type: "text", text: "final visible text" },
],
} as unknown as Partial<Message>);
const raw = formatThreadAsJSON(makeThread(), [message]);
expect(raw).not.toContain("hidden reasoning step");
expect(raw).toContain("final visible text");
});
it("strips <uploaded_files> markers from content", () => {
const message = human(
"real prompt\n<uploaded_files>\n/mnt/user-data/uploads/secret.pdf\n</uploaded_files>",
{ id: "h-clean" } as Partial<Message>,
);
const raw = formatThreadAsJSON(makeThread(), [message]);
expect(raw).not.toContain("<uploaded_files>");
expect(raw).not.toContain("secret.pdf");
expect(raw).toContain("real prompt");
});
it("drops AI messages that sanitise to empty content", () => {
// Pure-reasoning AI fragments (no visible text, no tool calls) should
// not survive as `{content: ""}` rows in the export.
const message = ai("<think>only thinking, no answer</think>", {
id: "ai-3",
} as Partial<Message>);
const raw = formatThreadAsJSON(makeThread(), [message]);
const parsed = JSON.parse(raw) as { messages: unknown[] };
expect(parsed.messages).toHaveLength(0);
});
it("strips <system-reminder>/<memory>/<current_date> as defence in depth", () => {
// Primary protection is `isHiddenFromUIMessage` filtering the whole
// hidden HumanMessage. If a regression strips the `hide_from_ui` flag
// (or the marker leaks into an otherwise-visible message), the
// sanitiser must still scrub the payload before export.
const leaky = human("real user text", {
id: "leak-1",
content:
"<system-reminder>\n<memory>secret fact A</memory>\n<current_date>2026-01-01, Tuesday</current_date>\n</system-reminder>\nreal user text",
// Deliberately *not* setting hide_from_ui to model the regression
// case the defence-in-depth strip is guarding against.
} as unknown as Partial<Message>);
const raw = formatThreadAsJSON(makeThread(), [leaky]);
expect(raw).not.toContain("<system-reminder>");
expect(raw).not.toContain("<memory>");
expect(raw).not.toContain("<current_date>");
expect(raw).not.toContain("secret fact A");
expect(raw).toContain("real user text");
});
it("sanitises tool message content when includeToolMessages is true", () => {
const message = {
id: "t-leak",
type: "tool",
content:
"Task Succeeded. Result: payload\n<uploaded_files>\n/mnt/user-data/uploads/secret.pdf\n</uploaded_files>",
name: "task",
tool_call_id: "call-leak",
} as unknown as Message;
const raw = formatThreadAsJSON(makeThread(), [message], {
includeToolMessages: true,
});
expect(raw).toContain("Task Succeeded");
expect(raw).not.toContain("<uploaded_files>");
expect(raw).not.toContain("secret.pdf");
});
it("preserves text and image_url parts in mixed content arrays", () => {
// `extractContentFromMessage` keeps `text` and `image_url` parts and
// drops `thinking` parts. The JSON export must agree with that
// contract.
const message = ai("placeholder", {
id: "ai-mixed",
content: [
{ type: "thinking", thinking: "internal reasoning" },
{ type: "text", text: "user-visible answer" },
{
type: "image_url",
image_url: { url: "https://example.invalid/cat.png" },
},
],
} as unknown as Partial<Message>);
const raw = formatThreadAsJSON(makeThread(), [message]);
expect(raw).toContain("user-visible answer");
expect(raw).toContain("https://example.invalid/cat.png");
expect(raw).not.toContain("internal reasoning");
});
it("drops opted-in empty reasoning rather than emit reasoning: ''", () => {
// `extractReasoningContentFromMessage` can legitimately hand back ""
// for an AI message that has no reasoning content. The export must
// mirror the Markdown path's `!reasoning` `continue` and drop the row
// instead of leaking `{reasoning: ""}`.
const message = ai("", {
id: "ai-empty-reasoning",
additional_kwargs: { reasoning_content: "" },
} as Partial<Message>);
const raw = formatThreadAsJSON(makeThread(), [message], {
includeReasoning: true,
});
const parsed = JSON.parse(raw) as { messages: unknown[] };
expect(parsed.messages).toHaveLength(0);
});
});