mirror of
https://github.com/bytedance/deer-flow.git
synced 2026-05-21 15:36:48 +00:00
Compare commits
5 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| c881d95898 | |||
| e93f658472 | |||
| 4cb2a22400 | |||
| 9c03a71a07 | |||
| 1c5c585741 |
@@ -184,6 +184,18 @@ Setup: Copy `config.example.yaml` to `config.yaml` in the **project root** direc
|
||||
|
||||
**Config Caching**: `get_app_config()` caches the parsed config, but automatically reloads it when the resolved config path changes or the file's mtime increases. This keeps Gateway and LangGraph reads aligned with `config.yaml` edits without requiring a manual process restart.
|
||||
|
||||
**Config Hot-Reload Boundary**: Gateway dependencies route through `get_app_config()` on every request, so per-run fields like `models[*].max_tokens`, `summarization.*`, `title.*`, `memory.*`, `subagents.*`, `tools[*]`, and the agent system prompt pick up `config.yaml` edits on the next message. `AppConfig` is intentionally **not** cached on `app.state` — `lifespan()` keeps a local `startup_config` variable for one-shot bootstrap work (logging level, channels, `langgraph_runtime` engines) and passes it explicitly to `langgraph_runtime(app, startup_config)`. Infrastructure fields are **restart-required**:
|
||||
|
||||
| Field | Why a restart is required |
|
||||
|---|---|
|
||||
| `database.*` | `init_engine_from_config()` runs once during `langgraph_runtime()` startup; the SQLAlchemy engine holds the connection pool. |
|
||||
| `checkpointer.*` (including SQLite WAL/journal settings) | `make_checkpointer()` binds the persistent checkpointer once at startup. |
|
||||
| `run_events.*` | `make_run_event_store()` selects memory- vs. SQL-backed implementation at startup. |
|
||||
| `stream_bridge.*` | `make_stream_bridge()` constructs the bridge object once. |
|
||||
| `sandbox.use` | `get_sandbox_provider()` caches the provider singleton (`_default_sandbox_provider`); a new class path takes effect only on next process start. |
|
||||
| `log_level` | `apply_logging_level()` is called only in `app.py` startup; it mutates the root logger's level, and `get_app_config()` returning a fresh `AppConfig` does not retrigger it. |
|
||||
| `channels.*` IM platform credentials | `start_channel_service()` is invoked once during startup; live channels are not rebuilt on config change. |
|
||||
|
||||
Configuration priority:
|
||||
1. Explicit `config_path` argument
|
||||
2. `DEER_FLOW_CONFIG_PATH` environment variable
|
||||
|
||||
@@ -161,10 +161,16 @@ async def _migrate_orphaned_threads(store, admin_user_id: str) -> int:
|
||||
async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
|
||||
"""Application lifespan handler."""
|
||||
|
||||
# Load config and check necessary environment variables at startup
|
||||
# Load config and check necessary environment variables at startup.
|
||||
# `startup_config` is a local snapshot used only for one-shot bootstrap
|
||||
# work (logging level, langgraph_runtime engines, channels). Request-time
|
||||
# config resolution always routes through `get_app_config()` in
|
||||
# `app/gateway/deps.py::get_config()` so `config.yaml` edits become
|
||||
# visible without a process restart. We deliberately do NOT cache this
|
||||
# snapshot on `app.state` to keep that contract enforceable.
|
||||
try:
|
||||
app.state.config = get_app_config()
|
||||
apply_logging_level(app.state.config.log_level)
|
||||
startup_config = get_app_config()
|
||||
apply_logging_level(startup_config.log_level)
|
||||
logger.info("Configuration loaded successfully")
|
||||
except Exception as e:
|
||||
error_msg = f"Failed to load configuration during gateway startup: {e}"
|
||||
@@ -174,7 +180,7 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
|
||||
logger.info(f"Starting API Gateway on {config.host}:{config.port}")
|
||||
|
||||
# Initialize LangGraph runtime components (StreamBridge, RunManager, checkpointer, store)
|
||||
async with langgraph_runtime(app):
|
||||
async with langgraph_runtime(app, startup_config):
|
||||
logger.info("LangGraph runtime initialised")
|
||||
|
||||
# Check admin bootstrap state and migrate orphan threads after admin exists.
|
||||
@@ -185,7 +191,7 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
|
||||
try:
|
||||
from app.channels.service import start_channel_service
|
||||
|
||||
channel_service = await start_channel_service(app.state.config)
|
||||
channel_service = await start_channel_service(startup_config)
|
||||
logger.info("Channel service started: %s", channel_service.get_status())
|
||||
except Exception:
|
||||
logger.exception("No IM channels configured or channel service failed to start")
|
||||
|
||||
+69
-17
@@ -3,11 +3,21 @@
|
||||
**Getters** (used by routers): raise 503 when a required dependency is
|
||||
missing, except ``get_store`` which returns ``None``.
|
||||
|
||||
``AppConfig`` is intentionally *not* cached on ``app.state``. Routers and the
|
||||
run path resolve it through :func:`deerflow.config.app_config.get_app_config`,
|
||||
which performs mtime-based hot reload, so edits to ``config.yaml`` take
|
||||
effect on the next request without a process restart. The engines created in
|
||||
:func:`langgraph_runtime` (stream bridge, persistence, checkpointer, store,
|
||||
run-event store) accept a ``startup_config`` snapshot — they are
|
||||
restart-required by design and stay bound to that snapshot to keep the live
|
||||
process consistent with itself.
|
||||
|
||||
Initialization is handled directly in ``app.py`` via :class:`AsyncExitStack`.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from collections.abc import AsyncGenerator, Callable
|
||||
from contextlib import AsyncExitStack, asynccontextmanager
|
||||
from typing import TYPE_CHECKING, TypeVar, cast
|
||||
@@ -15,12 +25,14 @@ from typing import TYPE_CHECKING, TypeVar, cast
|
||||
from fastapi import FastAPI, HTTPException, Request
|
||||
from langgraph.types import Checkpointer
|
||||
|
||||
from deerflow.config.app_config import AppConfig
|
||||
from deerflow.config.app_config import AppConfig, get_app_config
|
||||
from deerflow.persistence.feedback import FeedbackRepository
|
||||
from deerflow.runtime import RunContext, RunManager, StreamBridge
|
||||
from deerflow.runtime.events.store.base import RunEventStore
|
||||
from deerflow.runtime.runs.store.base import RunStore
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from app.gateway.auth.local_provider import LocalAuthProvider
|
||||
from app.gateway.auth.repositories.sqlite import SQLiteUserRepository
|
||||
@@ -30,21 +42,55 @@ if TYPE_CHECKING:
|
||||
T = TypeVar("T")
|
||||
|
||||
|
||||
def get_config(request: Request) -> AppConfig:
|
||||
"""Return the app-scoped ``AppConfig`` stored on ``app.state``."""
|
||||
config = getattr(request.app.state, "config", None)
|
||||
if config is None:
|
||||
raise HTTPException(status_code=503, detail="Configuration not available")
|
||||
return config
|
||||
def get_config() -> AppConfig:
|
||||
"""Return the freshest ``AppConfig`` for the current request.
|
||||
|
||||
Routes through :func:`deerflow.config.app_config.get_app_config`, which
|
||||
honours runtime ``ContextVar`` overrides and reloads ``config.yaml`` from
|
||||
disk when its mtime changes. ``AppConfig`` is not cached on ``app.state``
|
||||
at all — the only startup-time snapshot lives as a local
|
||||
``startup_config`` variable inside ``lifespan()`` and is passed
|
||||
explicitly into :func:`langgraph_runtime` for the engines that are
|
||||
restart-required by design. Routing every request through
|
||||
:func:`get_app_config` closes the bytedance/deer-flow issue #3107 BUG-001
|
||||
split-brain where the worker / lead-agent thread saw a stale startup
|
||||
snapshot.
|
||||
|
||||
Any failure to materialise the config (missing file, permission denied,
|
||||
YAML parse error, validation error) is reported as 503 — semantically
|
||||
"the gateway cannot serve requests without a usable configuration" — and
|
||||
logged with the original exception so operators have something to debug.
|
||||
"""
|
||||
try:
|
||||
return get_app_config()
|
||||
except Exception as exc: # noqa: BLE001 - request boundary: log and degrade gracefully
|
||||
logger.exception("Failed to load AppConfig at request time")
|
||||
raise HTTPException(status_code=503, detail="Configuration not available") from exc
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def langgraph_runtime(app: FastAPI) -> AsyncGenerator[None, None]:
|
||||
async def langgraph_runtime(app: FastAPI, startup_config: AppConfig) -> AsyncGenerator[None, None]:
|
||||
"""Bootstrap and tear down all LangGraph runtime singletons.
|
||||
|
||||
``startup_config`` is the ``AppConfig`` snapshot taken once during
|
||||
``lifespan()`` for one-shot infrastructure bootstrap. The engines and
|
||||
stores constructed here (stream bridge, persistence engine, checkpointer,
|
||||
store, run-event store) are restart-required by design — they hold live
|
||||
connections, file handles, or singleton providers — so they bind to this
|
||||
snapshot and survive across `config.yaml` edits. Request-time consumers
|
||||
must still go through :func:`get_config` for any field that should be
|
||||
hot-reloadable. See ``backend/CLAUDE.md`` "Config Hot-Reload Boundary".
|
||||
|
||||
The matching ``run_events_config`` is frozen onto ``app.state`` so
|
||||
:func:`get_run_context` pairs a freshly-loaded ``AppConfig`` with the
|
||||
*startup-time* run-events configuration the underlying ``event_store``
|
||||
was built from — otherwise the runtime could end up combining a live
|
||||
new ``run_events_config`` with an event store still bound to the
|
||||
previous backend.
|
||||
|
||||
Usage in ``app.py``::
|
||||
|
||||
async with langgraph_runtime(app):
|
||||
async with langgraph_runtime(app, startup_config):
|
||||
yield
|
||||
"""
|
||||
from deerflow.persistence.engine import close_engine, get_session_factory, init_engine_from_config
|
||||
@@ -53,9 +99,7 @@ async def langgraph_runtime(app: FastAPI) -> AsyncGenerator[None, None]:
|
||||
from deerflow.runtime.events.store import make_run_event_store
|
||||
|
||||
async with AsyncExitStack() as stack:
|
||||
config = getattr(app.state, "config", None)
|
||||
if config is None:
|
||||
raise RuntimeError("langgraph_runtime() requires app.state.config to be initialized")
|
||||
config = startup_config
|
||||
|
||||
app.state.stream_bridge = await stack.enter_async_context(make_stream_bridge(config))
|
||||
|
||||
@@ -84,8 +128,12 @@ async def langgraph_runtime(app: FastAPI) -> AsyncGenerator[None, None]:
|
||||
|
||||
app.state.thread_store = make_thread_store(sf, app.state.store)
|
||||
|
||||
# Run event store (has its own factory with config-driven backend selection)
|
||||
# Run event store. The store and the matching ``run_events_config`` are
|
||||
# both frozen at startup so ``get_run_context`` does not combine a
|
||||
# freshly-reloaded ``AppConfig.run_events`` with a store still bound to
|
||||
# the previous backend.
|
||||
run_events_config = getattr(config, "run_events", None)
|
||||
app.state.run_events_config = run_events_config
|
||||
app.state.run_event_store = make_run_event_store(run_events_config)
|
||||
|
||||
# RunManager with store backing for persistence
|
||||
@@ -139,16 +187,20 @@ def get_thread_store(request: Request) -> ThreadMetaStore:
|
||||
def get_run_context(request: Request) -> RunContext:
|
||||
"""Build a :class:`RunContext` from ``app.state`` singletons.
|
||||
|
||||
Returns a *base* context with infrastructure dependencies.
|
||||
Returns a *base* context with infrastructure dependencies. The
|
||||
``app_config`` field is resolved live so per-run fields (e.g.
|
||||
``models[*].max_tokens``) follow ``config.yaml`` edits; the
|
||||
``event_store`` / ``run_events_config`` pair stays frozen to the snapshot
|
||||
captured in :func:`langgraph_runtime` so callers never see a store bound
|
||||
to one backend paired with a config pointing at another.
|
||||
"""
|
||||
config = get_config(request)
|
||||
return RunContext(
|
||||
checkpointer=get_checkpointer(request),
|
||||
store=get_store(request),
|
||||
event_store=get_run_event_store(request),
|
||||
run_events_config=getattr(config, "run_events", None),
|
||||
run_events_config=getattr(request.app.state, "run_events_config", None),
|
||||
thread_store=get_thread_store(request),
|
||||
app_config=config,
|
||||
app_config=get_config(),
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -15,7 +15,8 @@ from collections.abc import Mapping
|
||||
from typing import Any
|
||||
|
||||
from fastapi import HTTPException, Request
|
||||
from langchain_core.messages import HumanMessage
|
||||
from langchain_core.messages import BaseMessage
|
||||
from langchain_core.messages.utils import convert_to_messages
|
||||
|
||||
from app.gateway.deps import get_run_context, get_run_manager, get_stream_bridge
|
||||
from app.gateway.utils import sanitize_log_param
|
||||
@@ -76,21 +77,35 @@ def normalize_stream_modes(raw: list[str] | str | None) -> list[str]:
|
||||
|
||||
|
||||
def normalize_input(raw_input: dict[str, Any] | None) -> dict[str, Any]:
|
||||
"""Convert LangGraph Platform input format to LangChain state dict."""
|
||||
"""Convert LangGraph Platform input format to LangChain state dict.
|
||||
|
||||
Delegates dict→message coercion to ``langchain_core.messages.utils.convert_to_messages``
|
||||
so that ``additional_kwargs`` (e.g. uploaded-file metadata — gh #3132), ``id``,
|
||||
``name``, and non-human roles (ai/system/tool) survive unchanged. An earlier
|
||||
hand-rolled version only forwarded ``content`` and collapsed every role to
|
||||
``HumanMessage``, which silently stripped frontend-supplied attachments.
|
||||
|
||||
Malformed message dicts (missing ``role``/``type``/``content``, unsupported
|
||||
role, etc.) raise ``HTTPException(400)`` with the offending index, instead
|
||||
of bubbling up as a 500. The gateway is a system boundary, so per-entry
|
||||
validation errors are the right shape for clients to retry against.
|
||||
"""
|
||||
if raw_input is None:
|
||||
return {}
|
||||
messages = raw_input.get("messages")
|
||||
if messages and isinstance(messages, list):
|
||||
converted = []
|
||||
for msg in messages:
|
||||
if isinstance(msg, dict):
|
||||
role = msg.get("role", msg.get("type", "user"))
|
||||
content = msg.get("content", "")
|
||||
if role in ("user", "human"):
|
||||
converted.append(HumanMessage(content=content))
|
||||
else:
|
||||
# TODO: handle other message types (system, ai, tool)
|
||||
converted.append(HumanMessage(content=content))
|
||||
converted: list[Any] = []
|
||||
for index, msg in enumerate(messages):
|
||||
if isinstance(msg, BaseMessage):
|
||||
converted.append(msg)
|
||||
elif isinstance(msg, dict):
|
||||
try:
|
||||
converted.extend(convert_to_messages([msg]))
|
||||
except (ValueError, TypeError, NotImplementedError) as exc:
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail=f"Invalid message at input.messages[{index}]: {exc}",
|
||||
) from exc
|
||||
else:
|
||||
converted.append(msg)
|
||||
return {**raw_input, "messages": converted}
|
||||
|
||||
@@ -134,9 +134,25 @@ def reset_mcp_tools_cache() -> None:
|
||||
"""Reset the MCP tools cache.
|
||||
|
||||
This is useful for testing or when you want to reload MCP tools.
|
||||
Also closes all persistent MCP sessions so they are recreated on
|
||||
the next tool load.
|
||||
"""
|
||||
global _mcp_tools_cache, _cache_initialized, _config_mtime
|
||||
_mcp_tools_cache = None
|
||||
_cache_initialized = False
|
||||
_config_mtime = None
|
||||
|
||||
# Close persistent sessions – they will be recreated by the next
|
||||
# get_mcp_tools() call with the (possibly updated) connection config.
|
||||
try:
|
||||
from deerflow.mcp.session_pool import get_session_pool
|
||||
|
||||
pool = get_session_pool()
|
||||
pool.close_all_sync()
|
||||
except Exception:
|
||||
logger.debug("Could not close MCP session pool on cache reset", exc_info=True)
|
||||
|
||||
from deerflow.mcp.session_pool import reset_session_pool
|
||||
|
||||
reset_session_pool()
|
||||
logger.info("MCP tools cache reset")
|
||||
|
||||
@@ -0,0 +1,198 @@
|
||||
"""Persistent MCP session pool for stateful tool calls.
|
||||
|
||||
When MCP tools are loaded via langchain-mcp-adapters with ``session=None``,
|
||||
each tool call creates a new MCP session. For stateful servers like Playwright,
|
||||
this means browser state (opened pages, filled forms) is lost between calls.
|
||||
|
||||
This module provides a session pool that maintains persistent MCP sessions,
|
||||
scoped by ``(server_name, scope_key)`` — typically scope_key is the thread_id —
|
||||
so that consecutive tool calls share the same session and server-side state.
|
||||
Sessions are evicted in LRU order when the pool reaches capacity.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import threading
|
||||
from collections import OrderedDict
|
||||
from typing import Any
|
||||
|
||||
from mcp import ClientSession
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class MCPSessionPool:
|
||||
"""Manages persistent MCP sessions scoped by ``(server_name, scope_key)``."""
|
||||
|
||||
MAX_SESSIONS = 256
|
||||
SESSION_CLOSE_TIMEOUT = 5.0 # seconds to wait when closing a session via run_coroutine_threadsafe
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._entries: OrderedDict[
|
||||
tuple[str, str],
|
||||
tuple[ClientSession, asyncio.AbstractEventLoop],
|
||||
] = OrderedDict()
|
||||
self._context_managers: dict[tuple[str, str], Any] = {}
|
||||
# threading.Lock is not bound to any event loop, so it is safe to
|
||||
# acquire from both async paths and sync/worker-thread paths.
|
||||
self._lock = threading.Lock()
|
||||
|
||||
async def get_session(
|
||||
self,
|
||||
server_name: str,
|
||||
scope_key: str,
|
||||
connection: dict[str, Any],
|
||||
) -> ClientSession:
|
||||
"""Get or create a persistent MCP session.
|
||||
|
||||
If an existing session was created in a different event loop (e.g.
|
||||
the sync-wrapper path), it is closed and replaced with a fresh one
|
||||
in the current loop.
|
||||
|
||||
Args:
|
||||
server_name: MCP server name.
|
||||
scope_key: Isolation key (typically thread_id).
|
||||
connection: Connection configuration for ``create_session``.
|
||||
|
||||
Returns:
|
||||
An initialized ``ClientSession``.
|
||||
"""
|
||||
key = (server_name, scope_key)
|
||||
current_loop = asyncio.get_running_loop()
|
||||
|
||||
# Phase 1: inspect/mutate the registry under the thread lock (no awaits).
|
||||
cms_to_close: list[tuple[tuple[str, str], Any]] = []
|
||||
with self._lock:
|
||||
if key in self._entries:
|
||||
session, loop = self._entries[key]
|
||||
if loop is current_loop:
|
||||
self._entries.move_to_end(key)
|
||||
return session
|
||||
# Session belongs to a different event loop – evict it.
|
||||
cm = self._context_managers.pop(key, None)
|
||||
self._entries.pop(key)
|
||||
if cm is not None:
|
||||
cms_to_close.append((key, cm))
|
||||
|
||||
# Evict LRU entries when at capacity.
|
||||
while len(self._entries) >= self.MAX_SESSIONS:
|
||||
oldest_key = next(iter(self._entries))
|
||||
cm = self._context_managers.pop(oldest_key, None)
|
||||
self._entries.pop(oldest_key)
|
||||
if cm is not None:
|
||||
cms_to_close.append((oldest_key, cm))
|
||||
|
||||
# Phase 2: async cleanup outside the lock so we never await while holding it.
|
||||
for close_key, cm in cms_to_close:
|
||||
try:
|
||||
await cm.__aexit__(None, None, None)
|
||||
except Exception:
|
||||
logger.warning("Error closing MCP session %s", close_key, exc_info=True)
|
||||
|
||||
from langchain_mcp_adapters.sessions import create_session
|
||||
|
||||
cm = create_session(connection)
|
||||
session = await cm.__aenter__()
|
||||
await session.initialize()
|
||||
|
||||
# Phase 3: register the new session under the lock.
|
||||
with self._lock:
|
||||
self._entries[key] = (session, current_loop)
|
||||
self._context_managers[key] = cm
|
||||
logger.info("Created persistent MCP session for %s/%s", server_name, scope_key)
|
||||
return session
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Cleanup helpers
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
async def _close_cm(self, key: tuple[str, str], cm: Any) -> None:
|
||||
"""Close a single context manager (must be called WITHOUT the lock)."""
|
||||
try:
|
||||
await cm.__aexit__(None, None, None)
|
||||
except Exception:
|
||||
logger.warning("Error closing MCP session %s", key, exc_info=True)
|
||||
|
||||
async def close_scope(self, scope_key: str) -> None:
|
||||
"""Close all sessions for a given scope (e.g. thread_id)."""
|
||||
with self._lock:
|
||||
keys = [k for k in self._entries if k[1] == scope_key]
|
||||
cms = [(k, self._context_managers.pop(k, None)) for k in keys]
|
||||
for k in keys:
|
||||
self._entries.pop(k, None)
|
||||
for key, cm in cms:
|
||||
if cm is not None:
|
||||
await self._close_cm(key, cm)
|
||||
|
||||
async def close_server(self, server_name: str) -> None:
|
||||
"""Close all sessions for a given server."""
|
||||
with self._lock:
|
||||
keys = [k for k in self._entries if k[0] == server_name]
|
||||
cms = [(k, self._context_managers.pop(k, None)) for k in keys]
|
||||
for k in keys:
|
||||
self._entries.pop(k, None)
|
||||
for key, cm in cms:
|
||||
if cm is not None:
|
||||
await self._close_cm(key, cm)
|
||||
|
||||
async def close_all(self) -> None:
|
||||
"""Close every managed session."""
|
||||
with self._lock:
|
||||
cms = list(self._context_managers.items())
|
||||
self._context_managers.clear()
|
||||
self._entries.clear()
|
||||
for key, cm in cms:
|
||||
await self._close_cm(key, cm)
|
||||
|
||||
def close_all_sync(self) -> None:
|
||||
"""Close all sessions using their owning event loops (synchronous).
|
||||
|
||||
Each session is closed on the loop it was created in, avoiding
|
||||
cross-loop resource leaks. Safe to call from any thread without an
|
||||
active event loop.
|
||||
"""
|
||||
with self._lock:
|
||||
entries = list(self._entries.items())
|
||||
cms = dict(self._context_managers)
|
||||
self._entries.clear()
|
||||
self._context_managers.clear()
|
||||
|
||||
for key, (_, loop) in entries:
|
||||
cm = cms.get(key)
|
||||
if cm is None or loop.is_closed():
|
||||
continue
|
||||
try:
|
||||
if loop.is_running():
|
||||
# Schedule on the owning loop from this (different) thread.
|
||||
future = asyncio.run_coroutine_threadsafe(cm.__aexit__(None, None, None), loop)
|
||||
future.result(timeout=self.SESSION_CLOSE_TIMEOUT)
|
||||
else:
|
||||
loop.run_until_complete(cm.__aexit__(None, None, None))
|
||||
except Exception:
|
||||
logger.debug("Error closing MCP session %s during sync close", key, exc_info=True)
|
||||
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Module-level singleton
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
_pool: MCPSessionPool | None = None
|
||||
_pool_lock = threading.Lock()
|
||||
|
||||
|
||||
def get_session_pool() -> MCPSessionPool:
|
||||
"""Return the global session-pool singleton."""
|
||||
global _pool
|
||||
if _pool is None:
|
||||
with _pool_lock:
|
||||
if _pool is None:
|
||||
_pool = MCPSessionPool()
|
||||
return _pool
|
||||
|
||||
|
||||
def reset_session_pool() -> None:
|
||||
"""Reset the singleton (for tests)."""
|
||||
global _pool
|
||||
_pool = None
|
||||
@@ -1,21 +1,181 @@
|
||||
"""Load MCP tools using langchain-mcp-adapters."""
|
||||
"""Load MCP tools using langchain-mcp-adapters with persistent sessions."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from typing import Any
|
||||
|
||||
from langchain_core.tools import BaseTool
|
||||
from langchain_core.tools import BaseTool, StructuredTool
|
||||
from langgraph.config import get_config
|
||||
|
||||
from deerflow.config.extensions_config import ExtensionsConfig
|
||||
from deerflow.mcp.client import build_servers_config
|
||||
from deerflow.mcp.oauth import build_oauth_tool_interceptor, get_initial_oauth_headers
|
||||
from deerflow.mcp.session_pool import get_session_pool
|
||||
from deerflow.reflection import resolve_variable
|
||||
from deerflow.tools.sync import make_sync_tool_wrapper
|
||||
from deerflow.tools.types import Runtime
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _extract_thread_id(runtime: Runtime | None) -> str:
|
||||
"""Extract thread_id from the injected tool runtime or LangGraph config."""
|
||||
if runtime is not None:
|
||||
tid = runtime.context.get("thread_id") if runtime.context else None
|
||||
if tid is not None:
|
||||
return str(tid)
|
||||
config = runtime.config or {}
|
||||
tid = config.get("configurable", {}).get("thread_id")
|
||||
if tid is not None:
|
||||
return str(tid)
|
||||
|
||||
try:
|
||||
tid = get_config().get("configurable", {}).get("thread_id")
|
||||
return str(tid) if tid is not None else "default"
|
||||
except RuntimeError:
|
||||
return "default"
|
||||
|
||||
|
||||
def _convert_call_tool_result(call_tool_result: Any) -> Any:
|
||||
"""Convert an MCP CallToolResult to the LangChain ``content_and_artifact`` format.
|
||||
|
||||
Implements the same conversion logic as the adapter without relying on
|
||||
the private ``langchain_mcp_adapters.tools._convert_call_tool_result`` symbol.
|
||||
"""
|
||||
from langchain_core.messages import ToolMessage
|
||||
from langchain_core.messages.content import create_file_block, create_image_block, create_text_block
|
||||
from langchain_core.tools import ToolException
|
||||
from mcp.types import EmbeddedResource, ImageContent, ResourceLink, TextContent, TextResourceContents
|
||||
|
||||
# Pass ToolMessage through directly (interceptor short-circuit).
|
||||
if isinstance(call_tool_result, ToolMessage):
|
||||
return call_tool_result, None
|
||||
|
||||
# Pass LangGraph Command through directly when langgraph is installed.
|
||||
try:
|
||||
from langgraph.types import Command
|
||||
|
||||
if isinstance(call_tool_result, Command):
|
||||
return call_tool_result, None
|
||||
except ImportError:
|
||||
# langgraph is optional; if unavailable, continue with standard MCP content conversion.
|
||||
pass
|
||||
|
||||
# Convert MCP content blocks to LangChain content blocks.
|
||||
lc_content = []
|
||||
for item in call_tool_result.content:
|
||||
if isinstance(item, TextContent):
|
||||
lc_content.append(create_text_block(text=item.text))
|
||||
elif isinstance(item, ImageContent):
|
||||
lc_content.append(create_image_block(base64=item.data, mime_type=item.mimeType))
|
||||
elif isinstance(item, ResourceLink):
|
||||
mime = item.mimeType or None
|
||||
if mime and mime.startswith("image/"):
|
||||
lc_content.append(create_image_block(url=str(item.uri), mime_type=mime))
|
||||
else:
|
||||
lc_content.append(create_file_block(url=str(item.uri), mime_type=mime))
|
||||
elif isinstance(item, EmbeddedResource):
|
||||
from mcp.types import BlobResourceContents
|
||||
|
||||
res = item.resource
|
||||
if isinstance(res, TextResourceContents):
|
||||
lc_content.append(create_text_block(text=res.text))
|
||||
elif isinstance(res, BlobResourceContents):
|
||||
mime = res.mimeType or None
|
||||
if mime and mime.startswith("image/"):
|
||||
lc_content.append(create_image_block(base64=res.blob, mime_type=mime))
|
||||
else:
|
||||
lc_content.append(create_file_block(base64=res.blob, mime_type=mime))
|
||||
else:
|
||||
lc_content.append(create_text_block(text=str(res)))
|
||||
else:
|
||||
lc_content.append(create_text_block(text=str(item)))
|
||||
|
||||
if call_tool_result.isError:
|
||||
error_parts = [item["text"] for item in lc_content if isinstance(item, dict) and item.get("type") == "text"]
|
||||
raise ToolException("\n".join(error_parts) if error_parts else str(lc_content))
|
||||
|
||||
artifact = None
|
||||
if call_tool_result.structuredContent is not None:
|
||||
artifact = {"structured_content": call_tool_result.structuredContent}
|
||||
|
||||
return lc_content, artifact
|
||||
|
||||
|
||||
def _make_session_pool_tool(
|
||||
tool: BaseTool,
|
||||
server_name: str,
|
||||
connection: dict[str, Any],
|
||||
tool_interceptors: list[Any] | None = None,
|
||||
) -> BaseTool:
|
||||
"""Wrap an MCP tool so it reuses a persistent session from the pool.
|
||||
|
||||
Replaces the per-call session creation with pool-managed sessions scoped
|
||||
by ``(server_name, thread_id)``. This ensures stateful MCP servers (e.g.
|
||||
Playwright) keep their state across tool calls within the same thread.
|
||||
|
||||
The configured ``tool_interceptors`` (OAuth, custom) are preserved and
|
||||
applied on every call before invoking the pooled session.
|
||||
"""
|
||||
# Strip the server-name prefix to recover the original MCP tool name.
|
||||
original_name = tool.name
|
||||
prefix = f"{server_name}_"
|
||||
if original_name.startswith(prefix):
|
||||
original_name = original_name[len(prefix) :]
|
||||
|
||||
pool = get_session_pool()
|
||||
|
||||
async def call_with_persistent_session(
|
||||
runtime: Runtime | None = None,
|
||||
**arguments: Any,
|
||||
) -> Any:
|
||||
thread_id = _extract_thread_id(runtime)
|
||||
session = await pool.get_session(server_name, thread_id, connection)
|
||||
|
||||
if tool_interceptors:
|
||||
from langchain_mcp_adapters.interceptors import MCPToolCallRequest
|
||||
|
||||
async def base_handler(request: MCPToolCallRequest) -> Any:
|
||||
return await session.call_tool(request.name, request.args)
|
||||
|
||||
handler = base_handler
|
||||
for interceptor in reversed(tool_interceptors):
|
||||
outer = handler
|
||||
|
||||
async def wrapped(req: Any, _i: Any = interceptor, _h: Any = outer) -> Any:
|
||||
return await _i(req, _h)
|
||||
|
||||
handler = wrapped
|
||||
|
||||
request = MCPToolCallRequest(
|
||||
name=original_name,
|
||||
args=arguments,
|
||||
server_name=server_name,
|
||||
runtime=runtime,
|
||||
)
|
||||
call_tool_result = await handler(request)
|
||||
else:
|
||||
call_tool_result = await session.call_tool(original_name, arguments)
|
||||
|
||||
return _convert_call_tool_result(call_tool_result)
|
||||
|
||||
return StructuredTool(
|
||||
name=tool.name,
|
||||
description=tool.description,
|
||||
args_schema=tool.args_schema,
|
||||
coroutine=call_with_persistent_session,
|
||||
response_format="content_and_artifact",
|
||||
metadata=tool.metadata,
|
||||
)
|
||||
|
||||
|
||||
async def get_mcp_tools() -> list[BaseTool]:
|
||||
"""Get all tools from enabled MCP servers.
|
||||
|
||||
Tools are wrapped with persistent-session logic so that consecutive
|
||||
calls within the same thread reuse the same MCP session.
|
||||
|
||||
Returns:
|
||||
List of LangChain tools from all enabled MCP servers.
|
||||
"""
|
||||
@@ -50,7 +210,7 @@ async def get_mcp_tools() -> list[BaseTool]:
|
||||
existing_headers["Authorization"] = auth_header
|
||||
servers_config[server_name]["headers"] = existing_headers
|
||||
|
||||
tool_interceptors = []
|
||||
tool_interceptors: list[Any] = []
|
||||
oauth_interceptor = build_oauth_tool_interceptor(extensions_config)
|
||||
if oauth_interceptor is not None:
|
||||
tool_interceptors.append(oauth_interceptor)
|
||||
@@ -74,20 +234,42 @@ async def get_mcp_tools() -> list[BaseTool]:
|
||||
elif interceptor is not None:
|
||||
logger.warning(f"Builder {interceptor_path} returned non-callable {type(interceptor).__name__}; skipping")
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to load MCP interceptor {interceptor_path}: {e}", exc_info=True)
|
||||
logger.warning(
|
||||
f"Failed to load MCP interceptor {interceptor_path}: {e}",
|
||||
exc_info=True,
|
||||
)
|
||||
|
||||
client = MultiServerMCPClient(servers_config, tool_interceptors=tool_interceptors, tool_name_prefix=True)
|
||||
client = MultiServerMCPClient(
|
||||
servers_config,
|
||||
tool_interceptors=tool_interceptors,
|
||||
tool_name_prefix=True,
|
||||
)
|
||||
|
||||
# Get all tools from all servers
|
||||
# Get all tools from all servers (discovers tool definitions via
|
||||
# temporary sessions – the persistent-session wrapping is applied below).
|
||||
tools = await client.get_tools()
|
||||
logger.info(f"Successfully loaded {len(tools)} tool(s) from MCP servers")
|
||||
|
||||
# Patch tools to support sync invocation, as deerflow client streams synchronously
|
||||
# Wrap each tool with persistent-session logic.
|
||||
wrapped_tools: list[BaseTool] = []
|
||||
for tool in tools:
|
||||
tool_server: str | None = None
|
||||
for name in servers_config:
|
||||
if tool.name.startswith(f"{name}_"):
|
||||
tool_server = name
|
||||
break
|
||||
|
||||
if tool_server is not None:
|
||||
wrapped_tools.append(_make_session_pool_tool(tool, tool_server, servers_config[tool_server], tool_interceptors))
|
||||
else:
|
||||
wrapped_tools.append(tool)
|
||||
|
||||
# Patch tools to support sync invocation, as deerflow client streams synchronously
|
||||
for tool in wrapped_tools:
|
||||
if getattr(tool, "func", None) is None and getattr(tool, "coroutine", None) is not None:
|
||||
tool.func = make_sync_tool_wrapper(tool.coroutine, tool.name)
|
||||
|
||||
return tools
|
||||
return wrapped_tools
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to load MCP tools: {e}", exc_info=True)
|
||||
|
||||
@@ -42,6 +42,7 @@ _DEFAULT_GLOB_MAX_RESULTS = 200
|
||||
_MAX_GLOB_MAX_RESULTS = 1000
|
||||
_DEFAULT_GREP_MAX_RESULTS = 100
|
||||
_MAX_GREP_MAX_RESULTS = 500
|
||||
_DEFAULT_WRITE_FILE_ERROR_MAX_CHARS = 2000
|
||||
_LOCAL_BASH_CWD_COMMANDS = {"cd", "pushd"}
|
||||
_LOCAL_BASH_COMMAND_WRAPPERS = {"command", "builtin"}
|
||||
_LOCAL_BASH_COMMAND_PREFIX_KEYWORDS = {"!", "{", "case", "do", "elif", "else", "for", "if", "select", "then", "time", "until", "while"}
|
||||
@@ -435,6 +436,42 @@ def _sanitize_error(error: Exception, runtime: Runtime | None = None) -> str:
|
||||
return msg
|
||||
|
||||
|
||||
def _truncate_write_file_error_detail(detail: str, max_chars: int) -> str:
|
||||
"""Middle-truncate write_file error details, preserving the head and tail."""
|
||||
if max_chars == 0:
|
||||
return detail
|
||||
if len(detail) <= max_chars:
|
||||
return detail
|
||||
total = len(detail)
|
||||
marker_max_len = len(f"\n... [write_file error truncated: {total} chars skipped] ...\n")
|
||||
kept = max(0, max_chars - marker_max_len)
|
||||
if kept == 0:
|
||||
return detail[:max_chars]
|
||||
head_len = kept // 2
|
||||
tail_len = kept - head_len
|
||||
skipped = total - kept
|
||||
marker = f"\n... [write_file error truncated: {skipped} chars skipped] ...\n"
|
||||
return f"{detail[:head_len]}{marker}{detail[-tail_len:] if tail_len > 0 else ''}"
|
||||
|
||||
|
||||
def _format_write_file_error(
|
||||
requested_path: str,
|
||||
error: Exception,
|
||||
runtime: Runtime | None = None,
|
||||
*,
|
||||
max_chars: int = _DEFAULT_WRITE_FILE_ERROR_MAX_CHARS,
|
||||
) -> str:
|
||||
"""Return a bounded, sanitized error string for write_file failures."""
|
||||
header = f"Error: Failed to write file '{requested_path}'"
|
||||
detail = _sanitize_error(error, runtime)
|
||||
if max_chars == 0:
|
||||
return f"{header}: {detail}"
|
||||
detail_budget = max_chars - len(header) - 2
|
||||
if detail_budget <= 0:
|
||||
return _truncate_write_file_error_detail(f"{header}: {detail}", max_chars)
|
||||
return f"{header}: {_truncate_write_file_error_detail(detail, detail_budget)}"
|
||||
|
||||
|
||||
def replace_virtual_path(path: str, thread_data: ThreadDataState | None) -> str:
|
||||
"""Replace virtual /mnt/user-data paths with actual thread data paths.
|
||||
|
||||
@@ -1651,9 +1688,9 @@ def write_file_tool(
|
||||
append: Whether to append content to the end of the file instead of overwriting it. Defaults to false.
|
||||
"""
|
||||
try:
|
||||
requested_path = path
|
||||
sandbox = ensure_sandbox_initialized(runtime)
|
||||
ensure_thread_directories_exist(runtime)
|
||||
requested_path = path
|
||||
if is_local_sandbox(runtime):
|
||||
thread_data = get_thread_data(runtime)
|
||||
validate_local_tool_path(path, thread_data)
|
||||
@@ -1664,15 +1701,21 @@ def write_file_tool(
|
||||
sandbox.write_file(path, content, append)
|
||||
return "OK"
|
||||
except SandboxError as e:
|
||||
return f"Error: {e}"
|
||||
return _format_write_file_error(requested_path, e, runtime)
|
||||
except PermissionError:
|
||||
return f"Error: Permission denied writing to file: {requested_path}"
|
||||
return _truncate_write_file_error_detail(
|
||||
f"Error: Permission denied writing to file: {requested_path}",
|
||||
_DEFAULT_WRITE_FILE_ERROR_MAX_CHARS,
|
||||
)
|
||||
except IsADirectoryError:
|
||||
return f"Error: Path is a directory, not a file: {requested_path}"
|
||||
return _truncate_write_file_error_detail(
|
||||
f"Error: Path is a directory, not a file: {requested_path}",
|
||||
_DEFAULT_WRITE_FILE_ERROR_MAX_CHARS,
|
||||
)
|
||||
except OSError as e:
|
||||
return f"Error: Failed to write file '{requested_path}': {_sanitize_error(e, runtime)}"
|
||||
return _format_write_file_error(requested_path, e, runtime)
|
||||
except Exception as e:
|
||||
return f"Error: Unexpected error writing file: {_sanitize_error(e, runtime)}"
|
||||
return _format_write_file_error(requested_path, e, runtime)
|
||||
|
||||
|
||||
async def _write_file_tool_async(
|
||||
|
||||
@@ -7,6 +7,7 @@ from dataclasses import replace
|
||||
from typing import TYPE_CHECKING, Annotated, Any, cast
|
||||
|
||||
from langchain.tools import InjectedToolCallId, tool
|
||||
from langchain_core.callbacks import BaseCallbackManager
|
||||
from langgraph.config import get_stream_writer
|
||||
|
||||
from deerflow.config import get_app_config
|
||||
@@ -99,15 +100,31 @@ def _schedule_deferred_subagent_cleanup(task_id: str, trace_id: str, max_polls:
|
||||
|
||||
|
||||
def _find_usage_recorder(runtime: Any) -> Any | None:
|
||||
"""Find a callback handler with ``record_external_llm_usage_records`` in the runtime config."""
|
||||
"""Find a callback handler with ``record_external_llm_usage_records`` in the runtime config.
|
||||
|
||||
LangChain may pass ``config["callbacks"]`` in three different shapes:
|
||||
|
||||
- ``None`` (no callbacks registered): no recorder.
|
||||
- A plain ``list[BaseCallbackHandler]``: iterate it directly.
|
||||
- A ``BaseCallbackManager`` instance (e.g. ``AsyncCallbackManager`` on async
|
||||
tool runs): managers are not iterable, so we unwrap ``.handlers`` first.
|
||||
|
||||
Any other shape (e.g. a single handler object accidentally passed without a
|
||||
list wrapper) cannot be iterated safely; treat it as "no recorder" rather
|
||||
than raise.
|
||||
"""
|
||||
if runtime is None:
|
||||
return None
|
||||
config = getattr(runtime, "config", None)
|
||||
if not isinstance(config, dict):
|
||||
return None
|
||||
callbacks = config.get("callbacks", [])
|
||||
callbacks = config.get("callbacks")
|
||||
if isinstance(callbacks, BaseCallbackManager):
|
||||
callbacks = callbacks.handlers
|
||||
if not callbacks:
|
||||
return None
|
||||
if not isinstance(callbacks, list):
|
||||
return None
|
||||
for cb in callbacks:
|
||||
if hasattr(cb, "record_external_llm_usage_records"):
|
||||
return cb
|
||||
|
||||
@@ -0,0 +1,189 @@
|
||||
"""Regression tests for gateway config freshness on the request hot path.
|
||||
|
||||
Bytedance/deer-flow issue #3107 BUG-001: the worker and lead-agent path
|
||||
captured ``app.state.config`` at gateway startup. ``config.yaml`` edits during
|
||||
runtime were therefore ignored — ``get_app_config()``'s mtime-based reload
|
||||
existed but was bypassed because the snapshot object was passed through
|
||||
explicitly.
|
||||
|
||||
These tests pin the desired behaviour: a request-time ``get_config`` call must
|
||||
observe the most recent on-disk ``config.yaml`` (mtime reload), and the
|
||||
runtime ``ContextVar`` override must keep working for per-request injection.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
from fastapi import Depends, FastAPI
|
||||
from fastapi.testclient import TestClient
|
||||
|
||||
from app.gateway import deps as gateway_deps
|
||||
from app.gateway.deps import get_config
|
||||
from deerflow.config.app_config import (
|
||||
AppConfig,
|
||||
pop_current_app_config,
|
||||
push_current_app_config,
|
||||
reset_app_config,
|
||||
set_app_config,
|
||||
)
|
||||
from deerflow.config.sandbox_config import SandboxConfig
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _isolate_app_config_singleton():
|
||||
"""Ensure each test starts with a clean module-level cache."""
|
||||
reset_app_config()
|
||||
yield
|
||||
reset_app_config()
|
||||
|
||||
|
||||
def _write_config_yaml(path: Path, *, log_level: str) -> None:
|
||||
path.write_text(
|
||||
f"""
|
||||
sandbox:
|
||||
use: deerflow.sandbox.local.provider:LocalSandboxProvider
|
||||
log_level: {log_level}
|
||||
""".strip()
|
||||
+ "\n",
|
||||
encoding="utf-8",
|
||||
)
|
||||
|
||||
|
||||
def _build_app() -> FastAPI:
|
||||
app = FastAPI()
|
||||
|
||||
@app.get("/probe")
|
||||
def probe(cfg: AppConfig = Depends(get_config)):
|
||||
return {"log_level": cfg.log_level}
|
||||
|
||||
return app
|
||||
|
||||
|
||||
def test_get_config_reflects_file_mtime_reload(tmp_path, monkeypatch):
|
||||
"""Editing config.yaml at runtime must be visible to /probe without restart.
|
||||
|
||||
This is the literal repro for the issue: the gateway must not freeze the
|
||||
config to whatever was on disk when the process started.
|
||||
"""
|
||||
config_file = tmp_path / "config.yaml"
|
||||
_write_config_yaml(config_file, log_level="info")
|
||||
monkeypatch.setenv("DEER_FLOW_CONFIG_PATH", str(config_file))
|
||||
|
||||
app = _build_app()
|
||||
client = TestClient(app)
|
||||
assert client.get("/probe").json() == {"log_level": "info"}
|
||||
|
||||
# Edit the file and bump its mtime — simulating a maintainer changing
|
||||
# max_tokens / model settings in production while the gateway is live.
|
||||
_write_config_yaml(config_file, log_level="debug")
|
||||
future_mtime = config_file.stat().st_mtime + 5
|
||||
os.utime(config_file, (future_mtime, future_mtime))
|
||||
|
||||
assert client.get("/probe").json() == {"log_level": "debug"}
|
||||
|
||||
|
||||
def test_get_config_respects_runtime_context_override(tmp_path, monkeypatch):
|
||||
"""Per-request ``push_current_app_config`` injection must still win."""
|
||||
config_file = tmp_path / "config.yaml"
|
||||
_write_config_yaml(config_file, log_level="info")
|
||||
monkeypatch.setenv("DEER_FLOW_CONFIG_PATH", str(config_file))
|
||||
|
||||
override = AppConfig(sandbox=SandboxConfig(use="test"), log_level="trace")
|
||||
push_current_app_config(override)
|
||||
try:
|
||||
app = _build_app()
|
||||
client = TestClient(app)
|
||||
assert client.get("/probe").json() == {"log_level": "trace"}
|
||||
finally:
|
||||
pop_current_app_config()
|
||||
|
||||
|
||||
def test_get_config_respects_test_set_app_config():
|
||||
"""``set_app_config`` (used by upload/skills router tests) keeps working."""
|
||||
injected = AppConfig(sandbox=SandboxConfig(use="test"), log_level="warning")
|
||||
set_app_config(injected)
|
||||
|
||||
app = _build_app()
|
||||
client = TestClient(app)
|
||||
assert client.get("/probe").json() == {"log_level": "warning"}
|
||||
|
||||
|
||||
def test_run_context_app_config_reflects_yaml_edit(tmp_path, monkeypatch):
|
||||
"""``RunContext.app_config`` must follow live `config.yaml` edits.
|
||||
|
||||
BUG-001 review feedback: the run-context that feeds worker / lead-agent
|
||||
factories must observe the same mtime reload that `get_config()` does;
|
||||
otherwise stale config slips back in through the run path even after the
|
||||
request dependency is fixed.
|
||||
"""
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
from app.gateway.deps import get_run_context
|
||||
|
||||
config_file = tmp_path / "config.yaml"
|
||||
_write_config_yaml(config_file, log_level="info")
|
||||
monkeypatch.setenv("DEER_FLOW_CONFIG_PATH", str(config_file))
|
||||
|
||||
app = FastAPI()
|
||||
# Sentinel values for the rest of the RunContext wiring — we only care
|
||||
# about ``ctx.app_config`` for this assertion.
|
||||
app.state.checkpointer = MagicMock()
|
||||
app.state.store = MagicMock()
|
||||
app.state.run_event_store = MagicMock()
|
||||
app.state.run_events_config = {"frozen": "startup"}
|
||||
app.state.thread_store = MagicMock()
|
||||
|
||||
@app.get("/run-ctx-log-level")
|
||||
def probe(ctx=Depends(get_run_context)):
|
||||
return {
|
||||
"log_level": ctx.app_config.log_level,
|
||||
"run_events_config": ctx.run_events_config,
|
||||
}
|
||||
|
||||
client = TestClient(app)
|
||||
first = client.get("/run-ctx-log-level").json()
|
||||
assert first == {"log_level": "info", "run_events_config": {"frozen": "startup"}}
|
||||
|
||||
_write_config_yaml(config_file, log_level="debug")
|
||||
future_mtime = config_file.stat().st_mtime + 5
|
||||
os.utime(config_file, (future_mtime, future_mtime))
|
||||
|
||||
second = client.get("/run-ctx-log-level").json()
|
||||
# app_config follows the edit; run_events_config stays frozen to the
|
||||
# startup snapshot we wrote onto app.state above.
|
||||
assert second == {"log_level": "debug", "run_events_config": {"frozen": "startup"}}
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"exception",
|
||||
[
|
||||
FileNotFoundError("config.yaml not found"),
|
||||
PermissionError("config.yaml not readable"),
|
||||
ValueError("invalid config"),
|
||||
RuntimeError("yaml parse error"),
|
||||
],
|
||||
)
|
||||
def test_get_config_returns_503_on_any_load_failure(monkeypatch, exception):
|
||||
"""Any failure to materialise the config must surface as 503, not 500.
|
||||
|
||||
Bytedance/deer-flow issue #3107 BUG-001 review: the original snapshot
|
||||
contract returned 503 when ``app.state.config is None``. The first cut of
|
||||
this fix only mapped ``FileNotFoundError`` to 503, which left
|
||||
``PermissionError`` / ``yaml.YAMLError`` / ``ValidationError`` etc. bubbling
|
||||
up as 500. Catch every load failure at the request boundary.
|
||||
"""
|
||||
|
||||
def _broken_get_app_config():
|
||||
raise exception
|
||||
|
||||
monkeypatch.setattr(gateway_deps, "get_app_config", _broken_get_app_config)
|
||||
|
||||
app = _build_app()
|
||||
client = TestClient(app, raise_server_exceptions=False)
|
||||
response = client.get("/probe")
|
||||
|
||||
assert response.status_code == 503
|
||||
assert response.json() == {"detail": "Configuration not available"}
|
||||
@@ -1,41 +0,0 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from fastapi import Depends, FastAPI
|
||||
from fastapi.testclient import TestClient
|
||||
|
||||
from app.gateway.deps import get_config
|
||||
from deerflow.config.app_config import AppConfig
|
||||
from deerflow.config.sandbox_config import SandboxConfig
|
||||
|
||||
|
||||
def test_get_config_returns_app_state_config():
|
||||
"""get_config should return the exact AppConfig stored on app.state."""
|
||||
app = FastAPI()
|
||||
config = AppConfig(sandbox=SandboxConfig(use="test"))
|
||||
app.state.config = config
|
||||
|
||||
@app.get("/probe")
|
||||
def probe(cfg: AppConfig = Depends(get_config)):
|
||||
return {"same_identity": cfg is config, "log_level": cfg.log_level}
|
||||
|
||||
client = TestClient(app)
|
||||
response = client.get("/probe")
|
||||
|
||||
assert response.status_code == 200
|
||||
assert response.json() == {"same_identity": True, "log_level": "info"}
|
||||
|
||||
|
||||
def test_get_config_reads_updated_app_state():
|
||||
"""Swapping app.state.config should be visible to the dependency."""
|
||||
app = FastAPI()
|
||||
app.state.config = AppConfig(sandbox=SandboxConfig(use="test"), log_level="info")
|
||||
|
||||
@app.get("/log-level")
|
||||
def log_level(cfg: AppConfig = Depends(get_config)):
|
||||
return {"level": cfg.log_level}
|
||||
|
||||
client = TestClient(app)
|
||||
assert client.get("/log-level").json() == {"level": "info"}
|
||||
|
||||
app.state.config = app.state.config.model_copy(update={"log_level": "debug"})
|
||||
assert client.get("/log-level").json() == {"level": "debug"}
|
||||
@@ -17,7 +17,7 @@ from fastapi import FastAPI
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def _noop_langgraph_runtime(_app):
|
||||
async def _noop_langgraph_runtime(_app, _startup_config):
|
||||
yield
|
||||
|
||||
|
||||
|
||||
@@ -81,6 +81,94 @@ def test_normalize_input_passthrough():
|
||||
assert result == {"custom_key": "value"}
|
||||
|
||||
|
||||
def test_normalize_input_preserves_additional_kwargs_and_id():
|
||||
"""Regression: gh #3132 — frontend ships uploaded-file metadata in
|
||||
additional_kwargs.files (and a client-side message id). The gateway must
|
||||
not strip them before the graph runs, otherwise UploadsMiddleware reports
|
||||
"(empty)" for new uploads and the frontend message loses its file chip.
|
||||
"""
|
||||
from langchain_core.messages import HumanMessage
|
||||
|
||||
from app.gateway.services import normalize_input
|
||||
|
||||
files = [{"filename": "a.csv", "size": 100, "path": "/mnt/user-data/uploads/a.csv", "status": "uploaded"}]
|
||||
result = normalize_input(
|
||||
{
|
||||
"messages": [
|
||||
{
|
||||
"type": "human",
|
||||
"id": "client-msg-1",
|
||||
"name": "user-input",
|
||||
"content": [{"type": "text", "text": "clean it"}],
|
||||
"additional_kwargs": {"files": files, "custom": "keep-me"},
|
||||
}
|
||||
]
|
||||
}
|
||||
)
|
||||
assert len(result["messages"]) == 1
|
||||
msg = result["messages"][0]
|
||||
assert isinstance(msg, HumanMessage)
|
||||
assert msg.id == "client-msg-1"
|
||||
assert msg.name == "user-input"
|
||||
assert msg.content == [{"type": "text", "text": "clean it"}]
|
||||
assert msg.additional_kwargs == {"files": files, "custom": "keep-me"}
|
||||
|
||||
|
||||
def test_normalize_input_passes_through_basemessage_instances():
|
||||
from langchain_core.messages import HumanMessage
|
||||
|
||||
from app.gateway.services import normalize_input
|
||||
|
||||
msg = HumanMessage(content="hello", id="m-1", additional_kwargs={"files": [{"filename": "x"}]})
|
||||
result = normalize_input({"messages": [msg]})
|
||||
assert result["messages"][0] is msg
|
||||
|
||||
|
||||
def test_normalize_input_rejects_malformed_message_with_400():
|
||||
"""Boundary validation: ``convert_to_messages`` raises ``ValueError`` when a
|
||||
message dict is missing ``role``/``type``/``content``. ``normalize_input``
|
||||
runs inside the gateway HTTP boundary, so a malformed payload should surface
|
||||
as a 400 referencing the offending entry — not bubble up as a 500.
|
||||
|
||||
Raised after the Copilot review on PR #3136.
|
||||
"""
|
||||
import pytest
|
||||
from fastapi import HTTPException
|
||||
|
||||
from app.gateway.services import normalize_input
|
||||
|
||||
with pytest.raises(HTTPException) as excinfo:
|
||||
normalize_input({"messages": [{"role": "human", "content": "ok"}, {"oops": "no role here"}]})
|
||||
assert excinfo.value.status_code == 400
|
||||
assert "input.messages[1]" in excinfo.value.detail
|
||||
|
||||
|
||||
def test_normalize_input_handles_non_human_roles():
|
||||
"""The previous implementation collapsed every role to HumanMessage with a
|
||||
`# TODO: handle other message types` comment. Resuming a thread with prior
|
||||
AI/tool messages would silently rewrite them as human turns — corrupting
|
||||
the conversation. Use langchain's standard conversion so ai/system/tool
|
||||
roles round-trip correctly.
|
||||
"""
|
||||
from langchain_core.messages import AIMessage, SystemMessage, ToolMessage
|
||||
|
||||
from app.gateway.services import normalize_input
|
||||
|
||||
result = normalize_input(
|
||||
{
|
||||
"messages": [
|
||||
{"role": "system", "content": "sys"},
|
||||
{"role": "ai", "content": "hi", "id": "ai-1"},
|
||||
{"role": "tool", "content": "result", "tool_call_id": "call-1"},
|
||||
]
|
||||
}
|
||||
)
|
||||
types = [type(m) for m in result["messages"]]
|
||||
assert types == [SystemMessage, AIMessage, ToolMessage]
|
||||
assert result["messages"][1].id == "ai-1"
|
||||
assert result["messages"][2].tool_call_id == "call-1"
|
||||
|
||||
|
||||
def test_build_run_config_basic():
|
||||
from app.gateway.services import build_run_config
|
||||
|
||||
|
||||
@@ -0,0 +1,409 @@
|
||||
"""Tests for the MCP persistent-session pool."""
|
||||
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from deerflow.mcp.session_pool import MCPSessionPool, get_session_pool, reset_session_pool
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _reset_pool():
|
||||
reset_session_pool()
|
||||
yield
|
||||
reset_session_pool()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# MCPSessionPool unit tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_session_creates_new():
|
||||
"""First call for a key creates a new session."""
|
||||
pool = MCPSessionPool()
|
||||
|
||||
mock_session = AsyncMock()
|
||||
mock_cm = MagicMock()
|
||||
mock_cm.__aenter__ = AsyncMock(return_value=mock_session)
|
||||
mock_cm.__aexit__ = AsyncMock(return_value=False)
|
||||
|
||||
with patch("langchain_mcp_adapters.sessions.create_session", return_value=mock_cm):
|
||||
session = await pool.get_session("server", "thread-1", {"transport": "stdio", "command": "x", "args": []})
|
||||
|
||||
assert session is mock_session
|
||||
mock_session.initialize.assert_awaited_once()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_session_reuses_existing():
|
||||
"""Second call for the same key returns the cached session."""
|
||||
pool = MCPSessionPool()
|
||||
|
||||
mock_session = AsyncMock()
|
||||
mock_cm = MagicMock()
|
||||
mock_cm.__aenter__ = AsyncMock(return_value=mock_session)
|
||||
mock_cm.__aexit__ = AsyncMock(return_value=False)
|
||||
|
||||
with patch("langchain_mcp_adapters.sessions.create_session", return_value=mock_cm):
|
||||
s1 = await pool.get_session("server", "thread-1", {"transport": "stdio", "command": "x", "args": []})
|
||||
s2 = await pool.get_session("server", "thread-1", {"transport": "stdio", "command": "x", "args": []})
|
||||
|
||||
assert s1 is s2
|
||||
# Only one session should have been created.
|
||||
assert mock_cm.__aenter__.await_count == 1
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_different_scope_creates_different_session():
|
||||
"""Different scope keys get different sessions."""
|
||||
pool = MCPSessionPool()
|
||||
|
||||
sessions = [AsyncMock(), AsyncMock()]
|
||||
idx = 0
|
||||
|
||||
class CmFactory:
|
||||
def __init__(self):
|
||||
self.enter_count = 0
|
||||
|
||||
async def __aenter__(self):
|
||||
nonlocal idx
|
||||
s = sessions[idx]
|
||||
idx += 1
|
||||
self.enter_count += 1
|
||||
return s
|
||||
|
||||
async def __aexit__(self, *args):
|
||||
return False
|
||||
|
||||
with patch("langchain_mcp_adapters.sessions.create_session", side_effect=lambda *a, **kw: CmFactory()):
|
||||
s1 = await pool.get_session("server", "thread-1", {"transport": "stdio", "command": "x", "args": []})
|
||||
s2 = await pool.get_session("server", "thread-2", {"transport": "stdio", "command": "x", "args": []})
|
||||
|
||||
assert s1 is not s2
|
||||
assert s1 is sessions[0]
|
||||
assert s2 is sessions[1]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_lru_eviction():
|
||||
"""Oldest entries are evicted when the pool is full."""
|
||||
pool = MCPSessionPool()
|
||||
pool.MAX_SESSIONS = 2
|
||||
|
||||
class CmFactory:
|
||||
def __init__(self):
|
||||
self.closed = False
|
||||
|
||||
async def __aenter__(self):
|
||||
return AsyncMock()
|
||||
|
||||
async def __aexit__(self, *args):
|
||||
self.closed = True
|
||||
return False
|
||||
|
||||
cms: list[CmFactory] = []
|
||||
|
||||
def make_cm(*a, **kw):
|
||||
cm = CmFactory()
|
||||
cms.append(cm)
|
||||
return cm
|
||||
|
||||
with patch("langchain_mcp_adapters.sessions.create_session", side_effect=make_cm):
|
||||
await pool.get_session("s", "t1", {"transport": "stdio", "command": "x", "args": []})
|
||||
await pool.get_session("s", "t2", {"transport": "stdio", "command": "x", "args": []})
|
||||
# Pool is full (2). Adding t3 should evict t1.
|
||||
await pool.get_session("s", "t3", {"transport": "stdio", "command": "x", "args": []})
|
||||
|
||||
assert cms[0].closed is True
|
||||
assert cms[1].closed is False
|
||||
assert cms[2].closed is False
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_close_scope():
|
||||
"""close_scope shuts down sessions for a specific scope key."""
|
||||
pool = MCPSessionPool()
|
||||
|
||||
class CmFactory:
|
||||
def __init__(self):
|
||||
self.closed = False
|
||||
|
||||
async def __aenter__(self):
|
||||
return AsyncMock()
|
||||
|
||||
async def __aexit__(self, *args):
|
||||
self.closed = True
|
||||
return False
|
||||
|
||||
cms: list[CmFactory] = []
|
||||
|
||||
def make_cm(*a, **kw):
|
||||
cm = CmFactory()
|
||||
cms.append(cm)
|
||||
return cm
|
||||
|
||||
with patch("langchain_mcp_adapters.sessions.create_session", side_effect=make_cm):
|
||||
await pool.get_session("s", "t1", {"transport": "stdio", "command": "x", "args": []})
|
||||
await pool.get_session("s", "t2", {"transport": "stdio", "command": "x", "args": []})
|
||||
|
||||
await pool.close_scope("t1")
|
||||
|
||||
assert cms[0].closed is True
|
||||
assert cms[1].closed is False
|
||||
|
||||
# t2 session still exists.
|
||||
assert ("s", "t2") in pool._entries
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_close_all():
|
||||
"""close_all shuts down every session."""
|
||||
pool = MCPSessionPool()
|
||||
|
||||
class CmFactory:
|
||||
def __init__(self):
|
||||
self.closed = False
|
||||
|
||||
async def __aenter__(self):
|
||||
return AsyncMock()
|
||||
|
||||
async def __aexit__(self, *args):
|
||||
self.closed = True
|
||||
return False
|
||||
|
||||
cms: list[CmFactory] = []
|
||||
|
||||
def make_cm(*a, **kw):
|
||||
cm = CmFactory()
|
||||
cms.append(cm)
|
||||
return cm
|
||||
|
||||
with patch("langchain_mcp_adapters.sessions.create_session", side_effect=make_cm):
|
||||
await pool.get_session("s1", "t1", {"transport": "stdio", "command": "x", "args": []})
|
||||
await pool.get_session("s2", "t2", {"transport": "stdio", "command": "x", "args": []})
|
||||
|
||||
await pool.close_all()
|
||||
|
||||
assert all(cm.closed for cm in cms)
|
||||
assert len(pool._entries) == 0
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Singleton helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_get_session_pool_singleton():
|
||||
"""get_session_pool returns the same instance."""
|
||||
p1 = get_session_pool()
|
||||
p2 = get_session_pool()
|
||||
assert p1 is p2
|
||||
|
||||
|
||||
def test_reset_session_pool():
|
||||
"""reset_session_pool clears the singleton."""
|
||||
p1 = get_session_pool()
|
||||
reset_session_pool()
|
||||
p2 = get_session_pool()
|
||||
assert p1 is not p2
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Integration: _make_session_pool_tool uses the pool
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_session_pool_tool_wrapping():
|
||||
"""The wrapper tool delegates to a pool-managed session."""
|
||||
# Build a dummy StructuredTool (as returned by langchain-mcp-adapters).
|
||||
from langchain_core.tools import StructuredTool
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from deerflow.mcp.tools import _make_session_pool_tool
|
||||
|
||||
class Args(BaseModel):
|
||||
url: str = Field(..., description="url")
|
||||
|
||||
original_tool = StructuredTool(
|
||||
name="playwright_navigate",
|
||||
description="Navigate browser",
|
||||
args_schema=Args,
|
||||
coroutine=AsyncMock(),
|
||||
response_format="content_and_artifact",
|
||||
)
|
||||
|
||||
mock_session = AsyncMock()
|
||||
mock_session.call_tool = AsyncMock(return_value=MagicMock(content=[], isError=False, structuredContent=None))
|
||||
mock_cm = MagicMock()
|
||||
mock_cm.__aenter__ = AsyncMock(return_value=mock_session)
|
||||
mock_cm.__aexit__ = AsyncMock(return_value=False)
|
||||
|
||||
connection = {"transport": "stdio", "command": "pw", "args": []}
|
||||
|
||||
with patch("langchain_mcp_adapters.sessions.create_session", return_value=mock_cm):
|
||||
wrapped = _make_session_pool_tool(original_tool, "playwright", connection)
|
||||
|
||||
# Simulate a tool call with a runtime context containing thread_id.
|
||||
mock_runtime = MagicMock()
|
||||
mock_runtime.context = {"thread_id": "thread-42"}
|
||||
mock_runtime.config = {}
|
||||
|
||||
await wrapped.coroutine(runtime=mock_runtime, url="https://example.com")
|
||||
|
||||
mock_session.call_tool.assert_awaited_once_with("navigate", {"url": "https://example.com"})
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_session_pool_tool_extracts_thread_id():
|
||||
"""Thread ID is extracted from runtime.config when not in context."""
|
||||
from langchain_core.tools import StructuredTool
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from deerflow.mcp.tools import _make_session_pool_tool
|
||||
|
||||
class Args(BaseModel):
|
||||
x: int = Field(..., description="x")
|
||||
|
||||
original_tool = StructuredTool(
|
||||
name="server_tool",
|
||||
description="test",
|
||||
args_schema=Args,
|
||||
coroutine=AsyncMock(),
|
||||
response_format="content_and_artifact",
|
||||
)
|
||||
|
||||
mock_session = AsyncMock()
|
||||
mock_session.call_tool = AsyncMock(return_value=MagicMock(content=[], isError=False, structuredContent=None))
|
||||
mock_cm = MagicMock()
|
||||
mock_cm.__aenter__ = AsyncMock(return_value=mock_session)
|
||||
mock_cm.__aexit__ = AsyncMock(return_value=False)
|
||||
|
||||
with patch("langchain_mcp_adapters.sessions.create_session", return_value=mock_cm):
|
||||
wrapped = _make_session_pool_tool(original_tool, "server", {"transport": "stdio", "command": "x", "args": []})
|
||||
|
||||
mock_runtime = MagicMock()
|
||||
mock_runtime.context = {}
|
||||
mock_runtime.config = {"configurable": {"thread_id": "from-config"}}
|
||||
|
||||
await wrapped.coroutine(runtime=mock_runtime, x=1)
|
||||
|
||||
# Verify the session was created with the correct scope key.
|
||||
pool = get_session_pool()
|
||||
assert ("server", "from-config") in pool._entries
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_session_pool_tool_default_scope():
|
||||
"""When no thread_id is available, 'default' is used as scope key."""
|
||||
from langchain_core.tools import StructuredTool
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from deerflow.mcp.tools import _make_session_pool_tool
|
||||
|
||||
class Args(BaseModel):
|
||||
x: int = Field(..., description="x")
|
||||
|
||||
original_tool = StructuredTool(
|
||||
name="server_tool",
|
||||
description="test",
|
||||
args_schema=Args,
|
||||
coroutine=AsyncMock(),
|
||||
response_format="content_and_artifact",
|
||||
)
|
||||
|
||||
mock_session = AsyncMock()
|
||||
mock_session.call_tool = AsyncMock(return_value=MagicMock(content=[], isError=False, structuredContent=None))
|
||||
mock_cm = MagicMock()
|
||||
mock_cm.__aenter__ = AsyncMock(return_value=mock_session)
|
||||
mock_cm.__aexit__ = AsyncMock(return_value=False)
|
||||
|
||||
with patch("langchain_mcp_adapters.sessions.create_session", return_value=mock_cm):
|
||||
wrapped = _make_session_pool_tool(original_tool, "server", {"transport": "stdio", "command": "x", "args": []})
|
||||
|
||||
# No thread_id in runtime at all.
|
||||
await wrapped.coroutine(runtime=None, x=1)
|
||||
|
||||
pool = get_session_pool()
|
||||
assert ("server", "default") in pool._entries
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_session_pool_tool_get_config_fallback():
|
||||
"""When runtime is None, get_config() provides thread_id as fallback."""
|
||||
from langchain_core.tools import StructuredTool
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from deerflow.mcp.tools import _make_session_pool_tool
|
||||
|
||||
class Args(BaseModel):
|
||||
x: int = Field(..., description="x")
|
||||
|
||||
original_tool = StructuredTool(
|
||||
name="server_tool",
|
||||
description="test",
|
||||
args_schema=Args,
|
||||
coroutine=AsyncMock(),
|
||||
response_format="content_and_artifact",
|
||||
)
|
||||
|
||||
mock_session = AsyncMock()
|
||||
mock_session.call_tool = AsyncMock(return_value=MagicMock(content=[], isError=False, structuredContent=None))
|
||||
mock_cm = MagicMock()
|
||||
mock_cm.__aenter__ = AsyncMock(return_value=mock_session)
|
||||
mock_cm.__aexit__ = AsyncMock(return_value=False)
|
||||
|
||||
fake_config = {"configurable": {"thread_id": "from-langgraph-config"}}
|
||||
|
||||
with (
|
||||
patch("langchain_mcp_adapters.sessions.create_session", return_value=mock_cm),
|
||||
patch("deerflow.mcp.tools.get_config", return_value=fake_config),
|
||||
):
|
||||
wrapped = _make_session_pool_tool(original_tool, "server", {"transport": "stdio", "command": "x", "args": []})
|
||||
|
||||
# runtime=None — get_config() fallback should provide thread_id
|
||||
await wrapped.coroutine(runtime=None, x=1)
|
||||
|
||||
pool = get_session_pool()
|
||||
assert ("server", "from-langgraph-config") in pool._entries
|
||||
|
||||
|
||||
def test_session_pool_tool_sync_wrapper_path_is_safe():
|
||||
"""Sync wrapper (tool.func) invocation doesn't crash on cross-loop access."""
|
||||
from langchain_core.tools import StructuredTool
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from deerflow.mcp.tools import _make_session_pool_tool
|
||||
from deerflow.tools.sync import make_sync_tool_wrapper
|
||||
|
||||
class Args(BaseModel):
|
||||
url: str = Field(..., description="url")
|
||||
|
||||
original_tool = StructuredTool(
|
||||
name="playwright_navigate",
|
||||
description="Navigate browser",
|
||||
args_schema=Args,
|
||||
coroutine=AsyncMock(),
|
||||
response_format="content_and_artifact",
|
||||
)
|
||||
|
||||
mock_session = AsyncMock()
|
||||
mock_session.call_tool = AsyncMock(return_value=MagicMock(content=[], isError=False, structuredContent=None))
|
||||
mock_cm = MagicMock()
|
||||
mock_cm.__aenter__ = AsyncMock(return_value=mock_session)
|
||||
mock_cm.__aexit__ = AsyncMock(return_value=False)
|
||||
|
||||
connection = {"transport": "stdio", "command": "pw", "args": []}
|
||||
|
||||
with patch("langchain_mcp_adapters.sessions.create_session", return_value=mock_cm):
|
||||
wrapped = _make_session_pool_tool(original_tool, "playwright", connection)
|
||||
# Attach the sync wrapper exactly as get_mcp_tools() does.
|
||||
wrapped.func = make_sync_tool_wrapper(wrapped.coroutine, wrapped.name)
|
||||
|
||||
# Call via the sync path (asyncio.run in a worker thread).
|
||||
# runtime is not supplied so _extract_thread_id falls back to "default".
|
||||
wrapped.func(url="https://example.com")
|
||||
|
||||
mock_session.call_tool.assert_called_once_with("navigate", {"url": "https://example.com"})
|
||||
@@ -5,6 +5,7 @@ from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
|
||||
from deerflow.sandbox.exceptions import SandboxError
|
||||
from deerflow.sandbox.tools import (
|
||||
VIRTUAL_PATH_PREFIX,
|
||||
_apply_cwd_prefix,
|
||||
@@ -1140,6 +1141,170 @@ def test_str_replace_and_append_on_same_path_should_preserve_both_updates(monkey
|
||||
assert sandbox.content == "ALPHA\ntail\n"
|
||||
|
||||
|
||||
def test_write_file_tool_bounds_large_oserror_and_masks_local_paths(monkeypatch) -> None:
|
||||
class FailingSandbox:
|
||||
id = "sandbox-write-large-oserror"
|
||||
|
||||
def write_file(self, path: str, content: str, append: bool = False) -> None:
|
||||
host_path = f"{_THREAD_DATA['workspace_path']}/nested/output.txt"
|
||||
raise OSError(f"write failed at {host_path}\n{'A' * 12000}\nremote tail marker")
|
||||
|
||||
runtime = SimpleNamespace(state={}, context={"thread_id": "thread-1"}, config={})
|
||||
sandbox = FailingSandbox()
|
||||
|
||||
monkeypatch.setattr("deerflow.sandbox.tools.ensure_sandbox_initialized", lambda runtime: sandbox)
|
||||
monkeypatch.setattr("deerflow.sandbox.tools.ensure_thread_directories_exist", lambda runtime: None)
|
||||
monkeypatch.setattr("deerflow.sandbox.tools.is_local_sandbox", lambda runtime: True)
|
||||
monkeypatch.setattr("deerflow.sandbox.tools.get_thread_data", lambda runtime: _THREAD_DATA)
|
||||
monkeypatch.setattr("deerflow.sandbox.tools.validate_local_tool_path", lambda path, thread_data: None)
|
||||
monkeypatch.setattr(
|
||||
"deerflow.sandbox.tools._resolve_and_validate_user_data_path",
|
||||
lambda path, thread_data: f"{_THREAD_DATA['workspace_path']}/output.txt",
|
||||
)
|
||||
|
||||
result = write_file_tool.func(
|
||||
runtime=runtime,
|
||||
description="写入大文件失败",
|
||||
path="/mnt/user-data/workspace/output.txt",
|
||||
content="report body",
|
||||
)
|
||||
|
||||
assert len(result) <= 2000
|
||||
assert "Error: Failed to write file '/mnt/user-data/workspace/output.txt':" in result
|
||||
assert "/tmp/deer-flow/threads/t1/user-data/workspace" not in result
|
||||
assert "/mnt/user-data/workspace/nested/output.txt" in result
|
||||
assert "remote tail marker" in result
|
||||
assert "[write_file error truncated:" in result
|
||||
|
||||
|
||||
def test_write_file_tool_preserves_short_oserror_without_truncation(monkeypatch) -> None:
|
||||
class FailingSandbox:
|
||||
id = "sandbox-write-short-oserror"
|
||||
|
||||
def write_file(self, path: str, content: str, append: bool = False) -> None:
|
||||
raise OSError("disk quota exceeded")
|
||||
|
||||
runtime = SimpleNamespace(state={}, context={"thread_id": "thread-1"}, config={})
|
||||
sandbox = FailingSandbox()
|
||||
|
||||
monkeypatch.setattr("deerflow.sandbox.tools.ensure_sandbox_initialized", lambda runtime: sandbox)
|
||||
monkeypatch.setattr("deerflow.sandbox.tools.ensure_thread_directories_exist", lambda runtime: None)
|
||||
monkeypatch.setattr("deerflow.sandbox.tools.is_local_sandbox", lambda runtime: False)
|
||||
|
||||
result = write_file_tool.func(
|
||||
runtime=runtime,
|
||||
description="写入失败",
|
||||
path="/mnt/user-data/workspace/output.txt",
|
||||
content="tiny payload",
|
||||
)
|
||||
|
||||
assert result == "Error: Failed to write file '/mnt/user-data/workspace/output.txt': OSError: disk quota exceeded"
|
||||
assert "[write_file error truncated:" not in result
|
||||
|
||||
|
||||
def test_write_file_tool_bounds_large_sandbox_error(monkeypatch) -> None:
|
||||
class FailingSandbox:
|
||||
id = "sandbox-write-large-sandbox-error"
|
||||
|
||||
def write_file(self, path: str, content: str, append: bool = False) -> None:
|
||||
raise SandboxError(f"remote write rejected {'B' * 12000} final detail")
|
||||
|
||||
runtime = SimpleNamespace(state={}, context={"thread_id": "thread-1"}, config={})
|
||||
sandbox = FailingSandbox()
|
||||
|
||||
monkeypatch.setattr("deerflow.sandbox.tools.ensure_sandbox_initialized", lambda runtime: sandbox)
|
||||
monkeypatch.setattr("deerflow.sandbox.tools.ensure_thread_directories_exist", lambda runtime: None)
|
||||
monkeypatch.setattr("deerflow.sandbox.tools.is_local_sandbox", lambda runtime: False)
|
||||
|
||||
result = write_file_tool.func(
|
||||
runtime=runtime,
|
||||
description="远端写入失败",
|
||||
path="/mnt/user-data/workspace/output.txt",
|
||||
content="tiny payload",
|
||||
)
|
||||
|
||||
assert len(result) <= 2000
|
||||
assert "Error: Failed to write file '/mnt/user-data/workspace/output.txt':" in result
|
||||
assert "SandboxError: remote write rejected" in result
|
||||
assert "final detail" in result
|
||||
assert "[write_file error truncated:" in result
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("raised_error", "expected_fragment"),
|
||||
[
|
||||
pytest.param(
|
||||
PermissionError("permission denied"),
|
||||
"Error: Permission denied writing to file: /mnt/user-data/workspace/output.txt",
|
||||
id="permission",
|
||||
),
|
||||
pytest.param(
|
||||
IsADirectoryError("target is a directory"),
|
||||
"Error: Path is a directory, not a file: /mnt/user-data/workspace/output.txt",
|
||||
id="directory",
|
||||
),
|
||||
pytest.param(
|
||||
Exception("remote sandbox timeout"),
|
||||
"Exception: remote sandbox timeout",
|
||||
id="generic",
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_write_file_tool_formats_all_other_failure_branches(
|
||||
monkeypatch,
|
||||
raised_error: Exception,
|
||||
expected_fragment: str,
|
||||
) -> None:
|
||||
class FailingSandbox:
|
||||
id = "sandbox-write-other-failure"
|
||||
|
||||
def write_file(self, path: str, content: str, append: bool = False) -> None:
|
||||
raise raised_error
|
||||
|
||||
runtime = SimpleNamespace(state={}, context={"thread_id": "thread-1"}, config={})
|
||||
sandbox = FailingSandbox()
|
||||
|
||||
monkeypatch.setattr("deerflow.sandbox.tools.ensure_sandbox_initialized", lambda runtime: sandbox)
|
||||
monkeypatch.setattr("deerflow.sandbox.tools.ensure_thread_directories_exist", lambda runtime: None)
|
||||
monkeypatch.setattr("deerflow.sandbox.tools.is_local_sandbox", lambda runtime: False)
|
||||
|
||||
result = write_file_tool.func(
|
||||
runtime=runtime,
|
||||
description="验证错误分支格式化",
|
||||
path="/mnt/user-data/workspace/output.txt",
|
||||
content="tiny payload",
|
||||
)
|
||||
|
||||
assert "/mnt/user-data/workspace/output.txt" in result
|
||||
assert expected_fragment in result
|
||||
assert "[write_file error truncated:" not in result
|
||||
|
||||
|
||||
def test_write_file_tool_handles_sandbox_init_failure(monkeypatch) -> None:
|
||||
"""Regression for #3133 review: SandboxError raised during sandbox
|
||||
initialization (before the local `requested_path` assignment) must still
|
||||
surface as a bounded tool error rather than an UnboundLocalError.
|
||||
"""
|
||||
|
||||
def raise_sandbox_error(runtime):
|
||||
raise SandboxError("sandbox missing")
|
||||
|
||||
runtime = SimpleNamespace(state={}, context={"thread_id": "thread-1"}, config={})
|
||||
monkeypatch.setattr("deerflow.sandbox.tools.ensure_sandbox_initialized", raise_sandbox_error)
|
||||
monkeypatch.setattr("deerflow.sandbox.tools.is_local_sandbox", lambda runtime: False)
|
||||
|
||||
result = write_file_tool.func(
|
||||
runtime=runtime,
|
||||
description="sandbox 初始化失败",
|
||||
path="/mnt/user-data/workspace/output.txt",
|
||||
content="tiny payload",
|
||||
)
|
||||
|
||||
assert "Error: Failed to write file '/mnt/user-data/workspace/output.txt':" in result
|
||||
assert "SandboxError: sandbox missing" in result
|
||||
assert "[write_file error truncated:" not in result
|
||||
|
||||
|
||||
def test_file_operation_lock_memory_cleanup() -> None:
|
||||
"""Verify that released locks are eventually cleaned up by WeakValueDictionary.
|
||||
|
||||
|
||||
@@ -7,6 +7,7 @@ from types import SimpleNamespace
|
||||
from fastapi import FastAPI
|
||||
from fastapi.testclient import TestClient
|
||||
|
||||
from app.gateway.deps import get_config
|
||||
from app.gateway.routers import skills as skills_router
|
||||
from deerflow.skills.storage import get_or_new_skill_storage
|
||||
from deerflow.skills.types import Skill
|
||||
@@ -38,7 +39,8 @@ def _make_skill(name: str, *, enabled: bool) -> Skill:
|
||||
|
||||
def _make_test_app(config) -> FastAPI:
|
||||
app = FastAPI()
|
||||
app.state.config = config
|
||||
app.state.config = config # kept for any startup-style reads
|
||||
app.dependency_overrides[get_config] = lambda: config
|
||||
app.include_router(skills_router.router)
|
||||
return app
|
||||
|
||||
|
||||
@@ -0,0 +1,91 @@
|
||||
"""Regression tests for _find_usage_recorder callback shape handling.
|
||||
|
||||
Bytedance issue #3107 BUG-002: When LangChain passes ``config["callbacks"]`` as
|
||||
an ``AsyncCallbackManager`` (instead of a plain list), the previous
|
||||
``for cb in callbacks`` loop raised ``TypeError: 'AsyncCallbackManager' object
|
||||
is not iterable``. ToolErrorHandlingMiddleware then converted the entire ``task``
|
||||
tool call into an error ToolMessage, losing the subagent result.
|
||||
"""
|
||||
|
||||
from types import SimpleNamespace
|
||||
|
||||
from langchain_core.callbacks import AsyncCallbackManager, CallbackManager
|
||||
|
||||
from deerflow.tools.builtins.task_tool import _find_usage_recorder
|
||||
|
||||
|
||||
class _RecorderHandler:
|
||||
def record_external_llm_usage_records(self, records):
|
||||
self.records = records
|
||||
|
||||
|
||||
class _OtherHandler:
|
||||
pass
|
||||
|
||||
|
||||
def _make_runtime(callbacks):
|
||||
return SimpleNamespace(config={"callbacks": callbacks})
|
||||
|
||||
|
||||
def test_find_usage_recorder_with_plain_list():
|
||||
recorder = _RecorderHandler()
|
||||
runtime = _make_runtime([_OtherHandler(), recorder])
|
||||
assert _find_usage_recorder(runtime) is recorder
|
||||
|
||||
|
||||
def test_find_usage_recorder_with_async_callback_manager():
|
||||
"""LangChain wraps callbacks in AsyncCallbackManager for async tool runs.
|
||||
|
||||
The old implementation raised TypeError here. The recorder lives on
|
||||
``manager.handlers``; we must look there too.
|
||||
"""
|
||||
recorder = _RecorderHandler()
|
||||
manager = AsyncCallbackManager(handlers=[_OtherHandler(), recorder])
|
||||
runtime = _make_runtime(manager)
|
||||
assert _find_usage_recorder(runtime) is recorder
|
||||
|
||||
|
||||
def test_find_usage_recorder_with_sync_callback_manager():
|
||||
"""Sync flavor of the same wrapper used by some langchain code paths."""
|
||||
recorder = _RecorderHandler()
|
||||
manager = CallbackManager(handlers=[recorder])
|
||||
runtime = _make_runtime(manager)
|
||||
assert _find_usage_recorder(runtime) is recorder
|
||||
|
||||
|
||||
def test_find_usage_recorder_returns_none_when_no_recorder():
|
||||
manager = AsyncCallbackManager(handlers=[_OtherHandler()])
|
||||
runtime = _make_runtime(manager)
|
||||
assert _find_usage_recorder(runtime) is None
|
||||
|
||||
|
||||
def test_find_usage_recorder_handles_empty_manager():
|
||||
manager = AsyncCallbackManager(handlers=[])
|
||||
runtime = _make_runtime(manager)
|
||||
assert _find_usage_recorder(runtime) is None
|
||||
|
||||
|
||||
def test_find_usage_recorder_returns_none_for_none_runtime():
|
||||
assert _find_usage_recorder(None) is None
|
||||
|
||||
|
||||
def test_find_usage_recorder_returns_none_when_callbacks_is_none():
|
||||
runtime = _make_runtime(None)
|
||||
assert _find_usage_recorder(runtime) is None
|
||||
|
||||
|
||||
def test_find_usage_recorder_returns_none_for_single_handler_object():
|
||||
"""A single handler instance (not wrapped in a list or manager) should not crash.
|
||||
|
||||
LangChain's contract is that ``config["callbacks"]`` is a list-or-manager,
|
||||
but we treat any other shape defensively rather than letting a ``for`` loop
|
||||
blow up at runtime.
|
||||
"""
|
||||
runtime = _make_runtime(_RecorderHandler())
|
||||
assert _find_usage_recorder(runtime) is None
|
||||
|
||||
|
||||
def test_find_usage_recorder_returns_none_when_config_not_dict():
|
||||
"""Defensive: a runtime without a dict-shaped config should not raise."""
|
||||
runtime = SimpleNamespace(config="not-a-dict")
|
||||
assert _find_usage_recorder(runtime) is None
|
||||
@@ -11,6 +11,7 @@ from _router_auth_helpers import call_unwrapped, make_authed_test_app
|
||||
from fastapi import HTTPException, UploadFile
|
||||
from fastapi.testclient import TestClient
|
||||
|
||||
from app.gateway.deps import get_config
|
||||
from app.gateway.routers import uploads
|
||||
|
||||
|
||||
@@ -631,6 +632,7 @@ def test_upload_limits_endpoint_requires_thread_access():
|
||||
cfg.uploads = {}
|
||||
app = make_authed_test_app(owner_check_passes=False)
|
||||
app.state.config = cfg
|
||||
app.dependency_overrides[get_config] = lambda: cfg
|
||||
app.include_router(uploads.router)
|
||||
|
||||
with TestClient(app) as client:
|
||||
|
||||
+12
-6
@@ -118,19 +118,25 @@ models:
|
||||
# For Docker deployments, use host.docker.internal instead of localhost:
|
||||
# base_url: http://host.docker.internal:11434
|
||||
|
||||
# Example: Anthropic Claude model
|
||||
# - name: claude-3-5-sonnet
|
||||
# display_name: Claude 3.5 Sonnet
|
||||
# Example: Anthropic Claude model (with extended thinking)
|
||||
# supports_thinking: true is required — without it, DeerFlow silently falls
|
||||
# back to non-thinking mode even when the UI thinking toggle is on.
|
||||
# budget_tokens is required by the Anthropic API when thinking.type=enabled
|
||||
# (no server default; min 1024; must be less than max_tokens).
|
||||
# - name: claude-sonnet-4
|
||||
# display_name: Claude Sonnet 4
|
||||
# use: langchain_anthropic:ChatAnthropic
|
||||
# model: claude-3-5-sonnet-20241022
|
||||
# model: claude-sonnet-4-20250514
|
||||
# api_key: $ANTHROPIC_API_KEY
|
||||
# default_request_timeout: 600.0
|
||||
# max_retries: 2
|
||||
# max_tokens: 8192
|
||||
# supports_vision: true # Enable vision support for view_image tool
|
||||
# max_tokens: 16000
|
||||
# supports_vision: true
|
||||
# supports_thinking: true
|
||||
# when_thinking_enabled:
|
||||
# thinking:
|
||||
# type: enabled
|
||||
# budget_tokens: 4096 # required; min 1024; must be < max_tokens
|
||||
# when_thinking_disabled:
|
||||
# thinking:
|
||||
# type: disabled
|
||||
|
||||
@@ -27,6 +27,7 @@ import {
|
||||
import { useRehypeSplitWordsIntoSpans } from "@/core/rehype";
|
||||
import type { Subtask } from "@/core/tasks";
|
||||
import { useUpdateSubtask } from "@/core/tasks/context";
|
||||
import { parseSubtaskResult } from "@/core/tasks/subtask-result";
|
||||
import type { AgentThreadState } from "@/core/threads";
|
||||
import { cn } from "@/lib/utils";
|
||||
|
||||
@@ -359,33 +360,10 @@ export function MessageList({
|
||||
} else if (message.type === "tool") {
|
||||
const taskId = message.tool_call_id;
|
||||
if (taskId) {
|
||||
const result = extractTextFromMessage(message);
|
||||
if (result.startsWith("Task Succeeded. Result:")) {
|
||||
updateSubtask({
|
||||
id: taskId,
|
||||
status: "completed",
|
||||
result: result
|
||||
.split("Task Succeeded. Result:")[1]
|
||||
?.trim(),
|
||||
});
|
||||
} else if (result.startsWith("Task failed.")) {
|
||||
updateSubtask({
|
||||
id: taskId,
|
||||
status: "failed",
|
||||
error: result.split("Task failed.")[1]?.trim(),
|
||||
});
|
||||
} else if (result.startsWith("Task timed out")) {
|
||||
updateSubtask({
|
||||
id: taskId,
|
||||
status: "failed",
|
||||
error: result,
|
||||
});
|
||||
} else {
|
||||
updateSubtask({
|
||||
id: taskId,
|
||||
status: "in_progress",
|
||||
});
|
||||
}
|
||||
const parsed = parseSubtaskResult(
|
||||
extractTextFromMessage(message),
|
||||
);
|
||||
updateSubtask({ id: taskId, ...parsed });
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -397,6 +397,50 @@ export function stripUploadedFilesTag(content: string): string {
|
||||
.trim();
|
||||
}
|
||||
|
||||
/**
|
||||
* Tag names that backend middlewares wrap around internal payloads before
|
||||
* letting them ride along inside LangGraph message ``content``.
|
||||
*
|
||||
* These markers are *not* user copy — they come from:
|
||||
*
|
||||
* - ``UploadsMiddleware`` → ``<uploaded_files>``
|
||||
* - ``DynamicContextMiddleware`` → ``<system-reminder>`` (carrying
|
||||
* ``<memory>`` / ``<current_date>`` inside)
|
||||
* - ``TodoListMiddleware`` / ``LoopDetectionMiddleware`` style reminders
|
||||
* live in ``hide_from_ui`` HumanMessages, but their inner payload uses
|
||||
* the same tag vocabulary.
|
||||
*
|
||||
* The primary export filter is {@link isHiddenFromUIMessage}. This list is
|
||||
* the defence-in-depth strip for any message that — by middleware bug,
|
||||
* provider quirk, or merge-conflict regression — slips through without
|
||||
* its ``hide_from_ui`` flag set.
|
||||
*/
|
||||
export const INTERNAL_MARKER_TAGS = [
|
||||
"uploaded_files",
|
||||
"system-reminder",
|
||||
"memory",
|
||||
"current_date",
|
||||
] as const;
|
||||
|
||||
const INTERNAL_MARKER_RE = new RegExp(
|
||||
`<(${INTERNAL_MARKER_TAGS.join("|")})>[\\s\\S]*?</\\1>`,
|
||||
"g",
|
||||
);
|
||||
|
||||
/**
|
||||
* Strip every known backend-injected marker from message content.
|
||||
*
|
||||
* Intended for the chat export path where a marker leaking through is a
|
||||
* privacy regression. UI render paths should keep using
|
||||
* {@link stripUploadedFilesTag} — they receive ``hide_from_ui`` messages
|
||||
* via a separate filter and the narrower function avoids stripping content
|
||||
* a user might legitimately type into a meta-discussion (e.g. asking the
|
||||
* model about its own ``<memory>`` system).
|
||||
*/
|
||||
export function stripInternalMarkers(content: string): string {
|
||||
return content.replace(INTERNAL_MARKER_RE, "").trim();
|
||||
}
|
||||
|
||||
export function parseUploadedFiles(content: string): FileInMessage[] {
|
||||
// Match <uploaded_files>...</uploaded_files> tag
|
||||
const uploadedFilesRegex = /<uploaded_files>([\s\S]*?)<\/uploaded_files>/;
|
||||
|
||||
@@ -0,0 +1,88 @@
|
||||
import type { Subtask } from "./types";
|
||||
|
||||
export type SubtaskStatus = Subtask["status"];
|
||||
|
||||
export interface SubtaskResultUpdate {
|
||||
status: SubtaskStatus;
|
||||
result?: string;
|
||||
error?: string;
|
||||
}
|
||||
|
||||
/**
|
||||
* Prefix strings the backend `task` tool writes into its result `content`.
|
||||
*
|
||||
* These values are not user-facing copy — they are part of the
|
||||
* backend↔frontend contract defined in
|
||||
* `backend/packages/harness/deerflow/tools/builtins/task_tool.py` (returned
|
||||
* from the tool body) and in
|
||||
* `backend/packages/harness/deerflow/agents/middlewares/tool_error_handling_middleware.py`
|
||||
* (wrapper for tool exceptions). Any change here must be paired with the
|
||||
* matching backend change. Exported so a future structured-status migration
|
||||
* can reference the same values from one place.
|
||||
*
|
||||
* `task_tool.py` also emits three `Error:` strings for pre-execution failures
|
||||
* — unknown subagent type, host-bash disabled, and "task disappeared from
|
||||
* background tasks". They are handled by {@link ERROR_WRAPPER_PATTERN}
|
||||
* rather than dedicated prefixes because the wrapper already produces
|
||||
* exactly the right `terminal failed` shape.
|
||||
*/
|
||||
export const SUCCESS_PREFIX = "Task Succeeded. Result:";
|
||||
export const FAILURE_PREFIX = "Task failed.";
|
||||
export const TIMEOUT_PREFIX = "Task timed out";
|
||||
export const CANCELLED_PREFIX = "Task cancelled by user.";
|
||||
export const POLLING_TIMEOUT_PREFIX = "Task polling timed out";
|
||||
export const ERROR_WRAPPER_PATTERN = /^Error\b/i;
|
||||
|
||||
/**
|
||||
* Map a `task` tool result string to a {@link SubtaskStatus}.
|
||||
*
|
||||
* Bytedance/deer-flow issue #3107 BUG-007: parent-visible task tool errors do
|
||||
* not always start with one of the three legacy prefixes (e.g. when
|
||||
* `ToolErrorHandlingMiddleware` wraps an exception as
|
||||
* `Error: Tool 'task' failed ...`). Treat any leading `Error:` token as a
|
||||
* terminal failure so subtask cards stop being stuck on "in_progress".
|
||||
*
|
||||
* Returning `in_progress` is the **deliberate** fallback for content that
|
||||
* matches none of the known prefixes. LangChain only ever emits a
|
||||
* `ToolMessage` once the tool itself has returned (success or wrapped
|
||||
* exception), so an unknown shape means "the contract changed underneath us"
|
||||
* — surfacing it as still-running prompts the operator to investigate, where
|
||||
* eagerly marking it terminal-failed would mask the drift.
|
||||
*/
|
||||
export function parseSubtaskResult(text: string): SubtaskResultUpdate {
|
||||
const trimmed = text.trim();
|
||||
|
||||
if (trimmed.startsWith(SUCCESS_PREFIX)) {
|
||||
return {
|
||||
status: "completed",
|
||||
result: trimmed.slice(SUCCESS_PREFIX.length).trim(),
|
||||
};
|
||||
}
|
||||
|
||||
if (trimmed.startsWith(FAILURE_PREFIX)) {
|
||||
return {
|
||||
status: "failed",
|
||||
error: trimmed.slice(FAILURE_PREFIX.length).trim(),
|
||||
};
|
||||
}
|
||||
|
||||
if (trimmed.startsWith(TIMEOUT_PREFIX)) {
|
||||
return { status: "failed", error: trimmed };
|
||||
}
|
||||
|
||||
if (trimmed.startsWith(CANCELLED_PREFIX)) {
|
||||
return { status: "failed", error: trimmed };
|
||||
}
|
||||
|
||||
if (trimmed.startsWith(POLLING_TIMEOUT_PREFIX)) {
|
||||
return { status: "failed", error: trimmed };
|
||||
}
|
||||
|
||||
// ToolErrorHandlingMiddleware-style wrapper, or any other terminal error
|
||||
// signal the backend forwards to the lead agent.
|
||||
if (ERROR_WRAPPER_PATTERN.test(trimmed)) {
|
||||
return { status: "failed", error: trimmed };
|
||||
}
|
||||
|
||||
return { status: "in_progress" };
|
||||
}
|
||||
@@ -5,16 +5,53 @@ import {
|
||||
extractReasoningContentFromMessage,
|
||||
hasContent,
|
||||
hasToolCalls,
|
||||
stripUploadedFilesTag,
|
||||
isHiddenFromUIMessage,
|
||||
stripInternalMarkers,
|
||||
} from "../messages/utils";
|
||||
|
||||
import type { AgentThread } from "./types";
|
||||
import { titleOfThread } from "./utils";
|
||||
|
||||
/**
|
||||
* Optional debug switches for advanced exports.
|
||||
*
|
||||
* Bytedance/deer-flow issue #3107 BUG-006 explicitly prescribes that the
|
||||
* default export includes only the user-visible transcript and excludes
|
||||
* thinking/reasoning content, tool calls, tool results, hidden messages,
|
||||
* memory injection, and `<system-reminder>` payloads. These options let a
|
||||
* future "debug export" surface re-include any of those categories without
|
||||
* forking the formatter. They are not currently wired to any UI control —
|
||||
* callers that want them must construct the options object explicitly.
|
||||
*/
|
||||
export interface ExportOptions {
|
||||
includeReasoning?: boolean;
|
||||
includeToolCalls?: boolean;
|
||||
includeToolMessages?: boolean;
|
||||
includeHidden?: boolean;
|
||||
}
|
||||
|
||||
function visibleMessages(
|
||||
messages: Message[],
|
||||
options: ExportOptions,
|
||||
): Message[] {
|
||||
return messages.filter((message) => {
|
||||
if (!options.includeHidden && isHiddenFromUIMessage(message)) {
|
||||
return false;
|
||||
}
|
||||
if (!options.includeToolMessages && message.type === "tool") {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
});
|
||||
}
|
||||
|
||||
function formatMessageContent(message: Message): string {
|
||||
const text = extractContentFromMessage(message);
|
||||
if (!text) return "";
|
||||
return stripUploadedFilesTag(text);
|
||||
// Defence-in-depth: even if a middleware-injected marker slipped through
|
||||
// the `hide_from_ui` filter, scrub every known internal tag before the
|
||||
// content lands in a user-visible export file.
|
||||
return stripInternalMarkers(text);
|
||||
}
|
||||
|
||||
function formatToolCalls(message: Message): string {
|
||||
@@ -26,6 +63,7 @@ function formatToolCalls(message: Message): string {
|
||||
export function formatThreadAsMarkdown(
|
||||
thread: AgentThread,
|
||||
messages: Message[],
|
||||
options: ExportOptions = {},
|
||||
): string {
|
||||
const title = titleOfThread(thread);
|
||||
const createdAt = thread.created_at
|
||||
@@ -41,16 +79,20 @@ export function formatThreadAsMarkdown(
|
||||
"",
|
||||
];
|
||||
|
||||
for (const message of messages) {
|
||||
for (const message of visibleMessages(messages, options)) {
|
||||
if (message.type === "human") {
|
||||
const content = formatMessageContent(message);
|
||||
if (content) {
|
||||
lines.push(`## 🧑 User`, "", content, "", "---", "");
|
||||
}
|
||||
} else if (message.type === "ai") {
|
||||
const reasoning = extractReasoningContentFromMessage(message);
|
||||
const reasoning = options.includeReasoning
|
||||
? extractReasoningContentFromMessage(message)
|
||||
: undefined;
|
||||
const content = formatMessageContent(message);
|
||||
const toolCalls = formatToolCalls(message);
|
||||
const toolCalls = options.includeToolCalls
|
||||
? formatToolCalls(message)
|
||||
: "";
|
||||
|
||||
if (!content && !toolCalls && !reasoning) continue;
|
||||
|
||||
@@ -83,23 +125,65 @@ export function formatThreadAsMarkdown(
|
||||
return lines.join("\n").trimEnd() + "\n";
|
||||
}
|
||||
|
||||
interface JSONExportMessage {
|
||||
type: Message["type"];
|
||||
id: string | undefined;
|
||||
content: string;
|
||||
reasoning?: string;
|
||||
tool_calls?: unknown;
|
||||
}
|
||||
|
||||
function buildJSONMessage(
|
||||
msg: Message,
|
||||
options: ExportOptions,
|
||||
): JSONExportMessage | null {
|
||||
// Run the same sanitiser the Markdown path uses so the JSON `content`
|
||||
// field never carries inline `<think>...</think>` wrappers, content-array
|
||||
// thinking blocks, `<uploaded_files>` markers, or other internal payloads.
|
||||
const content = formatMessageContent(msg);
|
||||
const reasoning =
|
||||
options.includeReasoning && msg.type === "ai"
|
||||
? (extractReasoningContentFromMessage(msg) ?? undefined)
|
||||
: undefined;
|
||||
const toolCalls =
|
||||
options.includeToolCalls &&
|
||||
msg.type === "ai" &&
|
||||
"tool_calls" in msg &&
|
||||
msg.tool_calls?.length
|
||||
? msg.tool_calls
|
||||
: undefined;
|
||||
|
||||
// Drop rows with no exportable payload (empty content + no opted-in
|
||||
// reasoning / tool_calls). Uses falsy semantics so `reasoning: ""` (the
|
||||
// empty string ``extractReasoningContentFromMessage`` can hand back) is
|
||||
// treated the same way Markdown's `!reasoning` continue does — otherwise
|
||||
// an opted-in but empty reasoning field would leak as `{reasoning: ""}`.
|
||||
if (!content && !reasoning && !toolCalls) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return {
|
||||
type: msg.type,
|
||||
id: msg.id,
|
||||
content,
|
||||
...(reasoning !== undefined ? { reasoning } : {}),
|
||||
...(toolCalls !== undefined ? { tool_calls: toolCalls } : {}),
|
||||
};
|
||||
}
|
||||
|
||||
export function formatThreadAsJSON(
|
||||
thread: AgentThread,
|
||||
messages: Message[],
|
||||
options: ExportOptions = {},
|
||||
): string {
|
||||
const exportData = {
|
||||
title: titleOfThread(thread),
|
||||
thread_id: thread.thread_id,
|
||||
created_at: thread.created_at,
|
||||
exported_at: new Date().toISOString(),
|
||||
messages: messages.map((msg) => ({
|
||||
type: msg.type,
|
||||
id: msg.id,
|
||||
content: typeof msg.content === "string" ? msg.content : msg.content,
|
||||
...(msg.type === "ai" && msg.tool_calls?.length
|
||||
? { tool_calls: msg.tool_calls }
|
||||
: {}),
|
||||
})),
|
||||
messages: visibleMessages(messages, options)
|
||||
.map((msg) => buildJSONMessage(msg, options))
|
||||
.filter((m): m is JSONExportMessage => m !== null),
|
||||
};
|
||||
return JSON.stringify(exportData, null, 2);
|
||||
}
|
||||
|
||||
@@ -0,0 +1,112 @@
|
||||
import { describe, expect, it } from "vitest";
|
||||
|
||||
import { parseSubtaskResult } from "@/core/tasks/subtask-result";
|
||||
|
||||
describe("parseSubtaskResult", () => {
|
||||
it("recognises the standard success prefix", () => {
|
||||
const parsed = parseSubtaskResult(
|
||||
"Task Succeeded. Result: investigated and produced a 3-page report",
|
||||
);
|
||||
expect(parsed.status).toBe("completed");
|
||||
expect(parsed.result).toBe("investigated and produced a 3-page report");
|
||||
});
|
||||
|
||||
it("recognises the standard failure prefix", () => {
|
||||
const parsed = parseSubtaskResult(
|
||||
"Task failed. underlying tool raised RuntimeError",
|
||||
);
|
||||
expect(parsed.status).toBe("failed");
|
||||
expect(parsed.error).toBe("underlying tool raised RuntimeError");
|
||||
});
|
||||
|
||||
it("recognises the standard timeout prefix", () => {
|
||||
const parsed = parseSubtaskResult("Task timed out after 900s");
|
||||
expect(parsed.status).toBe("failed");
|
||||
expect(parsed.error).toBe("Task timed out after 900s");
|
||||
});
|
||||
|
||||
it("recognises the cancelled-by-user prefix", () => {
|
||||
// bytedance/deer-flow#3131 review: this is one of the five terminal
|
||||
// strings task_tool.py actually emits — the previous cut treated it as
|
||||
// unrecognised content and pushed the card back to in_progress.
|
||||
const parsed = parseSubtaskResult("Task cancelled by user.");
|
||||
expect(parsed.status).toBe("failed");
|
||||
expect(parsed.error).toBe("Task cancelled by user.");
|
||||
});
|
||||
|
||||
it("recognises the polling-timed-out prefix", () => {
|
||||
// Emitted by task_tool when the background polling loop runs out of
|
||||
// budget waiting for the subagent to reach a terminal state.
|
||||
const parsed = parseSubtaskResult(
|
||||
"Task polling timed out after 15 minutes. This may indicate the background task is stuck. Status: RUNNING",
|
||||
);
|
||||
expect(parsed.status).toBe("failed");
|
||||
expect(parsed.error).toContain("polling timed out");
|
||||
});
|
||||
|
||||
it("recognises polling-timed-out with different durations", () => {
|
||||
// `task_tool` emits `Task polling timed out after {N} minutes` where N
|
||||
// varies with the configured subagent timeout. Guard against the regex
|
||||
// accidentally being pinned to a specific number.
|
||||
for (const n of [1, 5, 60]) {
|
||||
const parsed = parseSubtaskResult(
|
||||
`Task polling timed out after ${n} minutes. Status: RUNNING`,
|
||||
);
|
||||
expect(parsed.status).toBe("failed");
|
||||
}
|
||||
});
|
||||
|
||||
it("trims whitespace around cancelled and polling-timed-out prefixes", () => {
|
||||
// Streaming chunks sometimes arrive with leading/trailing newlines.
|
||||
expect(parseSubtaskResult(" Task cancelled by user. \n").status).toBe(
|
||||
"failed",
|
||||
);
|
||||
expect(
|
||||
parseSubtaskResult("\n\nTask polling timed out after 3 minutes").status,
|
||||
).toBe("failed");
|
||||
});
|
||||
|
||||
it("recognises task_tool pre-execution Error: returns via the wrapper", () => {
|
||||
// `task_tool.py` returns three `Error:` strings for unknown subagent
|
||||
// type, host-bash disabled, and "task disappeared". They share the
|
||||
// ERROR_WRAPPER_PATTERN, not a dedicated prefix, so this guards
|
||||
// against a refactor splitting them off.
|
||||
for (const text of [
|
||||
"Error: Unknown subagent type 'foo'. Available: bash, general-purpose",
|
||||
"Error: Host bash subagent is disabled by configuration",
|
||||
"Error: Task 1234 disappeared from background tasks",
|
||||
]) {
|
||||
expect(parseSubtaskResult(text).status).toBe("failed");
|
||||
}
|
||||
});
|
||||
|
||||
it("treats middleware-wrapped tool errors as terminal failures", () => {
|
||||
// bytedance/deer-flow issue #3107 BUG-007: the parent-visible ToolMessage
|
||||
// produced by ToolErrorHandlingMiddleware never matches the three legacy
|
||||
// prefixes, so subtask cards stay stuck on "in_progress".
|
||||
const parsed = parseSubtaskResult(
|
||||
"Error: Tool 'task' failed with TypeError: 'AsyncCallbackManager' object is not iterable. Continue with available context, or choose an alternative tool.",
|
||||
);
|
||||
expect(parsed.status).toBe("failed");
|
||||
expect(parsed.error).toContain("AsyncCallbackManager");
|
||||
});
|
||||
|
||||
it("treats any other Error: prefix as a terminal failure", () => {
|
||||
const parsed = parseSubtaskResult("Error: subagent worker pool exhausted");
|
||||
expect(parsed.status).toBe("failed");
|
||||
});
|
||||
|
||||
it("keeps unrecognised non-error output as in_progress", () => {
|
||||
// Streaming partial chunks should not flip the card to terminal early.
|
||||
const parsed = parseSubtaskResult("Investigating ...");
|
||||
expect(parsed.status).toBe("in_progress");
|
||||
expect(parsed.error).toBeUndefined();
|
||||
expect(parsed.result).toBeUndefined();
|
||||
});
|
||||
|
||||
it("trims surrounding whitespace before matching prefixes", () => {
|
||||
const parsed = parseSubtaskResult(" Task Succeeded. Result: ok ");
|
||||
expect(parsed.status).toBe("completed");
|
||||
expect(parsed.result).toBe("ok");
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,317 @@
|
||||
import type { Message } from "@langchain/langgraph-sdk";
|
||||
import { describe, expect, it } from "vitest";
|
||||
|
||||
import {
|
||||
formatThreadAsJSON,
|
||||
formatThreadAsMarkdown,
|
||||
} from "@/core/threads/export";
|
||||
import type { AgentThread } from "@/core/threads/types";
|
||||
|
||||
// Bytedance/deer-flow issue #3107 BUG-006: the chat export path bypasses the
|
||||
// UI-level hidden-message filter and emits reasoning content, tool calls, and
|
||||
// any other "internal" payload as if it were part of the user transcript.
|
||||
|
||||
function makeThread(): AgentThread {
|
||||
return {
|
||||
thread_id: "thread-1",
|
||||
created_at: "2026-05-21T00:00:00Z",
|
||||
updated_at: "2026-05-21T00:00:00Z",
|
||||
metadata: { title: "Demo thread" },
|
||||
status: "idle",
|
||||
values: { messages: [] },
|
||||
} as unknown as AgentThread;
|
||||
}
|
||||
|
||||
function human(content: string, extra: Partial<Message> = {}): Message {
|
||||
return {
|
||||
id: `h-${content}`,
|
||||
type: "human",
|
||||
content,
|
||||
...extra,
|
||||
} as Message;
|
||||
}
|
||||
|
||||
function ai(
|
||||
content: string,
|
||||
extra: Partial<Message> & { tool_calls?: unknown } = {},
|
||||
): Message {
|
||||
return {
|
||||
id: `a-${content}`,
|
||||
type: "ai",
|
||||
content,
|
||||
...extra,
|
||||
} as Message;
|
||||
}
|
||||
|
||||
function toolMsg(content: string): Message {
|
||||
return {
|
||||
id: `t-${content}`,
|
||||
type: "tool",
|
||||
content,
|
||||
name: "task",
|
||||
tool_call_id: "call-1",
|
||||
} as unknown as Message;
|
||||
}
|
||||
|
||||
describe("formatThreadAsMarkdown", () => {
|
||||
it("includes plain user and assistant text", () => {
|
||||
const md = formatThreadAsMarkdown(makeThread(), [
|
||||
human("hello"),
|
||||
ai("hi there"),
|
||||
]);
|
||||
expect(md).toContain("hello");
|
||||
expect(md).toContain("hi there");
|
||||
});
|
||||
|
||||
it("drops messages marked hide_from_ui", () => {
|
||||
const hidden = human("internal system reminder", {
|
||||
additional_kwargs: { hide_from_ui: true },
|
||||
} as Partial<Message>);
|
||||
const md = formatThreadAsMarkdown(makeThread(), [
|
||||
hidden,
|
||||
ai("public answer"),
|
||||
]);
|
||||
expect(md).not.toContain("internal system reminder");
|
||||
expect(md).toContain("public answer");
|
||||
});
|
||||
|
||||
it("does not emit reasoning_content by default", () => {
|
||||
const message = ai("final answer", {
|
||||
additional_kwargs: {
|
||||
reasoning_content: "secret chain of thought",
|
||||
},
|
||||
} as Partial<Message>);
|
||||
const md = formatThreadAsMarkdown(makeThread(), [message]);
|
||||
expect(md).not.toContain("secret chain of thought");
|
||||
expect(md).not.toContain("Thinking");
|
||||
});
|
||||
|
||||
it("does not emit tool calls by default", () => {
|
||||
const message = ai("ok", {
|
||||
tool_calls: [{ id: "1", name: "task", args: { description: "do work" } }],
|
||||
} as Partial<Message>);
|
||||
const md = formatThreadAsMarkdown(makeThread(), [message]);
|
||||
expect(md).not.toContain("**Tool:**");
|
||||
expect(md).not.toContain("`task`");
|
||||
});
|
||||
|
||||
it("drops tool result messages", () => {
|
||||
const md = formatThreadAsMarkdown(makeThread(), [
|
||||
ai("delegating"),
|
||||
toolMsg("Task Succeeded. Result: confidential"),
|
||||
]);
|
||||
expect(md).not.toContain("confidential");
|
||||
});
|
||||
});
|
||||
|
||||
describe("formatThreadAsMarkdown opt-in flags", () => {
|
||||
it("emits reasoning when includeReasoning is true", () => {
|
||||
const message = ai("final answer", {
|
||||
additional_kwargs: {
|
||||
reasoning_content: "step-by-step chain of thought",
|
||||
},
|
||||
} as Partial<Message>);
|
||||
const md = formatThreadAsMarkdown(makeThread(), [message], {
|
||||
includeReasoning: true,
|
||||
});
|
||||
expect(md).toContain("step-by-step chain of thought");
|
||||
expect(md).toContain("Thinking");
|
||||
});
|
||||
|
||||
it("emits tool call rows when includeToolCalls is true", () => {
|
||||
const message = ai("ok", {
|
||||
tool_calls: [{ id: "1", name: "task", args: { description: "do work" } }],
|
||||
} as Partial<Message>);
|
||||
const md = formatThreadAsMarkdown(makeThread(), [message], {
|
||||
includeToolCalls: true,
|
||||
});
|
||||
expect(md).toContain("**Tool:**");
|
||||
expect(md).toContain("`task`");
|
||||
});
|
||||
|
||||
it("keeps hidden messages when includeHidden is true", () => {
|
||||
const hidden = human("internal reminder", {
|
||||
additional_kwargs: { hide_from_ui: true },
|
||||
} as Partial<Message>);
|
||||
const md = formatThreadAsMarkdown(makeThread(), [hidden], {
|
||||
includeHidden: true,
|
||||
});
|
||||
expect(md).toContain("internal reminder");
|
||||
});
|
||||
});
|
||||
|
||||
describe("formatThreadAsJSON opt-in flags", () => {
|
||||
it("emits tool_calls field when includeToolCalls is true", () => {
|
||||
const message = ai("ok", {
|
||||
tool_calls: [{ id: "1", name: "task", args: { description: "x" } }],
|
||||
} as Partial<Message>);
|
||||
const raw = formatThreadAsJSON(makeThread(), [message], {
|
||||
includeToolCalls: true,
|
||||
});
|
||||
expect(raw).toContain("tool_calls");
|
||||
expect(raw).toContain('"task"');
|
||||
});
|
||||
|
||||
it("keeps tool messages when includeToolMessages is true", () => {
|
||||
const raw = formatThreadAsJSON(
|
||||
makeThread(),
|
||||
[toolMsg("Task Succeeded. Result: keep me")],
|
||||
{ includeToolMessages: true },
|
||||
);
|
||||
const parsed = JSON.parse(raw) as { messages: { type: string }[] };
|
||||
expect(parsed.messages.some((m) => m.type === "tool")).toBe(true);
|
||||
expect(raw).toContain("keep me");
|
||||
});
|
||||
});
|
||||
|
||||
describe("formatThreadAsJSON", () => {
|
||||
it("strips hidden messages, tool messages, reasoning, and tool calls", () => {
|
||||
const messages = [
|
||||
human("hello"),
|
||||
human("secret reminder", {
|
||||
additional_kwargs: { hide_from_ui: true },
|
||||
} as Partial<Message>),
|
||||
ai("answer", {
|
||||
additional_kwargs: {
|
||||
reasoning_content: "secret reasoning",
|
||||
},
|
||||
tool_calls: [{ id: "1", name: "task", args: {} }],
|
||||
} as Partial<Message>),
|
||||
toolMsg("internal trace"),
|
||||
];
|
||||
const raw = formatThreadAsJSON(makeThread(), messages);
|
||||
const parsed = JSON.parse(raw) as {
|
||||
messages: { type: string; tool_calls?: unknown[] }[];
|
||||
};
|
||||
|
||||
expect(parsed.messages).toHaveLength(2);
|
||||
expect(parsed.messages.every((m) => m.type !== "tool")).toBe(true);
|
||||
expect(raw).not.toContain("secret reminder");
|
||||
expect(raw).not.toContain("secret reasoning");
|
||||
expect(raw).not.toContain("internal trace");
|
||||
expect(raw).not.toContain("tool_calls");
|
||||
});
|
||||
|
||||
it("strips inline <think>...</think> wrappers from content", () => {
|
||||
// bytedance/deer-flow#3131 review: JSON export must run the same
|
||||
// sanitiser the Markdown path uses so inline reasoning never leaks
|
||||
// even when `includeReasoning` is left at its default false.
|
||||
const message = ai("<think>internal monologue</think>visible answer", {
|
||||
id: "ai-1",
|
||||
} as Partial<Message>);
|
||||
const raw = formatThreadAsJSON(makeThread(), [message]);
|
||||
expect(raw).not.toContain("internal monologue");
|
||||
expect(raw).not.toContain("<think>");
|
||||
expect(raw).toContain("visible answer");
|
||||
});
|
||||
|
||||
it("strips content-array thinking blocks from content", () => {
|
||||
const message = ai("placeholder", {
|
||||
id: "ai-2",
|
||||
content: [
|
||||
{ type: "thinking", thinking: "hidden reasoning step" },
|
||||
{ type: "text", text: "final visible text" },
|
||||
],
|
||||
} as unknown as Partial<Message>);
|
||||
const raw = formatThreadAsJSON(makeThread(), [message]);
|
||||
expect(raw).not.toContain("hidden reasoning step");
|
||||
expect(raw).toContain("final visible text");
|
||||
});
|
||||
|
||||
it("strips <uploaded_files> markers from content", () => {
|
||||
const message = human(
|
||||
"real prompt\n<uploaded_files>\n/mnt/user-data/uploads/secret.pdf\n</uploaded_files>",
|
||||
{ id: "h-clean" } as Partial<Message>,
|
||||
);
|
||||
const raw = formatThreadAsJSON(makeThread(), [message]);
|
||||
expect(raw).not.toContain("<uploaded_files>");
|
||||
expect(raw).not.toContain("secret.pdf");
|
||||
expect(raw).toContain("real prompt");
|
||||
});
|
||||
|
||||
it("drops AI messages that sanitise to empty content", () => {
|
||||
// Pure-reasoning AI fragments (no visible text, no tool calls) should
|
||||
// not survive as `{content: ""}` rows in the export.
|
||||
const message = ai("<think>only thinking, no answer</think>", {
|
||||
id: "ai-3",
|
||||
} as Partial<Message>);
|
||||
const raw = formatThreadAsJSON(makeThread(), [message]);
|
||||
const parsed = JSON.parse(raw) as { messages: unknown[] };
|
||||
expect(parsed.messages).toHaveLength(0);
|
||||
});
|
||||
|
||||
it("strips <system-reminder>/<memory>/<current_date> as defence in depth", () => {
|
||||
// Primary protection is `isHiddenFromUIMessage` filtering the whole
|
||||
// hidden HumanMessage. If a regression strips the `hide_from_ui` flag
|
||||
// (or the marker leaks into an otherwise-visible message), the
|
||||
// sanitiser must still scrub the payload before export.
|
||||
const leaky = human("real user text", {
|
||||
id: "leak-1",
|
||||
content:
|
||||
"<system-reminder>\n<memory>secret fact A</memory>\n<current_date>2026-01-01, Tuesday</current_date>\n</system-reminder>\nreal user text",
|
||||
// Deliberately *not* setting hide_from_ui to model the regression
|
||||
// case the defence-in-depth strip is guarding against.
|
||||
} as unknown as Partial<Message>);
|
||||
const raw = formatThreadAsJSON(makeThread(), [leaky]);
|
||||
expect(raw).not.toContain("<system-reminder>");
|
||||
expect(raw).not.toContain("<memory>");
|
||||
expect(raw).not.toContain("<current_date>");
|
||||
expect(raw).not.toContain("secret fact A");
|
||||
expect(raw).toContain("real user text");
|
||||
});
|
||||
|
||||
it("sanitises tool message content when includeToolMessages is true", () => {
|
||||
const message = {
|
||||
id: "t-leak",
|
||||
type: "tool",
|
||||
content:
|
||||
"Task Succeeded. Result: payload\n<uploaded_files>\n/mnt/user-data/uploads/secret.pdf\n</uploaded_files>",
|
||||
name: "task",
|
||||
tool_call_id: "call-leak",
|
||||
} as unknown as Message;
|
||||
|
||||
const raw = formatThreadAsJSON(makeThread(), [message], {
|
||||
includeToolMessages: true,
|
||||
});
|
||||
expect(raw).toContain("Task Succeeded");
|
||||
expect(raw).not.toContain("<uploaded_files>");
|
||||
expect(raw).not.toContain("secret.pdf");
|
||||
});
|
||||
|
||||
it("preserves text and image_url parts in mixed content arrays", () => {
|
||||
// `extractContentFromMessage` keeps `text` and `image_url` parts and
|
||||
// drops `thinking` parts. The JSON export must agree with that
|
||||
// contract.
|
||||
const message = ai("placeholder", {
|
||||
id: "ai-mixed",
|
||||
content: [
|
||||
{ type: "thinking", thinking: "internal reasoning" },
|
||||
{ type: "text", text: "user-visible answer" },
|
||||
{
|
||||
type: "image_url",
|
||||
image_url: { url: "https://example.invalid/cat.png" },
|
||||
},
|
||||
],
|
||||
} as unknown as Partial<Message>);
|
||||
const raw = formatThreadAsJSON(makeThread(), [message]);
|
||||
expect(raw).toContain("user-visible answer");
|
||||
expect(raw).toContain("https://example.invalid/cat.png");
|
||||
expect(raw).not.toContain("internal reasoning");
|
||||
});
|
||||
|
||||
it("drops opted-in empty reasoning rather than emit reasoning: ''", () => {
|
||||
// `extractReasoningContentFromMessage` can legitimately hand back ""
|
||||
// for an AI message that has no reasoning content. The export must
|
||||
// mirror the Markdown path's `!reasoning` `continue` and drop the row
|
||||
// instead of leaking `{reasoning: ""}`.
|
||||
const message = ai("", {
|
||||
id: "ai-empty-reasoning",
|
||||
additional_kwargs: { reasoning_content: "" },
|
||||
} as Partial<Message>);
|
||||
const raw = formatThreadAsJSON(makeThread(), [message], {
|
||||
includeReasoning: true,
|
||||
});
|
||||
const parsed = JSON.parse(raw) as { messages: unknown[] };
|
||||
expect(parsed.messages).toHaveLength(0);
|
||||
});
|
||||
});
|
||||
Reference in New Issue
Block a user