Compare commits

...

8 Commits

Author SHA1 Message Date
Admire e7967a7fc3 fix(frontend): hide copy for streaming assistant turn (#3176) 2026-05-23 23:29:16 +08:00
Huixin615 8785658a2e fix(agents): preserve todos state across node updates (#3180)
* fix(agents): preserve todos state across node updates

ThreadState.todos had no reducer, so any downstream node returning a
partial state without todos was implicitly setting it to None, which
LangGraph then used to overwrite the previously streamed value. This
caused the to-do list to render correctly during streaming but vanish
once streaming completed.

Add a merge_todos reducer that keeps the last non-None value, mirroring
the merge_artifacts pattern already used in the same file. An explicit
empty list is still respected so that 'user cleared todos' works.

Tests: 10 new unit tests in tests/test_thread_state_reducers.py covering
merge_todos plus regression coverage for merge_artifacts and
merge_viewed_images. All 69 thread-related tests pass locally.

Closes #3123

* test(agents): add annotation binding regression guard

Address Copilot review feedback on #3123:

- Add TestThreadStateAnnotations asserting that ThreadState.todos is
  Annotated with merge_todos. Without this guard, reverting the
  Annotated[list | None, merge_todos] binding would silently regress
  #3123 while all existing reducer unit tests continue to pass.

- Align test imports to 'from deerflow.agents.thread_state import ...'
  matching the rest of the backend test suite.
2026-05-23 23:25:38 +08:00
rayhpeng 0fb05825a2 fix(runtime): make run creation persistence atomic (#3152)
* fix runtime run creation persistence atomicity

* fix run creation cancellation rollback

* fix run manager test cleanup await

* clarify run creation rollback on cancellation

* document new run persistence rollback boundary

---------

Co-authored-by: Willem Jiang <willem.jiang@gmail.com>
2026-05-23 22:43:34 +08:00
Admire d0fa37e71d fix(frontend): avoid duplicate optimistic user message (#3002) 2026-05-23 17:02:23 +08:00
AochenShen99 604fcbb9d2 Stabilize write artifact previews (#3172) 2026-05-23 16:56:14 +08:00
Nan Gao a64a39dbc0 config: raise default summarization trigger before v2.0-m1 (#3174)
* config: update summarization configuration

* docs: sync summarization trigger guidance
2026-05-23 15:38:25 +08:00
JeffJiang b103d1a7f5 feat(frontend): support static website demo mode (#3170)
* feat(frontend): support static website demo mode

* fix(frontend): render html artifact previews from blob content

* chore(frontend): apply pre-commit formatting

* fix(frontend): address static demo PR review comments

* Update the release information of DeerFlow

---------

Co-authored-by: Willem Jiang <willem.jiang@gmail.com>
2026-05-23 00:10:56 +08:00
AochenShen99 66d6a6a4e8 fix: harden run finalization persistence (#3155)
* fix: harden run finalization persistence

* style: format gateway recovery test

* fix: align run repository return types

* fix: harden completion recovery follow-up
2026-05-23 00:09:06 +08:00
45 changed files with 3012 additions and 170 deletions
+35
View File
@@ -37,11 +37,36 @@ if TYPE_CHECKING:
from app.gateway.auth.local_provider import LocalAuthProvider
from app.gateway.auth.repositories.sqlite import SQLiteUserRepository
from deerflow.persistence.thread_meta.base import ThreadMetaStore
from deerflow.runtime import RunRecord
T = TypeVar("T")
async def _mark_latest_recovered_threads_error(
run_manager: RunManager,
thread_store: ThreadMetaStore,
recovered_runs: list[RunRecord],
) -> None:
"""Mark thread status as error only when its newest run was recovered."""
recovered_by_thread: dict[str, set[str]] = {}
for record in recovered_runs:
recovered_by_thread.setdefault(record.thread_id, set()).add(record.run_id)
for thread_id, recovered_run_ids in recovered_by_thread.items():
try:
latest_runs = await run_manager.list_by_thread(thread_id, user_id=None, limit=1)
except Exception:
logger.warning("Failed to find latest run for thread %s during run reconciliation", thread_id, exc_info=True)
continue
if not latest_runs or latest_runs[0].run_id not in recovered_run_ids:
continue
try:
await thread_store.update_status(thread_id, "error", user_id=None)
except Exception:
logger.warning("Failed to mark thread %s as error during run reconciliation", thread_id, exc_info=True)
def get_config() -> AppConfig:
"""Return the freshest ``AppConfig`` for the current request.
@@ -138,6 +163,16 @@ async def langgraph_runtime(app: FastAPI, startup_config: AppConfig) -> AsyncGen
# RunManager with store backing for persistence
app.state.run_manager = RunManager(store=app.state.run_store)
if getattr(config.database, "backend", None) == "sqlite":
from deerflow.utils.time import now_iso
# Startup-only recovery: clean shutdowns return no active rows and
# the thread-status update below becomes a no-op.
recovered_runs = await app.state.run_manager.reconcile_orphaned_inflight_runs(
error="Gateway restarted before this run reached a durable final state.",
before=now_iso(),
)
await _mark_latest_recovered_threads_error(app.state.run_manager, app.state.thread_store, recovered_runs)
try:
yield
@@ -45,11 +45,24 @@ def merge_viewed_images(existing: dict[str, ViewedImageData] | None, new: dict[s
return {**existing, **new}
def merge_todos(existing: list | None, new: list | None) -> list | None:
"""Reducer for todos list - keeps the last non-None value.
Semantics:
- If `new` is None (node didn't touch todos), preserve `existing`.
- If `new` is provided (even empty list), it represents an explicit
update and wins over `existing`.
"""
if new is None:
return existing
return new
class ThreadState(AgentState):
sandbox: NotRequired[SandboxState | None]
thread_data: NotRequired[ThreadDataState | None]
title: NotRequired[str | None]
artifacts: Annotated[list[str], merge_artifacts]
todos: NotRequired[list | None]
todos: Annotated[list | None, merge_todos]
uploaded_files: NotRequired[list[dict] | None]
viewed_images: Annotated[dict[str, ViewedImageData], merge_viewed_images] # image_path -> {base64, mime_type}
@@ -94,25 +94,35 @@ class RunRepository(RunStore):
created_at=None,
follow_up_to_run_id=None,
):
"""Insert or update a run row.
``RunManager`` retries ``put`` after transient SQLite failures. Making
this operation idempotent prevents a successful-but-unacknowledged first
commit from turning the retry into a primary-key failure.
"""
resolved_user_id = resolve_user_id(user_id, method_name="RunRepository.put")
now = datetime.now(UTC)
row = RunRow(
run_id=run_id,
thread_id=thread_id,
assistant_id=assistant_id,
user_id=resolved_user_id,
model_name=self._normalize_model_name(model_name),
status=status,
multitask_strategy=multitask_strategy,
metadata_json=self._safe_json(metadata) or {},
kwargs_json=self._safe_json(kwargs) or {},
error=error,
follow_up_to_run_id=follow_up_to_run_id,
created_at=datetime.fromisoformat(created_at) if created_at else now,
updated_at=now,
)
created = datetime.fromisoformat(created_at) if created_at else now
values = {
"thread_id": thread_id,
"assistant_id": assistant_id,
"user_id": resolved_user_id,
"model_name": self._normalize_model_name(model_name),
"status": status,
"multitask_strategy": multitask_strategy,
"metadata_json": self._safe_json(metadata) or {},
"kwargs_json": self._safe_json(kwargs) or {},
"error": error,
"follow_up_to_run_id": follow_up_to_run_id,
"updated_at": now,
}
async with self._sf() as session:
session.add(row)
row = await session.get(RunRow, run_id)
if row is None:
session.add(RunRow(run_id=run_id, created_at=created, **values))
else:
for key, value in values.items():
setattr(row, key, value)
await session.commit()
async def get(
@@ -146,13 +156,14 @@ class RunRepository(RunStore):
result = await session.execute(stmt)
return [self._row_to_dict(r) for r in result.scalars()]
async def update_status(self, run_id, status, *, error=None):
async def update_status(self, run_id, status, *, error=None) -> bool:
values: dict[str, Any] = {"status": status, "updated_at": datetime.now(UTC)}
if error is not None:
values["error"] = error
async with self._sf() as session:
await session.execute(update(RunRow).where(RunRow.run_id == run_id).values(**values))
result = await session.execute(update(RunRow).where(RunRow.run_id == run_id).values(**values))
await session.commit()
return result.rowcount != 0
async def update_model_name(self, run_id, model_name):
async with self._sf() as session:
@@ -187,6 +198,26 @@ class RunRepository(RunStore):
result = await session.execute(stmt)
return [self._row_to_dict(r) for r in result.scalars()]
async def list_inflight(self, *, before=None):
"""Return persisted active runs for startup recovery."""
if before is None:
before_dt = datetime.now(UTC)
elif isinstance(before, datetime):
before_dt = before
else:
before_dt = datetime.fromisoformat(before)
stmt = (
select(RunRow)
.where(
RunRow.status.in_(("pending", "running")),
RunRow.created_at <= before_dt,
)
.order_by(RunRow.created_at.asc())
)
async with self._sf() as session:
result = await session.execute(stmt)
return [self._row_to_dict(r) for r in result.scalars()]
async def update_run_completion(
self,
run_id: str,
@@ -203,8 +234,11 @@ class RunRepository(RunStore):
last_ai_message: str | None = None,
first_human_message: str | None = None,
error: str | None = None,
) -> None:
"""Update status + token usage + convenience fields on run completion."""
) -> bool:
"""Update status + token usage + convenience fields on run completion.
Returns ``False`` when no run row matched the requested ``run_id``.
"""
values: dict[str, Any] = {
"status": status,
"total_input_tokens": total_input_tokens,
@@ -224,8 +258,9 @@ class RunRepository(RunStore):
if error is not None:
values["error"] = error
async with self._sf() as session:
await session.execute(update(RunRow).where(RunRow.run_id == run_id).values(**values))
result = await session.execute(update(RunRow).where(RunRow.run_id == run_id).values(**values))
await session.commit()
return result.rowcount != 0
async def update_run_progress(
self,
@@ -4,7 +4,9 @@ from __future__ import annotations
import asyncio
import logging
import sqlite3
import uuid
from collections.abc import Awaitable, Callable
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any
@@ -17,6 +19,57 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
_RETRYABLE_SQLITE_MESSAGES = (
"database is locked",
"database table is locked",
"database is busy",
)
_RETRYABLE_SQLITE_ERROR_CODES = {
sqlite3.SQLITE_BUSY,
sqlite3.SQLITE_LOCKED,
}
def _is_retryable_persistence_error(exc: BaseException) -> bool:
"""Return True for transient SQLite persistence failures.
SQLite lock contention normally surfaces through either sqlite3 exceptions
or SQLAlchemy wrappers. The short bounded retry here protects run status
finalization from transient writer pressure without hiding permanent
failures forever.
"""
pending: list[BaseException] = [exc]
seen: set[int] = set()
while pending:
current = pending.pop()
if id(current) in seen:
continue
seen.add(id(current))
message = str(current).lower()
if any(fragment in message for fragment in _RETRYABLE_SQLITE_MESSAGES):
return True
if isinstance(current, (sqlite3.OperationalError, sqlite3.DatabaseError)):
error_code = getattr(current, "sqlite_errorcode", None)
if error_code in _RETRYABLE_SQLITE_ERROR_CODES:
return True
for chained in (getattr(current, "orig", None), current.__cause__, current.__context__):
if isinstance(chained, BaseException):
pending.append(chained)
return False
@dataclass(frozen=True)
class PersistenceRetryPolicy:
"""Bounded retry policy for short run-store writes."""
max_attempts: int = 5
initial_delay: float = 0.05
max_delay: float = 1.0
backoff_factor: float = 2.0
@dataclass
class RunRecord:
@@ -58,38 +111,117 @@ class RunManager:
that run history survives process restarts.
"""
def __init__(self, store: RunStore | None = None) -> None:
def __init__(
self,
store: RunStore | None = None,
*,
persistence_retry_policy: PersistenceRetryPolicy | None = None,
) -> None:
self._runs: dict[str, RunRecord] = {}
self._lock = asyncio.Lock()
self._store = store
self._persistence_retry_policy = persistence_retry_policy or PersistenceRetryPolicy()
async def _persist_to_store(self, record: RunRecord) -> None:
"""Best-effort persist run record to backing store."""
@staticmethod
def _store_put_payload(record: RunRecord, *, error: str | None = None) -> dict[str, Any]:
return {
"thread_id": record.thread_id,
"assistant_id": record.assistant_id,
"status": record.status.value,
"multitask_strategy": record.multitask_strategy,
"metadata": record.metadata or {},
"kwargs": record.kwargs or {},
"error": error if error is not None else record.error,
"created_at": record.created_at,
"model_name": record.model_name,
}
async def _call_store_with_retry(
self,
operation_name: str,
run_id: str,
operation: Callable[[], Awaitable[Any]],
) -> Any:
"""Run a short store operation with bounded retries for SQLite pressure."""
policy = self._persistence_retry_policy
attempt = 1
delay = policy.initial_delay
while True:
try:
return await operation()
except Exception as exc:
retryable = _is_retryable_persistence_error(exc)
if attempt >= policy.max_attempts or not retryable:
raise
logger.warning(
"Transient persistence failure during %s for run %s (attempt %d/%d); retrying",
operation_name,
run_id,
attempt,
policy.max_attempts,
exc_info=True,
)
if delay > 0:
await asyncio.sleep(delay)
delay = min(policy.max_delay, delay * policy.backoff_factor if delay else policy.initial_delay)
attempt += 1
async def _persist_snapshot_to_store(self, run_id: str, payload: dict[str, Any]) -> bool:
"""Best-effort persist a previously captured run snapshot."""
if self._store is None:
return True
try:
await self._call_store_with_retry(
"put",
run_id,
lambda: self._store.put(run_id, **payload),
)
return True
except Exception:
logger.warning("Failed to persist run %s to store", run_id, exc_info=True)
return False
async def _persist_new_run_to_store(self, record: RunRecord) -> None:
"""Persist a newly created run record to the backing store.
Initial run creation is part of the run visibility boundary: callers
should not observe a run in memory unless its backing store row exists.
Unlike follow-up status/model updates, failures are propagated so the
caller can treat creation as failed. Rollback is the caller's
responsibility after inserting the record into ``_runs``.
"""
if self._store is None:
return
try:
await self._store.put(
await self._call_store_with_retry(
"put",
record.run_id,
thread_id=record.thread_id,
assistant_id=record.assistant_id,
status=record.status.value,
multitask_strategy=record.multitask_strategy,
metadata=record.metadata or {},
kwargs=record.kwargs or {},
created_at=record.created_at,
model_name=record.model_name,
lambda: self._store.put(record.run_id, **self._store_put_payload(record)),
)
except Exception:
logger.warning("Failed to persist run %s to store", record.run_id, exc_info=True)
async def _persist_status(self, run_id: str, status: RunStatus, *, error: str | None = None) -> None:
async def _persist_to_store(self, record: RunRecord, *, error: str | None = None) -> bool:
"""Best-effort persist run record to backing store."""
return await self._persist_snapshot_to_store(
record.run_id,
self._store_put_payload(record, error=error),
)
async def _persist_status(self, record: RunRecord, status: RunStatus, *, error: str | None = None) -> bool:
"""Best-effort persist a status transition to the backing store."""
if self._store is None:
return
return True
row_recovery_payload = self._store_put_payload(record, error=error)
try:
await self._store.update_status(run_id, status.value, error=error)
updated = await self._call_store_with_retry(
"update_status",
record.run_id,
lambda: self._store.update_status(record.run_id, status.value, error=error),
)
if updated is False:
return await self._persist_snapshot_to_store(record.run_id, row_recovery_payload)
return True
except Exception:
logger.warning("Failed to persist status update for run %s", run_id, exc_info=True)
logger.warning("Failed to persist status update for run %s", record.run_id, exc_info=True)
return False
@staticmethod
def _record_from_store(row: dict[str, Any]) -> RunRecord:
@@ -126,6 +258,7 @@ class RunManager:
async def update_run_completion(self, run_id: str, **kwargs) -> None:
"""Persist token usage and completion data to the backing store."""
row_recovery_payload: dict[str, Any] | None = None
async with self._lock:
record = self._runs.get(run_id)
if record is not None:
@@ -135,9 +268,28 @@ class RunManager:
if hasattr(record, key) and value is not None:
setattr(record, key, value)
record.updated_at = _now_iso()
if self._store is not None:
row_recovery_payload = self._store_put_payload(record, error=kwargs.get("error"))
if self._store is None:
return
try:
await self._store.update_run_completion(run_id, **kwargs)
updated = await self._call_store_with_retry(
"update_run_completion",
run_id,
lambda: self._store.update_run_completion(run_id, **kwargs),
)
if updated is False:
if row_recovery_payload is None:
logger.warning("Failed to recreate missing run %s for completion persistence", run_id)
return
if not await self._persist_snapshot_to_store(run_id, row_recovery_payload):
return
recovered = await self._call_store_with_retry(
"update_run_completion",
run_id,
lambda: self._store.update_run_completion(run_id, **kwargs),
)
if recovered is False:
logger.warning("Run completion update for %s affected no rows after row recreation", run_id)
except Exception:
logger.warning("Failed to persist run completion for %s", run_id, exc_info=True)
@@ -186,7 +338,17 @@ class RunManager:
)
async with self._lock:
self._runs[run_id] = record
await self._persist_to_store(record)
persisted = False
try:
await self._persist_new_run_to_store(record)
persisted = True
except Exception:
logger.warning("Failed to persist run %s; rolled back in-memory record", run_id, exc_info=True)
raise
finally:
# Also covers cancellation, which bypasses ``except Exception``.
if not persisted:
self._runs.pop(run_id, None)
logger.info("Run created: run_id=%s thread_id=%s", run_id, thread_id)
return record
@@ -273,7 +435,7 @@ class RunManager:
record.updated_at = _now_iso()
if error is not None:
record.error = error
await self._persist_status(run_id, status, error=error)
await self._persist_status(record, status, error=error)
logger.info("Run %s -> %s", run_id, status.value)
async def _persist_model_name(self, run_id: str, model_name: str | None) -> None:
@@ -281,7 +443,11 @@ class RunManager:
if self._store is None:
return
try:
await self._store.update_model_name(run_id, model_name)
await self._call_store_with_retry(
"update_model_name",
run_id,
lambda: self._store.update_model_name(run_id, model_name),
)
except Exception:
logger.warning("Failed to persist model_name update for run %s", run_id, exc_info=True)
@@ -324,7 +490,7 @@ class RunManager:
record.task.cancel()
record.status = RunStatus.interrupted
record.updated_at = _now_iso()
await self._persist_status(run_id, RunStatus.interrupted)
await self._persist_status(record, RunStatus.interrupted)
logger.info("Run %s cancelled (action=%s)", run_id, action)
return True
@@ -352,7 +518,7 @@ class RunManager:
now = _now_iso()
_supported_strategies = ("reject", "interrupt", "rollback")
interrupted_run_ids: list[str] = []
interrupted_records: list[RunRecord] = []
async with self._lock:
if multitask_strategy not in _supported_strategies:
@@ -364,16 +530,8 @@ class RunManager:
raise ConflictError(f"Thread {thread_id} already has an active run")
if multitask_strategy in ("interrupt", "rollback") and inflight:
for r in inflight:
r.abort_action = multitask_strategy
r.abort_event.set()
if r.task is not None and not r.task.done():
r.task.cancel()
r.status = RunStatus.interrupted
r.updated_at = now
interrupted_run_ids.append(r.run_id)
logger.info(
"Cancelled %d inflight run(s) on thread %s (strategy=%s)",
"Preparing to cancel %d inflight run(s) on thread %s (strategy=%s)",
len(inflight),
thread_id,
multitask_strategy,
@@ -393,13 +551,87 @@ class RunManager:
model_name=model_name,
)
self._runs[run_id] = record
persisted = False
try:
await self._persist_new_run_to_store(record)
persisted = True
except Exception:
logger.warning("Failed to persist run %s; rolled back in-memory record", run_id, exc_info=True)
raise
finally:
# Also covers cancellation, which bypasses ``except Exception``.
if not persisted:
self._runs.pop(run_id, None)
for interrupted_run_id in interrupted_run_ids:
await self._persist_status(interrupted_run_id, RunStatus.interrupted)
await self._persist_to_store(record)
if multitask_strategy in ("interrupt", "rollback") and inflight:
for r in inflight:
r.abort_action = multitask_strategy
r.abort_event.set()
if r.task is not None and not r.task.done():
r.task.cancel()
r.status = RunStatus.interrupted
r.updated_at = now
interrupted_records.append(r)
for interrupted_record in interrupted_records:
await self._persist_status(interrupted_record, RunStatus.interrupted)
logger.info("Run created: run_id=%s thread_id=%s", run_id, thread_id)
return record
async def reconcile_orphaned_inflight_runs(
self,
*,
error: str,
before: str | None = None,
) -> list[RunRecord]:
"""Mark persisted active runs as failed when no local task owns them.
Gateway runs are process-local: the asyncio task and abort event live in
memory, while the run row is durable. After a SQLite-backed gateway
restart, any persisted ``pending`` or ``running`` row created before
startup cannot still have a local worker. This recovery step turns that
ambiguous state into an explicit error instead of letting the UI show an
indefinite active run.
"""
if self._store is None:
return []
try:
rows = await self._call_store_with_retry(
"list_inflight",
"*",
lambda: self._store.list_inflight(before=before),
)
except Exception:
logger.warning("Failed to list orphaned inflight runs for reconciliation", exc_info=True)
return []
recovered: list[RunRecord] = []
now = _now_iso()
for row in rows:
try:
record = self._record_from_store(row)
except Exception:
logger.warning("Failed to map orphaned run row during reconciliation", exc_info=True)
continue
async with self._lock:
live_record = self._runs.get(record.run_id)
if live_record is not None and live_record.status in (RunStatus.pending, RunStatus.running):
continue
record.status = RunStatus.error
record.error = error
record.updated_at = now
persisted = await self._persist_status(record, RunStatus.error, error=error)
if not persisted:
logger.warning("Skipped orphaned run %s recovery because error status was not persisted", record.run_id)
continue
recovered.append(record)
if recovered:
logger.warning("Recovered %d orphaned inflight run(s) as error", len(recovered))
return recovered
async def has_inflight(self, thread_id: str) -> bool:
"""Return ``True`` if *thread_id* has a pending or running run."""
async with self._lock:
@@ -59,7 +59,12 @@ class RunStore(abc.ABC):
status: str,
*,
error: str | None = None,
) -> None:
) -> bool | None:
"""Update a run status.
Returns ``False`` when the store can prove no row was updated. Older or
lightweight stores may return ``None`` when they cannot report rowcount.
"""
pass
@abc.abstractmethod
@@ -92,7 +97,11 @@ class RunStore(abc.ABC):
last_ai_message: str | None = None,
first_human_message: str | None = None,
error: str | None = None,
) -> None:
) -> bool | None:
"""Persist final completion fields.
Returns ``False`` when the store can prove no row was updated.
"""
pass
async def update_run_progress(
@@ -117,6 +126,11 @@ class RunStore(abc.ABC):
async def list_pending(self, *, before: str | None = None) -> list[dict[str, Any]]:
pass
@abc.abstractmethod
async def list_inflight(self, *, before: str | None = None) -> list[dict[str, Any]]:
"""Return persisted runs that are still ``pending`` or ``running``."""
pass
@abc.abstractmethod
async def aggregate_tokens_by_thread(self, thread_id: str, *, include_active: bool = False) -> dict[str, Any]:
"""Aggregate token usage for completed runs in a thread.
@@ -65,6 +65,8 @@ class MemoryRunStore(RunStore):
if error is not None:
self._runs[run_id]["error"] = error
self._runs[run_id]["updated_at"] = datetime.now(UTC).isoformat()
return True
return False
async def update_model_name(self, run_id, model_name):
if run_id in self._runs:
@@ -81,6 +83,8 @@ class MemoryRunStore(RunStore):
if value is not None:
self._runs[run_id][key] = value
self._runs[run_id]["updated_at"] = datetime.now(UTC).isoformat()
return True
return False
async def update_run_progress(self, run_id, **kwargs):
if run_id in self._runs and self._runs[run_id].get("status") == "running":
@@ -95,6 +99,12 @@ class MemoryRunStore(RunStore):
results.sort(key=lambda r: r["created_at"])
return results
async def list_inflight(self, *, before=None):
now = before or datetime.now(UTC).isoformat()
results = [r for r in self._runs.values() if r["status"] in ("pending", "running") and r["created_at"] <= now]
results.sort(key=lambda r: r["created_at"])
return results
async def aggregate_tokens_by_thread(self, thread_id: str, *, include_active: bool = False) -> dict[str, Any]:
statuses = ("success", "error", "running") if include_active else ("success", "error")
completed = [r for r in self._runs.values() if r["thread_id"] == thread_id and r.get("status") in statuses]
+127
View File
@@ -0,0 +1,127 @@
"""Gateway startup recovery for stale persisted runs."""
from __future__ import annotations
from contextlib import asynccontextmanager
from types import SimpleNamespace
import pytest
from fastapi import FastAPI
import deerflow.runtime as runtime_module
from app.gateway import deps as gateway_deps
from deerflow.persistence import engine as engine_module
from deerflow.persistence import thread_meta as thread_meta_module
from deerflow.runtime.checkpointer import async_provider as checkpointer_module
from deerflow.runtime.events import store as event_store_module
@asynccontextmanager
async def _fake_context(value):
yield value
class _FakeRunManager:
"""RunManager double that records startup reconciliation calls."""
instances: list[_FakeRunManager] = []
recovered_runs = [SimpleNamespace(run_id="run-1", thread_id="thread-1")]
latest_by_thread: dict[str, list[SimpleNamespace]] = {}
def __init__(self, *, store):
self.store = store
self.reconcile_calls: list[dict] = []
self.list_by_thread_calls: list[dict] = []
_FakeRunManager.instances.append(self)
async def reconcile_orphaned_inflight_runs(self, *, error: str, before: str | None = None):
self.reconcile_calls.append({"error": error, "before": before})
return self.recovered_runs
async def list_by_thread(self, thread_id: str, *, user_id=None, limit: int = 100):
self.list_by_thread_calls.append({"thread_id": thread_id, "user_id": user_id, "limit": limit})
return self.latest_by_thread.get(thread_id, self.recovered_runs[:limit])
class _FakeThreadStore:
def __init__(self) -> None:
self.status_updates: list[tuple[str, str, str | None]] = []
async def update_status(self, thread_id: str, status: str, *, user_id=None) -> None:
self.status_updates.append((thread_id, status, user_id))
@pytest.mark.anyio
async def test_sqlite_runtime_reconciles_orphaned_runs_on_startup(monkeypatch):
"""SQLite startup should recover stale active runs before serving requests."""
app = FastAPI()
config = SimpleNamespace(
database=SimpleNamespace(backend="sqlite"),
run_events=SimpleNamespace(backend="memory"),
)
thread_store = _FakeThreadStore()
_FakeRunManager.instances.clear()
_FakeRunManager.recovered_runs = [SimpleNamespace(run_id="run-1", thread_id="thread-1")]
_FakeRunManager.latest_by_thread = {}
async def fake_init_engine_from_config(_database):
return None
async def fake_close_engine():
return None
monkeypatch.setattr(engine_module, "init_engine_from_config", fake_init_engine_from_config)
monkeypatch.setattr(engine_module, "get_session_factory", lambda: None)
monkeypatch.setattr(engine_module, "close_engine", fake_close_engine)
monkeypatch.setattr(runtime_module, "make_stream_bridge", lambda _config: _fake_context(object()))
monkeypatch.setattr(checkpointer_module, "make_checkpointer", lambda _config: _fake_context(object()))
monkeypatch.setattr(runtime_module, "make_store", lambda _config: _fake_context(object()))
monkeypatch.setattr(thread_meta_module, "make_thread_store", lambda _sf, _store: thread_store)
monkeypatch.setattr(event_store_module, "make_run_event_store", lambda _config: object())
monkeypatch.setattr(gateway_deps, "RunManager", _FakeRunManager)
async with gateway_deps.langgraph_runtime(app, config):
pass
assert len(_FakeRunManager.instances) == 1
assert _FakeRunManager.instances[0].reconcile_calls
assert _FakeRunManager.instances[0].reconcile_calls[0]["error"]
assert _FakeRunManager.instances[0].list_by_thread_calls == [{"thread_id": "thread-1", "user_id": None, "limit": 1}]
assert thread_store.status_updates == [("thread-1", "error", None)]
@pytest.mark.anyio
async def test_sqlite_runtime_does_not_mark_thread_error_when_newer_run_is_success(monkeypatch):
"""Startup recovery should not let an old orphaned run overwrite a newer terminal thread state."""
app = FastAPI()
config = SimpleNamespace(
database=SimpleNamespace(backend="sqlite"),
run_events=SimpleNamespace(backend="memory"),
)
thread_store = _FakeThreadStore()
_FakeRunManager.instances.clear()
_FakeRunManager.recovered_runs = [SimpleNamespace(run_id="old-running", thread_id="thread-1")]
_FakeRunManager.latest_by_thread = {"thread-1": [SimpleNamespace(run_id="newer-success", thread_id="thread-1", status="success")]}
async def fake_init_engine_from_config(_database):
return None
async def fake_close_engine():
return None
monkeypatch.setattr(engine_module, "init_engine_from_config", fake_init_engine_from_config)
monkeypatch.setattr(engine_module, "get_session_factory", lambda: None)
monkeypatch.setattr(engine_module, "close_engine", fake_close_engine)
monkeypatch.setattr(runtime_module, "make_stream_bridge", lambda _config: _fake_context(object()))
monkeypatch.setattr(checkpointer_module, "make_checkpointer", lambda _config: _fake_context(object()))
monkeypatch.setattr(runtime_module, "make_store", lambda _config: _fake_context(object()))
monkeypatch.setattr(thread_meta_module, "make_thread_store", lambda _sf, _store: thread_store)
monkeypatch.setattr(event_store_module, "make_run_event_store", lambda _config: object())
monkeypatch.setattr(gateway_deps, "RunManager", _FakeRunManager)
async with gateway_deps.langgraph_runtime(app, config):
pass
assert len(_FakeRunManager.instances) == 1
assert _FakeRunManager.instances[0].list_by_thread_calls == [{"thread_id": "thread-1", "user_id": None, "limit": 1}]
assert thread_store.status_updates == []
+362
View File
@@ -1,10 +1,16 @@
"""Tests for RunManager."""
import asyncio
import logging
import re
import sqlite3
from typing import Any
import pytest
from sqlalchemy.exc import DatabaseError as SQLAlchemyDatabaseError
from deerflow.runtime import DisconnectMode, RunManager, RunStatus
from deerflow.runtime.runs.manager import PersistenceRetryPolicy
from deerflow.runtime.runs.store.memory import MemoryRunStore
ISO_RE = re.compile(r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}")
@@ -15,6 +21,92 @@ def manager() -> RunManager:
return RunManager()
class FlakyStatusRunStore(MemoryRunStore):
"""Memory run store that simulates transient SQLite status-write failures."""
def __init__(self, *, status_failures: int) -> None:
super().__init__()
self.status_failures = status_failures
self.status_update_attempts = 0
async def update_status(self, run_id, status, *, error=None):
self.status_update_attempts += 1
if self.status_failures > 0:
self.status_failures -= 1
raise sqlite3.OperationalError("database is locked")
return await super().update_status(run_id, status, error=error)
class MissingRowStatusRunStore(MemoryRunStore):
"""Memory run store that reports a missing row for status updates."""
async def update_status(self, run_id, status, *, error=None):
await super().update_status(run_id, status, error=error)
return False
class PermanentStatusRunStore(MemoryRunStore):
"""Memory run store that simulates a permanent SQLAlchemy write failure."""
def __init__(self) -> None:
super().__init__()
self.status_update_attempts = 0
async def update_status(self, run_id, status, *, error=None):
self.status_update_attempts += 1
raise SQLAlchemyDatabaseError(
"UPDATE runs SET status = :status WHERE run_id = :run_id",
{"status": status, "run_id": run_id},
sqlite3.DatabaseError("no such table: runs"),
)
class FailingStatusRunStore(MemoryRunStore):
"""Memory run store that always fails status updates."""
def __init__(self) -> None:
super().__init__()
self.status_update_attempts = 0
async def update_status(self, run_id, status, *, error=None):
self.status_update_attempts += 1
raise sqlite3.OperationalError("database is locked")
class MissingCompletionRunStore(MemoryRunStore):
"""Memory run store that reports one missing row for completion updates."""
def __init__(self) -> None:
super().__init__()
self.completion_update_attempts = 0
async def update_run_completion(self, run_id, *, status, **kwargs):
self.completion_update_attempts += 1
if self.completion_update_attempts == 1:
return False
return await super().update_run_completion(run_id, status=status, **kwargs)
class AlwaysMissingCompletionRunStore(MemoryRunStore):
"""Memory run store that keeps reporting missing rows for completion updates."""
def __init__(self) -> None:
super().__init__()
self.completion_update_attempts = 0
async def update_run_completion(self, run_id, *, status, **kwargs):
self.completion_update_attempts += 1
return False
async def _stored_statuses(store: MemoryRunStore, *run_ids: str) -> dict[str, Any]:
rows = {}
for run_id in run_ids:
row = await store.get(run_id)
rows[run_id] = row["status"] if row else None
return rows
@pytest.mark.anyio
async def test_create_and_get(manager: RunManager):
"""Created run should be retrievable with new fields."""
@@ -80,6 +172,155 @@ async def test_cancel_persists_interrupted_status_to_store():
assert stored["status"] == "interrupted"
@pytest.mark.anyio
async def test_status_persistence_retries_transient_sqlite_lock():
"""Transient SQLite lock errors should not leave a final status stale."""
store = FlakyStatusRunStore(status_failures=2)
manager = RunManager(store=store)
record = await manager.create("thread-1")
await manager.set_status(record.run_id, RunStatus.running)
await manager.set_status(record.run_id, RunStatus.success)
stored = await store.get(record.run_id)
assert stored is not None
assert stored["status"] == "success"
assert store.status_update_attempts >= 4
@pytest.mark.anyio
async def test_status_persistence_recreates_missing_store_row():
"""A final status update should recreate a run row if initial persistence was lost."""
store = MissingRowStatusRunStore()
manager = RunManager(store=store)
record = await manager.create("thread-1")
await store.delete(record.run_id)
await manager.set_status(record.run_id, RunStatus.error, error="boom")
stored = await store.get(record.run_id)
assert stored is not None
assert stored["status"] == "error"
assert stored["error"] == "boom"
@pytest.mark.anyio
async def test_status_persistence_does_not_retry_permanent_sqlalchemy_errors():
"""Permanent SQLAlchemy failures should not be retried as SQLite pressure."""
store = PermanentStatusRunStore()
manager = RunManager(
store=store,
persistence_retry_policy=PersistenceRetryPolicy(max_attempts=5, initial_delay=0),
)
record = await manager.create("thread-1")
await manager.set_status(record.run_id, RunStatus.error, error="boom")
assert store.status_update_attempts == 1
@pytest.mark.anyio
async def test_completion_persistence_recreates_missing_store_row():
"""Completion updates should recreate a missing row and persist final counters."""
store = MissingCompletionRunStore()
manager = RunManager(store=store)
record = await manager.create("thread-1")
await manager.set_status(record.run_id, RunStatus.running)
await manager.set_status(record.run_id, RunStatus.success)
await store.delete(record.run_id)
await manager.update_run_completion(
record.run_id,
status="success",
total_tokens=42,
llm_call_count=2,
last_ai_message="done",
)
stored = await store.get(record.run_id)
assert stored is not None
assert stored["status"] == "success"
assert stored["total_tokens"] == 42
assert stored["llm_call_count"] == 2
assert stored["last_ai_message"] == "done"
assert store.completion_update_attempts == 2
@pytest.mark.anyio
async def test_completion_persistence_warns_when_recreated_row_still_missing(caplog):
"""A second zero-row completion update after recreation should not be silent."""
store = AlwaysMissingCompletionRunStore()
manager = RunManager(store=store)
record = await manager.create("thread-1")
await manager.set_status(record.run_id, RunStatus.success)
caplog.set_level(logging.WARNING, logger="deerflow.runtime.runs.manager")
await manager.update_run_completion(record.run_id, status="success", total_tokens=42)
assert store.completion_update_attempts == 2
assert "affected no rows after row recreation" in caplog.text
@pytest.mark.anyio
async def test_reconcile_orphaned_inflight_runs_marks_stale_rows_error():
"""Startup recovery should turn persisted active rows into explicit errors."""
store = MemoryRunStore()
await store.put("pending-run", thread_id="thread-1", status="pending", created_at="2026-01-01T00:00:00+00:00")
await store.put("running-run", thread_id="thread-1", status="running", created_at="2026-01-01T00:00:01+00:00")
await store.put("success-run", thread_id="thread-1", status="success", created_at="2026-01-01T00:00:02+00:00")
manager = RunManager(store=store)
recovered = await manager.reconcile_orphaned_inflight_runs(
error="Gateway restarted before this run reached a durable final state.",
before="2026-01-01T00:00:02+00:00",
)
assert {record.run_id for record in recovered} == {"pending-run", "running-run"}
assert await _stored_statuses(store, "pending-run", "running-run", "success-run") == {
"pending-run": "error",
"running-run": "error",
"success-run": "success",
}
@pytest.mark.anyio
async def test_reconcile_orphaned_inflight_runs_skips_live_local_run():
"""Startup recovery should not mark an active row orphaned when this worker owns it."""
store = MemoryRunStore()
manager = RunManager(store=store)
record = await manager.create("thread-1")
await manager.set_status(record.run_id, RunStatus.running)
recovered = await manager.reconcile_orphaned_inflight_runs(
error="Gateway restarted before this run reached a durable final state.",
)
stored = await store.get(record.run_id)
assert recovered == []
assert stored["status"] == "running"
@pytest.mark.anyio
async def test_reconcile_orphaned_inflight_runs_skips_rows_when_error_status_is_not_persisted():
"""Startup recovery must not report a row as recovered if the error update failed."""
store = FailingStatusRunStore()
await store.put("running-run", thread_id="thread-1", status="running", created_at="2026-01-01T00:00:00+00:00")
manager = RunManager(
store=store,
persistence_retry_policy=PersistenceRetryPolicy(max_attempts=2, initial_delay=0),
)
recovered = await manager.reconcile_orphaned_inflight_runs(
error="Gateway restarted before this run reached a durable final state.",
before="2026-01-01T00:00:01+00:00",
)
stored = await store.get("running-run")
assert recovered == []
assert stored["status"] == "running"
assert store.status_update_attempts == 2
@pytest.mark.anyio
async def test_cancel_not_inflight(manager: RunManager):
"""Cancelling a completed run should return False."""
@@ -231,6 +472,81 @@ async def test_create_record_is_not_store_only(manager: RunManager):
assert record.store_only is False
@pytest.mark.anyio
async def test_create_rolls_back_in_memory_record_on_store_failure():
"""create() must fail and hide the run when the initial store write fails."""
from unittest.mock import AsyncMock
store = MemoryRunStore()
store.put = AsyncMock(side_effect=RuntimeError("db down"))
manager = RunManager(store=store)
with pytest.raises(RuntimeError, match="db down"):
await manager.create("thread-1")
assert manager._runs == {}
assert await manager.list_by_thread("thread-1") == []
@pytest.mark.anyio
async def test_create_rolls_back_in_memory_record_on_store_cancellation():
"""create() must also roll back when cancelled during the initial store write."""
store = MemoryRunStore()
async def cancelled_put(run_id, **kwargs):
raise asyncio.CancelledError
store.put = cancelled_put
manager = RunManager(store=store)
with pytest.raises(asyncio.CancelledError):
await manager.create("thread-1")
assert manager._runs == {}
assert await manager.list_by_thread("thread-1") == []
@pytest.mark.anyio
async def test_create_does_not_expose_run_until_store_persist_completes():
"""Concurrent readers must wait until the new run has been persisted."""
store = MemoryRunStore()
manager = RunManager(store=store)
original_put = store.put
put_started = asyncio.Event()
allow_put = asyncio.Event()
async def blocking_put(run_id, **kwargs):
put_started.set()
await allow_put.wait()
return await original_put(run_id, **kwargs)
store.put = blocking_put
create_task = asyncio.create_task(manager.create("thread-1"))
list_task = None
try:
await put_started.wait()
list_task = asyncio.create_task(manager.list_by_thread("thread-1"))
await asyncio.sleep(0)
assert not list_task.done()
allow_put.set()
record = await create_task
runs = await list_task
assert [run.run_id for run in runs] == [record.run_id]
finally:
allow_put.set()
cleanup_tasks = []
for task in (list_task, create_task):
if task is None:
continue
if not task.done():
task.cancel()
cleanup_tasks.append(task)
await asyncio.gather(*cleanup_tasks, return_exceptions=True)
@pytest.mark.anyio
async def test_get_prefers_in_memory_record_over_store():
"""In-memory records retain task/control state when store has same run."""
@@ -318,6 +634,52 @@ async def test_create_or_reject_interrupt_persists_interrupted_status_to_store()
assert stored_old["status"] == "interrupted"
@pytest.mark.anyio
async def test_create_or_reject_does_not_interrupt_old_run_when_new_run_store_write_fails():
"""A failed new-run persist must not cancel the existing inflight run."""
from unittest.mock import AsyncMock
store = MemoryRunStore()
manager = RunManager(store=store)
old = await manager.create("thread-1")
await manager.set_status(old.run_id, RunStatus.running)
store.put = AsyncMock(side_effect=RuntimeError("db down"))
with pytest.raises(RuntimeError, match="db down"):
await manager.create_or_reject("thread-1", multitask_strategy="interrupt")
stored_old = await store.get(old.run_id)
assert list(manager._runs) == [old.run_id]
assert old.status == RunStatus.running
assert old.abort_event.is_set() is False
assert stored_old is not None
assert stored_old["status"] == "running"
@pytest.mark.anyio
async def test_create_or_reject_does_not_interrupt_old_run_when_new_run_store_write_is_cancelled():
"""Cancellation during new-run persist must not cancel the existing run."""
store = MemoryRunStore()
manager = RunManager(store=store)
old = await manager.create("thread-1")
await manager.set_status(old.run_id, RunStatus.running)
async def cancelled_put(run_id, **kwargs):
raise asyncio.CancelledError
store.put = cancelled_put
with pytest.raises(asyncio.CancelledError):
await manager.create_or_reject("thread-1", multitask_strategy="interrupt")
stored_old = await store.get(old.run_id)
assert list(manager._runs) == [old.run_id]
assert old.status == RunStatus.running
assert old.abort_event.is_set() is False
assert stored_old is not None
assert stored_old["status"] == "running"
@pytest.mark.anyio
async def test_create_or_reject_rollback_persists_interrupted_status_to_store():
"""rollback strategy should persist interrupted status for old runs."""
+47 -2
View File
@@ -52,6 +52,9 @@ class _CustomRunStoreWithoutProgress(RunStore):
async def list_pending(self, *args, **kwargs):
return []
async def list_inflight(self, *args, **kwargs):
return []
async def aggregate_tokens_by_thread(self, *args, **kwargs):
return {}
@@ -75,6 +78,19 @@ class TestRunRepository:
assert row["status"] == "pending"
await _cleanup()
@pytest.mark.anyio
async def test_put_is_idempotent_for_retried_writes(self, tmp_path):
repo = await _make_repo(tmp_path)
await repo.put("r1", thread_id="t1", assistant_id="old-agent", status="pending")
await repo.put("r1", thread_id="t1", assistant_id="new-agent", status="running", error="retry")
row = await repo.get("r1")
assert row["assistant_id"] == "new-agent"
assert row["status"] == "running"
assert row["error"] == "retry"
await _cleanup()
@pytest.mark.anyio
async def test_get_missing_returns_none(self, tmp_path):
repo = await _make_repo(tmp_path)
@@ -85,11 +101,19 @@ class TestRunRepository:
async def test_update_status(self, tmp_path):
repo = await _make_repo(tmp_path)
await repo.put("r1", thread_id="t1")
await repo.update_status("r1", "running")
updated = await repo.update_status("r1", "running")
row = await repo.get("r1")
assert updated is True
assert row["status"] == "running"
await _cleanup()
@pytest.mark.anyio
async def test_update_status_returns_false_for_missing_row(self, tmp_path):
repo = await _make_repo(tmp_path)
updated = await repo.update_status("missing", "error", error="lost")
assert updated is False
await _cleanup()
@pytest.mark.anyio
async def test_update_status_with_error(self, tmp_path):
repo = await _make_repo(tmp_path)
@@ -146,11 +170,24 @@ class TestRunRepository:
assert all(r["status"] == "pending" for r in pending)
await _cleanup()
@pytest.mark.anyio
async def test_list_inflight_returns_pending_and_running_before_cutoff(self, tmp_path):
repo = await _make_repo(tmp_path)
await repo.put("pending-old", thread_id="t1", status="pending", created_at="2026-01-01T00:00:00+00:00")
await repo.put("running-old", thread_id="t1", status="running", created_at="2026-01-01T00:00:01+00:00")
await repo.put("success-old", thread_id="t1", status="success", created_at="2026-01-01T00:00:02+00:00")
await repo.put("pending-new", thread_id="t1", status="pending", created_at="2026-01-01T00:00:03+00:00")
inflight = await repo.list_inflight(before="2026-01-01T00:00:02+00:00")
assert [row["run_id"] for row in inflight] == ["pending-old", "running-old"]
await _cleanup()
@pytest.mark.anyio
async def test_update_run_completion(self, tmp_path):
repo = await _make_repo(tmp_path)
await repo.put("r1", thread_id="t1", status="running")
await repo.update_run_completion(
updated = await repo.update_run_completion(
"r1",
status="success",
total_input_tokens=100,
@@ -165,6 +202,7 @@ class TestRunRepository:
first_human_message="What is the meaning?",
)
row = await repo.get("r1")
assert updated is True
assert row["status"] == "success"
assert row["total_tokens"] == 150
assert row["llm_call_count"] == 2
@@ -174,6 +212,13 @@ class TestRunRepository:
assert row["first_human_message"] == "What is the meaning?"
await _cleanup()
@pytest.mark.anyio
async def test_update_run_completion_returns_false_for_missing_row(self, tmp_path):
repo = await _make_repo(tmp_path)
updated = await repo.update_run_completion("missing", status="error", total_tokens=1)
assert updated is False
await _cleanup()
@pytest.mark.anyio
async def test_metadata_preserved(self, tmp_path):
repo = await _make_repo(tmp_path)
@@ -0,0 +1,97 @@
"""Unit tests for ThreadState reducers.
Regression coverage for issue #3123: todos list disappearing after streaming
completes because a downstream node's partial state update with `todos=None`
overwrites the previously accumulated value.
"""
from typing import get_type_hints
from deerflow.agents.thread_state import (
ThreadState,
merge_artifacts,
merge_todos,
merge_viewed_images,
)
class TestMergeTodos:
"""Reducer for ThreadState.todos - keeps last non-None value."""
def test_new_value_overrides_existing(self):
existing = [{"id": 1, "text": "old", "done": False}]
new = [{"id": 1, "text": "old", "done": True}]
assert merge_todos(existing, new) == new
def test_none_new_preserves_existing(self):
"""THE KEY FIX for #3123: a node that doesn't touch todos must NOT
wipe them out by returning an implicit None."""
existing = [{"id": 1, "text": "task", "done": False}]
assert merge_todos(existing, None) == existing
def test_none_existing_accepts_new(self):
new = [{"id": 1, "text": "first todo"}]
assert merge_todos(None, new) == new
def test_both_none_returns_none(self):
assert merge_todos(None, None) is None
def test_empty_list_is_explicit_clear(self):
"""An explicit empty list means 'user cleared all todos' and must
win over the previous list."""
existing = [{"id": 1, "text": "task"}]
assert merge_todos(existing, []) == []
class TestMergeArtifacts:
"""Sanity check for the existing artifacts reducer."""
def test_dedupes_and_preserves_order(self):
assert merge_artifacts(["a", "b"], ["b", "c"]) == ["a", "b", "c"]
def test_none_new_preserves_existing(self):
assert merge_artifacts(["a"], None) == ["a"]
def test_none_existing_accepts_new(self):
assert merge_artifacts(None, ["a"]) == ["a"]
class TestMergeViewedImages:
"""Sanity check for the existing viewed_images reducer."""
def test_merges_dicts(self):
existing = {"k1": {"base64": "x", "mime_type": "image/png"}}
new = {"k2": {"base64": "y", "mime_type": "image/jpeg"}}
merged = merge_viewed_images(existing, new)
assert set(merged.keys()) == {"k1", "k2"}
def test_empty_dict_clears(self):
existing = {"k1": {"base64": "x", "mime_type": "image/png"}}
assert merge_viewed_images(existing, {}) == {}
class TestThreadStateAnnotations:
"""Regression guards: ensure reducer wiring on ThreadState fields.
These tests protect against silent regressions where a field's
``Annotated[..., reducer]`` is reverted to a plain type, which would
re-introduce bugs even when the reducer functions themselves remain
correct.
"""
def test_todos_field_is_wired_to_merge_todos(self):
"""ThreadState.todos must use merge_todos.
Without this Annotated binding, LangGraph falls back to last-value-wins
behavior, and partial state updates that omit todos will silently clear
previously streamed values.
"""
hints = get_type_hints(ThreadState, include_extras=True)
todos_hint = hints["todos"]
assert hasattr(todos_hint, "__metadata__"), "ThreadState.todos must be Annotated with a reducer"
assert merge_todos in todos_hint.__metadata__, "ThreadState.todos must be wired to merge_todos reducer (see #3123)"
def test_artifacts_field_is_wired_to_merge_artifacts(self):
"""Sanity check that existing reducer wiring is preserved."""
hints = get_type_hints(ThreadState, include_extras=True)
assert merge_artifacts in hints["artifacts"].__metadata__
+2 -2
View File
@@ -799,9 +799,9 @@ summarization:
# Summarization runs when ANY threshold is met (OR logic)
# You can specify a single trigger or a list of triggers
trigger:
# Trigger when token count reaches 15564
# Trigger when token count reaches 32000
- type: tokens
value: 15564
value: 32000
# Uncomment to also trigger when message count reaches 50
# - type: messages
# value: 50
+2
View File
@@ -1,3 +1,5 @@
pnpm-lock.yaml
.omc/
src/content/**/*.mdx
playwright-report/
test-results/
+4
View File
@@ -18,3 +18,7 @@ lint:
format:
pnpm format:write
build-static:
NEXT_CONFIG_BUILD_OUTPUT=standalone SKIP_ENV_VALIDATION=1 NEXT_PUBLIC_STATIC_WEBSITE_ONLY=true pnpm build
@if [ -d .next/static ]; then mkdir -p .next/standalone/.next && cp -R .next/static .next/standalone/.next/static; fi
+4
View File
@@ -16,6 +16,10 @@ const withNextra = nextra({});
/** @type {import("next").NextConfig} */
const config = {
output:
process.env.NEXT_CONFIG_BUILD_OUTPUT === "standalone"
? "standalone"
: undefined,
i18n: {
locales: ["en", "zh"],
defaultLocale: "en",
@@ -32,7 +32,7 @@ Even with digital Leicas, photographers often emulate film characteristics: natu
### Image 1: Parisian Decisive Moment
![Paris Decisive Moment](/frontend/public/demo/threads/7f9dc56c-e49c-4671-a3d2-c492ff4dce0c/user-data/outputs/leica-paris-decisive-moment.jpg)
![Paris Decisive Moment](/demo/threads/7f9dc56c-e49c-4671-a3d2-c492ff4dce0c/user-data/outputs/leica-paris-decisive-moment.jpg)
This image captures the essence of Cartier-Bresson's philosophy. A woman in a red coat leaps over a puddle while a cyclist passes in perfect synchrony. The composition follows the rule of thirds, with the subject positioned at the intersection of grid lines. Shot with a simulated Leica M11 and 35mm Summicron lens at f/2.8, the image features shallow depth of field, natural film grain, and the warm, muted color palette characteristic of Leica photography.
@@ -40,7 +40,7 @@ The "decisive moment" here isn't just about timing—it's about the alignment of
### Image 2: Tokyo Night Reflections
![Tokyo Night Scene](/frontend/public/demo/threads/7f9dc56c-e49c-4671-a3d2-c492ff4dce0c/user-data/outputs/leica-tokyo-night.jpg)
![Tokyo Night Scene](/demo/threads/7f9dc56c-e49c-4671-a3d2-c492ff4dce0c/user-data/outputs/leica-tokyo-night.jpg)
Moving to Shinjuku, Tokyo, this image explores the atmospheric possibilities of Leica's legendary Noctilux lens. Simulating a Leica M10-P with a 50mm f/0.95 Noctilux wide open, the image creates extremely shallow depth of field with beautiful bokeh balls from neon signs reflected in wet pavement.
@@ -48,7 +48,7 @@ A salaryman waits under glowing kanji signs, steam rising from a nearby ramen sh
### Image 3: New York City Candid
![NYC Candid Scene](/frontend/public/demo/threads/7f9dc56c-e49c-4671-a3d2-c492ff4dce0c/user-data/outputs/leica-nyc-candid.jpg)
![NYC Candid Scene](/demo/threads/7f9dc56c-e49c-4671-a3d2-c492ff4dce0c/user-data/outputs/leica-nyc-candid.jpg)
This Chinatown scene demonstrates the documentary power of Leica's Q2 camera with its fixed 28mm Summilux lens. The wide angle captures environmental context while maintaining intimate proximity to the subjects. A fishmonger hands a live fish to a customer while tourists photograph the scene—a moment of cultural contrast and authentic urban life.
@@ -1,19 +1,19 @@
"use client";
import { isStaticWebsiteOnly } from "@/core/static-mode";
import { DEMO_THREAD_IDS } from "@/core/threads/static-demo";
import { PromptInputProvider } from "@/components/ai-elements/prompt-input";
import { ArtifactsProvider } from "@/components/workspace/artifacts";
import { SubtasksProvider } from "@/core/tasks/context";
import { ChatProviders } from "./providers";
export function generateStaticParams() {
if (!isStaticWebsiteOnly()) {
return [];
}
return DEMO_THREAD_IDS.map((thread_id) => ({ thread_id }));
}
export default function ChatLayout({
children,
}: {
children: React.ReactNode;
}) {
return (
<SubtasksProvider>
<ArtifactsProvider>
<PromptInputProvider>{children}</PromptInputProvider>
</ArtifactsProvider>
</SubtasksProvider>
);
return <ChatProviders>{children}</ChatProviders>;
}
@@ -227,6 +227,7 @@ export default function ChatPage() {
isWelcomeMode && <Welcome mode={settings.context.mode} />
}
disabled={
isMock ||
env.NEXT_PUBLIC_STATIC_WEBSITE_ONLY === "true" ||
isUploading
}
@@ -0,0 +1,15 @@
"use client";
import { PromptInputProvider } from "@/components/ai-elements/prompt-input";
import { ArtifactsProvider } from "@/components/workspace/artifacts";
import { SubtasksProvider } from "@/core/tasks/context";
export function ChatProviders({ children }: { children: React.ReactNode }) {
return (
<SubtasksProvider>
<ArtifactsProvider>
<PromptInputProvider>{children}</PromptInputProvider>
</ArtifactsProvider>
</SubtasksProvider>
);
}
+5 -3
View File
@@ -43,12 +43,14 @@ export default async function WorkspaceLayout({
>
Retry
</Link>
<Link
href="/api/v1/auth/logout"
<form action="/api/v1/auth/logout" method="post">
<button
type="submit"
className="text-muted-foreground hover:bg-muted rounded-md border px-4 py-2 text-sm"
>
Logout &amp; Reset
</Link>
</button>
</form>
</div>
</div>
);
@@ -8,7 +8,7 @@ import {
SquareArrowOutUpRightIcon,
XIcon,
} from "lucide-react";
import { useCallback, useEffect, useMemo, useState } from "react";
import { useCallback, useEffect, useMemo, useRef, useState } from "react";
import { toast } from "sonner";
import { Streamdown } from "streamdown";
@@ -30,8 +30,16 @@ import {
import { ToggleGroup, ToggleGroupItem } from "@/components/ui/toggle-group";
import { CodeEditor } from "@/components/workspace/code-editor";
import { useArtifactContent } from "@/core/artifacts/hooks";
import {
appendHtmlPreviewBaseHref,
appendHtmlPreviewScrollRestoration,
createHtmlPreviewScrollKey,
getArtifactViewState,
HTML_PREVIEW_SCROLL_MESSAGE_SOURCE,
} from "@/core/artifacts/preview";
import { urlOfArtifact } from "@/core/artifacts/utils";
import { useI18n } from "@/core/i18n/hooks";
import { findToolCallResult } from "@/core/messages/utils";
import { installSkill } from "@/core/skills/api";
import { streamdownPlugins } from "@/core/streamdown";
import { checkCodeFile, getFileName } from "@/core/utils/files";
@@ -44,6 +52,8 @@ import { Tooltip } from "../tooltip";
import { useArtifacts } from "./context";
const WRITE_FILE_PREVIEW_REFRESH_INTERVAL_MS = 3000;
export function ArtifactFileDetail({
className,
filepath: filepathFromProps,
@@ -55,6 +65,7 @@ export function ArtifactFileDetail({
}) {
const { t } = useI18n();
const { artifacts, setOpen, select } = useArtifacts();
const { thread, isMock } = useThread();
const isWriteFile = useMemo(() => {
return filepathFromProps.startsWith("write-file:");
}, [filepathFromProps]);
@@ -83,24 +94,43 @@ export function ArtifactFileDetail({
const isSupportPreview = useMemo(() => {
return language === "html" || language === "markdown";
}, [language]);
const { content } = useArtifactContent({
const toolResult = (() => {
if (!isWriteFile) {
return undefined;
}
const url = new URL(filepathFromProps);
const toolCallId = url.searchParams.get("tool_call_id");
if (!toolCallId) {
return undefined;
}
return findToolCallResult(toolCallId, thread.messages);
})();
const artifactViewState = getArtifactViewState({
filepath: filepathFromProps,
isSupportPreview,
toolResult,
});
const { content, url } = useArtifactContent({
threadId,
filepath: filepathFromProps,
enabled: isCodeFile && !isWriteFile,
});
const displayContent = content ?? "";
const isWritingFile = isWriteFile && toolResult === undefined;
const visibleContent = useThrottledValue(
displayContent,
isWritingFile ? WRITE_FILE_PREVIEW_REFRESH_INTERVAL_MS : 0,
filepathFromProps,
);
const [viewMode, setViewMode] = useState<"code" | "preview">("code");
const [viewMode, setViewMode] = useState<"code" | "preview">(
artifactViewState.initialViewMode,
);
const [isInstalling, setIsInstalling] = useState(false);
const { isMock } = useThread();
useEffect(() => {
if (isSupportPreview) {
setViewMode("preview");
} else {
setViewMode("code");
}
}, [isSupportPreview]);
setViewMode(artifactViewState.initialViewMode);
}, [artifactViewState.initialViewMode]);
const handleInstallSkill = useCallback(async () => {
if (isInstalling) return;
@@ -149,7 +179,7 @@ export function ArtifactFileDetail({
</ArtifactTitle>
</div>
<div className="flex min-w-0 grow items-center justify-center">
{isSupportPreview && (
{artifactViewState.canPreview && (
<ToggleGroup
className="mx-auto"
type="single"
@@ -209,7 +239,7 @@ export function ArtifactFileDetail({
disabled={!content}
onClick={async () => {
try {
await navigator.clipboard.writeText(displayContent ?? "");
await navigator.clipboard.writeText(visibleContent ?? "");
toast.success(t.clipboard.copiedToClipboard);
} catch (error) {
toast.error("Failed to copy to clipboard");
@@ -249,18 +279,20 @@ export function ArtifactFileDetail({
</div>
</ArtifactHeader>
<ArtifactContent className="p-0">
{isSupportPreview &&
{artifactViewState.canPreview &&
viewMode === "preview" &&
(language === "markdown" || language === "html") && (
<ArtifactFilePreview
content={displayContent}
content={visibleContent}
language={language ?? "text"}
scrollKey={filepathFromProps}
url={url}
/>
)}
{isCodeFile && viewMode === "code" && (
<CodeEditor
className="size-full resize-none rounded-none border-none"
value={displayContent ?? ""}
value={visibleContent ?? ""}
readonly
/>
)}
@@ -278,26 +310,85 @@ export function ArtifactFileDetail({
export function ArtifactFilePreview({
content,
language,
scrollKey,
url,
}: {
content: string;
language: string;
scrollKey: string;
url?: string;
}) {
const iframeRef = useRef<HTMLIFrameElement>(null);
const scrollPositionRef = useRef({ x: 0, y: 0 });
const scrollMessageKey = useMemo(
() => createHtmlPreviewScrollKey(scrollKey),
[scrollKey],
);
const [htmlPreviewUrl, setHtmlPreviewUrl] = useState<string>();
useEffect(() => {
scrollPositionRef.current = { x: 0, y: 0 };
}, [scrollMessageKey]);
useEffect(() => {
if (language !== "html") {
return;
}
const handleMessage = (event: MessageEvent) => {
if (event.source !== iframeRef.current?.contentWindow) {
return;
}
if (!isArtifactScrollMessage(event.data, scrollMessageKey)) {
return;
}
if (event.data.type === "save") {
const x = scrollCoordinate(event.data.x);
const y = scrollCoordinate(event.data.y);
if (x !== undefined && y !== undefined) {
scrollPositionRef.current = { x, y };
}
return;
}
iframeRef.current?.contentWindow?.postMessage(
{
source: HTML_PREVIEW_SCROLL_MESSAGE_SOURCE,
key: scrollMessageKey,
type: "restore",
...scrollPositionRef.current,
},
"*",
);
};
window.addEventListener("message", handleMessage);
return () => {
window.removeEventListener("message", handleMessage);
};
}, [language, scrollMessageKey]);
useEffect(() => {
if (language !== "html") {
setHtmlPreviewUrl(undefined);
return;
}
const blob = new Blob([content ?? ""], { type: "text/html" });
const url = URL.createObjectURL(blob);
setHtmlPreviewUrl(url);
const previewContent = appendHtmlPreviewScrollRestoration(
appendHtmlPreviewBaseHref(content ?? "", url),
scrollKey,
);
const blob = new Blob([previewContent], {
type: "text/html;charset=utf-8",
});
const objectUrl = URL.createObjectURL(blob);
setHtmlPreviewUrl(objectUrl);
return () => {
URL.revokeObjectURL(url);
URL.revokeObjectURL(objectUrl);
};
}, [content, language]);
}, [content, language, scrollKey, url]);
if (language === "markdown") {
return (
@@ -315,6 +406,7 @@ export function ArtifactFilePreview({
if (language === "html") {
return (
<iframe
ref={iframeRef}
className="size-full"
title="Artifact preview"
sandbox="allow-scripts allow-forms"
@@ -324,3 +416,100 @@ export function ArtifactFilePreview({
}
return null;
}
function isArtifactScrollMessage(
data: unknown,
key: string,
): data is {
type: "save" | "restore-request";
x?: unknown;
y?: unknown;
} {
return (
typeof data === "object" &&
data !== null &&
"source" in data &&
data.source === HTML_PREVIEW_SCROLL_MESSAGE_SOURCE &&
"key" in data &&
data.key === key &&
"type" in data &&
(data.type === "save" || data.type === "restore-request")
);
}
function scrollCoordinate(value: unknown) {
return typeof value === "number" && Number.isFinite(value)
? value
: undefined;
}
function useThrottledValue(
value: string,
intervalMs: number,
resetKey: string,
) {
const [throttledValue, setThrottledValue] = useState(value);
const latestValueRef = useRef(value);
const lastFlushAtRef = useRef(0);
const timeoutRef = useRef<ReturnType<typeof setTimeout> | null>(null);
const resetKeyRef = useRef(resetKey);
useEffect(() => {
latestValueRef.current = value;
if (resetKeyRef.current !== resetKey) {
resetKeyRef.current = resetKey;
if (timeoutRef.current) {
clearTimeout(timeoutRef.current);
timeoutRef.current = null;
}
lastFlushAtRef.current = Date.now();
setThrottledValue(value);
return;
}
if (intervalMs <= 0) {
if (timeoutRef.current) {
clearTimeout(timeoutRef.current);
timeoutRef.current = null;
}
lastFlushAtRef.current = Date.now();
setThrottledValue(value);
return;
}
const now = Date.now();
const elapsed = now - lastFlushAtRef.current;
if (lastFlushAtRef.current === 0 || elapsed >= intervalMs) {
if (timeoutRef.current) {
clearTimeout(timeoutRef.current);
timeoutRef.current = null;
}
lastFlushAtRef.current = now;
setThrottledValue(value);
return;
}
if (timeoutRef.current) {
return;
}
timeoutRef.current = setTimeout(() => {
timeoutRef.current = null;
lastFlushAtRef.current = Date.now();
setThrottledValue(latestValueRef.current);
}, intervalMs - elapsed);
}, [intervalMs, resetKey, value]);
useEffect(() => {
return () => {
if (timeoutRef.current) {
clearTimeout(timeoutRef.current);
}
};
}, []);
return intervalMs <= 0 || resetKeyRef.current !== resetKey
? value
: throttledValue;
}
@@ -16,13 +16,15 @@ import {
import {
extractContentFromMessage,
extractPresentFilesFromMessage,
extractReasoningContentFromMessage,
extractTextFromMessage,
getAssistantTurnCopyData,
getAssistantTurnUsageMessages,
getMessageGroups,
getStreamingMessageLookup,
hasContent,
hasPresentFiles,
hasReasoning,
isAssistantMessageGroupStreaming,
} from "@/core/messages/utils";
import { useRehypeSplitWordsIntoSpans } from "@/core/rehype";
import type { Subtask } from "@/core/tasks";
@@ -184,16 +186,19 @@ export function MessageList({
() => buildTokenDebugSteps(messages, t),
[messages, t],
);
const streamingMessages = useMemo(
() =>
getStreamingMessageLookup(
messages,
thread.isLoading,
thread.getMessagesMetadata,
),
[messages, thread.getMessagesMetadata, thread.isLoading],
);
const renderAssistantCopyButton = useCallback((messages: Message[]) => {
const clipboardData = [...messages]
.reverse()
.filter((message) => message.type === "ai")
.map((message) => {
const content = extractContentFromMessage(message);
return content ?? extractReasoningContentFromMessage(message) ?? "";
})
.find((content) => content.length > 0);
const renderAssistantCopyButton = useCallback(
(messages: Message[], isStreaming: boolean) => {
const clipboardData = getAssistantTurnCopyData(messages, { isStreaming });
if (!clipboardData) {
return null;
@@ -204,7 +209,9 @@ export function MessageList({
<CopyButton clipboardData={clipboardData} />
</div>
);
}, []);
},
[],
);
const renderTokenUsage = useCallback(
({
@@ -294,7 +301,13 @@ export function MessageList({
turnUsageMessages,
})}
{group.type === "assistant" &&
renderAssistantCopyButton(group.messages)}
renderAssistantCopyButton(
group.messages,
isAssistantMessageGroupStreaming(
group.messages,
streamingMessages,
),
)}
</div>
);
} else if (group.type === "assistant:clarification") {
@@ -162,7 +162,7 @@ summarization:
# Trigger conditions — summarization runs when ANY threshold is met
trigger:
- type: tokens # trigger when context exceeds N tokens
value: 15564
value: 32000
# - type: messages # trigger when there are more than N messages
# value: 50
# - type: fraction # trigger when context exceeds X% of model max
+17 -17
View File
@@ -20,27 +20,27 @@ If you want to understand how DeerFlow works, start with the Introduction. If yo
Start with the conceptual overview first.
- [Introduction](/docs/introduction)
- [Why DeerFlow](/docs/introduction/why-deerflow)
- [Harness vs App](/docs/introduction/harness-vs-app)
- [Introduction](./docs/introduction)
- [Why DeerFlow](./docs/introduction/why-deerflow)
- [Harness vs App](./docs/introduction/harness-vs-app)
### If you want to build with DeerFlow
Start with the Harness section. This path is for teams who want to integrate DeerFlow capabilities into their own system or build a custom agent product on top of the DeerFlow runtime.
- [DeerFlow Harness](/docs/harness)
- [Quick Start](/docs/harness/quick-start)
- [Configuration](/docs/harness/configuration)
- [Customization](/docs/harness/customization)
- [DeerFlow Harness](./docs/harness)
- [Quick Start](./docs/harness/quick-start)
- [Configuration](./docs/harness/configuration)
- [Customization](./docs/harness/customization)
### If you want to deploy and use DeerFlow
Start with the App section. This path is for teams who want to run DeerFlow as a complete application and understand how to configure, operate, and use it in practice.
- [DeerFlow App](/docs/app)
- [Quick Start](/docs/app/quick-start)
- [Deployment Guide](/docs/app/deployment-guide)
- [Workspace Usage](/docs/app/workspace-usage)
- [DeerFlow App](./docs/app)
- [Quick Start](./docs/app/quick-start)
- [Deployment Guide](./docs/app/deployment-guide)
- [Workspace Usage](./docs/app/workspace-usage)
## Documentation structure
@@ -79,17 +79,17 @@ The App section is written for teams who want to deploy DeerFlow as a usable pro
The Tutorials section is for hands-on, task-oriented learning.
- [Tutorials](/docs/tutorials)
- [Tutorials](./docs/tutorials)
### Reference
The Reference section is for detailed lookup material, including configuration, runtime modes, APIs, and source-oriented mapping.
- [Reference](/docs/reference)
- [Reference](./docs/reference)
## Choose the right path
- If you are **evaluating the project**, start with [Introduction](/docs/introduction).
- If you are **building your own agent system**, start with [DeerFlow Harness](/docs/harness).
- If you are **deploying DeerFlow for users**, start with [DeerFlow App](/docs/app).
- If you want to **learn by doing**, go to [Tutorials](/docs/tutorials).
- If you are **evaluating the project**, start with [Introduction](./docs/introduction).
- If you are **building your own agent system**, start with [DeerFlow Harness](./docs/harness).
- If you are **deploying DeerFlow for users**, start with [DeerFlow App](./docs/app).
- If you want to **learn by doing**, go to [Tutorials](./docs/tutorials).
@@ -0,0 +1,9 @@
---
title: DeerFlow 2.0 M1
description: DeerFlow 2.0 M1 is officially in RC. Here's what you need to know.
date: 2026-05-30
tags:
- Release
---
## DeerFlow 2.0 M1 Release
@@ -154,7 +154,7 @@ summarization:
# 触发条件——满足任意一个条件时运行摘要
trigger:
- type: tokens # 当上下文超过 N 个 token 时触发
value: 15564
value: 32000
# - type: messages # 当消息数超过 N 时触发
# value: 50
# - type: fraction # 当上下文达到模型最大输入的 X% 时触发
+11 -11
View File
@@ -20,27 +20,27 @@ DeerFlow 是一个用于构建和运行 Agent 系统的框架。它提供了一
先从概念概述开始。
- [简介](/docs/introduction)
- [为什么选择 DeerFlow](/docs/introduction/why-deerflow)
- [Harness 与应用的区别](/docs/introduction/harness-vs-app)
- [简介](./docs/introduction)
- [为什么选择 DeerFlow](./docs/introduction/why-deerflow)
- [Harness 与应用的区别](./docs/introduction/harness-vs-app)
### 如果你想基于 DeerFlow 进行开发
从 Harness 章节开始。这条路径适合想将 DeerFlow 功能集成到自己系统中,或基于 DeerFlow 运行时构建自定义 Agent 产品的团队。
- [DeerFlow Harness](/docs/harness)
- [快速上手](/docs/harness/quick-start)
- [配置](/docs/harness/configuration)
- [自定义与扩展](/docs/harness/customization)
- [DeerFlow Harness](./docs/harness)
- [快速上手](./docs/harness/quick-start)
- [配置](./docs/harness/configuration)
- [自定义与扩展](./docs/harness/customization)
### 如果你想部署和使用 DeerFlow
从应用章节开始。这条路径适合想将 DeerFlow 作为完整应用运行,并了解如何配置、运维和实际使用的团队。
- [DeerFlow 应用](/docs/application)
- [快速上手](/docs/application/quick-start)
- [部署指南](/docs/application/deployment-guide)
- [工作区使用](/docs/application/workspace-usage)
- [DeerFlow 应用](./docs/application)
- [快速上手](./docs/application/quick-start)
- [部署指南](./docs/application/deployment-guide)
- [工作区使用](./docs/application/workspace-usage)
## 文档结构
+49
View File
@@ -3,6 +3,13 @@
import { Client as LangGraphClient } from "@langchain/langgraph-sdk/client";
import { getLangGraphBaseURL } from "../config";
import { isStaticWebsiteOnly } from "../static-mode";
import {
loadStaticDemoThread,
loadStaticDemoThreads,
staticDemoThreadState,
} from "../threads/static-demo";
import type { AgentThreadState } from "../threads/types";
import { isStateChangingMethod, readCsrfCookie } from "./fetcher";
import { sanitizeRunStreamOptions } from "./stream-mode";
@@ -32,6 +39,10 @@ function injectCsrfHeader(_url: URL, init: RequestInit): RequestInit {
}
function createCompatibleClient(isMock?: boolean): LangGraphClient {
if (isStaticWebsiteOnly() && !isMock) {
return createStaticClient();
}
const apiUrl = getLangGraphBaseURL(isMock);
console.log(`Creating API client with base URL: ${apiUrl}`);
const client = new LangGraphClient({
@@ -58,6 +69,44 @@ function createCompatibleClient(isMock?: boolean): LangGraphClient {
return client;
}
function createStaticClient(): LangGraphClient {
const apiUrl =
typeof window === "undefined"
? "http://localhost:3000"
: window.location.origin;
const client = new LangGraphClient({ apiUrl });
client.threads.search = (async (query) => {
return loadStaticDemoThreads(query);
}) as typeof client.threads.search;
client.threads.get = (async (threadId) => {
return loadStaticDemoThread(threadId);
}) as typeof client.threads.get;
client.threads.getState = (async (threadId) => {
return staticDemoThreadState(await loadStaticDemoThread(threadId));
}) as typeof client.threads.getState;
client.threads.getHistory = (async (threadId) => {
return [staticDemoThreadState(await loadStaticDemoThread(threadId))];
}) as typeof client.threads.getHistory;
client.threads.update = (async (threadId) => {
return loadStaticDemoThread(threadId);
}) as typeof client.threads.update;
client.runs.list = (async () => []) as typeof client.runs.list;
client.runs.stream = async function* () {
/* empty */
} as typeof client.runs.stream;
client.runs.joinStream = async function* () {
/* empty */
} as typeof client.runs.joinStream;
return client as LangGraphClient<AgentThreadState>;
}
const _clients = new Map<string, LangGraphClient>();
export function getAPIClient(isMock?: boolean): LangGraphClient {
const cacheKey = isMock ? "mock" : "default";
+9
View File
@@ -2,6 +2,7 @@ import type { BaseStream } from "@langchain/langgraph-sdk/react";
import type { AgentThreadState } from "../threads";
import { buildWriteFileDraftContent } from "./preview";
import { urlOfArtifact } from "./utils";
export async function loadArtifactContent({
@@ -30,6 +31,14 @@ export function loadArtifactContentFromToolCall({
url: string;
thread: BaseStream<AgentThreadState>;
}) {
const draftContent = buildWriteFileDraftContent({
filepath: urlString,
messages: thread.messages,
});
if (draftContent !== undefined) {
return draftContent;
}
const url = new URL(urlString);
const toolCallId = url.searchParams.get("tool_call_id");
const messageId = url.searchParams.get("message_id");
+278
View File
@@ -0,0 +1,278 @@
export type ArtifactViewMode = "code" | "preview";
type ArtifactPreviewMessage = {
type?: string;
id?: string;
name?: string | null;
tool_call_id?: string;
content?: unknown;
tool_calls?: Array<{
id?: string;
name?: string;
args?: Record<string, unknown>;
}>;
};
export function isWriteFileArtifact(filepath: string) {
return filepath.startsWith("write-file:");
}
function hasSuccessfulWriteResult(toolResult: string | undefined) {
return toolResult?.trim() === "OK";
}
function hasFailedWriteResult(toolResult: string | undefined) {
return (
typeof toolResult === "string" && !hasSuccessfulWriteResult(toolResult)
);
}
function getTextContent(content: unknown) {
if (typeof content === "string") {
return content.trim();
}
if (Array.isArray(content)) {
return content
.map((part) => {
if (
typeof part === "object" &&
part !== null &&
"text" in part &&
typeof part.text === "string"
) {
return part.text;
}
return "";
})
.join("")
.trim();
}
return undefined;
}
function findToolResult(
toolCallId: string,
messages: ArtifactPreviewMessage[],
) {
for (const message of messages) {
if (message.type === "tool" && message.tool_call_id === toolCallId) {
return getTextContent(message.content);
}
}
return undefined;
}
function parseWriteFileArtifact(filepath: string) {
if (!isWriteFileArtifact(filepath)) {
return undefined;
}
try {
const url = new URL(filepath);
return {
path: decodeURIComponent(url.pathname),
messageId: url.searchParams.get("message_id") ?? undefined,
toolCallId: url.searchParams.get("tool_call_id") ?? undefined,
};
} catch {
return undefined;
}
}
export function buildWriteFileDraftContent({
filepath,
messages,
}: {
filepath: string;
messages: ArtifactPreviewMessage[];
}) {
const target = parseWriteFileArtifact(filepath);
if (!target) {
return undefined;
}
let draft = "";
let hasDraft = false;
for (const message of messages) {
if (message.type !== "ai") {
continue;
}
for (const toolCall of message.tool_calls ?? []) {
const args = toolCall.args ?? {};
if (
toolCall.name !== "write_file" ||
args.path !== target.path ||
typeof args.content !== "string"
) {
continue;
}
const toolCallId = toolCall.id;
const toolResult = toolCallId
? findToolResult(toolCallId, messages)
: undefined;
const isSelected =
toolCallId === target.toolCallId &&
(!target.messageId || message.id === target.messageId);
if (isSelected && hasFailedWriteResult(toolResult)) {
return undefined;
}
const shouldInclude =
hasSuccessfulWriteResult(toolResult) ||
(isSelected && toolResult === undefined);
if (!shouldInclude) {
continue;
}
if (args.append === true && hasDraft) {
draft += args.content;
} else {
draft = args.content;
}
hasDraft = true;
if (isSelected) {
return draft;
}
}
}
return hasDraft ? draft : undefined;
}
export function getArtifactViewState({
filepath,
isSupportPreview,
toolResult,
}: {
filepath: string;
isSupportPreview: boolean;
toolResult?: string;
}): {
canPreview: boolean;
initialViewMode: ArtifactViewMode;
} {
const isWriteArtifact = isWriteFileArtifact(filepath);
const canPreview =
isSupportPreview && (!isWriteArtifact || !hasFailedWriteResult(toolResult));
return {
canPreview,
initialViewMode: canPreview ? "preview" : "code",
};
}
export function appendHtmlPreviewBaseHref(
content: string,
url?: string,
currentHref = globalThis.location?.href ?? "http://localhost/",
) {
if (!url || /<base\s/i.exec(content)) {
return content;
}
const baseHref = htmlBaseHref(url, currentHref);
const baseElement = `<base href="${escapeHtmlAttribute(baseHref)}">`;
if (/<head[^>]*>/i.exec(content)) {
return content.replace(/<head([^>]*)>/i, `<head$1>${baseElement}`);
}
return `${baseElement}${content}`;
}
function htmlBaseHref(url: string, currentHref: string) {
const baseUrl = new URL(url, currentHref);
baseUrl.pathname = baseUrl.pathname.replace(/\/[^/]*$/, "/");
baseUrl.search = "";
baseUrl.hash = "";
return baseUrl.toString();
}
function escapeHtmlAttribute(value: string) {
return value.replaceAll("&", "&amp;").replaceAll('"', "&quot;");
}
export const HTML_PREVIEW_SCROLL_MESSAGE_SOURCE =
"deerflow-artifact-preview-scroll";
export function createHtmlPreviewScrollKey(value: string) {
let hash = 2166136261;
for (let index = 0; index < value.length; index += 1) {
hash ^= value.charCodeAt(index);
hash = Math.imul(hash, 16777619);
}
return `artifact-scroll:${(hash >>> 0).toString(36)}`;
}
function escapeJavaScriptString(value: string) {
return JSON.stringify(value)
.replace(/</g, "\\u003C")
.replace(/\u2028/g, "\\u2028")
.replace(/\u2029/g, "\\u2029");
}
function htmlScrollRestorationScript(messageKey: string) {
return `<script data-deerflow-artifact-scroll-restoration>
(() => {
const source = ${escapeJavaScriptString(HTML_PREVIEW_SCROLL_MESSAGE_SOURCE)};
const key = ${escapeJavaScriptString(messageKey)};
const post = (type, payload = {}) => {
window.parent.postMessage({ source, key, type, ...payload }, "*");
};
const save = () => {
post("save", {
x: Math.round(window.scrollX || 0),
y: Math.round(window.scrollY || 0),
});
};
const restore = (x, y) => {
if (Number.isFinite(x) && Number.isFinite(y)) {
window.scrollTo(x, y);
}
};
window.addEventListener("message", (event) => {
const data = event.data;
if (
!data ||
data.source !== source ||
data.key !== key ||
data.type !== "restore"
) {
return;
}
restore(data.x, data.y);
});
window.addEventListener("scroll", save, { passive: true });
window.addEventListener("pagehide", save);
if (document.readyState === "loading") {
document.addEventListener("DOMContentLoaded", () => post("restore-request"), { once: true });
} else {
post("restore-request");
}
window.addEventListener("load", () => post("restore-request"), { once: true });
})();
</script>`;
}
export function appendHtmlPreviewScrollRestoration(
content: string,
scrollKey = "default",
) {
if (content.includes("data-deerflow-artifact-scroll-restoration")) {
return content;
}
const script = htmlScrollRestorationScript(
createHtmlPreviewScrollKey(scrollKey),
);
if (/<head(?:\s[^>]*)?>/i.test(content)) {
return content.replace(
/<head(?:\s[^>]*)?>/i,
(headTag) => `${headTag}${script}`,
);
}
if (/<\/body\s*>/i.test(content)) {
return content.replace(/<\/body\s*>/i, `${script}</body>`);
}
return `${content}${script}`;
}
+20
View File
@@ -1,4 +1,5 @@
import { getBackendBaseURL } from "../config";
import { isStaticWebsiteOnly } from "../static-mode";
import type { AgentThread } from "../threads";
export function urlOfArtifact({
@@ -12,6 +13,9 @@ export function urlOfArtifact({
download?: boolean;
isMock?: boolean;
}) {
if (isStaticWebsiteOnly()) {
return staticDemoArtifactURL({ filepath, threadId, download });
}
if (isMock) {
return `${getBackendBaseURL()}/mock/api/threads/${threadId}/artifacts${filepath}${download ? "?download=true" : ""}`;
}
@@ -23,5 +27,21 @@ export function extractArtifactsFromThread(thread: AgentThread) {
}
export function resolveArtifactURL(absolutePath: string, threadId: string) {
if (isStaticWebsiteOnly()) {
return staticDemoArtifactURL({ filepath: absolutePath, threadId });
}
return `${getBackendBaseURL()}/api/threads/${threadId}/artifacts${absolutePath}`;
}
function staticDemoArtifactURL({
filepath,
threadId,
download = false,
}: {
filepath: string;
threadId: string;
download?: boolean;
}) {
const demoPath = filepath.replace(/^\/mnt\//, "/");
return `${getBackendBaseURL()}/demo/threads/${threadId}${demoPath}${download ? "?download=true" : ""}`;
}
+17 -3
View File
@@ -10,6 +10,8 @@ import React, {
type ReactNode,
} from "react";
import { isStaticWebsiteOnly } from "../static-mode";
import { type User, buildLoginUrl } from "./types";
// Re-export for consumers
@@ -46,6 +48,7 @@ export function AuthProvider({ children, initialUser }: AuthProviderProps) {
const [isLoading, setIsLoading] = useState(false);
const router = useRouter();
const pathname = usePathname();
const staticMode = isStaticWebsiteOnly();
const isAuthenticated = user !== null;
@@ -54,6 +57,8 @@ export function AuthProvider({ children, initialUser }: AuthProviderProps) {
* Used when initialUser might be stale (e.g., after tab was inactive)
*/
const refreshUser = useCallback(async () => {
if (staticMode) return;
try {
setIsLoading(true);
const res = await fetch("/api/v1/auth/me", {
@@ -77,7 +82,7 @@ export function AuthProvider({ children, initialUser }: AuthProviderProps) {
} finally {
setIsLoading(false);
}
}, [pathname, router]);
}, [staticMode, pathname, router]);
/**
* Logout - call FastAPI logout endpoint and clear local state
@@ -87,6 +92,11 @@ export function AuthProvider({ children, initialUser }: AuthProviderProps) {
// Immediately clear local state to prevent UI flicker
setUser(null);
if (staticMode) {
router.push("/");
return;
}
try {
await fetch("/api/v1/auth/logout", {
method: "POST",
@@ -99,7 +109,7 @@ export function AuthProvider({ children, initialUser }: AuthProviderProps) {
// Redirect to home page
router.push("/");
}, [router]);
}, [staticMode, router]);
/**
* Handle visibility change - refresh user when tab becomes visible again.
@@ -108,6 +118,8 @@ export function AuthProvider({ children, initialUser }: AuthProviderProps) {
const lastCheckRef = React.useRef(0);
useEffect(() => {
if (staticMode) return;
const handleVisibilityChange = () => {
if (document.visibilityState !== "visible" || user === null) return;
const now = Date.now();
@@ -120,7 +132,7 @@ export function AuthProvider({ children, initialUser }: AuthProviderProps) {
return () => {
document.removeEventListener("visibilitychange", handleVisibilityChange);
};
}, [user, refreshUser]);
}, [staticMode, user, refreshUser]);
const value: AuthContextType = {
user,
@@ -155,6 +167,8 @@ export function useRequireAuth(): AuthContextType {
const pathname = usePathname();
useEffect(() => {
if (isStaticWebsiteOnly()) return;
// Only redirect if we're sure user is not authenticated (not just loading)
if (!auth.isLoading && !auth.isAuthenticated) {
router.push(buildLoginUrl(pathname || "/workspace"));
+10
View File
@@ -1,6 +1,9 @@
import { cookies } from "next/headers";
import { isStaticWebsiteOnly } from "../static-mode";
import { getGatewayConfig } from "./gateway-config";
import { STATIC_WEBSITE_USER } from "./static-user";
import { type AuthResult, userSchema } from "./types";
const SSR_AUTH_TIMEOUT_MS = 5_000;
@@ -10,6 +13,13 @@ const SSR_AUTH_TIMEOUT_MS = 5_000;
* Returns a tagged AuthResult — callers use exhaustive switch, no try/catch.
*/
export async function getServerSideUser(): Promise<AuthResult> {
if (isStaticWebsiteOnly()) {
return {
tag: "authenticated",
user: STATIC_WEBSITE_USER,
};
}
if (process.env.DEER_FLOW_AUTH_DISABLED === "1") {
return {
tag: "authenticated",
+8
View File
@@ -0,0 +1,8 @@
import type { User } from "./types";
export const STATIC_WEBSITE_USER: User = {
id: "static-website-user",
email: "static@example.local",
system_role: "admin",
needs_setup: false,
};
+80
View File
@@ -170,6 +170,86 @@ export function getAssistantTurnUsageMessages(groups: MessageGroup[]) {
return usageMessagesByGroupIndex;
}
type MessageMetadataLookup = (
message: Message,
index: number,
) => { streamMetadata?: Record<string, unknown> } | undefined;
export type StreamingMessageLookup = {
ids: ReadonlySet<string>;
messages: ReadonlySet<Message>;
};
export function getStreamingMessageLookup(
messages: Message[],
isStreaming: boolean,
getMessagesMetadata?: MessageMetadataLookup,
): StreamingMessageLookup {
const streamingMessageIds = new Set<string>();
const streamingMessages = new Set<Message>();
if (!isStreaming) {
return {
ids: streamingMessageIds,
messages: streamingMessages,
};
}
messages.forEach((message, index) => {
if (!getMessagesMetadata?.(message, index)?.streamMetadata) {
return;
}
if (typeof message.id === "string" && message.id.length > 0) {
streamingMessageIds.add(message.id);
}
streamingMessages.add(message);
});
return {
ids: streamingMessageIds,
messages: streamingMessages,
};
}
export function isAssistantMessageGroupStreaming(
groupMessages: Message[],
streamingMessages: StreamingMessageLookup,
) {
return groupMessages.some((message) => {
if (message.type !== "ai") {
return false;
}
return (
(typeof message.id === "string" &&
message.id.length > 0 &&
streamingMessages.ids.has(message.id)) ||
streamingMessages.messages.has(message)
);
});
}
export function getAssistantTurnCopyData(
messages: Message[],
{ isStreaming = false }: { isStreaming?: boolean } = {},
) {
if (isStreaming) {
return null;
}
return (
[...messages]
.reverse()
.filter((message) => message.type === "ai")
.map((message) => {
const content = extractContentFromMessage(message);
return content ?? extractReasoningContentFromMessage(message) ?? "";
})
.find((content) => content.length > 0) ?? null
);
}
export function extractTextFromMessage(message: Message) {
if (typeof message.content === "string") {
return (
+10
View File
@@ -1,8 +1,18 @@
import { getBackendBaseURL } from "../config";
import { isStaticWebsiteOnly } from "../static-mode";
import type { ModelsResponse } from "./types";
const STATIC_MODELS_RESPONSE: ModelsResponse = {
models: [],
token_usage: { enabled: false },
};
export async function loadModels(): Promise<ModelsResponse> {
if (isStaticWebsiteOnly()) {
return STATIC_MODELS_RESPONSE;
}
const res = await fetch(`${getBackendBaseURL()}/api/models`);
const data = (await res.json()) as Partial<ModelsResponse>;
return {
+5
View File
@@ -0,0 +1,5 @@
import { env } from "@/env";
export function isStaticWebsiteOnly() {
return env.NEXT_PUBLIC_STATIC_WEBSITE_ONLY === "true";
}
+21 -1
View File
@@ -135,6 +135,20 @@ function getMessagesAfterBaseline(
});
}
export function getVisibleOptimisticMessages(
optimisticMessages: Message[],
previousHumanMessageCount: number,
currentHumanMessageCount: number,
): Message[] {
if (
optimisticMessages.some((message) => message.type === "human") &&
currentHumanMessageCount > previousHumanMessageCount
) {
return [];
}
return optimisticMessages;
}
function getStreamErrorMessage(error: unknown): string {
if (typeof error === "string" && error.trim()) {
return error;
@@ -627,10 +641,16 @@ export function useThreadStream({
messagesRef.current = thread.messages;
}
const visibleOptimisticMessages = getVisibleOptimisticMessages(
optimisticMessages,
prevHumanMsgCountRef.current,
humanMessageCount,
);
const mergedMessages = mergeMessages(
history,
thread.messages,
optimisticMessages,
visibleOptimisticMessages,
);
const pendingUsageMessages = thread.isLoading
? getMessagesAfterBaseline(
+87
View File
@@ -0,0 +1,87 @@
import type { ThreadState } from "@langchain/langgraph-sdk";
import type { ThreadsClient } from "@langchain/langgraph-sdk/client";
import type { AgentThread, AgentThreadState } from "./types";
export const DEMO_THREAD_IDS = [
"21cfea46-34bd-4aa6-9e1f-3009452fbeb9",
"3823e443-4e2b-4679-b496-a9506eae462b",
"4f3e55ee-f853-43db-bfb3-7d1a411f03cb",
"5aa47db1-d0cb-4eb9-aea5-3dac1b371c5a",
"7cfa5f8f-a2f8-47ad-acbd-da7137baf990",
"7f9dc56c-e49c-4671-a3d2-c492ff4dce0c",
"90040b36-7eba-4b97-ba89-02c3ad47a8b9",
"ad76c455-5bf9-4335-8517-fc03834ab828",
"b83fbb2a-4e36-4d82-9de0-7b2a02c2092a",
"c02bb4d5-4202-490e-ae8f-ff4864fc0d2e",
"d3e5adaf-084c-4dd5-9d29-94f1d6bccd98",
"f4125791-0128-402a-8ca9-50e0947557e4",
"fe3f7974-1bcb-4a01-a950-79673baafefd",
] as const;
export type ThreadSearchParams = NonNullable<
Parameters<ThreadsClient["search"]>[0]
>;
export async function loadStaticDemoThreads(
params: ThreadSearchParams = {},
): Promise<AgentThread[]> {
const threads = await Promise.all(
DEMO_THREAD_IDS.map((threadId) => loadStaticDemoThread(threadId)),
);
const sortBy = params.sortBy ?? "updated_at";
const sortOrder = params.sortOrder ?? "desc";
const sortedThreads = [...threads].sort((a, b) => {
const aTimestamp = (a as unknown as Record<string, unknown>)[sortBy];
const bTimestamp = (b as unknown as Record<string, unknown>)[sortBy];
const aParsed = typeof aTimestamp === "string" ? Date.parse(aTimestamp) : 0;
const bParsed = typeof bTimestamp === "string" ? Date.parse(bTimestamp) : 0;
const aValue = Number.isNaN(aParsed) ? 0 : aParsed;
const bValue = Number.isNaN(bParsed) ? 0 : bParsed;
return sortOrder === "asc" ? aValue - bValue : bValue - aValue;
});
const offset = Math.max(0, Math.floor(params.offset ?? 0));
const limit =
typeof params.limit === "number"
? Math.max(0, Math.floor(params.limit))
: sortedThreads.length;
return sortedThreads.slice(offset, offset + limit);
}
export async function loadStaticDemoThread(
threadId: string,
): Promise<AgentThread> {
const response = await globalThis.fetch(
`/demo/threads/${encodeURIComponent(threadId)}/thread.json`,
);
if (!response.ok) {
throw new Error(`Failed to load demo thread ${threadId}`);
}
const thread = (await response.json()) as AgentThread;
return {
...thread,
thread_id: threadId,
updated_at: thread.updated_at ?? thread.created_at,
};
}
export function staticDemoThreadState(
thread: AgentThread,
): ThreadState<AgentThreadState> {
return {
values: thread.values,
next: [],
checkpoint: {
thread_id: thread.thread_id,
checkpoint_ns: "",
checkpoint_id: null,
checkpoint_map: null,
},
metadata: thread.metadata ?? null,
created_at: thread.updated_at ?? thread.created_at ?? null,
parent_checkpoint: null,
tasks: [],
};
}
+174
View File
@@ -0,0 +1,174 @@
import { expect, test } from "@playwright/test";
import { mockLangGraphAPI } from "./utils/mock-api";
const ARTIFACT_PATH = "/artifact-fixtures/report.html";
const MARKDOWN_ARTIFACT_PATH = "/artifact-fixtures/report.md";
const JSON_ARTIFACT_PATH = "/artifact-fixtures/report.json";
const IN_PROGRESS_THREAD_ID = "00000000-0000-0000-0000-000000003119";
const COMPLETE_THREAD_ID = "00000000-0000-0000-0000-000000003120";
const MARKDOWN_THREAD_ID = "00000000-0000-0000-0000-000000003121";
const JSON_THREAD_ID = "00000000-0000-0000-0000-000000003122";
function writeFileMessages({
path = ARTIFACT_PATH,
content = "<!doctype html><html><body><h1>Report draft</h1><p>测试内容</p></body></html>",
toolResult,
}: {
path?: string;
content?: string;
toolResult?: string;
} = {}) {
const messages: unknown[] = [
{
type: "human",
id: "msg-human-artifact",
content: [{ type: "text", text: "Create a report artifact" }],
},
{
type: "ai",
id: "msg-ai-write-artifact",
content: "",
tool_calls: [
{
id: "write-file-artifact",
name: "write_file",
args: {
description: "Writing report artifact",
path,
content,
},
},
],
},
];
if (toolResult !== undefined) {
messages.push({
type: "tool",
id: "msg-tool-write-artifact",
name: "write_file",
tool_call_id: "write-file-artifact",
content: toolResult,
});
}
return messages;
}
test.describe("Artifact preview stability", () => {
test("renders preview iframe for an in-progress write artifact", async ({
page,
}) => {
mockLangGraphAPI(page, {
threads: [
{
thread_id: IN_PROGRESS_THREAD_ID,
title: "Artifact preview in progress",
messages: writeFileMessages(),
},
],
});
await page.goto(`/workspace/chats/${IN_PROGRESS_THREAD_ID}`);
await expect(page.getByText(ARTIFACT_PATH)).toBeVisible({
timeout: 15_000,
});
await page.getByText(ARTIFACT_PATH).click();
const artifactsPanel = page.locator("#artifacts");
await expect(artifactsPanel.getByText("report.html")).toBeVisible();
await expect(
artifactsPanel.locator('iframe[title="Artifact preview"]'),
).toBeVisible();
});
test("renders preview iframe after the write artifact succeeds", async ({
page,
}) => {
mockLangGraphAPI(page, {
threads: [
{
thread_id: COMPLETE_THREAD_ID,
title: "Artifact preview complete",
messages: writeFileMessages({ toolResult: "OK" }),
},
],
});
await page.goto(`/workspace/chats/${COMPLETE_THREAD_ID}`);
await expect(page.getByText(ARTIFACT_PATH)).toBeVisible({
timeout: 15_000,
});
await page.getByText(ARTIFACT_PATH).click();
const artifactsPanel = page.locator("#artifacts");
await expect(artifactsPanel.getByText("report.html")).toBeVisible();
await expect(
artifactsPanel.locator('iframe[title="Artifact preview"]'),
).toBeVisible();
});
test("renders markdown preview for an in-progress write artifact", async ({
page,
}) => {
mockLangGraphAPI(page, {
threads: [
{
thread_id: MARKDOWN_THREAD_ID,
title: "Markdown artifact preview in progress",
messages: writeFileMessages({
path: MARKDOWN_ARTIFACT_PATH,
content: "# Markdown draft\n\n- 测试内容 1\n- English term",
}),
},
],
});
await page.goto(`/workspace/chats/${MARKDOWN_THREAD_ID}`);
await expect(page.getByText(MARKDOWN_ARTIFACT_PATH)).toBeVisible({
timeout: 15_000,
});
await page.getByText(MARKDOWN_ARTIFACT_PATH).click();
const artifactsPanel = page.locator("#artifacts");
await expect(artifactsPanel.getByText("report.md")).toBeVisible();
await expect(artifactsPanel.getByText("Markdown draft")).toBeVisible();
await expect(artifactsPanel.getByText("测试内容 1")).toBeVisible();
});
test("renders code view for an in-progress non-preview write artifact", async ({
page,
}) => {
mockLangGraphAPI(page, {
threads: [
{
thread_id: JSON_THREAD_ID,
title: "JSON artifact code view in progress",
messages: writeFileMessages({
path: JSON_ARTIFACT_PATH,
content:
'{\n "status": "draft",\n "中文字段": "测试内容",\n "count": 3\n}',
}),
},
],
});
await page.goto(`/workspace/chats/${JSON_THREAD_ID}`);
await expect(page.getByText(JSON_ARTIFACT_PATH)).toBeVisible({
timeout: 15_000,
});
await page.getByText(JSON_ARTIFACT_PATH).click();
const artifactsPanel = page.locator("#artifacts");
await expect(artifactsPanel.getByText("report.json")).toBeVisible();
await expect(artifactsPanel.getByText('"status": "draft"')).toBeVisible();
await expect(
artifactsPanel.getByText('"中文字段": "测试内容"'),
).toBeVisible();
});
});
+52 -4
View File
@@ -25,6 +25,8 @@ export type MockThread = {
title?: string;
updated_at?: string;
agent_name?: string;
messages?: unknown[];
artifacts?: string[];
};
export type MockAgent = {
@@ -113,7 +115,7 @@ export function mockLangGraphAPI(page: Page, options?: MockAPIOptions) {
{
values: {
title: matchingThread.title ?? "Untitled",
messages: [
messages: matchingThread.messages ?? [
{
type: "human",
id: `msg-human-${matchingThread.thread_id}`,
@@ -125,6 +127,7 @@ export function mockLangGraphAPI(page: Page, options?: MockAPIOptions) {
content: `Response in thread ${matchingThread.title ?? matchingThread.thread_id}`,
},
],
artifacts: matchingThread.artifacts ?? [],
},
next: [],
metadata: {},
@@ -155,7 +158,7 @@ export function mockLangGraphAPI(page: Page, options?: MockAPIOptions) {
values: {
title: matchingThread?.title ?? "Untitled",
messages: matchingThread
? [
? (matchingThread.messages ?? [
{
type: "human",
id: `msg-human-${matchingThread.thread_id}`,
@@ -166,8 +169,9 @@ export function mockLangGraphAPI(page: Page, options?: MockAPIOptions) {
id: `msg-ai-${matchingThread.thread_id}`,
content: `Response in thread ${matchingThread.title ?? matchingThread.thread_id}`,
},
]
])
: [],
artifacts: matchingThread?.artifacts ?? [],
},
next: [],
metadata: {},
@@ -183,15 +187,59 @@ export function mockLangGraphAPI(page: Page, options?: MockAPIOptions) {
// followed by `?` or end-of-string. This must NOT match `/runs/stream`.
void page.route(/\/api\/langgraph\/threads\/[^/]+\/runs(\?|$)/, (route) => {
if (route.request().method() === "GET") {
const url = route.request().url();
const matchingThread = threads.find((t) => url.includes(t.thread_id));
return route.fulfill({
status: 200,
contentType: "application/json",
body: "[]",
body: JSON.stringify(
matchingThread
? [
{
run_id: `run-${matchingThread.thread_id}`,
thread_id: matchingThread.thread_id,
assistant_id: "lead_agent",
status: "success",
metadata: {},
kwargs: {},
created_at: "2025-01-01T00:00:00Z",
updated_at:
matchingThread.updated_at ?? "2025-01-01T00:00:00Z",
},
]
: [],
),
});
}
return route.fallback();
});
void page.route(
/\/api\/threads\/([^/]+)\/runs\/([^/]+)\/messages/,
(route) => {
if (route.request().method() === "GET") {
const url = route.request().url();
const matchingThread = threads.find((t) =>
url.includes(`/api/threads/${t.thread_id}/runs/`),
);
return route.fulfill({
status: 200,
contentType: "application/json",
body: JSON.stringify({
data: (matchingThread?.messages ?? []).map((message, index) => ({
run_id: `run-${matchingThread?.thread_id ?? "unknown"}`,
content: message,
metadata: { caller: "lead_agent" },
created_at: `2025-01-01T00:00:${String(index).padStart(2, "0")}Z`,
})),
hasMore: false,
}),
});
}
return route.fallback();
},
);
// Run stream — returns a minimal SSE response with an AI message
void page.route("**/api/langgraph/runs/stream", handleRunStream);
void page.route("**/api/langgraph/threads/*/runs/stream", handleRunStream);
@@ -0,0 +1,310 @@
import { expect, test } from "vitest";
import {
appendHtmlPreviewBaseHref,
appendHtmlPreviewScrollRestoration,
buildWriteFileDraftContent,
createHtmlPreviewScrollKey,
getArtifactViewState,
} from "@/core/artifacts/preview";
const ARTIFACT_PATH = "/artifact-fixtures/report.html";
const UNSUPPORTED_ARTIFACT_PATH = "/artifact-fixtures/data.csv";
test("allows in-progress write artifacts to render a throttled preview", () => {
expect(
getArtifactViewState({
filepath: `write-file:${ARTIFACT_PATH}?message_id=ai-1&tool_call_id=call-1`,
isSupportPreview: true,
}),
).toEqual({
canPreview: true,
initialViewMode: "preview",
});
});
test("allows preview for a write artifact once the tool call has a result", () => {
expect(
getArtifactViewState({
filepath: `write-file:${ARTIFACT_PATH}?message_id=ai-1&tool_call_id=call-1`,
isSupportPreview: true,
toolResult: "OK",
}),
).toEqual({
canPreview: true,
initialViewMode: "preview",
});
});
test("keeps failed write artifacts in code view", () => {
expect(
getArtifactViewState({
filepath: `write-file:${ARTIFACT_PATH}?message_id=ai-1&tool_call_id=call-1`,
isSupportPreview: true,
toolResult: "Error: Failed to write file",
}),
).toEqual({
canPreview: false,
initialViewMode: "code",
});
});
test("keeps completed artifacts on their existing preview defaults", () => {
expect(
getArtifactViewState({
filepath: ARTIFACT_PATH,
isSupportPreview: true,
}),
).toEqual({
canPreview: true,
initialViewMode: "preview",
});
});
test("keeps unsupported artifacts in code view", () => {
expect(
getArtifactViewState({
filepath: UNSUPPORTED_ARTIFACT_PATH,
isSupportPreview: false,
}),
).toEqual({
canPreview: false,
initialViewMode: "code",
});
});
test("builds a draft write-file artifact from successful writes plus the selected in-progress append", () => {
const filepath = `write-file:${ARTIFACT_PATH}?message_id=ai-2&tool_call_id=call-2`;
expect(
buildWriteFileDraftContent({
filepath,
messages: [
{
type: "ai",
id: "ai-1",
tool_calls: [
{
id: "call-1",
name: "write_file",
args: {
path: ARTIFACT_PATH,
content: "<!doctype html><html><body>",
},
},
],
},
{
type: "tool",
id: "tool-1",
name: "write_file",
tool_call_id: "call-1",
content: "OK",
},
{
type: "ai",
id: "ai-2",
tool_calls: [
{
id: "call-2",
name: "write_file",
args: {
append: true,
path: ARTIFACT_PATH,
content: "<p>追加内容</p>",
},
},
],
},
],
}),
).toBe("<!doctype html><html><body><p>追加内容</p>");
});
test("does not include failed writes in a draft artifact", () => {
const filepath = `write-file:${ARTIFACT_PATH}?message_id=ai-3&tool_call_id=call-3`;
expect(
buildWriteFileDraftContent({
filepath,
messages: [
{
type: "ai",
id: "ai-1",
tool_calls: [
{
id: "call-1",
name: "write_file",
args: {
path: ARTIFACT_PATH,
content: "<html>",
},
},
],
},
{
type: "tool",
id: "tool-1",
name: "write_file",
tool_call_id: "call-1",
content: "OK",
},
{
type: "ai",
id: "ai-2",
tool_calls: [
{
id: "call-2",
name: "write_file",
args: {
append: true,
path: ARTIFACT_PATH,
content: "<p>失败内容</p>",
},
},
],
},
{
type: "tool",
id: "tool-2",
name: "write_file",
tool_call_id: "call-2",
content: "Error: write failed",
},
{
type: "ai",
id: "ai-3",
tool_calls: [
{
id: "call-3",
name: "write_file",
args: {
append: true,
path: ARTIFACT_PATH,
content: "</html>",
},
},
],
},
],
}),
).toBe("<html></html>");
});
test("returns undefined when the selected append failed so the caller can fall back", () => {
const filepath = `write-file:${ARTIFACT_PATH}?message_id=ai-2&tool_call_id=call-2`;
expect(
buildWriteFileDraftContent({
filepath,
messages: [
{
type: "ai",
id: "ai-1",
tool_calls: [
{
id: "call-1",
name: "write_file",
args: {
path: ARTIFACT_PATH,
content: "<html>",
},
},
],
},
{
type: "tool",
id: "tool-1",
name: "write_file",
tool_call_id: "call-1",
content: "OK",
},
{
type: "ai",
id: "ai-2",
tool_calls: [
{
id: "call-2",
name: "write_file",
args: {
append: true,
path: ARTIFACT_PATH,
content: "<p>失败的追加内容</p>",
},
},
],
},
{
type: "tool",
id: "tool-2",
name: "write_file",
tool_call_id: "call-2",
content: "Error: write failed",
},
],
}),
).toBeUndefined();
});
test("injects scroll restoration at the start of the HTML head", () => {
const html =
'<!doctype html><html><head><meta http-equiv="Content-Security-Policy" content="script-src \'none\'"></head><body><main>content</main></body></html>';
expect(appendHtmlPreviewScrollRestoration(html, ARTIFACT_PATH)).toContain(
"<script data-deerflow-artifact-scroll-restoration>",
);
expect(appendHtmlPreviewScrollRestoration(html, ARTIFACT_PATH)).toContain(
"<head><script data-deerflow-artifact-scroll-restoration>",
);
});
test("preserves existing head elements when injecting scroll restoration", () => {
const html =
'<!doctype html><html><head><meta http-equiv="Content-Security-Policy" content="script-src \'none\'"></head><body><main>content</main></body></html>';
const result = appendHtmlPreviewScrollRestoration(
appendHtmlPreviewBaseHref(
html,
"/demo/threads/thread-1/user-data/outputs/report.html?download=true",
"http://localhost/workspace/chats/thread-1",
),
ARTIFACT_PATH,
);
expect(result).toContain(
'<base href="http://localhost/demo/threads/thread-1/user-data/outputs/">',
);
expect(
result.indexOf("data-deerflow-artifact-scroll-restoration"),
).toBeLessThan(
result.indexOf(
'<base href="http://localhost/demo/threads/thread-1/user-data/outputs/">',
),
);
});
test("does not duplicate HTML scroll restoration script", () => {
const html = appendHtmlPreviewScrollRestoration(
"<html><body>x</body></html>",
);
expect(
appendHtmlPreviewScrollRestoration(html).match(
/data-deerflow-artifact-scroll-restoration/g,
),
).toHaveLength(1);
});
test("scopes HTML scroll restoration without exposing the artifact path", () => {
const artifactPath =
'/artifact-fixtures/a</script><script>alert("x")</script>.html';
const html = appendHtmlPreviewScrollRestoration(
"<html><body>x</body></html>",
artifactPath,
);
expect(html).toContain(createHtmlPreviewScrollKey(artifactPath));
expect(html).toContain("window.parent.postMessage");
expect(html).not.toContain("window.name");
expect(html).not.toContain("/artifact-fixtures/a");
expect(html).not.toContain("<script>alert");
});
@@ -0,0 +1,69 @@
import { afterEach, beforeEach, describe, expect, test, vi } from "vitest";
const ENV_KEYS = [
"NEXT_PUBLIC_BACKEND_BASE_URL",
"NEXT_PUBLIC_STATIC_WEBSITE_ONLY",
] as const;
type EnvSnapshot = Partial<
Record<(typeof ENV_KEYS)[number], string | undefined>
>;
function snapshotEnv(): EnvSnapshot {
const snapshot: EnvSnapshot = {};
for (const key of ENV_KEYS) {
snapshot[key] = process.env[key];
}
return snapshot;
}
function setEnv(key: (typeof ENV_KEYS)[number], value: string | undefined) {
const env = process.env as Record<string, string | undefined>;
if (value === undefined) {
delete env[key];
} else {
env[key] = value;
}
}
function restoreEnv(snapshot: EnvSnapshot) {
for (const key of ENV_KEYS) {
setEnv(key, snapshot[key]);
}
}
async function loadFreshArtifactUtils() {
vi.resetModules();
return await import("@/core/artifacts/utils");
}
describe("artifact URL helpers", () => {
let saved: EnvSnapshot;
beforeEach(() => {
saved = snapshotEnv();
setEnv("NEXT_PUBLIC_BACKEND_BASE_URL", undefined);
setEnv("NEXT_PUBLIC_STATIC_WEBSITE_ONLY", undefined);
});
afterEach(() => {
restoreEnv(saved);
});
test("maps static demo artifact paths to bundled public files", async () => {
setEnv("NEXT_PUBLIC_STATIC_WEBSITE_ONLY", "true");
const { resolveArtifactURL, urlOfArtifact } =
await loadFreshArtifactUtils();
expect(
urlOfArtifact({
filepath: "/mnt/user-data/outputs/index.html",
threadId: "thread-1",
}),
).toBe("/demo/threads/thread-1/user-data/outputs/index.html");
expect(
resolveArtifactURL("/mnt/user-data/outputs/style.css", "thread-1"),
).toBe("/demo/threads/thread-1/user-data/outputs/style.css");
});
});
@@ -0,0 +1,77 @@
import { afterEach, beforeEach, describe, expect, test, vi } from "vitest";
import { STATIC_WEBSITE_USER } from "@/core/auth/static-user";
vi.mock("next/headers", () => ({
cookies: vi.fn(() => {
throw new Error("cookies should not be read in static website mode");
}),
}));
const ENV_KEYS = [
"DEER_FLOW_AUTH_DISABLED",
"NEXT_PUBLIC_STATIC_WEBSITE_ONLY",
] as const;
type EnvSnapshot = Partial<
Record<(typeof ENV_KEYS)[number], string | undefined>
>;
function snapshotEnv(): EnvSnapshot {
const snapshot: EnvSnapshot = {};
for (const key of ENV_KEYS) {
snapshot[key] = process.env[key];
}
return snapshot;
}
function setEnv(key: (typeof ENV_KEYS)[number], value: string | undefined) {
const env = process.env as Record<string, string | undefined>;
if (value === undefined) {
delete env[key];
} else {
env[key] = value;
}
}
function restoreEnv(snapshot: EnvSnapshot) {
for (const key of ENV_KEYS) {
setEnv(key, snapshot[key]);
}
}
async function loadFreshServerAuth() {
vi.resetModules();
return await import("@/core/auth/server");
}
describe("getServerSideUser", () => {
let saved: EnvSnapshot;
beforeEach(() => {
saved = snapshotEnv();
setEnv("DEER_FLOW_AUTH_DISABLED", undefined);
setEnv("NEXT_PUBLIC_STATIC_WEBSITE_ONLY", undefined);
});
afterEach(() => {
restoreEnv(saved);
vi.unstubAllGlobals();
});
test("bypasses gateway auth in static website mode", async () => {
setEnv("NEXT_PUBLIC_STATIC_WEBSITE_ONLY", "true");
const fetchSpy = vi.fn(() => {
throw new Error("fetch should not be called in static website mode");
});
vi.stubGlobal("fetch", fetchSpy);
const { getServerSideUser } = await loadFreshServerAuth();
await expect(getServerSideUser()).resolves.toEqual({
tag: "authenticated",
user: STATIC_WEBSITE_USER,
});
expect(fetchSpy).not.toHaveBeenCalled();
});
});
@@ -2,8 +2,11 @@ import type { Message } from "@langchain/langgraph-sdk";
import { expect, test } from "vitest";
import {
getAssistantTurnCopyData,
getAssistantTurnUsageMessages,
getMessageGroups,
getStreamingMessageLookup,
isAssistantMessageGroupStreaming,
} from "@/core/messages/utils";
test("aggregates token usage messages once per assistant turn", () => {
@@ -97,3 +100,272 @@ test("hides internal todo reminder messages from message groups", () => {
groups.flatMap((group) => group.messages).map((message) => message.id),
).toEqual(["human-1", "ai-1"]);
});
test("hides assistant copy data while that turn is streaming", () => {
const messages = [
{
id: "ai-1",
type: "ai",
content: "Partial answer",
},
] as Message[];
expect(getAssistantTurnCopyData(messages)).toBe("Partial answer");
expect(getAssistantTurnCopyData(messages, { isStreaming: true })).toBeNull();
});
test("marks the latest assistant message as streaming", () => {
const messages = [
{
id: "human-1",
type: "human",
content: "Hello",
},
{
id: "ai-1",
type: "ai",
content: "Still generating",
},
] as Message[];
const groups = getMessageGroups(messages);
const assistantGroupIndex = groups.findIndex(
(group) => group.type === "assistant",
);
expect(
isAssistantMessageGroupStreaming(
groups[assistantGroupIndex]?.messages ?? [],
getStreamingMessageLookup(messages, true, () => ({
streamMetadata: { langgraph_node: "agent" },
})),
),
).toBe(true);
expect(
isAssistantMessageGroupStreaming(
groups[assistantGroupIndex]?.messages ?? [],
getStreamingMessageLookup(messages, false, () => ({
streamMetadata: { langgraph_node: "agent" },
})),
),
).toBe(false);
});
test("keeps previous assistant copyable while waiting for a new visible answer", () => {
const messages = [
{
id: "human-1",
type: "human",
content: "Hello",
},
{
id: "ai-1",
type: "ai",
content: "Completed answer",
},
{
id: "opt-human-1",
type: "human",
content: "Continue",
},
] as Message[];
const groups = getMessageGroups(messages);
const assistantGroupIndex = groups.findIndex(
(group) => group.type === "assistant",
);
expect(
isAssistantMessageGroupStreaming(
groups[assistantGroupIndex]?.messages ?? [],
getStreamingMessageLookup(messages, true),
),
).toBe(false);
});
test("keeps previous assistant copyable while a hidden send is starting", () => {
const messages = [
{
id: "human-1",
type: "human",
content: "Hello",
},
{
id: "ai-1",
type: "ai",
content: "Completed answer",
},
] as Message[];
const groups = getMessageGroups(messages);
const assistantGroupIndex = groups.findIndex(
(group) => group.type === "assistant",
);
expect(
isAssistantMessageGroupStreaming(
groups[assistantGroupIndex]?.messages ?? [],
getStreamingMessageLookup(messages, true),
),
).toBe(false);
});
test("keeps previous assistant copyable after a hidden send is appended", () => {
const messages = [
{
id: "human-1",
type: "human",
content: "Hello",
},
{
id: "ai-1",
type: "ai",
content: "Completed answer",
},
{
id: "human-hidden",
type: "human",
content: "Save this agent",
additional_kwargs: { hide_from_ui: true },
},
] as Message[];
const groups = getMessageGroups(messages);
const assistantGroupIndex = groups.findIndex(
(group) => group.type === "assistant",
);
expect(
isAssistantMessageGroupStreaming(
groups[assistantGroupIndex]?.messages ?? [],
getStreamingMessageLookup(messages, true),
),
).toBe(false);
});
test("uses stream metadata to identify an assistant before optimistic input", () => {
const messages = [
{
id: "human-1",
type: "human",
content: "Hello",
},
{
id: "ai-1",
type: "ai",
content: "Completed answer",
},
{
id: "ai-2",
type: "ai",
content: "Still generating",
},
{
id: "opt-human-1",
type: "human",
content: "Continue",
},
] as Message[];
const assistantGroups = getMessageGroups(messages).filter(
(group) => group.type === "assistant",
);
const groups = getMessageGroups(messages);
const assistantGroupIndexes = groups
.map((group, index) => (group.type === "assistant" ? index : -1))
.filter((index) => index >= 0);
expect(
isAssistantMessageGroupStreaming(
groups[assistantGroupIndexes[0] ?? -1]?.messages ?? [],
getStreamingMessageLookup(messages, true, (message) =>
message.id === "ai-2"
? { streamMetadata: { langgraph_node: "agent" } }
: undefined,
),
),
).toBe(false);
expect(
isAssistantMessageGroupStreaming(
groups[assistantGroupIndexes[1] ?? -1]?.messages ?? [],
getStreamingMessageLookup(messages, true, (message) =>
message.id === "ai-2"
? { streamMetadata: { langgraph_node: "agent" } }
: undefined,
),
),
).toBe(true);
expect(assistantGroups.map((group) => group.id)).toEqual(["ai-1", "ai-2"]);
});
test("does not mark a completed assistant group streaming from a later processing group", () => {
const messages = [
{
id: "human-1",
type: "human",
content: "Hello",
},
{
id: "ai-1",
type: "ai",
content: "Visible answer",
},
{
id: "ai-2",
type: "ai",
content: "",
tool_calls: [{ id: "tool-1", name: "web_search", args: {} }],
},
] as Message[];
const groups = getMessageGroups(messages);
const assistantGroupIndex = groups.findIndex(
(group) => group.type === "assistant",
);
expect(groups.map((group) => group.type)).toEqual([
"human",
"assistant",
"assistant:processing",
]);
expect(
isAssistantMessageGroupStreaming(
groups[assistantGroupIndex]?.messages ?? [],
getStreamingMessageLookup(messages, true, (message) =>
message.id === "ai-2"
? { streamMetadata: { langgraph_node: "agent" } }
: undefined,
),
),
).toBe(false);
});
test("keeps streaming assistant hidden when a hidden control message follows it", () => {
const messages = [
{
id: "human-1",
type: "human",
content: "Hello",
},
{
id: "ai-1",
type: "ai",
content: "Still generating",
},
{
id: "human-hidden",
type: "human",
content: "Save this agent",
additional_kwargs: { hide_from_ui: true },
},
] as Message[];
const groups = getMessageGroups(messages);
const assistantGroupIndex = groups.findIndex(
(group) => group.type === "assistant",
);
expect(
isAssistantMessageGroupStreaming(
groups[assistantGroupIndex]?.messages ?? [],
getStreamingMessageLookup(messages, true, (message) =>
message.id === "ai-1"
? { streamMetadata: { langgraph_node: "agent" } }
: undefined,
),
),
).toBe(true);
});
@@ -1,7 +1,10 @@
import type { Message } from "@langchain/langgraph-sdk";
import { expect, test } from "vitest";
import { mergeMessages } from "@/core/threads/hooks";
import {
getVisibleOptimisticMessages,
mergeMessages,
} from "@/core/threads/hooks";
test("mergeMessages removes duplicate messages already present in history", () => {
const human = {
@@ -62,3 +65,93 @@ test("mergeMessages deduplicates tool messages by tool_call_id", () => {
expect(mergeMessages([oldTool], [liveTool], [])).toEqual([liveTool]);
});
test("getVisibleOptimisticMessages hides optimistic user input after server human arrives", () => {
const optimisticHuman = {
id: "opt-human-1",
type: "human",
content: "hello",
} as Message;
expect(getVisibleOptimisticMessages([optimisticHuman], 0, 1)).toEqual([]);
});
test("mergeMessages shows server human instead of optimistic duplicate after first response", () => {
const serverHuman = {
id: "server-human-1",
type: "human",
content: "hello",
} as Message;
const optimisticHuman = {
id: "opt-human-1",
type: "human",
content: "hello",
} as Message;
const visibleOptimistic = getVisibleOptimisticMessages(
[optimisticHuman],
0,
1,
);
expect(mergeMessages([], [serverHuman], visibleOptimistic)).toEqual([
serverHuman,
]);
});
test("getVisibleOptimisticMessages keeps optimistic user input until server human arrives", () => {
const optimisticHuman = {
id: "opt-human-1",
type: "human",
content: "hello",
} as Message;
expect(getVisibleOptimisticMessages([optimisticHuman], 0, 0)).toEqual([
optimisticHuman,
]);
});
test("getVisibleOptimisticMessages keeps non-human optimistic status messages", () => {
const optimisticAi = {
id: "opt-ai-1",
type: "ai",
content: "Uploading files...",
} as Message;
expect(getVisibleOptimisticMessages([optimisticAi], 0, 1)).toEqual([
optimisticAi,
]);
});
test("getVisibleOptimisticMessages hides the upload optimistic pair after server human arrives", () => {
const optimisticHuman = {
id: "opt-human-1",
type: "human",
content: "upload this",
} as Message;
const optimisticUploadingAi = {
id: "opt-ai-uploading",
type: "ai",
content: "Uploading files...",
} as Message;
expect(
getVisibleOptimisticMessages(
[optimisticHuman, optimisticUploadingAi],
0,
1,
),
).toEqual([]);
});
test("getVisibleOptimisticMessages hides optimistic user input after later server turns", () => {
const optimisticHuman = {
id: "opt-human-2",
type: "human",
content: "follow up",
} as Message;
expect(getVisibleOptimisticMessages([optimisticHuman], 3, 4)).toEqual([]);
expect(getVisibleOptimisticMessages([optimisticHuman], 3, 3)).toEqual([
optimisticHuman,
]);
});