diff --git a/backend/CLAUDE.md b/backend/CLAUDE.md index 8c4711395..f04774050 100644 --- a/backend/CLAUDE.md +++ b/backend/CLAUDE.md @@ -184,6 +184,18 @@ Setup: Copy `config.example.yaml` to `config.yaml` in the **project root** direc **Config Caching**: `get_app_config()` caches the parsed config, but automatically reloads it when the resolved config path changes or the file's mtime increases. This keeps Gateway and LangGraph reads aligned with `config.yaml` edits without requiring a manual process restart. +**Config Hot-Reload Boundary**: Gateway dependencies route through `get_app_config()` on every request, so per-run fields like `models[*].max_tokens`, `summarization.*`, `title.*`, `memory.*`, `subagents.*`, `tools[*]`, and the agent system prompt pick up `config.yaml` edits on the next message. `AppConfig` is intentionally **not** cached on `app.state` — `lifespan()` keeps a local `startup_config` variable for one-shot bootstrap work (logging level, channels, `langgraph_runtime` engines) and passes it explicitly to `langgraph_runtime(app, startup_config)`. Infrastructure fields are **restart-required**: + +| Field | Why a restart is required | +|---|---| +| `database.*` | `init_engine_from_config()` runs once during `langgraph_runtime()` startup; the SQLAlchemy engine holds the connection pool. | +| `checkpointer.*` (including SQLite WAL/journal settings) | `make_checkpointer()` binds the persistent checkpointer once at startup. | +| `run_events.*` | `make_run_event_store()` selects memory- vs. SQL-backed implementation at startup. | +| `stream_bridge.*` | `make_stream_bridge()` constructs the bridge object once. | +| `sandbox.use` | `get_sandbox_provider()` caches the provider singleton (`_default_sandbox_provider`); a new class path takes effect only on next process start. | +| `log_level` | `apply_logging_level()` is called only in `app.py` startup; it mutates the root logger's level, and `get_app_config()` returning a fresh `AppConfig` does not retrigger it. | +| `channels.*` IM platform credentials | `start_channel_service()` is invoked once during startup; live channels are not rebuilt on config change. | + Configuration priority: 1. Explicit `config_path` argument 2. `DEER_FLOW_CONFIG_PATH` environment variable diff --git a/backend/app/gateway/app.py b/backend/app/gateway/app.py index 2c13f571c..8baecb363 100644 --- a/backend/app/gateway/app.py +++ b/backend/app/gateway/app.py @@ -161,10 +161,16 @@ async def _migrate_orphaned_threads(store, admin_user_id: str) -> int: async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: """Application lifespan handler.""" - # Load config and check necessary environment variables at startup + # Load config and check necessary environment variables at startup. + # `startup_config` is a local snapshot used only for one-shot bootstrap + # work (logging level, langgraph_runtime engines, channels). Request-time + # config resolution always routes through `get_app_config()` in + # `app/gateway/deps.py::get_config()` so `config.yaml` edits become + # visible without a process restart. We deliberately do NOT cache this + # snapshot on `app.state` to keep that contract enforceable. try: - app.state.config = get_app_config() - apply_logging_level(app.state.config.log_level) + startup_config = get_app_config() + apply_logging_level(startup_config.log_level) logger.info("Configuration loaded successfully") except Exception as e: error_msg = f"Failed to load configuration during gateway startup: {e}" @@ -174,7 +180,7 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: logger.info(f"Starting API Gateway on {config.host}:{config.port}") # Initialize LangGraph runtime components (StreamBridge, RunManager, checkpointer, store) - async with langgraph_runtime(app): + async with langgraph_runtime(app, startup_config): logger.info("LangGraph runtime initialised") # Check admin bootstrap state and migrate orphan threads after admin exists. @@ -185,7 +191,7 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: try: from app.channels.service import start_channel_service - channel_service = await start_channel_service(app.state.config) + channel_service = await start_channel_service(startup_config) logger.info("Channel service started: %s", channel_service.get_status()) except Exception: logger.exception("No IM channels configured or channel service failed to start") diff --git a/backend/app/gateway/deps.py b/backend/app/gateway/deps.py index 96ea7c5ea..f045a2ee3 100644 --- a/backend/app/gateway/deps.py +++ b/backend/app/gateway/deps.py @@ -3,11 +3,21 @@ **Getters** (used by routers): raise 503 when a required dependency is missing, except ``get_store`` which returns ``None``. +``AppConfig`` is intentionally *not* cached on ``app.state``. Routers and the +run path resolve it through :func:`deerflow.config.app_config.get_app_config`, +which performs mtime-based hot reload, so edits to ``config.yaml`` take +effect on the next request without a process restart. The engines created in +:func:`langgraph_runtime` (stream bridge, persistence, checkpointer, store, +run-event store) accept a ``startup_config`` snapshot — they are +restart-required by design and stay bound to that snapshot to keep the live +process consistent with itself. + Initialization is handled directly in ``app.py`` via :class:`AsyncExitStack`. """ from __future__ import annotations +import logging from collections.abc import AsyncGenerator, Callable from contextlib import AsyncExitStack, asynccontextmanager from typing import TYPE_CHECKING, TypeVar, cast @@ -15,12 +25,14 @@ from typing import TYPE_CHECKING, TypeVar, cast from fastapi import FastAPI, HTTPException, Request from langgraph.types import Checkpointer -from deerflow.config.app_config import AppConfig +from deerflow.config.app_config import AppConfig, get_app_config from deerflow.persistence.feedback import FeedbackRepository from deerflow.runtime import RunContext, RunManager, StreamBridge from deerflow.runtime.events.store.base import RunEventStore from deerflow.runtime.runs.store.base import RunStore +logger = logging.getLogger(__name__) + if TYPE_CHECKING: from app.gateway.auth.local_provider import LocalAuthProvider from app.gateway.auth.repositories.sqlite import SQLiteUserRepository @@ -30,21 +42,55 @@ if TYPE_CHECKING: T = TypeVar("T") -def get_config(request: Request) -> AppConfig: - """Return the app-scoped ``AppConfig`` stored on ``app.state``.""" - config = getattr(request.app.state, "config", None) - if config is None: - raise HTTPException(status_code=503, detail="Configuration not available") - return config +def get_config() -> AppConfig: + """Return the freshest ``AppConfig`` for the current request. + + Routes through :func:`deerflow.config.app_config.get_app_config`, which + honours runtime ``ContextVar`` overrides and reloads ``config.yaml`` from + disk when its mtime changes. ``AppConfig`` is not cached on ``app.state`` + at all — the only startup-time snapshot lives as a local + ``startup_config`` variable inside ``lifespan()`` and is passed + explicitly into :func:`langgraph_runtime` for the engines that are + restart-required by design. Routing every request through + :func:`get_app_config` closes the bytedance/deer-flow issue #3107 BUG-001 + split-brain where the worker / lead-agent thread saw a stale startup + snapshot. + + Any failure to materialise the config (missing file, permission denied, + YAML parse error, validation error) is reported as 503 — semantically + "the gateway cannot serve requests without a usable configuration" — and + logged with the original exception so operators have something to debug. + """ + try: + return get_app_config() + except Exception as exc: # noqa: BLE001 - request boundary: log and degrade gracefully + logger.exception("Failed to load AppConfig at request time") + raise HTTPException(status_code=503, detail="Configuration not available") from exc @asynccontextmanager -async def langgraph_runtime(app: FastAPI) -> AsyncGenerator[None, None]: +async def langgraph_runtime(app: FastAPI, startup_config: AppConfig) -> AsyncGenerator[None, None]: """Bootstrap and tear down all LangGraph runtime singletons. + ``startup_config`` is the ``AppConfig`` snapshot taken once during + ``lifespan()`` for one-shot infrastructure bootstrap. The engines and + stores constructed here (stream bridge, persistence engine, checkpointer, + store, run-event store) are restart-required by design — they hold live + connections, file handles, or singleton providers — so they bind to this + snapshot and survive across `config.yaml` edits. Request-time consumers + must still go through :func:`get_config` for any field that should be + hot-reloadable. See ``backend/CLAUDE.md`` "Config Hot-Reload Boundary". + + The matching ``run_events_config`` is frozen onto ``app.state`` so + :func:`get_run_context` pairs a freshly-loaded ``AppConfig`` with the + *startup-time* run-events configuration the underlying ``event_store`` + was built from — otherwise the runtime could end up combining a live + new ``run_events_config`` with an event store still bound to the + previous backend. + Usage in ``app.py``:: - async with langgraph_runtime(app): + async with langgraph_runtime(app, startup_config): yield """ from deerflow.persistence.engine import close_engine, get_session_factory, init_engine_from_config @@ -53,9 +99,7 @@ async def langgraph_runtime(app: FastAPI) -> AsyncGenerator[None, None]: from deerflow.runtime.events.store import make_run_event_store async with AsyncExitStack() as stack: - config = getattr(app.state, "config", None) - if config is None: - raise RuntimeError("langgraph_runtime() requires app.state.config to be initialized") + config = startup_config app.state.stream_bridge = await stack.enter_async_context(make_stream_bridge(config)) @@ -84,8 +128,12 @@ async def langgraph_runtime(app: FastAPI) -> AsyncGenerator[None, None]: app.state.thread_store = make_thread_store(sf, app.state.store) - # Run event store (has its own factory with config-driven backend selection) + # Run event store. The store and the matching ``run_events_config`` are + # both frozen at startup so ``get_run_context`` does not combine a + # freshly-reloaded ``AppConfig.run_events`` with a store still bound to + # the previous backend. run_events_config = getattr(config, "run_events", None) + app.state.run_events_config = run_events_config app.state.run_event_store = make_run_event_store(run_events_config) # RunManager with store backing for persistence @@ -139,16 +187,20 @@ def get_thread_store(request: Request) -> ThreadMetaStore: def get_run_context(request: Request) -> RunContext: """Build a :class:`RunContext` from ``app.state`` singletons. - Returns a *base* context with infrastructure dependencies. + Returns a *base* context with infrastructure dependencies. The + ``app_config`` field is resolved live so per-run fields (e.g. + ``models[*].max_tokens``) follow ``config.yaml`` edits; the + ``event_store`` / ``run_events_config`` pair stays frozen to the snapshot + captured in :func:`langgraph_runtime` so callers never see a store bound + to one backend paired with a config pointing at another. """ - config = get_config(request) return RunContext( checkpointer=get_checkpointer(request), store=get_store(request), event_store=get_run_event_store(request), - run_events_config=getattr(config, "run_events", None), + run_events_config=getattr(request.app.state, "run_events_config", None), thread_store=get_thread_store(request), - app_config=config, + app_config=get_config(), ) diff --git a/backend/packages/harness/deerflow/tools/builtins/task_tool.py b/backend/packages/harness/deerflow/tools/builtins/task_tool.py index a45bff787..dab1377c6 100644 --- a/backend/packages/harness/deerflow/tools/builtins/task_tool.py +++ b/backend/packages/harness/deerflow/tools/builtins/task_tool.py @@ -7,6 +7,7 @@ from dataclasses import replace from typing import TYPE_CHECKING, Annotated, Any, cast from langchain.tools import InjectedToolCallId, tool +from langchain_core.callbacks import BaseCallbackManager from langgraph.config import get_stream_writer from deerflow.config import get_app_config @@ -99,15 +100,31 @@ def _schedule_deferred_subagent_cleanup(task_id: str, trace_id: str, max_polls: def _find_usage_recorder(runtime: Any) -> Any | None: - """Find a callback handler with ``record_external_llm_usage_records`` in the runtime config.""" + """Find a callback handler with ``record_external_llm_usage_records`` in the runtime config. + + LangChain may pass ``config["callbacks"]`` in three different shapes: + + - ``None`` (no callbacks registered): no recorder. + - A plain ``list[BaseCallbackHandler]``: iterate it directly. + - A ``BaseCallbackManager`` instance (e.g. ``AsyncCallbackManager`` on async + tool runs): managers are not iterable, so we unwrap ``.handlers`` first. + + Any other shape (e.g. a single handler object accidentally passed without a + list wrapper) cannot be iterated safely; treat it as "no recorder" rather + than raise. + """ if runtime is None: return None config = getattr(runtime, "config", None) if not isinstance(config, dict): return None - callbacks = config.get("callbacks", []) + callbacks = config.get("callbacks") + if isinstance(callbacks, BaseCallbackManager): + callbacks = callbacks.handlers if not callbacks: return None + if not isinstance(callbacks, list): + return None for cb in callbacks: if hasattr(cb, "record_external_llm_usage_records"): return cb diff --git a/backend/tests/test_gateway_config_freshness.py b/backend/tests/test_gateway_config_freshness.py new file mode 100644 index 000000000..8f38ab6cc --- /dev/null +++ b/backend/tests/test_gateway_config_freshness.py @@ -0,0 +1,189 @@ +"""Regression tests for gateway config freshness on the request hot path. + +Bytedance/deer-flow issue #3107 BUG-001: the worker and lead-agent path +captured ``app.state.config`` at gateway startup. ``config.yaml`` edits during +runtime were therefore ignored — ``get_app_config()``'s mtime-based reload +existed but was bypassed because the snapshot object was passed through +explicitly. + +These tests pin the desired behaviour: a request-time ``get_config`` call must +observe the most recent on-disk ``config.yaml`` (mtime reload), and the +runtime ``ContextVar`` override must keep working for per-request injection. +""" + +from __future__ import annotations + +import os +from pathlib import Path + +import pytest +from fastapi import Depends, FastAPI +from fastapi.testclient import TestClient + +from app.gateway import deps as gateway_deps +from app.gateway.deps import get_config +from deerflow.config.app_config import ( + AppConfig, + pop_current_app_config, + push_current_app_config, + reset_app_config, + set_app_config, +) +from deerflow.config.sandbox_config import SandboxConfig + + +@pytest.fixture(autouse=True) +def _isolate_app_config_singleton(): + """Ensure each test starts with a clean module-level cache.""" + reset_app_config() + yield + reset_app_config() + + +def _write_config_yaml(path: Path, *, log_level: str) -> None: + path.write_text( + f""" +sandbox: + use: deerflow.sandbox.local.provider:LocalSandboxProvider +log_level: {log_level} +""".strip() + + "\n", + encoding="utf-8", + ) + + +def _build_app() -> FastAPI: + app = FastAPI() + + @app.get("/probe") + def probe(cfg: AppConfig = Depends(get_config)): + return {"log_level": cfg.log_level} + + return app + + +def test_get_config_reflects_file_mtime_reload(tmp_path, monkeypatch): + """Editing config.yaml at runtime must be visible to /probe without restart. + + This is the literal repro for the issue: the gateway must not freeze the + config to whatever was on disk when the process started. + """ + config_file = tmp_path / "config.yaml" + _write_config_yaml(config_file, log_level="info") + monkeypatch.setenv("DEER_FLOW_CONFIG_PATH", str(config_file)) + + app = _build_app() + client = TestClient(app) + assert client.get("/probe").json() == {"log_level": "info"} + + # Edit the file and bump its mtime — simulating a maintainer changing + # max_tokens / model settings in production while the gateway is live. + _write_config_yaml(config_file, log_level="debug") + future_mtime = config_file.stat().st_mtime + 5 + os.utime(config_file, (future_mtime, future_mtime)) + + assert client.get("/probe").json() == {"log_level": "debug"} + + +def test_get_config_respects_runtime_context_override(tmp_path, monkeypatch): + """Per-request ``push_current_app_config`` injection must still win.""" + config_file = tmp_path / "config.yaml" + _write_config_yaml(config_file, log_level="info") + monkeypatch.setenv("DEER_FLOW_CONFIG_PATH", str(config_file)) + + override = AppConfig(sandbox=SandboxConfig(use="test"), log_level="trace") + push_current_app_config(override) + try: + app = _build_app() + client = TestClient(app) + assert client.get("/probe").json() == {"log_level": "trace"} + finally: + pop_current_app_config() + + +def test_get_config_respects_test_set_app_config(): + """``set_app_config`` (used by upload/skills router tests) keeps working.""" + injected = AppConfig(sandbox=SandboxConfig(use="test"), log_level="warning") + set_app_config(injected) + + app = _build_app() + client = TestClient(app) + assert client.get("/probe").json() == {"log_level": "warning"} + + +def test_run_context_app_config_reflects_yaml_edit(tmp_path, monkeypatch): + """``RunContext.app_config`` must follow live `config.yaml` edits. + + BUG-001 review feedback: the run-context that feeds worker / lead-agent + factories must observe the same mtime reload that `get_config()` does; + otherwise stale config slips back in through the run path even after the + request dependency is fixed. + """ + from unittest.mock import MagicMock + + from app.gateway.deps import get_run_context + + config_file = tmp_path / "config.yaml" + _write_config_yaml(config_file, log_level="info") + monkeypatch.setenv("DEER_FLOW_CONFIG_PATH", str(config_file)) + + app = FastAPI() + # Sentinel values for the rest of the RunContext wiring — we only care + # about ``ctx.app_config`` for this assertion. + app.state.checkpointer = MagicMock() + app.state.store = MagicMock() + app.state.run_event_store = MagicMock() + app.state.run_events_config = {"frozen": "startup"} + app.state.thread_store = MagicMock() + + @app.get("/run-ctx-log-level") + def probe(ctx=Depends(get_run_context)): + return { + "log_level": ctx.app_config.log_level, + "run_events_config": ctx.run_events_config, + } + + client = TestClient(app) + first = client.get("/run-ctx-log-level").json() + assert first == {"log_level": "info", "run_events_config": {"frozen": "startup"}} + + _write_config_yaml(config_file, log_level="debug") + future_mtime = config_file.stat().st_mtime + 5 + os.utime(config_file, (future_mtime, future_mtime)) + + second = client.get("/run-ctx-log-level").json() + # app_config follows the edit; run_events_config stays frozen to the + # startup snapshot we wrote onto app.state above. + assert second == {"log_level": "debug", "run_events_config": {"frozen": "startup"}} + + +@pytest.mark.parametrize( + "exception", + [ + FileNotFoundError("config.yaml not found"), + PermissionError("config.yaml not readable"), + ValueError("invalid config"), + RuntimeError("yaml parse error"), + ], +) +def test_get_config_returns_503_on_any_load_failure(monkeypatch, exception): + """Any failure to materialise the config must surface as 503, not 500. + + Bytedance/deer-flow issue #3107 BUG-001 review: the original snapshot + contract returned 503 when ``app.state.config is None``. The first cut of + this fix only mapped ``FileNotFoundError`` to 503, which left + ``PermissionError`` / ``yaml.YAMLError`` / ``ValidationError`` etc. bubbling + up as 500. Catch every load failure at the request boundary. + """ + + def _broken_get_app_config(): + raise exception + + monkeypatch.setattr(gateway_deps, "get_app_config", _broken_get_app_config) + + app = _build_app() + client = TestClient(app, raise_server_exceptions=False) + response = client.get("/probe") + + assert response.status_code == 503 + assert response.json() == {"detail": "Configuration not available"} diff --git a/backend/tests/test_gateway_deps_config.py b/backend/tests/test_gateway_deps_config.py deleted file mode 100644 index 70f9124b6..000000000 --- a/backend/tests/test_gateway_deps_config.py +++ /dev/null @@ -1,41 +0,0 @@ -from __future__ import annotations - -from fastapi import Depends, FastAPI -from fastapi.testclient import TestClient - -from app.gateway.deps import get_config -from deerflow.config.app_config import AppConfig -from deerflow.config.sandbox_config import SandboxConfig - - -def test_get_config_returns_app_state_config(): - """get_config should return the exact AppConfig stored on app.state.""" - app = FastAPI() - config = AppConfig(sandbox=SandboxConfig(use="test")) - app.state.config = config - - @app.get("/probe") - def probe(cfg: AppConfig = Depends(get_config)): - return {"same_identity": cfg is config, "log_level": cfg.log_level} - - client = TestClient(app) - response = client.get("/probe") - - assert response.status_code == 200 - assert response.json() == {"same_identity": True, "log_level": "info"} - - -def test_get_config_reads_updated_app_state(): - """Swapping app.state.config should be visible to the dependency.""" - app = FastAPI() - app.state.config = AppConfig(sandbox=SandboxConfig(use="test"), log_level="info") - - @app.get("/log-level") - def log_level(cfg: AppConfig = Depends(get_config)): - return {"level": cfg.log_level} - - client = TestClient(app) - assert client.get("/log-level").json() == {"level": "info"} - - app.state.config = app.state.config.model_copy(update={"log_level": "debug"}) - assert client.get("/log-level").json() == {"level": "debug"} diff --git a/backend/tests/test_gateway_lifespan_shutdown.py b/backend/tests/test_gateway_lifespan_shutdown.py index 9319c6268..a694ab00a 100644 --- a/backend/tests/test_gateway_lifespan_shutdown.py +++ b/backend/tests/test_gateway_lifespan_shutdown.py @@ -17,7 +17,7 @@ from fastapi import FastAPI @asynccontextmanager -async def _noop_langgraph_runtime(_app): +async def _noop_langgraph_runtime(_app, _startup_config): yield diff --git a/backend/tests/test_skills_custom_router.py b/backend/tests/test_skills_custom_router.py index ed93e5510..e8a86d8ab 100644 --- a/backend/tests/test_skills_custom_router.py +++ b/backend/tests/test_skills_custom_router.py @@ -7,6 +7,7 @@ from types import SimpleNamespace from fastapi import FastAPI from fastapi.testclient import TestClient +from app.gateway.deps import get_config from app.gateway.routers import skills as skills_router from deerflow.skills.storage import get_or_new_skill_storage from deerflow.skills.types import Skill @@ -38,7 +39,8 @@ def _make_skill(name: str, *, enabled: bool) -> Skill: def _make_test_app(config) -> FastAPI: app = FastAPI() - app.state.config = config + app.state.config = config # kept for any startup-style reads + app.dependency_overrides[get_config] = lambda: config app.include_router(skills_router.router) return app diff --git a/backend/tests/test_task_tool_usage_recorder.py b/backend/tests/test_task_tool_usage_recorder.py new file mode 100644 index 000000000..d7b4ea3b5 --- /dev/null +++ b/backend/tests/test_task_tool_usage_recorder.py @@ -0,0 +1,91 @@ +"""Regression tests for _find_usage_recorder callback shape handling. + +Bytedance issue #3107 BUG-002: When LangChain passes ``config["callbacks"]`` as +an ``AsyncCallbackManager`` (instead of a plain list), the previous +``for cb in callbacks`` loop raised ``TypeError: 'AsyncCallbackManager' object +is not iterable``. ToolErrorHandlingMiddleware then converted the entire ``task`` +tool call into an error ToolMessage, losing the subagent result. +""" + +from types import SimpleNamespace + +from langchain_core.callbacks import AsyncCallbackManager, CallbackManager + +from deerflow.tools.builtins.task_tool import _find_usage_recorder + + +class _RecorderHandler: + def record_external_llm_usage_records(self, records): + self.records = records + + +class _OtherHandler: + pass + + +def _make_runtime(callbacks): + return SimpleNamespace(config={"callbacks": callbacks}) + + +def test_find_usage_recorder_with_plain_list(): + recorder = _RecorderHandler() + runtime = _make_runtime([_OtherHandler(), recorder]) + assert _find_usage_recorder(runtime) is recorder + + +def test_find_usage_recorder_with_async_callback_manager(): + """LangChain wraps callbacks in AsyncCallbackManager for async tool runs. + + The old implementation raised TypeError here. The recorder lives on + ``manager.handlers``; we must look there too. + """ + recorder = _RecorderHandler() + manager = AsyncCallbackManager(handlers=[_OtherHandler(), recorder]) + runtime = _make_runtime(manager) + assert _find_usage_recorder(runtime) is recorder + + +def test_find_usage_recorder_with_sync_callback_manager(): + """Sync flavor of the same wrapper used by some langchain code paths.""" + recorder = _RecorderHandler() + manager = CallbackManager(handlers=[recorder]) + runtime = _make_runtime(manager) + assert _find_usage_recorder(runtime) is recorder + + +def test_find_usage_recorder_returns_none_when_no_recorder(): + manager = AsyncCallbackManager(handlers=[_OtherHandler()]) + runtime = _make_runtime(manager) + assert _find_usage_recorder(runtime) is None + + +def test_find_usage_recorder_handles_empty_manager(): + manager = AsyncCallbackManager(handlers=[]) + runtime = _make_runtime(manager) + assert _find_usage_recorder(runtime) is None + + +def test_find_usage_recorder_returns_none_for_none_runtime(): + assert _find_usage_recorder(None) is None + + +def test_find_usage_recorder_returns_none_when_callbacks_is_none(): + runtime = _make_runtime(None) + assert _find_usage_recorder(runtime) is None + + +def test_find_usage_recorder_returns_none_for_single_handler_object(): + """A single handler instance (not wrapped in a list or manager) should not crash. + + LangChain's contract is that ``config["callbacks"]`` is a list-or-manager, + but we treat any other shape defensively rather than letting a ``for`` loop + blow up at runtime. + """ + runtime = _make_runtime(_RecorderHandler()) + assert _find_usage_recorder(runtime) is None + + +def test_find_usage_recorder_returns_none_when_config_not_dict(): + """Defensive: a runtime without a dict-shaped config should not raise.""" + runtime = SimpleNamespace(config="not-a-dict") + assert _find_usage_recorder(runtime) is None diff --git a/backend/tests/test_uploads_router.py b/backend/tests/test_uploads_router.py index 7846865b8..1bcdb2eb7 100644 --- a/backend/tests/test_uploads_router.py +++ b/backend/tests/test_uploads_router.py @@ -11,6 +11,7 @@ from _router_auth_helpers import call_unwrapped, make_authed_test_app from fastapi import HTTPException, UploadFile from fastapi.testclient import TestClient +from app.gateway.deps import get_config from app.gateway.routers import uploads @@ -631,6 +632,7 @@ def test_upload_limits_endpoint_requires_thread_access(): cfg.uploads = {} app = make_authed_test_app(owner_check_passes=False) app.state.config = cfg + app.dependency_overrides[get_config] = lambda: cfg app.include_router(uploads.router) with TestClient(app) as client: diff --git a/frontend/src/components/workspace/messages/message-list.tsx b/frontend/src/components/workspace/messages/message-list.tsx index ffbf3e3ad..74dca2af5 100644 --- a/frontend/src/components/workspace/messages/message-list.tsx +++ b/frontend/src/components/workspace/messages/message-list.tsx @@ -27,6 +27,7 @@ import { import { useRehypeSplitWordsIntoSpans } from "@/core/rehype"; import type { Subtask } from "@/core/tasks"; import { useUpdateSubtask } from "@/core/tasks/context"; +import { parseSubtaskResult } from "@/core/tasks/subtask-result"; import type { AgentThreadState } from "@/core/threads"; import { cn } from "@/lib/utils"; @@ -359,33 +360,10 @@ export function MessageList({ } else if (message.type === "tool") { const taskId = message.tool_call_id; if (taskId) { - const result = extractTextFromMessage(message); - if (result.startsWith("Task Succeeded. Result:")) { - updateSubtask({ - id: taskId, - status: "completed", - result: result - .split("Task Succeeded. Result:")[1] - ?.trim(), - }); - } else if (result.startsWith("Task failed.")) { - updateSubtask({ - id: taskId, - status: "failed", - error: result.split("Task failed.")[1]?.trim(), - }); - } else if (result.startsWith("Task timed out")) { - updateSubtask({ - id: taskId, - status: "failed", - error: result, - }); - } else { - updateSubtask({ - id: taskId, - status: "in_progress", - }); - } + const parsed = parseSubtaskResult( + extractTextFromMessage(message), + ); + updateSubtask({ id: taskId, ...parsed }); } } } diff --git a/frontend/src/core/messages/utils.ts b/frontend/src/core/messages/utils.ts index 22f985009..1c165fd8d 100644 --- a/frontend/src/core/messages/utils.ts +++ b/frontend/src/core/messages/utils.ts @@ -397,6 +397,50 @@ export function stripUploadedFilesTag(content: string): string { .trim(); } +/** + * Tag names that backend middlewares wrap around internal payloads before + * letting them ride along inside LangGraph message ``content``. + * + * These markers are *not* user copy — they come from: + * + * - ``UploadsMiddleware`` → ```` + * - ``DynamicContextMiddleware`` → ```` (carrying + * ```` / ```` inside) + * - ``TodoListMiddleware`` / ``LoopDetectionMiddleware`` style reminders + * live in ``hide_from_ui`` HumanMessages, but their inner payload uses + * the same tag vocabulary. + * + * The primary export filter is {@link isHiddenFromUIMessage}. This list is + * the defence-in-depth strip for any message that — by middleware bug, + * provider quirk, or merge-conflict regression — slips through without + * its ``hide_from_ui`` flag set. + */ +export const INTERNAL_MARKER_TAGS = [ + "uploaded_files", + "system-reminder", + "memory", + "current_date", +] as const; + +const INTERNAL_MARKER_RE = new RegExp( + `<(${INTERNAL_MARKER_TAGS.join("|")})>[\\s\\S]*?`, + "g", +); + +/** + * Strip every known backend-injected marker from message content. + * + * Intended for the chat export path where a marker leaking through is a + * privacy regression. UI render paths should keep using + * {@link stripUploadedFilesTag} — they receive ``hide_from_ui`` messages + * via a separate filter and the narrower function avoids stripping content + * a user might legitimately type into a meta-discussion (e.g. asking the + * model about its own ```` system). + */ +export function stripInternalMarkers(content: string): string { + return content.replace(INTERNAL_MARKER_RE, "").trim(); +} + export function parseUploadedFiles(content: string): FileInMessage[] { // Match ... tag const uploadedFilesRegex = /([\s\S]*?)<\/uploaded_files>/; diff --git a/frontend/src/core/tasks/subtask-result.ts b/frontend/src/core/tasks/subtask-result.ts new file mode 100644 index 000000000..ac4a422a9 --- /dev/null +++ b/frontend/src/core/tasks/subtask-result.ts @@ -0,0 +1,88 @@ +import type { Subtask } from "./types"; + +export type SubtaskStatus = Subtask["status"]; + +export interface SubtaskResultUpdate { + status: SubtaskStatus; + result?: string; + error?: string; +} + +/** + * Prefix strings the backend `task` tool writes into its result `content`. + * + * These values are not user-facing copy — they are part of the + * backend↔frontend contract defined in + * `backend/packages/harness/deerflow/tools/builtins/task_tool.py` (returned + * from the tool body) and in + * `backend/packages/harness/deerflow/agents/middlewares/tool_error_handling_middleware.py` + * (wrapper for tool exceptions). Any change here must be paired with the + * matching backend change. Exported so a future structured-status migration + * can reference the same values from one place. + * + * `task_tool.py` also emits three `Error:` strings for pre-execution failures + * — unknown subagent type, host-bash disabled, and "task disappeared from + * background tasks". They are handled by {@link ERROR_WRAPPER_PATTERN} + * rather than dedicated prefixes because the wrapper already produces + * exactly the right `terminal failed` shape. + */ +export const SUCCESS_PREFIX = "Task Succeeded. Result:"; +export const FAILURE_PREFIX = "Task failed."; +export const TIMEOUT_PREFIX = "Task timed out"; +export const CANCELLED_PREFIX = "Task cancelled by user."; +export const POLLING_TIMEOUT_PREFIX = "Task polling timed out"; +export const ERROR_WRAPPER_PATTERN = /^Error\b/i; + +/** + * Map a `task` tool result string to a {@link SubtaskStatus}. + * + * Bytedance/deer-flow issue #3107 BUG-007: parent-visible task tool errors do + * not always start with one of the three legacy prefixes (e.g. when + * `ToolErrorHandlingMiddleware` wraps an exception as + * `Error: Tool 'task' failed ...`). Treat any leading `Error:` token as a + * terminal failure so subtask cards stop being stuck on "in_progress". + * + * Returning `in_progress` is the **deliberate** fallback for content that + * matches none of the known prefixes. LangChain only ever emits a + * `ToolMessage` once the tool itself has returned (success or wrapped + * exception), so an unknown shape means "the contract changed underneath us" + * — surfacing it as still-running prompts the operator to investigate, where + * eagerly marking it terminal-failed would mask the drift. + */ +export function parseSubtaskResult(text: string): SubtaskResultUpdate { + const trimmed = text.trim(); + + if (trimmed.startsWith(SUCCESS_PREFIX)) { + return { + status: "completed", + result: trimmed.slice(SUCCESS_PREFIX.length).trim(), + }; + } + + if (trimmed.startsWith(FAILURE_PREFIX)) { + return { + status: "failed", + error: trimmed.slice(FAILURE_PREFIX.length).trim(), + }; + } + + if (trimmed.startsWith(TIMEOUT_PREFIX)) { + return { status: "failed", error: trimmed }; + } + + if (trimmed.startsWith(CANCELLED_PREFIX)) { + return { status: "failed", error: trimmed }; + } + + if (trimmed.startsWith(POLLING_TIMEOUT_PREFIX)) { + return { status: "failed", error: trimmed }; + } + + // ToolErrorHandlingMiddleware-style wrapper, or any other terminal error + // signal the backend forwards to the lead agent. + if (ERROR_WRAPPER_PATTERN.test(trimmed)) { + return { status: "failed", error: trimmed }; + } + + return { status: "in_progress" }; +} diff --git a/frontend/src/core/threads/export.ts b/frontend/src/core/threads/export.ts index cf1f92e47..02eeeb910 100644 --- a/frontend/src/core/threads/export.ts +++ b/frontend/src/core/threads/export.ts @@ -5,16 +5,53 @@ import { extractReasoningContentFromMessage, hasContent, hasToolCalls, - stripUploadedFilesTag, + isHiddenFromUIMessage, + stripInternalMarkers, } from "../messages/utils"; import type { AgentThread } from "./types"; import { titleOfThread } from "./utils"; +/** + * Optional debug switches for advanced exports. + * + * Bytedance/deer-flow issue #3107 BUG-006 explicitly prescribes that the + * default export includes only the user-visible transcript and excludes + * thinking/reasoning content, tool calls, tool results, hidden messages, + * memory injection, and `` payloads. These options let a + * future "debug export" surface re-include any of those categories without + * forking the formatter. They are not currently wired to any UI control — + * callers that want them must construct the options object explicitly. + */ +export interface ExportOptions { + includeReasoning?: boolean; + includeToolCalls?: boolean; + includeToolMessages?: boolean; + includeHidden?: boolean; +} + +function visibleMessages( + messages: Message[], + options: ExportOptions, +): Message[] { + return messages.filter((message) => { + if (!options.includeHidden && isHiddenFromUIMessage(message)) { + return false; + } + if (!options.includeToolMessages && message.type === "tool") { + return false; + } + return true; + }); +} + function formatMessageContent(message: Message): string { const text = extractContentFromMessage(message); if (!text) return ""; - return stripUploadedFilesTag(text); + // Defence-in-depth: even if a middleware-injected marker slipped through + // the `hide_from_ui` filter, scrub every known internal tag before the + // content lands in a user-visible export file. + return stripInternalMarkers(text); } function formatToolCalls(message: Message): string { @@ -26,6 +63,7 @@ function formatToolCalls(message: Message): string { export function formatThreadAsMarkdown( thread: AgentThread, messages: Message[], + options: ExportOptions = {}, ): string { const title = titleOfThread(thread); const createdAt = thread.created_at @@ -41,16 +79,20 @@ export function formatThreadAsMarkdown( "", ]; - for (const message of messages) { + for (const message of visibleMessages(messages, options)) { if (message.type === "human") { const content = formatMessageContent(message); if (content) { lines.push(`## 🧑 User`, "", content, "", "---", ""); } } else if (message.type === "ai") { - const reasoning = extractReasoningContentFromMessage(message); + const reasoning = options.includeReasoning + ? extractReasoningContentFromMessage(message) + : undefined; const content = formatMessageContent(message); - const toolCalls = formatToolCalls(message); + const toolCalls = options.includeToolCalls + ? formatToolCalls(message) + : ""; if (!content && !toolCalls && !reasoning) continue; @@ -83,23 +125,65 @@ export function formatThreadAsMarkdown( return lines.join("\n").trimEnd() + "\n"; } +interface JSONExportMessage { + type: Message["type"]; + id: string | undefined; + content: string; + reasoning?: string; + tool_calls?: unknown; +} + +function buildJSONMessage( + msg: Message, + options: ExportOptions, +): JSONExportMessage | null { + // Run the same sanitiser the Markdown path uses so the JSON `content` + // field never carries inline `...` wrappers, content-array + // thinking blocks, `` markers, or other internal payloads. + const content = formatMessageContent(msg); + const reasoning = + options.includeReasoning && msg.type === "ai" + ? (extractReasoningContentFromMessage(msg) ?? undefined) + : undefined; + const toolCalls = + options.includeToolCalls && + msg.type === "ai" && + "tool_calls" in msg && + msg.tool_calls?.length + ? msg.tool_calls + : undefined; + + // Drop rows with no exportable payload (empty content + no opted-in + // reasoning / tool_calls). Uses falsy semantics so `reasoning: ""` (the + // empty string ``extractReasoningContentFromMessage`` can hand back) is + // treated the same way Markdown's `!reasoning` continue does — otherwise + // an opted-in but empty reasoning field would leak as `{reasoning: ""}`. + if (!content && !reasoning && !toolCalls) { + return null; + } + + return { + type: msg.type, + id: msg.id, + content, + ...(reasoning !== undefined ? { reasoning } : {}), + ...(toolCalls !== undefined ? { tool_calls: toolCalls } : {}), + }; +} + export function formatThreadAsJSON( thread: AgentThread, messages: Message[], + options: ExportOptions = {}, ): string { const exportData = { title: titleOfThread(thread), thread_id: thread.thread_id, created_at: thread.created_at, exported_at: new Date().toISOString(), - messages: messages.map((msg) => ({ - type: msg.type, - id: msg.id, - content: typeof msg.content === "string" ? msg.content : msg.content, - ...(msg.type === "ai" && msg.tool_calls?.length - ? { tool_calls: msg.tool_calls } - : {}), - })), + messages: visibleMessages(messages, options) + .map((msg) => buildJSONMessage(msg, options)) + .filter((m): m is JSONExportMessage => m !== null), }; return JSON.stringify(exportData, null, 2); } diff --git a/frontend/tests/unit/core/tasks/subtask-result.test.ts b/frontend/tests/unit/core/tasks/subtask-result.test.ts new file mode 100644 index 000000000..4f0597fda --- /dev/null +++ b/frontend/tests/unit/core/tasks/subtask-result.test.ts @@ -0,0 +1,112 @@ +import { describe, expect, it } from "vitest"; + +import { parseSubtaskResult } from "@/core/tasks/subtask-result"; + +describe("parseSubtaskResult", () => { + it("recognises the standard success prefix", () => { + const parsed = parseSubtaskResult( + "Task Succeeded. Result: investigated and produced a 3-page report", + ); + expect(parsed.status).toBe("completed"); + expect(parsed.result).toBe("investigated and produced a 3-page report"); + }); + + it("recognises the standard failure prefix", () => { + const parsed = parseSubtaskResult( + "Task failed. underlying tool raised RuntimeError", + ); + expect(parsed.status).toBe("failed"); + expect(parsed.error).toBe("underlying tool raised RuntimeError"); + }); + + it("recognises the standard timeout prefix", () => { + const parsed = parseSubtaskResult("Task timed out after 900s"); + expect(parsed.status).toBe("failed"); + expect(parsed.error).toBe("Task timed out after 900s"); + }); + + it("recognises the cancelled-by-user prefix", () => { + // bytedance/deer-flow#3131 review: this is one of the five terminal + // strings task_tool.py actually emits — the previous cut treated it as + // unrecognised content and pushed the card back to in_progress. + const parsed = parseSubtaskResult("Task cancelled by user."); + expect(parsed.status).toBe("failed"); + expect(parsed.error).toBe("Task cancelled by user."); + }); + + it("recognises the polling-timed-out prefix", () => { + // Emitted by task_tool when the background polling loop runs out of + // budget waiting for the subagent to reach a terminal state. + const parsed = parseSubtaskResult( + "Task polling timed out after 15 minutes. This may indicate the background task is stuck. Status: RUNNING", + ); + expect(parsed.status).toBe("failed"); + expect(parsed.error).toContain("polling timed out"); + }); + + it("recognises polling-timed-out with different durations", () => { + // `task_tool` emits `Task polling timed out after {N} minutes` where N + // varies with the configured subagent timeout. Guard against the regex + // accidentally being pinned to a specific number. + for (const n of [1, 5, 60]) { + const parsed = parseSubtaskResult( + `Task polling timed out after ${n} minutes. Status: RUNNING`, + ); + expect(parsed.status).toBe("failed"); + } + }); + + it("trims whitespace around cancelled and polling-timed-out prefixes", () => { + // Streaming chunks sometimes arrive with leading/trailing newlines. + expect(parseSubtaskResult(" Task cancelled by user. \n").status).toBe( + "failed", + ); + expect( + parseSubtaskResult("\n\nTask polling timed out after 3 minutes").status, + ).toBe("failed"); + }); + + it("recognises task_tool pre-execution Error: returns via the wrapper", () => { + // `task_tool.py` returns three `Error:` strings for unknown subagent + // type, host-bash disabled, and "task disappeared". They share the + // ERROR_WRAPPER_PATTERN, not a dedicated prefix, so this guards + // against a refactor splitting them off. + for (const text of [ + "Error: Unknown subagent type 'foo'. Available: bash, general-purpose", + "Error: Host bash subagent is disabled by configuration", + "Error: Task 1234 disappeared from background tasks", + ]) { + expect(parseSubtaskResult(text).status).toBe("failed"); + } + }); + + it("treats middleware-wrapped tool errors as terminal failures", () => { + // bytedance/deer-flow issue #3107 BUG-007: the parent-visible ToolMessage + // produced by ToolErrorHandlingMiddleware never matches the three legacy + // prefixes, so subtask cards stay stuck on "in_progress". + const parsed = parseSubtaskResult( + "Error: Tool 'task' failed with TypeError: 'AsyncCallbackManager' object is not iterable. Continue with available context, or choose an alternative tool.", + ); + expect(parsed.status).toBe("failed"); + expect(parsed.error).toContain("AsyncCallbackManager"); + }); + + it("treats any other Error: prefix as a terminal failure", () => { + const parsed = parseSubtaskResult("Error: subagent worker pool exhausted"); + expect(parsed.status).toBe("failed"); + }); + + it("keeps unrecognised non-error output as in_progress", () => { + // Streaming partial chunks should not flip the card to terminal early. + const parsed = parseSubtaskResult("Investigating ..."); + expect(parsed.status).toBe("in_progress"); + expect(parsed.error).toBeUndefined(); + expect(parsed.result).toBeUndefined(); + }); + + it("trims surrounding whitespace before matching prefixes", () => { + const parsed = parseSubtaskResult(" Task Succeeded. Result: ok "); + expect(parsed.status).toBe("completed"); + expect(parsed.result).toBe("ok"); + }); +}); diff --git a/frontend/tests/unit/core/threads/export.test.ts b/frontend/tests/unit/core/threads/export.test.ts new file mode 100644 index 000000000..8ee520aa3 --- /dev/null +++ b/frontend/tests/unit/core/threads/export.test.ts @@ -0,0 +1,317 @@ +import type { Message } from "@langchain/langgraph-sdk"; +import { describe, expect, it } from "vitest"; + +import { + formatThreadAsJSON, + formatThreadAsMarkdown, +} from "@/core/threads/export"; +import type { AgentThread } from "@/core/threads/types"; + +// Bytedance/deer-flow issue #3107 BUG-006: the chat export path bypasses the +// UI-level hidden-message filter and emits reasoning content, tool calls, and +// any other "internal" payload as if it were part of the user transcript. + +function makeThread(): AgentThread { + return { + thread_id: "thread-1", + created_at: "2026-05-21T00:00:00Z", + updated_at: "2026-05-21T00:00:00Z", + metadata: { title: "Demo thread" }, + status: "idle", + values: { messages: [] }, + } as unknown as AgentThread; +} + +function human(content: string, extra: Partial = {}): Message { + return { + id: `h-${content}`, + type: "human", + content, + ...extra, + } as Message; +} + +function ai( + content: string, + extra: Partial & { tool_calls?: unknown } = {}, +): Message { + return { + id: `a-${content}`, + type: "ai", + content, + ...extra, + } as Message; +} + +function toolMsg(content: string): Message { + return { + id: `t-${content}`, + type: "tool", + content, + name: "task", + tool_call_id: "call-1", + } as unknown as Message; +} + +describe("formatThreadAsMarkdown", () => { + it("includes plain user and assistant text", () => { + const md = formatThreadAsMarkdown(makeThread(), [ + human("hello"), + ai("hi there"), + ]); + expect(md).toContain("hello"); + expect(md).toContain("hi there"); + }); + + it("drops messages marked hide_from_ui", () => { + const hidden = human("internal system reminder", { + additional_kwargs: { hide_from_ui: true }, + } as Partial); + const md = formatThreadAsMarkdown(makeThread(), [ + hidden, + ai("public answer"), + ]); + expect(md).not.toContain("internal system reminder"); + expect(md).toContain("public answer"); + }); + + it("does not emit reasoning_content by default", () => { + const message = ai("final answer", { + additional_kwargs: { + reasoning_content: "secret chain of thought", + }, + } as Partial); + const md = formatThreadAsMarkdown(makeThread(), [message]); + expect(md).not.toContain("secret chain of thought"); + expect(md).not.toContain("Thinking"); + }); + + it("does not emit tool calls by default", () => { + const message = ai("ok", { + tool_calls: [{ id: "1", name: "task", args: { description: "do work" } }], + } as Partial); + const md = formatThreadAsMarkdown(makeThread(), [message]); + expect(md).not.toContain("**Tool:**"); + expect(md).not.toContain("`task`"); + }); + + it("drops tool result messages", () => { + const md = formatThreadAsMarkdown(makeThread(), [ + ai("delegating"), + toolMsg("Task Succeeded. Result: confidential"), + ]); + expect(md).not.toContain("confidential"); + }); +}); + +describe("formatThreadAsMarkdown opt-in flags", () => { + it("emits reasoning when includeReasoning is true", () => { + const message = ai("final answer", { + additional_kwargs: { + reasoning_content: "step-by-step chain of thought", + }, + } as Partial); + const md = formatThreadAsMarkdown(makeThread(), [message], { + includeReasoning: true, + }); + expect(md).toContain("step-by-step chain of thought"); + expect(md).toContain("Thinking"); + }); + + it("emits tool call rows when includeToolCalls is true", () => { + const message = ai("ok", { + tool_calls: [{ id: "1", name: "task", args: { description: "do work" } }], + } as Partial); + const md = formatThreadAsMarkdown(makeThread(), [message], { + includeToolCalls: true, + }); + expect(md).toContain("**Tool:**"); + expect(md).toContain("`task`"); + }); + + it("keeps hidden messages when includeHidden is true", () => { + const hidden = human("internal reminder", { + additional_kwargs: { hide_from_ui: true }, + } as Partial); + const md = formatThreadAsMarkdown(makeThread(), [hidden], { + includeHidden: true, + }); + expect(md).toContain("internal reminder"); + }); +}); + +describe("formatThreadAsJSON opt-in flags", () => { + it("emits tool_calls field when includeToolCalls is true", () => { + const message = ai("ok", { + tool_calls: [{ id: "1", name: "task", args: { description: "x" } }], + } as Partial); + const raw = formatThreadAsJSON(makeThread(), [message], { + includeToolCalls: true, + }); + expect(raw).toContain("tool_calls"); + expect(raw).toContain('"task"'); + }); + + it("keeps tool messages when includeToolMessages is true", () => { + const raw = formatThreadAsJSON( + makeThread(), + [toolMsg("Task Succeeded. Result: keep me")], + { includeToolMessages: true }, + ); + const parsed = JSON.parse(raw) as { messages: { type: string }[] }; + expect(parsed.messages.some((m) => m.type === "tool")).toBe(true); + expect(raw).toContain("keep me"); + }); +}); + +describe("formatThreadAsJSON", () => { + it("strips hidden messages, tool messages, reasoning, and tool calls", () => { + const messages = [ + human("hello"), + human("secret reminder", { + additional_kwargs: { hide_from_ui: true }, + } as Partial), + ai("answer", { + additional_kwargs: { + reasoning_content: "secret reasoning", + }, + tool_calls: [{ id: "1", name: "task", args: {} }], + } as Partial), + toolMsg("internal trace"), + ]; + const raw = formatThreadAsJSON(makeThread(), messages); + const parsed = JSON.parse(raw) as { + messages: { type: string; tool_calls?: unknown[] }[]; + }; + + expect(parsed.messages).toHaveLength(2); + expect(parsed.messages.every((m) => m.type !== "tool")).toBe(true); + expect(raw).not.toContain("secret reminder"); + expect(raw).not.toContain("secret reasoning"); + expect(raw).not.toContain("internal trace"); + expect(raw).not.toContain("tool_calls"); + }); + + it("strips inline ... wrappers from content", () => { + // bytedance/deer-flow#3131 review: JSON export must run the same + // sanitiser the Markdown path uses so inline reasoning never leaks + // even when `includeReasoning` is left at its default false. + const message = ai("internal monologuevisible answer", { + id: "ai-1", + } as Partial); + const raw = formatThreadAsJSON(makeThread(), [message]); + expect(raw).not.toContain("internal monologue"); + expect(raw).not.toContain(""); + expect(raw).toContain("visible answer"); + }); + + it("strips content-array thinking blocks from content", () => { + const message = ai("placeholder", { + id: "ai-2", + content: [ + { type: "thinking", thinking: "hidden reasoning step" }, + { type: "text", text: "final visible text" }, + ], + } as unknown as Partial); + const raw = formatThreadAsJSON(makeThread(), [message]); + expect(raw).not.toContain("hidden reasoning step"); + expect(raw).toContain("final visible text"); + }); + + it("strips markers from content", () => { + const message = human( + "real prompt\n\n/mnt/user-data/uploads/secret.pdf\n", + { id: "h-clean" } as Partial, + ); + const raw = formatThreadAsJSON(makeThread(), [message]); + expect(raw).not.toContain(""); + expect(raw).not.toContain("secret.pdf"); + expect(raw).toContain("real prompt"); + }); + + it("drops AI messages that sanitise to empty content", () => { + // Pure-reasoning AI fragments (no visible text, no tool calls) should + // not survive as `{content: ""}` rows in the export. + const message = ai("only thinking, no answer", { + id: "ai-3", + } as Partial); + const raw = formatThreadAsJSON(makeThread(), [message]); + const parsed = JSON.parse(raw) as { messages: unknown[] }; + expect(parsed.messages).toHaveLength(0); + }); + + it("strips // as defence in depth", () => { + // Primary protection is `isHiddenFromUIMessage` filtering the whole + // hidden HumanMessage. If a regression strips the `hide_from_ui` flag + // (or the marker leaks into an otherwise-visible message), the + // sanitiser must still scrub the payload before export. + const leaky = human("real user text", { + id: "leak-1", + content: + "\nsecret fact A\n2026-01-01, Tuesday\n\nreal user text", + // Deliberately *not* setting hide_from_ui to model the regression + // case the defence-in-depth strip is guarding against. + } as unknown as Partial); + const raw = formatThreadAsJSON(makeThread(), [leaky]); + expect(raw).not.toContain(""); + expect(raw).not.toContain(""); + expect(raw).not.toContain(""); + expect(raw).not.toContain("secret fact A"); + expect(raw).toContain("real user text"); + }); + + it("sanitises tool message content when includeToolMessages is true", () => { + const message = { + id: "t-leak", + type: "tool", + content: + "Task Succeeded. Result: payload\n\n/mnt/user-data/uploads/secret.pdf\n", + name: "task", + tool_call_id: "call-leak", + } as unknown as Message; + + const raw = formatThreadAsJSON(makeThread(), [message], { + includeToolMessages: true, + }); + expect(raw).toContain("Task Succeeded"); + expect(raw).not.toContain(""); + expect(raw).not.toContain("secret.pdf"); + }); + + it("preserves text and image_url parts in mixed content arrays", () => { + // `extractContentFromMessage` keeps `text` and `image_url` parts and + // drops `thinking` parts. The JSON export must agree with that + // contract. + const message = ai("placeholder", { + id: "ai-mixed", + content: [ + { type: "thinking", thinking: "internal reasoning" }, + { type: "text", text: "user-visible answer" }, + { + type: "image_url", + image_url: { url: "https://example.invalid/cat.png" }, + }, + ], + } as unknown as Partial); + const raw = formatThreadAsJSON(makeThread(), [message]); + expect(raw).toContain("user-visible answer"); + expect(raw).toContain("https://example.invalid/cat.png"); + expect(raw).not.toContain("internal reasoning"); + }); + + it("drops opted-in empty reasoning rather than emit reasoning: ''", () => { + // `extractReasoningContentFromMessage` can legitimately hand back "" + // for an AI message that has no reasoning content. The export must + // mirror the Markdown path's `!reasoning` `continue` and drop the row + // instead of leaking `{reasoning: ""}`. + const message = ai("", { + id: "ai-empty-reasoning", + additional_kwargs: { reasoning_content: "" }, + } as Partial); + const raw = formatThreadAsJSON(makeThread(), [message], { + includeReasoning: true, + }); + const parsed = JSON.parse(raw) as { messages: unknown[] }; + expect(parsed.messages).toHaveLength(0); + }); +});