Compare commits

..

3 Commits

Author SHA1 Message Date
Nan Gao a64a39dbc0 config: raise default summarization trigger before v2.0-m1 (#3174)
* config: update summarization configuration

* docs: sync summarization trigger guidance
2026-05-23 15:38:25 +08:00
JeffJiang b103d1a7f5 feat(frontend): support static website demo mode (#3170)
* feat(frontend): support static website demo mode

* fix(frontend): render html artifact previews from blob content

* chore(frontend): apply pre-commit formatting

* fix(frontend): address static demo PR review comments

* Update the release information of DeerFlow

---------

Co-authored-by: Willem Jiang <willem.jiang@gmail.com>
2026-05-23 00:10:56 +08:00
AochenShen99 66d6a6a4e8 fix: harden run finalization persistence (#3155)
* fix: harden run finalization persistence

* style: format gateway recovery test

* fix: align run repository return types

* fix: harden completion recovery follow-up
2026-05-23 00:09:06 +08:00
36 changed files with 1236 additions and 205 deletions
+35
View File
@@ -37,11 +37,36 @@ if TYPE_CHECKING:
from app.gateway.auth.local_provider import LocalAuthProvider from app.gateway.auth.local_provider import LocalAuthProvider
from app.gateway.auth.repositories.sqlite import SQLiteUserRepository from app.gateway.auth.repositories.sqlite import SQLiteUserRepository
from deerflow.persistence.thread_meta.base import ThreadMetaStore from deerflow.persistence.thread_meta.base import ThreadMetaStore
from deerflow.runtime import RunRecord
T = TypeVar("T") T = TypeVar("T")
async def _mark_latest_recovered_threads_error(
run_manager: RunManager,
thread_store: ThreadMetaStore,
recovered_runs: list[RunRecord],
) -> None:
"""Mark thread status as error only when its newest run was recovered."""
recovered_by_thread: dict[str, set[str]] = {}
for record in recovered_runs:
recovered_by_thread.setdefault(record.thread_id, set()).add(record.run_id)
for thread_id, recovered_run_ids in recovered_by_thread.items():
try:
latest_runs = await run_manager.list_by_thread(thread_id, user_id=None, limit=1)
except Exception:
logger.warning("Failed to find latest run for thread %s during run reconciliation", thread_id, exc_info=True)
continue
if not latest_runs or latest_runs[0].run_id not in recovered_run_ids:
continue
try:
await thread_store.update_status(thread_id, "error", user_id=None)
except Exception:
logger.warning("Failed to mark thread %s as error during run reconciliation", thread_id, exc_info=True)
def get_config() -> AppConfig: def get_config() -> AppConfig:
"""Return the freshest ``AppConfig`` for the current request. """Return the freshest ``AppConfig`` for the current request.
@@ -138,6 +163,16 @@ async def langgraph_runtime(app: FastAPI, startup_config: AppConfig) -> AsyncGen
# RunManager with store backing for persistence # RunManager with store backing for persistence
app.state.run_manager = RunManager(store=app.state.run_store) app.state.run_manager = RunManager(store=app.state.run_store)
if getattr(config.database, "backend", None) == "sqlite":
from deerflow.utils.time import now_iso
# Startup-only recovery: clean shutdowns return no active rows and
# the thread-status update below becomes a no-op.
recovered_runs = await app.state.run_manager.reconcile_orphaned_inflight_runs(
error="Gateway restarted before this run reached a durable final state.",
before=now_iso(),
)
await _mark_latest_recovered_threads_error(app.state.run_manager, app.state.thread_store, recovered_runs)
try: try:
yield yield
-28
View File
@@ -74,25 +74,6 @@ def _make_file_sandbox_writable(file_path: os.PathLike[str] | str) -> None:
os.chmod(file_path, writable_mode, **chmod_kwargs) os.chmod(file_path, writable_mode, **chmod_kwargs)
def _make_file_sandbox_readable(file_path: os.PathLike[str] | str) -> None:
"""Ensure uploaded files are readable by the sandbox process.
For Docker sandboxes (AIO), the gateway writes files as root with 0o600
permissions, then bind-mounts the host directory into the container. The
sandbox process inside the container runs as a non-root user and cannot
read those files without group/other read bits. This function adds
``S_IRGRP | S_IROTH`` so the sandbox can read the uploaded content.
"""
file_stat = os.lstat(file_path)
if stat.S_ISLNK(file_stat.st_mode):
logger.warning("Skipping sandbox chmod for symlinked upload path: %s", file_path)
return
readable_mode = stat.S_IMODE(file_stat.st_mode) | stat.S_IRGRP | stat.S_IROTH
chmod_kwargs = {"follow_symlinks": False} if os.chmod in os.supports_follow_symlinks else {}
os.chmod(file_path, readable_mode, **chmod_kwargs)
def _uses_thread_data_mounts(sandbox_provider: SandboxProvider) -> bool: def _uses_thread_data_mounts(sandbox_provider: SandboxProvider) -> bool:
return bool(getattr(sandbox_provider, "uses_thread_data_mounts", False)) return bool(getattr(sandbox_provider, "uses_thread_data_mounts", False))
@@ -295,15 +276,6 @@ async def upload_files(
_cleanup_uploaded_paths(written_paths) _cleanup_uploaded_paths(written_paths)
raise HTTPException(status_code=500, detail=f"Failed to upload {file.filename}: {str(e)}") raise HTTPException(status_code=500, detail=f"Failed to upload {file.filename}: {str(e)}")
# When the sandbox uses bind-mounted thread data directories (e.g. AIO with
# LocalContainerBackend), uploaded files are visible inside the container but
# retain the 0o600 permissions set by the gateway. The sandbox process runs
# as a different user and cannot read them. Adjust permissions to add
# group/other read bits so the sandbox can access the files.
if not sync_to_sandbox and getattr(sandbox_provider, "needs_upload_permission_adjustment", True):
for file_path in written_paths:
_make_file_sandbox_readable(file_path)
if sync_to_sandbox: if sync_to_sandbox:
for file_path, virtual_path in sandbox_sync_targets: for file_path, virtual_path in sandbox_sync_targets:
_make_file_sandbox_writable(file_path) _make_file_sandbox_writable(file_path)
@@ -94,25 +94,35 @@ class RunRepository(RunStore):
created_at=None, created_at=None,
follow_up_to_run_id=None, follow_up_to_run_id=None,
): ):
"""Insert or update a run row.
``RunManager`` retries ``put`` after transient SQLite failures. Making
this operation idempotent prevents a successful-but-unacknowledged first
commit from turning the retry into a primary-key failure.
"""
resolved_user_id = resolve_user_id(user_id, method_name="RunRepository.put") resolved_user_id = resolve_user_id(user_id, method_name="RunRepository.put")
now = datetime.now(UTC) now = datetime.now(UTC)
row = RunRow( created = datetime.fromisoformat(created_at) if created_at else now
run_id=run_id, values = {
thread_id=thread_id, "thread_id": thread_id,
assistant_id=assistant_id, "assistant_id": assistant_id,
user_id=resolved_user_id, "user_id": resolved_user_id,
model_name=self._normalize_model_name(model_name), "model_name": self._normalize_model_name(model_name),
status=status, "status": status,
multitask_strategy=multitask_strategy, "multitask_strategy": multitask_strategy,
metadata_json=self._safe_json(metadata) or {}, "metadata_json": self._safe_json(metadata) or {},
kwargs_json=self._safe_json(kwargs) or {}, "kwargs_json": self._safe_json(kwargs) or {},
error=error, "error": error,
follow_up_to_run_id=follow_up_to_run_id, "follow_up_to_run_id": follow_up_to_run_id,
created_at=datetime.fromisoformat(created_at) if created_at else now, "updated_at": now,
updated_at=now, }
)
async with self._sf() as session: async with self._sf() as session:
session.add(row) row = await session.get(RunRow, run_id)
if row is None:
session.add(RunRow(run_id=run_id, created_at=created, **values))
else:
for key, value in values.items():
setattr(row, key, value)
await session.commit() await session.commit()
async def get( async def get(
@@ -146,13 +156,14 @@ class RunRepository(RunStore):
result = await session.execute(stmt) result = await session.execute(stmt)
return [self._row_to_dict(r) for r in result.scalars()] return [self._row_to_dict(r) for r in result.scalars()]
async def update_status(self, run_id, status, *, error=None): async def update_status(self, run_id, status, *, error=None) -> bool:
values: dict[str, Any] = {"status": status, "updated_at": datetime.now(UTC)} values: dict[str, Any] = {"status": status, "updated_at": datetime.now(UTC)}
if error is not None: if error is not None:
values["error"] = error values["error"] = error
async with self._sf() as session: async with self._sf() as session:
await session.execute(update(RunRow).where(RunRow.run_id == run_id).values(**values)) result = await session.execute(update(RunRow).where(RunRow.run_id == run_id).values(**values))
await session.commit() await session.commit()
return result.rowcount != 0
async def update_model_name(self, run_id, model_name): async def update_model_name(self, run_id, model_name):
async with self._sf() as session: async with self._sf() as session:
@@ -187,6 +198,26 @@ class RunRepository(RunStore):
result = await session.execute(stmt) result = await session.execute(stmt)
return [self._row_to_dict(r) for r in result.scalars()] return [self._row_to_dict(r) for r in result.scalars()]
async def list_inflight(self, *, before=None):
"""Return persisted active runs for startup recovery."""
if before is None:
before_dt = datetime.now(UTC)
elif isinstance(before, datetime):
before_dt = before
else:
before_dt = datetime.fromisoformat(before)
stmt = (
select(RunRow)
.where(
RunRow.status.in_(("pending", "running")),
RunRow.created_at <= before_dt,
)
.order_by(RunRow.created_at.asc())
)
async with self._sf() as session:
result = await session.execute(stmt)
return [self._row_to_dict(r) for r in result.scalars()]
async def update_run_completion( async def update_run_completion(
self, self,
run_id: str, run_id: str,
@@ -203,8 +234,11 @@ class RunRepository(RunStore):
last_ai_message: str | None = None, last_ai_message: str | None = None,
first_human_message: str | None = None, first_human_message: str | None = None,
error: str | None = None, error: str | None = None,
) -> None: ) -> bool:
"""Update status + token usage + convenience fields on run completion.""" """Update status + token usage + convenience fields on run completion.
Returns ``False`` when no run row matched the requested ``run_id``.
"""
values: dict[str, Any] = { values: dict[str, Any] = {
"status": status, "status": status,
"total_input_tokens": total_input_tokens, "total_input_tokens": total_input_tokens,
@@ -224,8 +258,9 @@ class RunRepository(RunStore):
if error is not None: if error is not None:
values["error"] = error values["error"] = error
async with self._sf() as session: async with self._sf() as session:
await session.execute(update(RunRow).where(RunRow.run_id == run_id).values(**values)) result = await session.execute(update(RunRow).where(RunRow.run_id == run_id).values(**values))
await session.commit() await session.commit()
return result.rowcount != 0
async def update_run_progress( async def update_run_progress(
self, self,
@@ -4,7 +4,9 @@ from __future__ import annotations
import asyncio import asyncio
import logging import logging
import sqlite3
import uuid import uuid
from collections.abc import Awaitable, Callable
from dataclasses import dataclass, field from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any from typing import TYPE_CHECKING, Any
@@ -17,6 +19,57 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
_RETRYABLE_SQLITE_MESSAGES = (
"database is locked",
"database table is locked",
"database is busy",
)
_RETRYABLE_SQLITE_ERROR_CODES = {
sqlite3.SQLITE_BUSY,
sqlite3.SQLITE_LOCKED,
}
def _is_retryable_persistence_error(exc: BaseException) -> bool:
"""Return True for transient SQLite persistence failures.
SQLite lock contention normally surfaces through either sqlite3 exceptions
or SQLAlchemy wrappers. The short bounded retry here protects run status
finalization from transient writer pressure without hiding permanent
failures forever.
"""
pending: list[BaseException] = [exc]
seen: set[int] = set()
while pending:
current = pending.pop()
if id(current) in seen:
continue
seen.add(id(current))
message = str(current).lower()
if any(fragment in message for fragment in _RETRYABLE_SQLITE_MESSAGES):
return True
if isinstance(current, (sqlite3.OperationalError, sqlite3.DatabaseError)):
error_code = getattr(current, "sqlite_errorcode", None)
if error_code in _RETRYABLE_SQLITE_ERROR_CODES:
return True
for chained in (getattr(current, "orig", None), current.__cause__, current.__context__):
if isinstance(chained, BaseException):
pending.append(chained)
return False
@dataclass(frozen=True)
class PersistenceRetryPolicy:
"""Bounded retry policy for short run-store writes."""
max_attempts: int = 5
initial_delay: float = 0.05
max_delay: float = 1.0
backoff_factor: float = 2.0
@dataclass @dataclass
class RunRecord: class RunRecord:
@@ -58,38 +111,100 @@ class RunManager:
that run history survives process restarts. that run history survives process restarts.
""" """
def __init__(self, store: RunStore | None = None) -> None: def __init__(
self,
store: RunStore | None = None,
*,
persistence_retry_policy: PersistenceRetryPolicy | None = None,
) -> None:
self._runs: dict[str, RunRecord] = {} self._runs: dict[str, RunRecord] = {}
self._lock = asyncio.Lock() self._lock = asyncio.Lock()
self._store = store self._store = store
self._persistence_retry_policy = persistence_retry_policy or PersistenceRetryPolicy()
async def _persist_to_store(self, record: RunRecord) -> None: @staticmethod
"""Best-effort persist run record to backing store.""" def _store_put_payload(record: RunRecord, *, error: str | None = None) -> dict[str, Any]:
if self._store is None: return {
return "thread_id": record.thread_id,
"assistant_id": record.assistant_id,
"status": record.status.value,
"multitask_strategy": record.multitask_strategy,
"metadata": record.metadata or {},
"kwargs": record.kwargs or {},
"error": error if error is not None else record.error,
"created_at": record.created_at,
"model_name": record.model_name,
}
async def _call_store_with_retry(
self,
operation_name: str,
run_id: str,
operation: Callable[[], Awaitable[Any]],
) -> Any:
"""Run a short store operation with bounded retries for SQLite pressure."""
policy = self._persistence_retry_policy
attempt = 1
delay = policy.initial_delay
while True:
try: try:
await self._store.put( return await operation()
record.run_id, except Exception as exc:
thread_id=record.thread_id, retryable = _is_retryable_persistence_error(exc)
assistant_id=record.assistant_id, if attempt >= policy.max_attempts or not retryable:
status=record.status.value, raise
multitask_strategy=record.multitask_strategy, logger.warning(
metadata=record.metadata or {}, "Transient persistence failure during %s for run %s (attempt %d/%d); retrying",
kwargs=record.kwargs or {}, operation_name,
created_at=record.created_at, run_id,
model_name=record.model_name, attempt,
policy.max_attempts,
exc_info=True,
) )
except Exception: if delay > 0:
logger.warning("Failed to persist run %s to store", record.run_id, exc_info=True) await asyncio.sleep(delay)
delay = min(policy.max_delay, delay * policy.backoff_factor if delay else policy.initial_delay)
attempt += 1
async def _persist_status(self, run_id: str, status: RunStatus, *, error: str | None = None) -> None: async def _persist_snapshot_to_store(self, run_id: str, payload: dict[str, Any]) -> bool:
"""Best-effort persist a previously captured run snapshot."""
if self._store is None:
return True
try:
await self._call_store_with_retry(
"put",
run_id,
lambda: self._store.put(run_id, **payload),
)
return True
except Exception:
logger.warning("Failed to persist run %s to store", run_id, exc_info=True)
return False
async def _persist_to_store(self, record: RunRecord, *, error: str | None = None) -> bool:
"""Best-effort persist run record to backing store."""
return await self._persist_snapshot_to_store(
record.run_id,
self._store_put_payload(record, error=error),
)
async def _persist_status(self, record: RunRecord, status: RunStatus, *, error: str | None = None) -> bool:
"""Best-effort persist a status transition to the backing store.""" """Best-effort persist a status transition to the backing store."""
if self._store is None: if self._store is None:
return return True
row_recovery_payload = self._store_put_payload(record, error=error)
try: try:
await self._store.update_status(run_id, status.value, error=error) updated = await self._call_store_with_retry(
"update_status",
record.run_id,
lambda: self._store.update_status(record.run_id, status.value, error=error),
)
if updated is False:
return await self._persist_snapshot_to_store(record.run_id, row_recovery_payload)
return True
except Exception: except Exception:
logger.warning("Failed to persist status update for run %s", run_id, exc_info=True) logger.warning("Failed to persist status update for run %s", record.run_id, exc_info=True)
return False
@staticmethod @staticmethod
def _record_from_store(row: dict[str, Any]) -> RunRecord: def _record_from_store(row: dict[str, Any]) -> RunRecord:
@@ -126,6 +241,7 @@ class RunManager:
async def update_run_completion(self, run_id: str, **kwargs) -> None: async def update_run_completion(self, run_id: str, **kwargs) -> None:
"""Persist token usage and completion data to the backing store.""" """Persist token usage and completion data to the backing store."""
row_recovery_payload: dict[str, Any] | None = None
async with self._lock: async with self._lock:
record = self._runs.get(run_id) record = self._runs.get(run_id)
if record is not None: if record is not None:
@@ -135,9 +251,28 @@ class RunManager:
if hasattr(record, key) and value is not None: if hasattr(record, key) and value is not None:
setattr(record, key, value) setattr(record, key, value)
record.updated_at = _now_iso() record.updated_at = _now_iso()
if self._store is not None: row_recovery_payload = self._store_put_payload(record, error=kwargs.get("error"))
if self._store is None:
return
try: try:
await self._store.update_run_completion(run_id, **kwargs) updated = await self._call_store_with_retry(
"update_run_completion",
run_id,
lambda: self._store.update_run_completion(run_id, **kwargs),
)
if updated is False:
if row_recovery_payload is None:
logger.warning("Failed to recreate missing run %s for completion persistence", run_id)
return
if not await self._persist_snapshot_to_store(run_id, row_recovery_payload):
return
recovered = await self._call_store_with_retry(
"update_run_completion",
run_id,
lambda: self._store.update_run_completion(run_id, **kwargs),
)
if recovered is False:
logger.warning("Run completion update for %s affected no rows after row recreation", run_id)
except Exception: except Exception:
logger.warning("Failed to persist run completion for %s", run_id, exc_info=True) logger.warning("Failed to persist run completion for %s", run_id, exc_info=True)
@@ -273,7 +408,7 @@ class RunManager:
record.updated_at = _now_iso() record.updated_at = _now_iso()
if error is not None: if error is not None:
record.error = error record.error = error
await self._persist_status(run_id, status, error=error) await self._persist_status(record, status, error=error)
logger.info("Run %s -> %s", run_id, status.value) logger.info("Run %s -> %s", run_id, status.value)
async def _persist_model_name(self, run_id: str, model_name: str | None) -> None: async def _persist_model_name(self, run_id: str, model_name: str | None) -> None:
@@ -281,7 +416,11 @@ class RunManager:
if self._store is None: if self._store is None:
return return
try: try:
await self._store.update_model_name(run_id, model_name) await self._call_store_with_retry(
"update_model_name",
run_id,
lambda: self._store.update_model_name(run_id, model_name),
)
except Exception: except Exception:
logger.warning("Failed to persist model_name update for run %s", run_id, exc_info=True) logger.warning("Failed to persist model_name update for run %s", run_id, exc_info=True)
@@ -324,7 +463,7 @@ class RunManager:
record.task.cancel() record.task.cancel()
record.status = RunStatus.interrupted record.status = RunStatus.interrupted
record.updated_at = _now_iso() record.updated_at = _now_iso()
await self._persist_status(run_id, RunStatus.interrupted) await self._persist_status(record, RunStatus.interrupted)
logger.info("Run %s cancelled (action=%s)", run_id, action) logger.info("Run %s cancelled (action=%s)", run_id, action)
return True return True
@@ -352,7 +491,7 @@ class RunManager:
now = _now_iso() now = _now_iso()
_supported_strategies = ("reject", "interrupt", "rollback") _supported_strategies = ("reject", "interrupt", "rollback")
interrupted_run_ids: list[str] = [] interrupted_records: list[RunRecord] = []
async with self._lock: async with self._lock:
if multitask_strategy not in _supported_strategies: if multitask_strategy not in _supported_strategies:
@@ -371,7 +510,7 @@ class RunManager:
r.task.cancel() r.task.cancel()
r.status = RunStatus.interrupted r.status = RunStatus.interrupted
r.updated_at = now r.updated_at = now
interrupted_run_ids.append(r.run_id) interrupted_records.append(r)
logger.info( logger.info(
"Cancelled %d inflight run(s) on thread %s (strategy=%s)", "Cancelled %d inflight run(s) on thread %s (strategy=%s)",
len(inflight), len(inflight),
@@ -394,12 +533,66 @@ class RunManager:
) )
self._runs[run_id] = record self._runs[run_id] = record
for interrupted_run_id in interrupted_run_ids: for interrupted_record in interrupted_records:
await self._persist_status(interrupted_run_id, RunStatus.interrupted) await self._persist_status(interrupted_record, RunStatus.interrupted)
await self._persist_to_store(record) await self._persist_to_store(record)
logger.info("Run created: run_id=%s thread_id=%s", run_id, thread_id) logger.info("Run created: run_id=%s thread_id=%s", run_id, thread_id)
return record return record
async def reconcile_orphaned_inflight_runs(
self,
*,
error: str,
before: str | None = None,
) -> list[RunRecord]:
"""Mark persisted active runs as failed when no local task owns them.
Gateway runs are process-local: the asyncio task and abort event live in
memory, while the run row is durable. After a SQLite-backed gateway
restart, any persisted ``pending`` or ``running`` row created before
startup cannot still have a local worker. This recovery step turns that
ambiguous state into an explicit error instead of letting the UI show an
indefinite active run.
"""
if self._store is None:
return []
try:
rows = await self._call_store_with_retry(
"list_inflight",
"*",
lambda: self._store.list_inflight(before=before),
)
except Exception:
logger.warning("Failed to list orphaned inflight runs for reconciliation", exc_info=True)
return []
recovered: list[RunRecord] = []
now = _now_iso()
for row in rows:
try:
record = self._record_from_store(row)
except Exception:
logger.warning("Failed to map orphaned run row during reconciliation", exc_info=True)
continue
async with self._lock:
live_record = self._runs.get(record.run_id)
if live_record is not None and live_record.status in (RunStatus.pending, RunStatus.running):
continue
record.status = RunStatus.error
record.error = error
record.updated_at = now
persisted = await self._persist_status(record, RunStatus.error, error=error)
if not persisted:
logger.warning("Skipped orphaned run %s recovery because error status was not persisted", record.run_id)
continue
recovered.append(record)
if recovered:
logger.warning("Recovered %d orphaned inflight run(s) as error", len(recovered))
return recovered
async def has_inflight(self, thread_id: str) -> bool: async def has_inflight(self, thread_id: str) -> bool:
"""Return ``True`` if *thread_id* has a pending or running run.""" """Return ``True`` if *thread_id* has a pending or running run."""
async with self._lock: async with self._lock:
@@ -59,7 +59,12 @@ class RunStore(abc.ABC):
status: str, status: str,
*, *,
error: str | None = None, error: str | None = None,
) -> None: ) -> bool | None:
"""Update a run status.
Returns ``False`` when the store can prove no row was updated. Older or
lightweight stores may return ``None`` when they cannot report rowcount.
"""
pass pass
@abc.abstractmethod @abc.abstractmethod
@@ -92,7 +97,11 @@ class RunStore(abc.ABC):
last_ai_message: str | None = None, last_ai_message: str | None = None,
first_human_message: str | None = None, first_human_message: str | None = None,
error: str | None = None, error: str | None = None,
) -> None: ) -> bool | None:
"""Persist final completion fields.
Returns ``False`` when the store can prove no row was updated.
"""
pass pass
async def update_run_progress( async def update_run_progress(
@@ -117,6 +126,11 @@ class RunStore(abc.ABC):
async def list_pending(self, *, before: str | None = None) -> list[dict[str, Any]]: async def list_pending(self, *, before: str | None = None) -> list[dict[str, Any]]:
pass pass
@abc.abstractmethod
async def list_inflight(self, *, before: str | None = None) -> list[dict[str, Any]]:
"""Return persisted runs that are still ``pending`` or ``running``."""
pass
@abc.abstractmethod @abc.abstractmethod
async def aggregate_tokens_by_thread(self, thread_id: str, *, include_active: bool = False) -> dict[str, Any]: async def aggregate_tokens_by_thread(self, thread_id: str, *, include_active: bool = False) -> dict[str, Any]:
"""Aggregate token usage for completed runs in a thread. """Aggregate token usage for completed runs in a thread.
@@ -65,6 +65,8 @@ class MemoryRunStore(RunStore):
if error is not None: if error is not None:
self._runs[run_id]["error"] = error self._runs[run_id]["error"] = error
self._runs[run_id]["updated_at"] = datetime.now(UTC).isoformat() self._runs[run_id]["updated_at"] = datetime.now(UTC).isoformat()
return True
return False
async def update_model_name(self, run_id, model_name): async def update_model_name(self, run_id, model_name):
if run_id in self._runs: if run_id in self._runs:
@@ -81,6 +83,8 @@ class MemoryRunStore(RunStore):
if value is not None: if value is not None:
self._runs[run_id][key] = value self._runs[run_id][key] = value
self._runs[run_id]["updated_at"] = datetime.now(UTC).isoformat() self._runs[run_id]["updated_at"] = datetime.now(UTC).isoformat()
return True
return False
async def update_run_progress(self, run_id, **kwargs): async def update_run_progress(self, run_id, **kwargs):
if run_id in self._runs and self._runs[run_id].get("status") == "running": if run_id in self._runs and self._runs[run_id].get("status") == "running":
@@ -95,6 +99,12 @@ class MemoryRunStore(RunStore):
results.sort(key=lambda r: r["created_at"]) results.sort(key=lambda r: r["created_at"])
return results return results
async def list_inflight(self, *, before=None):
now = before or datetime.now(UTC).isoformat()
results = [r for r in self._runs.values() if r["status"] in ("pending", "running") and r["created_at"] <= now]
results.sort(key=lambda r: r["created_at"])
return results
async def aggregate_tokens_by_thread(self, thread_id: str, *, include_active: bool = False) -> dict[str, Any]: async def aggregate_tokens_by_thread(self, thread_id: str, *, include_active: bool = False) -> dict[str, Any]:
statuses = ("success", "error", "running") if include_active else ("success", "error") statuses = ("success", "error", "running") if include_active else ("success", "error")
completed = [r for r in self._runs.values() if r["thread_id"] == thread_id and r.get("status") in statuses] completed = [r for r in self._runs.values() if r["thread_id"] == thread_id and r.get("status") in statuses]
@@ -63,7 +63,6 @@ class LocalSandboxProvider(SandboxProvider):
""" """
uses_thread_data_mounts = True uses_thread_data_mounts = True
needs_upload_permission_adjustment = False
def __init__(self, max_cached_threads: int = DEFAULT_MAX_CACHED_THREAD_SANDBOXES): def __init__(self, max_cached_threads: int = DEFAULT_MAX_CACHED_THREAD_SANDBOXES):
"""Initialize the local sandbox provider with static path mappings. """Initialize the local sandbox provider with static path mappings.
@@ -10,7 +10,6 @@ class SandboxProvider(ABC):
"""Abstract base class for sandbox providers""" """Abstract base class for sandbox providers"""
uses_thread_data_mounts: bool = False uses_thread_data_mounts: bool = False
needs_upload_permission_adjustment: bool = True
@abstractmethod @abstractmethod
def acquire(self, thread_id: str | None = None) -> str: def acquire(self, thread_id: str | None = None) -> str:
+127
View File
@@ -0,0 +1,127 @@
"""Gateway startup recovery for stale persisted runs."""
from __future__ import annotations
from contextlib import asynccontextmanager
from types import SimpleNamespace
import pytest
from fastapi import FastAPI
import deerflow.runtime as runtime_module
from app.gateway import deps as gateway_deps
from deerflow.persistence import engine as engine_module
from deerflow.persistence import thread_meta as thread_meta_module
from deerflow.runtime.checkpointer import async_provider as checkpointer_module
from deerflow.runtime.events import store as event_store_module
@asynccontextmanager
async def _fake_context(value):
yield value
class _FakeRunManager:
"""RunManager double that records startup reconciliation calls."""
instances: list[_FakeRunManager] = []
recovered_runs = [SimpleNamespace(run_id="run-1", thread_id="thread-1")]
latest_by_thread: dict[str, list[SimpleNamespace]] = {}
def __init__(self, *, store):
self.store = store
self.reconcile_calls: list[dict] = []
self.list_by_thread_calls: list[dict] = []
_FakeRunManager.instances.append(self)
async def reconcile_orphaned_inflight_runs(self, *, error: str, before: str | None = None):
self.reconcile_calls.append({"error": error, "before": before})
return self.recovered_runs
async def list_by_thread(self, thread_id: str, *, user_id=None, limit: int = 100):
self.list_by_thread_calls.append({"thread_id": thread_id, "user_id": user_id, "limit": limit})
return self.latest_by_thread.get(thread_id, self.recovered_runs[:limit])
class _FakeThreadStore:
def __init__(self) -> None:
self.status_updates: list[tuple[str, str, str | None]] = []
async def update_status(self, thread_id: str, status: str, *, user_id=None) -> None:
self.status_updates.append((thread_id, status, user_id))
@pytest.mark.anyio
async def test_sqlite_runtime_reconciles_orphaned_runs_on_startup(monkeypatch):
"""SQLite startup should recover stale active runs before serving requests."""
app = FastAPI()
config = SimpleNamespace(
database=SimpleNamespace(backend="sqlite"),
run_events=SimpleNamespace(backend="memory"),
)
thread_store = _FakeThreadStore()
_FakeRunManager.instances.clear()
_FakeRunManager.recovered_runs = [SimpleNamespace(run_id="run-1", thread_id="thread-1")]
_FakeRunManager.latest_by_thread = {}
async def fake_init_engine_from_config(_database):
return None
async def fake_close_engine():
return None
monkeypatch.setattr(engine_module, "init_engine_from_config", fake_init_engine_from_config)
monkeypatch.setattr(engine_module, "get_session_factory", lambda: None)
monkeypatch.setattr(engine_module, "close_engine", fake_close_engine)
monkeypatch.setattr(runtime_module, "make_stream_bridge", lambda _config: _fake_context(object()))
monkeypatch.setattr(checkpointer_module, "make_checkpointer", lambda _config: _fake_context(object()))
monkeypatch.setattr(runtime_module, "make_store", lambda _config: _fake_context(object()))
monkeypatch.setattr(thread_meta_module, "make_thread_store", lambda _sf, _store: thread_store)
monkeypatch.setattr(event_store_module, "make_run_event_store", lambda _config: object())
monkeypatch.setattr(gateway_deps, "RunManager", _FakeRunManager)
async with gateway_deps.langgraph_runtime(app, config):
pass
assert len(_FakeRunManager.instances) == 1
assert _FakeRunManager.instances[0].reconcile_calls
assert _FakeRunManager.instances[0].reconcile_calls[0]["error"]
assert _FakeRunManager.instances[0].list_by_thread_calls == [{"thread_id": "thread-1", "user_id": None, "limit": 1}]
assert thread_store.status_updates == [("thread-1", "error", None)]
@pytest.mark.anyio
async def test_sqlite_runtime_does_not_mark_thread_error_when_newer_run_is_success(monkeypatch):
"""Startup recovery should not let an old orphaned run overwrite a newer terminal thread state."""
app = FastAPI()
config = SimpleNamespace(
database=SimpleNamespace(backend="sqlite"),
run_events=SimpleNamespace(backend="memory"),
)
thread_store = _FakeThreadStore()
_FakeRunManager.instances.clear()
_FakeRunManager.recovered_runs = [SimpleNamespace(run_id="old-running", thread_id="thread-1")]
_FakeRunManager.latest_by_thread = {"thread-1": [SimpleNamespace(run_id="newer-success", thread_id="thread-1", status="success")]}
async def fake_init_engine_from_config(_database):
return None
async def fake_close_engine():
return None
monkeypatch.setattr(engine_module, "init_engine_from_config", fake_init_engine_from_config)
monkeypatch.setattr(engine_module, "get_session_factory", lambda: None)
monkeypatch.setattr(engine_module, "close_engine", fake_close_engine)
monkeypatch.setattr(runtime_module, "make_stream_bridge", lambda _config: _fake_context(object()))
monkeypatch.setattr(checkpointer_module, "make_checkpointer", lambda _config: _fake_context(object()))
monkeypatch.setattr(runtime_module, "make_store", lambda _config: _fake_context(object()))
monkeypatch.setattr(thread_meta_module, "make_thread_store", lambda _sf, _store: thread_store)
monkeypatch.setattr(event_store_module, "make_run_event_store", lambda _config: object())
monkeypatch.setattr(gateway_deps, "RunManager", _FakeRunManager)
async with gateway_deps.langgraph_runtime(app, config):
pass
assert len(_FakeRunManager.instances) == 1
assert _FakeRunManager.instances[0].list_by_thread_calls == [{"thread_id": "thread-1", "user_id": None, "limit": 1}]
assert thread_store.status_updates == []
+240
View File
@@ -1,10 +1,15 @@
"""Tests for RunManager.""" """Tests for RunManager."""
import logging
import re import re
import sqlite3
from typing import Any
import pytest import pytest
from sqlalchemy.exc import DatabaseError as SQLAlchemyDatabaseError
from deerflow.runtime import DisconnectMode, RunManager, RunStatus from deerflow.runtime import DisconnectMode, RunManager, RunStatus
from deerflow.runtime.runs.manager import PersistenceRetryPolicy
from deerflow.runtime.runs.store.memory import MemoryRunStore from deerflow.runtime.runs.store.memory import MemoryRunStore
ISO_RE = re.compile(r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}") ISO_RE = re.compile(r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}")
@@ -15,6 +20,92 @@ def manager() -> RunManager:
return RunManager() return RunManager()
class FlakyStatusRunStore(MemoryRunStore):
"""Memory run store that simulates transient SQLite status-write failures."""
def __init__(self, *, status_failures: int) -> None:
super().__init__()
self.status_failures = status_failures
self.status_update_attempts = 0
async def update_status(self, run_id, status, *, error=None):
self.status_update_attempts += 1
if self.status_failures > 0:
self.status_failures -= 1
raise sqlite3.OperationalError("database is locked")
return await super().update_status(run_id, status, error=error)
class MissingRowStatusRunStore(MemoryRunStore):
"""Memory run store that reports a missing row for status updates."""
async def update_status(self, run_id, status, *, error=None):
await super().update_status(run_id, status, error=error)
return False
class PermanentStatusRunStore(MemoryRunStore):
"""Memory run store that simulates a permanent SQLAlchemy write failure."""
def __init__(self) -> None:
super().__init__()
self.status_update_attempts = 0
async def update_status(self, run_id, status, *, error=None):
self.status_update_attempts += 1
raise SQLAlchemyDatabaseError(
"UPDATE runs SET status = :status WHERE run_id = :run_id",
{"status": status, "run_id": run_id},
sqlite3.DatabaseError("no such table: runs"),
)
class FailingStatusRunStore(MemoryRunStore):
"""Memory run store that always fails status updates."""
def __init__(self) -> None:
super().__init__()
self.status_update_attempts = 0
async def update_status(self, run_id, status, *, error=None):
self.status_update_attempts += 1
raise sqlite3.OperationalError("database is locked")
class MissingCompletionRunStore(MemoryRunStore):
"""Memory run store that reports one missing row for completion updates."""
def __init__(self) -> None:
super().__init__()
self.completion_update_attempts = 0
async def update_run_completion(self, run_id, *, status, **kwargs):
self.completion_update_attempts += 1
if self.completion_update_attempts == 1:
return False
return await super().update_run_completion(run_id, status=status, **kwargs)
class AlwaysMissingCompletionRunStore(MemoryRunStore):
"""Memory run store that keeps reporting missing rows for completion updates."""
def __init__(self) -> None:
super().__init__()
self.completion_update_attempts = 0
async def update_run_completion(self, run_id, *, status, **kwargs):
self.completion_update_attempts += 1
return False
async def _stored_statuses(store: MemoryRunStore, *run_ids: str) -> dict[str, Any]:
rows = {}
for run_id in run_ids:
row = await store.get(run_id)
rows[run_id] = row["status"] if row else None
return rows
@pytest.mark.anyio @pytest.mark.anyio
async def test_create_and_get(manager: RunManager): async def test_create_and_get(manager: RunManager):
"""Created run should be retrievable with new fields.""" """Created run should be retrievable with new fields."""
@@ -80,6 +171,155 @@ async def test_cancel_persists_interrupted_status_to_store():
assert stored["status"] == "interrupted" assert stored["status"] == "interrupted"
@pytest.mark.anyio
async def test_status_persistence_retries_transient_sqlite_lock():
"""Transient SQLite lock errors should not leave a final status stale."""
store = FlakyStatusRunStore(status_failures=2)
manager = RunManager(store=store)
record = await manager.create("thread-1")
await manager.set_status(record.run_id, RunStatus.running)
await manager.set_status(record.run_id, RunStatus.success)
stored = await store.get(record.run_id)
assert stored is not None
assert stored["status"] == "success"
assert store.status_update_attempts >= 4
@pytest.mark.anyio
async def test_status_persistence_recreates_missing_store_row():
"""A final status update should recreate a run row if initial persistence was lost."""
store = MissingRowStatusRunStore()
manager = RunManager(store=store)
record = await manager.create("thread-1")
await store.delete(record.run_id)
await manager.set_status(record.run_id, RunStatus.error, error="boom")
stored = await store.get(record.run_id)
assert stored is not None
assert stored["status"] == "error"
assert stored["error"] == "boom"
@pytest.mark.anyio
async def test_status_persistence_does_not_retry_permanent_sqlalchemy_errors():
"""Permanent SQLAlchemy failures should not be retried as SQLite pressure."""
store = PermanentStatusRunStore()
manager = RunManager(
store=store,
persistence_retry_policy=PersistenceRetryPolicy(max_attempts=5, initial_delay=0),
)
record = await manager.create("thread-1")
await manager.set_status(record.run_id, RunStatus.error, error="boom")
assert store.status_update_attempts == 1
@pytest.mark.anyio
async def test_completion_persistence_recreates_missing_store_row():
"""Completion updates should recreate a missing row and persist final counters."""
store = MissingCompletionRunStore()
manager = RunManager(store=store)
record = await manager.create("thread-1")
await manager.set_status(record.run_id, RunStatus.running)
await manager.set_status(record.run_id, RunStatus.success)
await store.delete(record.run_id)
await manager.update_run_completion(
record.run_id,
status="success",
total_tokens=42,
llm_call_count=2,
last_ai_message="done",
)
stored = await store.get(record.run_id)
assert stored is not None
assert stored["status"] == "success"
assert stored["total_tokens"] == 42
assert stored["llm_call_count"] == 2
assert stored["last_ai_message"] == "done"
assert store.completion_update_attempts == 2
@pytest.mark.anyio
async def test_completion_persistence_warns_when_recreated_row_still_missing(caplog):
"""A second zero-row completion update after recreation should not be silent."""
store = AlwaysMissingCompletionRunStore()
manager = RunManager(store=store)
record = await manager.create("thread-1")
await manager.set_status(record.run_id, RunStatus.success)
caplog.set_level(logging.WARNING, logger="deerflow.runtime.runs.manager")
await manager.update_run_completion(record.run_id, status="success", total_tokens=42)
assert store.completion_update_attempts == 2
assert "affected no rows after row recreation" in caplog.text
@pytest.mark.anyio
async def test_reconcile_orphaned_inflight_runs_marks_stale_rows_error():
"""Startup recovery should turn persisted active rows into explicit errors."""
store = MemoryRunStore()
await store.put("pending-run", thread_id="thread-1", status="pending", created_at="2026-01-01T00:00:00+00:00")
await store.put("running-run", thread_id="thread-1", status="running", created_at="2026-01-01T00:00:01+00:00")
await store.put("success-run", thread_id="thread-1", status="success", created_at="2026-01-01T00:00:02+00:00")
manager = RunManager(store=store)
recovered = await manager.reconcile_orphaned_inflight_runs(
error="Gateway restarted before this run reached a durable final state.",
before="2026-01-01T00:00:02+00:00",
)
assert {record.run_id for record in recovered} == {"pending-run", "running-run"}
assert await _stored_statuses(store, "pending-run", "running-run", "success-run") == {
"pending-run": "error",
"running-run": "error",
"success-run": "success",
}
@pytest.mark.anyio
async def test_reconcile_orphaned_inflight_runs_skips_live_local_run():
"""Startup recovery should not mark an active row orphaned when this worker owns it."""
store = MemoryRunStore()
manager = RunManager(store=store)
record = await manager.create("thread-1")
await manager.set_status(record.run_id, RunStatus.running)
recovered = await manager.reconcile_orphaned_inflight_runs(
error="Gateway restarted before this run reached a durable final state.",
)
stored = await store.get(record.run_id)
assert recovered == []
assert stored["status"] == "running"
@pytest.mark.anyio
async def test_reconcile_orphaned_inflight_runs_skips_rows_when_error_status_is_not_persisted():
"""Startup recovery must not report a row as recovered if the error update failed."""
store = FailingStatusRunStore()
await store.put("running-run", thread_id="thread-1", status="running", created_at="2026-01-01T00:00:00+00:00")
manager = RunManager(
store=store,
persistence_retry_policy=PersistenceRetryPolicy(max_attempts=2, initial_delay=0),
)
recovered = await manager.reconcile_orphaned_inflight_runs(
error="Gateway restarted before this run reached a durable final state.",
before="2026-01-01T00:00:01+00:00",
)
stored = await store.get("running-run")
assert recovered == []
assert stored["status"] == "running"
assert store.status_update_attempts == 2
@pytest.mark.anyio @pytest.mark.anyio
async def test_cancel_not_inflight(manager: RunManager): async def test_cancel_not_inflight(manager: RunManager):
"""Cancelling a completed run should return False.""" """Cancelling a completed run should return False."""
+47 -2
View File
@@ -52,6 +52,9 @@ class _CustomRunStoreWithoutProgress(RunStore):
async def list_pending(self, *args, **kwargs): async def list_pending(self, *args, **kwargs):
return [] return []
async def list_inflight(self, *args, **kwargs):
return []
async def aggregate_tokens_by_thread(self, *args, **kwargs): async def aggregate_tokens_by_thread(self, *args, **kwargs):
return {} return {}
@@ -75,6 +78,19 @@ class TestRunRepository:
assert row["status"] == "pending" assert row["status"] == "pending"
await _cleanup() await _cleanup()
@pytest.mark.anyio
async def test_put_is_idempotent_for_retried_writes(self, tmp_path):
repo = await _make_repo(tmp_path)
await repo.put("r1", thread_id="t1", assistant_id="old-agent", status="pending")
await repo.put("r1", thread_id="t1", assistant_id="new-agent", status="running", error="retry")
row = await repo.get("r1")
assert row["assistant_id"] == "new-agent"
assert row["status"] == "running"
assert row["error"] == "retry"
await _cleanup()
@pytest.mark.anyio @pytest.mark.anyio
async def test_get_missing_returns_none(self, tmp_path): async def test_get_missing_returns_none(self, tmp_path):
repo = await _make_repo(tmp_path) repo = await _make_repo(tmp_path)
@@ -85,11 +101,19 @@ class TestRunRepository:
async def test_update_status(self, tmp_path): async def test_update_status(self, tmp_path):
repo = await _make_repo(tmp_path) repo = await _make_repo(tmp_path)
await repo.put("r1", thread_id="t1") await repo.put("r1", thread_id="t1")
await repo.update_status("r1", "running") updated = await repo.update_status("r1", "running")
row = await repo.get("r1") row = await repo.get("r1")
assert updated is True
assert row["status"] == "running" assert row["status"] == "running"
await _cleanup() await _cleanup()
@pytest.mark.anyio
async def test_update_status_returns_false_for_missing_row(self, tmp_path):
repo = await _make_repo(tmp_path)
updated = await repo.update_status("missing", "error", error="lost")
assert updated is False
await _cleanup()
@pytest.mark.anyio @pytest.mark.anyio
async def test_update_status_with_error(self, tmp_path): async def test_update_status_with_error(self, tmp_path):
repo = await _make_repo(tmp_path) repo = await _make_repo(tmp_path)
@@ -146,11 +170,24 @@ class TestRunRepository:
assert all(r["status"] == "pending" for r in pending) assert all(r["status"] == "pending" for r in pending)
await _cleanup() await _cleanup()
@pytest.mark.anyio
async def test_list_inflight_returns_pending_and_running_before_cutoff(self, tmp_path):
repo = await _make_repo(tmp_path)
await repo.put("pending-old", thread_id="t1", status="pending", created_at="2026-01-01T00:00:00+00:00")
await repo.put("running-old", thread_id="t1", status="running", created_at="2026-01-01T00:00:01+00:00")
await repo.put("success-old", thread_id="t1", status="success", created_at="2026-01-01T00:00:02+00:00")
await repo.put("pending-new", thread_id="t1", status="pending", created_at="2026-01-01T00:00:03+00:00")
inflight = await repo.list_inflight(before="2026-01-01T00:00:02+00:00")
assert [row["run_id"] for row in inflight] == ["pending-old", "running-old"]
await _cleanup()
@pytest.mark.anyio @pytest.mark.anyio
async def test_update_run_completion(self, tmp_path): async def test_update_run_completion(self, tmp_path):
repo = await _make_repo(tmp_path) repo = await _make_repo(tmp_path)
await repo.put("r1", thread_id="t1", status="running") await repo.put("r1", thread_id="t1", status="running")
await repo.update_run_completion( updated = await repo.update_run_completion(
"r1", "r1",
status="success", status="success",
total_input_tokens=100, total_input_tokens=100,
@@ -165,6 +202,7 @@ class TestRunRepository:
first_human_message="What is the meaning?", first_human_message="What is the meaning?",
) )
row = await repo.get("r1") row = await repo.get("r1")
assert updated is True
assert row["status"] == "success" assert row["status"] == "success"
assert row["total_tokens"] == 150 assert row["total_tokens"] == 150
assert row["llm_call_count"] == 2 assert row["llm_call_count"] == 2
@@ -174,6 +212,13 @@ class TestRunRepository:
assert row["first_human_message"] == "What is the meaning?" assert row["first_human_message"] == "What is the meaning?"
await _cleanup() await _cleanup()
@pytest.mark.anyio
async def test_update_run_completion_returns_false_for_missing_row(self, tmp_path):
repo = await _make_repo(tmp_path)
updated = await repo.update_run_completion("missing", status="error", total_tokens=1)
assert updated is False
await _cleanup()
@pytest.mark.anyio @pytest.mark.anyio
async def test_metadata_preserved(self, tmp_path): async def test_metadata_preserved(self, tmp_path):
repo = await _make_repo(tmp_path) repo = await _make_repo(tmp_path)
-56
View File
@@ -219,7 +219,6 @@ def test_upload_files_does_not_adjust_permissions_for_local_sandbox(tmp_path):
provider = MagicMock() provider = MagicMock()
provider.uses_thread_data_mounts = True provider.uses_thread_data_mounts = True
provider.needs_upload_permission_adjustment = False
provider.acquire.return_value = "local" provider.acquire.return_value = "local"
sandbox = MagicMock() sandbox = MagicMock()
provider.get.return_value = sandbox provider.get.return_value = sandbox
@@ -229,14 +228,12 @@ def test_upload_files_does_not_adjust_permissions_for_local_sandbox(tmp_path):
patch.object(uploads, "ensure_uploads_dir", return_value=thread_uploads_dir), patch.object(uploads, "ensure_uploads_dir", return_value=thread_uploads_dir),
patch.object(uploads, "get_sandbox_provider", return_value=provider), patch.object(uploads, "get_sandbox_provider", return_value=provider),
patch.object(uploads, "_make_file_sandbox_writable") as make_writable, patch.object(uploads, "_make_file_sandbox_writable") as make_writable,
patch.object(uploads, "_make_file_sandbox_readable") as make_readable,
): ):
file = UploadFile(filename="notes.txt", file=BytesIO(b"hello uploads")) file = UploadFile(filename="notes.txt", file=BytesIO(b"hello uploads"))
result = asyncio.run(call_unwrapped(uploads.upload_files, "thread-local", request=MagicMock(), files=[file], config=SimpleNamespace())) result = asyncio.run(call_unwrapped(uploads.upload_files, "thread-local", request=MagicMock(), files=[file], config=SimpleNamespace()))
assert result.success is True assert result.success is True
make_writable.assert_not_called() make_writable.assert_not_called()
make_readable.assert_not_called()
def test_upload_files_acquires_non_local_sandbox_before_writing(tmp_path): def test_upload_files_acquires_non_local_sandbox_before_writing(tmp_path):
@@ -434,59 +431,6 @@ def test_make_file_sandbox_writable_skips_symlinks(tmp_path):
chmod.assert_not_called() chmod.assert_not_called()
def test_make_file_sandbox_readable_adds_read_bits_for_regular_files(tmp_path):
file_path = tmp_path / "data.csv"
file_path.write_bytes(b"csv-data")
# Simulate the 0o600 permissions set by open_upload_file_no_symlink
file_path.chmod(0o600)
uploads._make_file_sandbox_readable(file_path)
updated_mode = stat.S_IMODE(file_path.stat().st_mode)
assert updated_mode & stat.S_IRUSR
assert updated_mode & stat.S_IRGRP
assert updated_mode & stat.S_IROTH
def test_make_file_sandbox_readable_skips_symlinks(tmp_path):
file_path = tmp_path / "target-link.txt"
file_path.write_text("hello", encoding="utf-8")
symlink_stat = MagicMock(st_mode=stat.S_IFLNK)
with (
patch.object(uploads.os, "lstat", return_value=symlink_stat),
patch.object(uploads.os, "chmod") as chmod,
):
uploads._make_file_sandbox_readable(file_path)
chmod.assert_not_called()
def test_upload_files_adjusts_read_permissions_for_mounted_non_local_sandbox(tmp_path):
thread_uploads_dir = tmp_path / "uploads"
thread_uploads_dir.mkdir(parents=True)
# AIO sandbox with LocalContainerBackend: uses_thread_data_mounts=True
# but needs_upload_permission_adjustment=True (default)
provider = MagicMock()
provider.uses_thread_data_mounts = True
provider.needs_upload_permission_adjustment = True
with (
patch.object(uploads, "get_uploads_dir", return_value=thread_uploads_dir),
patch.object(uploads, "ensure_uploads_dir", return_value=thread_uploads_dir),
patch.object(uploads, "get_sandbox_provider", return_value=provider),
patch.object(uploads, "_make_file_sandbox_readable") as make_readable,
):
file = UploadFile(filename="notes.txt", file=BytesIO(b"hello uploads"))
result = asyncio.run(call_unwrapped(uploads.upload_files, "thread-aio", request=MagicMock(), files=[file], config=SimpleNamespace()))
assert result.success is True
make_readable.assert_called_once()
called_path = make_readable.call_args[0][0]
assert called_path.name == "notes.txt"
def test_upload_files_rejects_dotdot_and_dot_filenames(tmp_path): def test_upload_files_rejects_dotdot_and_dot_filenames(tmp_path):
thread_uploads_dir = tmp_path / "uploads" thread_uploads_dir = tmp_path / "uploads"
thread_uploads_dir.mkdir(parents=True) thread_uploads_dir.mkdir(parents=True)
+2 -2
View File
@@ -799,9 +799,9 @@ summarization:
# Summarization runs when ANY threshold is met (OR logic) # Summarization runs when ANY threshold is met (OR logic)
# You can specify a single trigger or a list of triggers # You can specify a single trigger or a list of triggers
trigger: trigger:
# Trigger when token count reaches 15564 # Trigger when token count reaches 32000
- type: tokens - type: tokens
value: 15564 value: 32000
# Uncomment to also trigger when message count reaches 50 # Uncomment to also trigger when message count reaches 50
# - type: messages # - type: messages
# value: 50 # value: 50
+4
View File
@@ -18,3 +18,7 @@ lint:
format: format:
pnpm format:write pnpm format:write
build-static:
NEXT_CONFIG_BUILD_OUTPUT=standalone SKIP_ENV_VALIDATION=1 NEXT_PUBLIC_STATIC_WEBSITE_ONLY=true pnpm build
@if [ -d .next/static ]; then mkdir -p .next/standalone/.next && cp -R .next/static .next/standalone/.next/static; fi
+4
View File
@@ -16,6 +16,10 @@ const withNextra = nextra({});
/** @type {import("next").NextConfig} */ /** @type {import("next").NextConfig} */
const config = { const config = {
output:
process.env.NEXT_CONFIG_BUILD_OUTPUT === "standalone"
? "standalone"
: undefined,
i18n: { i18n: {
locales: ["en", "zh"], locales: ["en", "zh"],
defaultLocale: "en", defaultLocale: "en",
@@ -32,7 +32,7 @@ Even with digital Leicas, photographers often emulate film characteristics: natu
### Image 1: Parisian Decisive Moment ### Image 1: Parisian Decisive Moment
![Paris Decisive Moment](/frontend/public/demo/threads/7f9dc56c-e49c-4671-a3d2-c492ff4dce0c/user-data/outputs/leica-paris-decisive-moment.jpg) ![Paris Decisive Moment](/demo/threads/7f9dc56c-e49c-4671-a3d2-c492ff4dce0c/user-data/outputs/leica-paris-decisive-moment.jpg)
This image captures the essence of Cartier-Bresson's philosophy. A woman in a red coat leaps over a puddle while a cyclist passes in perfect synchrony. The composition follows the rule of thirds, with the subject positioned at the intersection of grid lines. Shot with a simulated Leica M11 and 35mm Summicron lens at f/2.8, the image features shallow depth of field, natural film grain, and the warm, muted color palette characteristic of Leica photography. This image captures the essence of Cartier-Bresson's philosophy. A woman in a red coat leaps over a puddle while a cyclist passes in perfect synchrony. The composition follows the rule of thirds, with the subject positioned at the intersection of grid lines. Shot with a simulated Leica M11 and 35mm Summicron lens at f/2.8, the image features shallow depth of field, natural film grain, and the warm, muted color palette characteristic of Leica photography.
@@ -40,7 +40,7 @@ The "decisive moment" here isn't just about timing—it's about the alignment of
### Image 2: Tokyo Night Reflections ### Image 2: Tokyo Night Reflections
![Tokyo Night Scene](/frontend/public/demo/threads/7f9dc56c-e49c-4671-a3d2-c492ff4dce0c/user-data/outputs/leica-tokyo-night.jpg) ![Tokyo Night Scene](/demo/threads/7f9dc56c-e49c-4671-a3d2-c492ff4dce0c/user-data/outputs/leica-tokyo-night.jpg)
Moving to Shinjuku, Tokyo, this image explores the atmospheric possibilities of Leica's legendary Noctilux lens. Simulating a Leica M10-P with a 50mm f/0.95 Noctilux wide open, the image creates extremely shallow depth of field with beautiful bokeh balls from neon signs reflected in wet pavement. Moving to Shinjuku, Tokyo, this image explores the atmospheric possibilities of Leica's legendary Noctilux lens. Simulating a Leica M10-P with a 50mm f/0.95 Noctilux wide open, the image creates extremely shallow depth of field with beautiful bokeh balls from neon signs reflected in wet pavement.
@@ -48,7 +48,7 @@ A salaryman waits under glowing kanji signs, steam rising from a nearby ramen sh
### Image 3: New York City Candid ### Image 3: New York City Candid
![NYC Candid Scene](/frontend/public/demo/threads/7f9dc56c-e49c-4671-a3d2-c492ff4dce0c/user-data/outputs/leica-nyc-candid.jpg) ![NYC Candid Scene](/demo/threads/7f9dc56c-e49c-4671-a3d2-c492ff4dce0c/user-data/outputs/leica-nyc-candid.jpg)
This Chinatown scene demonstrates the documentary power of Leica's Q2 camera with its fixed 28mm Summilux lens. The wide angle captures environmental context while maintaining intimate proximity to the subjects. A fishmonger hands a live fish to a customer while tourists photograph the scene—a moment of cultural contrast and authentic urban life. This Chinatown scene demonstrates the documentary power of Leica's Q2 camera with its fixed 28mm Summilux lens. The wide angle captures environmental context while maintaining intimate proximity to the subjects. A fishmonger hands a live fish to a customer while tourists photograph the scene—a moment of cultural contrast and authentic urban life.
@@ -1,19 +1,19 @@
"use client"; import { isStaticWebsiteOnly } from "@/core/static-mode";
import { DEMO_THREAD_IDS } from "@/core/threads/static-demo";
import { PromptInputProvider } from "@/components/ai-elements/prompt-input"; import { ChatProviders } from "./providers";
import { ArtifactsProvider } from "@/components/workspace/artifacts";
import { SubtasksProvider } from "@/core/tasks/context"; export function generateStaticParams() {
if (!isStaticWebsiteOnly()) {
return [];
}
return DEMO_THREAD_IDS.map((thread_id) => ({ thread_id }));
}
export default function ChatLayout({ export default function ChatLayout({
children, children,
}: { }: {
children: React.ReactNode; children: React.ReactNode;
}) { }) {
return ( return <ChatProviders>{children}</ChatProviders>;
<SubtasksProvider>
<ArtifactsProvider>
<PromptInputProvider>{children}</PromptInputProvider>
</ArtifactsProvider>
</SubtasksProvider>
);
} }
@@ -227,6 +227,7 @@ export default function ChatPage() {
isWelcomeMode && <Welcome mode={settings.context.mode} /> isWelcomeMode && <Welcome mode={settings.context.mode} />
} }
disabled={ disabled={
isMock ||
env.NEXT_PUBLIC_STATIC_WEBSITE_ONLY === "true" || env.NEXT_PUBLIC_STATIC_WEBSITE_ONLY === "true" ||
isUploading isUploading
} }
@@ -0,0 +1,15 @@
"use client";
import { PromptInputProvider } from "@/components/ai-elements/prompt-input";
import { ArtifactsProvider } from "@/components/workspace/artifacts";
import { SubtasksProvider } from "@/core/tasks/context";
export function ChatProviders({ children }: { children: React.ReactNode }) {
return (
<SubtasksProvider>
<ArtifactsProvider>
<PromptInputProvider>{children}</PromptInputProvider>
</ArtifactsProvider>
</SubtasksProvider>
);
}
+5 -3
View File
@@ -43,12 +43,14 @@ export default async function WorkspaceLayout({
> >
Retry Retry
</Link> </Link>
<Link <form action="/api/v1/auth/logout" method="post">
href="/api/v1/auth/logout" <button
type="submit"
className="text-muted-foreground hover:bg-muted rounded-md border px-4 py-2 text-sm" className="text-muted-foreground hover:bg-muted rounded-md border px-4 py-2 text-sm"
> >
Logout &amp; Reset Logout &amp; Reset
</Link> </button>
</form>
</div> </div>
</div> </div>
); );
@@ -83,7 +83,7 @@ export function ArtifactFileDetail({
const isSupportPreview = useMemo(() => { const isSupportPreview = useMemo(() => {
return language === "html" || language === "markdown"; return language === "html" || language === "markdown";
}, [language]); }, [language]);
const { content } = useArtifactContent({ const { content, url } = useArtifactContent({
threadId, threadId,
filepath: filepathFromProps, filepath: filepathFromProps,
enabled: isCodeFile && !isWriteFile, enabled: isCodeFile && !isWriteFile,
@@ -254,7 +254,9 @@ export function ArtifactFileDetail({
(language === "markdown" || language === "html") && ( (language === "markdown" || language === "html") && (
<ArtifactFilePreview <ArtifactFilePreview
content={displayContent} content={displayContent}
isWriteFile={isWriteFile}
language={language ?? "text"} language={language ?? "text"}
url={url}
/> />
)} )}
{isCodeFile && viewMode === "code" && ( {isCodeFile && viewMode === "code" && (
@@ -277,27 +279,33 @@ export function ArtifactFileDetail({
export function ArtifactFilePreview({ export function ArtifactFilePreview({
content, content,
isWriteFile,
language, language,
url,
}: { }: {
content: string; content: string;
isWriteFile: boolean;
language: string; language: string;
url?: string;
}) { }) {
const [htmlPreviewUrl, setHtmlPreviewUrl] = useState<string>(); const [htmlPreviewUrl, setHtmlPreviewUrl] = useState<string>();
useEffect(() => { useEffect(() => {
if (language !== "html") { if (language !== "html" || isWriteFile) {
setHtmlPreviewUrl(undefined); setHtmlPreviewUrl(undefined);
return; return;
} }
const blob = new Blob([content ?? ""], { type: "text/html" }); const blob = new Blob([htmlWithBaseHref(content ?? "", url)], {
const url = URL.createObjectURL(blob); type: "text/html",
setHtmlPreviewUrl(url); });
const objectUrl = URL.createObjectURL(blob);
setHtmlPreviewUrl(objectUrl);
return () => { return () => {
URL.revokeObjectURL(url); URL.revokeObjectURL(objectUrl);
}; };
}, [content, language]); }, [content, isWriteFile, language, url]);
if (language === "markdown") { if (language === "markdown") {
return ( return (
@@ -318,9 +326,35 @@ export function ArtifactFilePreview({
className="size-full" className="size-full"
title="Artifact preview" title="Artifact preview"
sandbox="allow-scripts allow-forms" sandbox="allow-scripts allow-forms"
src={htmlPreviewUrl} src={isWriteFile ? undefined : htmlPreviewUrl}
srcDoc={isWriteFile ? content : undefined}
/> />
); );
} }
return null; return null;
} }
function htmlWithBaseHref(content: string, url?: string) {
if (!url || /<base\s/i.exec(content)) {
return content;
}
const baseHref = htmlBaseHref(url);
const baseElement = `<base href="${escapeHtmlAttribute(baseHref)}">`;
if (/<head[^>]*>/i.exec(content)) {
return content.replace(/<head([^>]*)>/i, `<head$1>${baseElement}`);
}
return `${baseElement}${content}`;
}
function htmlBaseHref(url: string) {
const baseUrl = new URL(url, window.location.href);
baseUrl.pathname = baseUrl.pathname.replace(/\/[^/]*$/, "/");
baseUrl.search = "";
baseUrl.hash = "";
return baseUrl.toString();
}
function escapeHtmlAttribute(value: string) {
return value.replaceAll("&", "&amp;").replaceAll('"', "&quot;");
}
@@ -162,7 +162,7 @@ summarization:
# Trigger conditions — summarization runs when ANY threshold is met # Trigger conditions — summarization runs when ANY threshold is met
trigger: trigger:
- type: tokens # trigger when context exceeds N tokens - type: tokens # trigger when context exceeds N tokens
value: 15564 value: 32000
# - type: messages # trigger when there are more than N messages # - type: messages # trigger when there are more than N messages
# value: 50 # value: 50
# - type: fraction # trigger when context exceeds X% of model max # - type: fraction # trigger when context exceeds X% of model max
+17 -17
View File
@@ -20,27 +20,27 @@ If you want to understand how DeerFlow works, start with the Introduction. If yo
Start with the conceptual overview first. Start with the conceptual overview first.
- [Introduction](/docs/introduction) - [Introduction](./docs/introduction)
- [Why DeerFlow](/docs/introduction/why-deerflow) - [Why DeerFlow](./docs/introduction/why-deerflow)
- [Harness vs App](/docs/introduction/harness-vs-app) - [Harness vs App](./docs/introduction/harness-vs-app)
### If you want to build with DeerFlow ### If you want to build with DeerFlow
Start with the Harness section. This path is for teams who want to integrate DeerFlow capabilities into their own system or build a custom agent product on top of the DeerFlow runtime. Start with the Harness section. This path is for teams who want to integrate DeerFlow capabilities into their own system or build a custom agent product on top of the DeerFlow runtime.
- [DeerFlow Harness](/docs/harness) - [DeerFlow Harness](./docs/harness)
- [Quick Start](/docs/harness/quick-start) - [Quick Start](./docs/harness/quick-start)
- [Configuration](/docs/harness/configuration) - [Configuration](./docs/harness/configuration)
- [Customization](/docs/harness/customization) - [Customization](./docs/harness/customization)
### If you want to deploy and use DeerFlow ### If you want to deploy and use DeerFlow
Start with the App section. This path is for teams who want to run DeerFlow as a complete application and understand how to configure, operate, and use it in practice. Start with the App section. This path is for teams who want to run DeerFlow as a complete application and understand how to configure, operate, and use it in practice.
- [DeerFlow App](/docs/app) - [DeerFlow App](./docs/app)
- [Quick Start](/docs/app/quick-start) - [Quick Start](./docs/app/quick-start)
- [Deployment Guide](/docs/app/deployment-guide) - [Deployment Guide](./docs/app/deployment-guide)
- [Workspace Usage](/docs/app/workspace-usage) - [Workspace Usage](./docs/app/workspace-usage)
## Documentation structure ## Documentation structure
@@ -79,17 +79,17 @@ The App section is written for teams who want to deploy DeerFlow as a usable pro
The Tutorials section is for hands-on, task-oriented learning. The Tutorials section is for hands-on, task-oriented learning.
- [Tutorials](/docs/tutorials) - [Tutorials](./docs/tutorials)
### Reference ### Reference
The Reference section is for detailed lookup material, including configuration, runtime modes, APIs, and source-oriented mapping. The Reference section is for detailed lookup material, including configuration, runtime modes, APIs, and source-oriented mapping.
- [Reference](/docs/reference) - [Reference](./docs/reference)
## Choose the right path ## Choose the right path
- If you are **evaluating the project**, start with [Introduction](/docs/introduction). - If you are **evaluating the project**, start with [Introduction](./docs/introduction).
- If you are **building your own agent system**, start with [DeerFlow Harness](/docs/harness). - If you are **building your own agent system**, start with [DeerFlow Harness](./docs/harness).
- If you are **deploying DeerFlow for users**, start with [DeerFlow App](/docs/app). - If you are **deploying DeerFlow for users**, start with [DeerFlow App](./docs/app).
- If you want to **learn by doing**, go to [Tutorials](/docs/tutorials). - If you want to **learn by doing**, go to [Tutorials](./docs/tutorials).
@@ -0,0 +1,9 @@
---
title: DeerFlow 2.0 M1
description: DeerFlow 2.0 M1 is officially in RC. Here's what you need to know.
date: 2026-05-30
tags:
- Release
---
## DeerFlow 2.0 M1 Release
@@ -154,7 +154,7 @@ summarization:
# 触发条件——满足任意一个条件时运行摘要 # 触发条件——满足任意一个条件时运行摘要
trigger: trigger:
- type: tokens # 当上下文超过 N 个 token 时触发 - type: tokens # 当上下文超过 N 个 token 时触发
value: 15564 value: 32000
# - type: messages # 当消息数超过 N 时触发 # - type: messages # 当消息数超过 N 时触发
# value: 50 # value: 50
# - type: fraction # 当上下文达到模型最大输入的 X% 时触发 # - type: fraction # 当上下文达到模型最大输入的 X% 时触发
+11 -11
View File
@@ -20,27 +20,27 @@ DeerFlow 是一个用于构建和运行 Agent 系统的框架。它提供了一
先从概念概述开始。 先从概念概述开始。
- [简介](/docs/introduction) - [简介](./docs/introduction)
- [为什么选择 DeerFlow](/docs/introduction/why-deerflow) - [为什么选择 DeerFlow](./docs/introduction/why-deerflow)
- [Harness 与应用的区别](/docs/introduction/harness-vs-app) - [Harness 与应用的区别](./docs/introduction/harness-vs-app)
### 如果你想基于 DeerFlow 进行开发 ### 如果你想基于 DeerFlow 进行开发
从 Harness 章节开始。这条路径适合想将 DeerFlow 功能集成到自己系统中,或基于 DeerFlow 运行时构建自定义 Agent 产品的团队。 从 Harness 章节开始。这条路径适合想将 DeerFlow 功能集成到自己系统中,或基于 DeerFlow 运行时构建自定义 Agent 产品的团队。
- [DeerFlow Harness](/docs/harness) - [DeerFlow Harness](./docs/harness)
- [快速上手](/docs/harness/quick-start) - [快速上手](./docs/harness/quick-start)
- [配置](/docs/harness/configuration) - [配置](./docs/harness/configuration)
- [自定义与扩展](/docs/harness/customization) - [自定义与扩展](./docs/harness/customization)
### 如果你想部署和使用 DeerFlow ### 如果你想部署和使用 DeerFlow
从应用章节开始。这条路径适合想将 DeerFlow 作为完整应用运行,并了解如何配置、运维和实际使用的团队。 从应用章节开始。这条路径适合想将 DeerFlow 作为完整应用运行,并了解如何配置、运维和实际使用的团队。
- [DeerFlow 应用](/docs/application) - [DeerFlow 应用](./docs/application)
- [快速上手](/docs/application/quick-start) - [快速上手](./docs/application/quick-start)
- [部署指南](/docs/application/deployment-guide) - [部署指南](./docs/application/deployment-guide)
- [工作区使用](/docs/application/workspace-usage) - [工作区使用](./docs/application/workspace-usage)
## 文档结构 ## 文档结构
+49
View File
@@ -3,6 +3,13 @@
import { Client as LangGraphClient } from "@langchain/langgraph-sdk/client"; import { Client as LangGraphClient } from "@langchain/langgraph-sdk/client";
import { getLangGraphBaseURL } from "../config"; import { getLangGraphBaseURL } from "../config";
import { isStaticWebsiteOnly } from "../static-mode";
import {
loadStaticDemoThread,
loadStaticDemoThreads,
staticDemoThreadState,
} from "../threads/static-demo";
import type { AgentThreadState } from "../threads/types";
import { isStateChangingMethod, readCsrfCookie } from "./fetcher"; import { isStateChangingMethod, readCsrfCookie } from "./fetcher";
import { sanitizeRunStreamOptions } from "./stream-mode"; import { sanitizeRunStreamOptions } from "./stream-mode";
@@ -32,6 +39,10 @@ function injectCsrfHeader(_url: URL, init: RequestInit): RequestInit {
} }
function createCompatibleClient(isMock?: boolean): LangGraphClient { function createCompatibleClient(isMock?: boolean): LangGraphClient {
if (isStaticWebsiteOnly() && !isMock) {
return createStaticClient();
}
const apiUrl = getLangGraphBaseURL(isMock); const apiUrl = getLangGraphBaseURL(isMock);
console.log(`Creating API client with base URL: ${apiUrl}`); console.log(`Creating API client with base URL: ${apiUrl}`);
const client = new LangGraphClient({ const client = new LangGraphClient({
@@ -58,6 +69,44 @@ function createCompatibleClient(isMock?: boolean): LangGraphClient {
return client; return client;
} }
function createStaticClient(): LangGraphClient {
const apiUrl =
typeof window === "undefined"
? "http://localhost:3000"
: window.location.origin;
const client = new LangGraphClient({ apiUrl });
client.threads.search = (async (query) => {
return loadStaticDemoThreads(query);
}) as typeof client.threads.search;
client.threads.get = (async (threadId) => {
return loadStaticDemoThread(threadId);
}) as typeof client.threads.get;
client.threads.getState = (async (threadId) => {
return staticDemoThreadState(await loadStaticDemoThread(threadId));
}) as typeof client.threads.getState;
client.threads.getHistory = (async (threadId) => {
return [staticDemoThreadState(await loadStaticDemoThread(threadId))];
}) as typeof client.threads.getHistory;
client.threads.update = (async (threadId) => {
return loadStaticDemoThread(threadId);
}) as typeof client.threads.update;
client.runs.list = (async () => []) as typeof client.runs.list;
client.runs.stream = async function* () {
/* empty */
} as typeof client.runs.stream;
client.runs.joinStream = async function* () {
/* empty */
} as typeof client.runs.joinStream;
return client as LangGraphClient<AgentThreadState>;
}
const _clients = new Map<string, LangGraphClient>(); const _clients = new Map<string, LangGraphClient>();
export function getAPIClient(isMock?: boolean): LangGraphClient { export function getAPIClient(isMock?: boolean): LangGraphClient {
const cacheKey = isMock ? "mock" : "default"; const cacheKey = isMock ? "mock" : "default";
+20
View File
@@ -1,4 +1,5 @@
import { getBackendBaseURL } from "../config"; import { getBackendBaseURL } from "../config";
import { isStaticWebsiteOnly } from "../static-mode";
import type { AgentThread } from "../threads"; import type { AgentThread } from "../threads";
export function urlOfArtifact({ export function urlOfArtifact({
@@ -12,6 +13,9 @@ export function urlOfArtifact({
download?: boolean; download?: boolean;
isMock?: boolean; isMock?: boolean;
}) { }) {
if (isStaticWebsiteOnly()) {
return staticDemoArtifactURL({ filepath, threadId, download });
}
if (isMock) { if (isMock) {
return `${getBackendBaseURL()}/mock/api/threads/${threadId}/artifacts${filepath}${download ? "?download=true" : ""}`; return `${getBackendBaseURL()}/mock/api/threads/${threadId}/artifacts${filepath}${download ? "?download=true" : ""}`;
} }
@@ -23,5 +27,21 @@ export function extractArtifactsFromThread(thread: AgentThread) {
} }
export function resolveArtifactURL(absolutePath: string, threadId: string) { export function resolveArtifactURL(absolutePath: string, threadId: string) {
if (isStaticWebsiteOnly()) {
return staticDemoArtifactURL({ filepath: absolutePath, threadId });
}
return `${getBackendBaseURL()}/api/threads/${threadId}/artifacts${absolutePath}`; return `${getBackendBaseURL()}/api/threads/${threadId}/artifacts${absolutePath}`;
} }
function staticDemoArtifactURL({
filepath,
threadId,
download = false,
}: {
filepath: string;
threadId: string;
download?: boolean;
}) {
const demoPath = filepath.replace(/^\/mnt\//, "/");
return `${getBackendBaseURL()}/demo/threads/${threadId}${demoPath}${download ? "?download=true" : ""}`;
}
+17 -3
View File
@@ -10,6 +10,8 @@ import React, {
type ReactNode, type ReactNode,
} from "react"; } from "react";
import { isStaticWebsiteOnly } from "../static-mode";
import { type User, buildLoginUrl } from "./types"; import { type User, buildLoginUrl } from "./types";
// Re-export for consumers // Re-export for consumers
@@ -46,6 +48,7 @@ export function AuthProvider({ children, initialUser }: AuthProviderProps) {
const [isLoading, setIsLoading] = useState(false); const [isLoading, setIsLoading] = useState(false);
const router = useRouter(); const router = useRouter();
const pathname = usePathname(); const pathname = usePathname();
const staticMode = isStaticWebsiteOnly();
const isAuthenticated = user !== null; const isAuthenticated = user !== null;
@@ -54,6 +57,8 @@ export function AuthProvider({ children, initialUser }: AuthProviderProps) {
* Used when initialUser might be stale (e.g., after tab was inactive) * Used when initialUser might be stale (e.g., after tab was inactive)
*/ */
const refreshUser = useCallback(async () => { const refreshUser = useCallback(async () => {
if (staticMode) return;
try { try {
setIsLoading(true); setIsLoading(true);
const res = await fetch("/api/v1/auth/me", { const res = await fetch("/api/v1/auth/me", {
@@ -77,7 +82,7 @@ export function AuthProvider({ children, initialUser }: AuthProviderProps) {
} finally { } finally {
setIsLoading(false); setIsLoading(false);
} }
}, [pathname, router]); }, [staticMode, pathname, router]);
/** /**
* Logout - call FastAPI logout endpoint and clear local state * Logout - call FastAPI logout endpoint and clear local state
@@ -87,6 +92,11 @@ export function AuthProvider({ children, initialUser }: AuthProviderProps) {
// Immediately clear local state to prevent UI flicker // Immediately clear local state to prevent UI flicker
setUser(null); setUser(null);
if (staticMode) {
router.push("/");
return;
}
try { try {
await fetch("/api/v1/auth/logout", { await fetch("/api/v1/auth/logout", {
method: "POST", method: "POST",
@@ -99,7 +109,7 @@ export function AuthProvider({ children, initialUser }: AuthProviderProps) {
// Redirect to home page // Redirect to home page
router.push("/"); router.push("/");
}, [router]); }, [staticMode, router]);
/** /**
* Handle visibility change - refresh user when tab becomes visible again. * Handle visibility change - refresh user when tab becomes visible again.
@@ -108,6 +118,8 @@ export function AuthProvider({ children, initialUser }: AuthProviderProps) {
const lastCheckRef = React.useRef(0); const lastCheckRef = React.useRef(0);
useEffect(() => { useEffect(() => {
if (staticMode) return;
const handleVisibilityChange = () => { const handleVisibilityChange = () => {
if (document.visibilityState !== "visible" || user === null) return; if (document.visibilityState !== "visible" || user === null) return;
const now = Date.now(); const now = Date.now();
@@ -120,7 +132,7 @@ export function AuthProvider({ children, initialUser }: AuthProviderProps) {
return () => { return () => {
document.removeEventListener("visibilitychange", handleVisibilityChange); document.removeEventListener("visibilitychange", handleVisibilityChange);
}; };
}, [user, refreshUser]); }, [staticMode, user, refreshUser]);
const value: AuthContextType = { const value: AuthContextType = {
user, user,
@@ -155,6 +167,8 @@ export function useRequireAuth(): AuthContextType {
const pathname = usePathname(); const pathname = usePathname();
useEffect(() => { useEffect(() => {
if (isStaticWebsiteOnly()) return;
// Only redirect if we're sure user is not authenticated (not just loading) // Only redirect if we're sure user is not authenticated (not just loading)
if (!auth.isLoading && !auth.isAuthenticated) { if (!auth.isLoading && !auth.isAuthenticated) {
router.push(buildLoginUrl(pathname || "/workspace")); router.push(buildLoginUrl(pathname || "/workspace"));
+10
View File
@@ -1,6 +1,9 @@
import { cookies } from "next/headers"; import { cookies } from "next/headers";
import { isStaticWebsiteOnly } from "../static-mode";
import { getGatewayConfig } from "./gateway-config"; import { getGatewayConfig } from "./gateway-config";
import { STATIC_WEBSITE_USER } from "./static-user";
import { type AuthResult, userSchema } from "./types"; import { type AuthResult, userSchema } from "./types";
const SSR_AUTH_TIMEOUT_MS = 5_000; const SSR_AUTH_TIMEOUT_MS = 5_000;
@@ -10,6 +13,13 @@ const SSR_AUTH_TIMEOUT_MS = 5_000;
* Returns a tagged AuthResult — callers use exhaustive switch, no try/catch. * Returns a tagged AuthResult — callers use exhaustive switch, no try/catch.
*/ */
export async function getServerSideUser(): Promise<AuthResult> { export async function getServerSideUser(): Promise<AuthResult> {
if (isStaticWebsiteOnly()) {
return {
tag: "authenticated",
user: STATIC_WEBSITE_USER,
};
}
if (process.env.DEER_FLOW_AUTH_DISABLED === "1") { if (process.env.DEER_FLOW_AUTH_DISABLED === "1") {
return { return {
tag: "authenticated", tag: "authenticated",
+8
View File
@@ -0,0 +1,8 @@
import type { User } from "./types";
export const STATIC_WEBSITE_USER: User = {
id: "static-website-user",
email: "static@example.local",
system_role: "admin",
needs_setup: false,
};
+10
View File
@@ -1,8 +1,18 @@
import { getBackendBaseURL } from "../config"; import { getBackendBaseURL } from "../config";
import { isStaticWebsiteOnly } from "../static-mode";
import type { ModelsResponse } from "./types"; import type { ModelsResponse } from "./types";
const STATIC_MODELS_RESPONSE: ModelsResponse = {
models: [],
token_usage: { enabled: false },
};
export async function loadModels(): Promise<ModelsResponse> { export async function loadModels(): Promise<ModelsResponse> {
if (isStaticWebsiteOnly()) {
return STATIC_MODELS_RESPONSE;
}
const res = await fetch(`${getBackendBaseURL()}/api/models`); const res = await fetch(`${getBackendBaseURL()}/api/models`);
const data = (await res.json()) as Partial<ModelsResponse>; const data = (await res.json()) as Partial<ModelsResponse>;
return { return {
+5
View File
@@ -0,0 +1,5 @@
import { env } from "@/env";
export function isStaticWebsiteOnly() {
return env.NEXT_PUBLIC_STATIC_WEBSITE_ONLY === "true";
}
+87
View File
@@ -0,0 +1,87 @@
import type { ThreadState } from "@langchain/langgraph-sdk";
import type { ThreadsClient } from "@langchain/langgraph-sdk/client";
import type { AgentThread, AgentThreadState } from "./types";
export const DEMO_THREAD_IDS = [
"21cfea46-34bd-4aa6-9e1f-3009452fbeb9",
"3823e443-4e2b-4679-b496-a9506eae462b",
"4f3e55ee-f853-43db-bfb3-7d1a411f03cb",
"5aa47db1-d0cb-4eb9-aea5-3dac1b371c5a",
"7cfa5f8f-a2f8-47ad-acbd-da7137baf990",
"7f9dc56c-e49c-4671-a3d2-c492ff4dce0c",
"90040b36-7eba-4b97-ba89-02c3ad47a8b9",
"ad76c455-5bf9-4335-8517-fc03834ab828",
"b83fbb2a-4e36-4d82-9de0-7b2a02c2092a",
"c02bb4d5-4202-490e-ae8f-ff4864fc0d2e",
"d3e5adaf-084c-4dd5-9d29-94f1d6bccd98",
"f4125791-0128-402a-8ca9-50e0947557e4",
"fe3f7974-1bcb-4a01-a950-79673baafefd",
] as const;
export type ThreadSearchParams = NonNullable<
Parameters<ThreadsClient["search"]>[0]
>;
export async function loadStaticDemoThreads(
params: ThreadSearchParams = {},
): Promise<AgentThread[]> {
const threads = await Promise.all(
DEMO_THREAD_IDS.map((threadId) => loadStaticDemoThread(threadId)),
);
const sortBy = params.sortBy ?? "updated_at";
const sortOrder = params.sortOrder ?? "desc";
const sortedThreads = [...threads].sort((a, b) => {
const aTimestamp = (a as unknown as Record<string, unknown>)[sortBy];
const bTimestamp = (b as unknown as Record<string, unknown>)[sortBy];
const aParsed = typeof aTimestamp === "string" ? Date.parse(aTimestamp) : 0;
const bParsed = typeof bTimestamp === "string" ? Date.parse(bTimestamp) : 0;
const aValue = Number.isNaN(aParsed) ? 0 : aParsed;
const bValue = Number.isNaN(bParsed) ? 0 : bParsed;
return sortOrder === "asc" ? aValue - bValue : bValue - aValue;
});
const offset = Math.max(0, Math.floor(params.offset ?? 0));
const limit =
typeof params.limit === "number"
? Math.max(0, Math.floor(params.limit))
: sortedThreads.length;
return sortedThreads.slice(offset, offset + limit);
}
export async function loadStaticDemoThread(
threadId: string,
): Promise<AgentThread> {
const response = await globalThis.fetch(
`/demo/threads/${encodeURIComponent(threadId)}/thread.json`,
);
if (!response.ok) {
throw new Error(`Failed to load demo thread ${threadId}`);
}
const thread = (await response.json()) as AgentThread;
return {
...thread,
thread_id: threadId,
updated_at: thread.updated_at ?? thread.created_at,
};
}
export function staticDemoThreadState(
thread: AgentThread,
): ThreadState<AgentThreadState> {
return {
values: thread.values,
next: [],
checkpoint: {
thread_id: thread.thread_id,
checkpoint_ns: "",
checkpoint_id: null,
checkpoint_map: null,
},
metadata: thread.metadata ?? null,
created_at: thread.updated_at ?? thread.created_at ?? null,
parent_checkpoint: null,
tasks: [],
};
}
@@ -0,0 +1,69 @@
import { afterEach, beforeEach, describe, expect, test, vi } from "vitest";
const ENV_KEYS = [
"NEXT_PUBLIC_BACKEND_BASE_URL",
"NEXT_PUBLIC_STATIC_WEBSITE_ONLY",
] as const;
type EnvSnapshot = Partial<
Record<(typeof ENV_KEYS)[number], string | undefined>
>;
function snapshotEnv(): EnvSnapshot {
const snapshot: EnvSnapshot = {};
for (const key of ENV_KEYS) {
snapshot[key] = process.env[key];
}
return snapshot;
}
function setEnv(key: (typeof ENV_KEYS)[number], value: string | undefined) {
const env = process.env as Record<string, string | undefined>;
if (value === undefined) {
delete env[key];
} else {
env[key] = value;
}
}
function restoreEnv(snapshot: EnvSnapshot) {
for (const key of ENV_KEYS) {
setEnv(key, snapshot[key]);
}
}
async function loadFreshArtifactUtils() {
vi.resetModules();
return await import("@/core/artifacts/utils");
}
describe("artifact URL helpers", () => {
let saved: EnvSnapshot;
beforeEach(() => {
saved = snapshotEnv();
setEnv("NEXT_PUBLIC_BACKEND_BASE_URL", undefined);
setEnv("NEXT_PUBLIC_STATIC_WEBSITE_ONLY", undefined);
});
afterEach(() => {
restoreEnv(saved);
});
test("maps static demo artifact paths to bundled public files", async () => {
setEnv("NEXT_PUBLIC_STATIC_WEBSITE_ONLY", "true");
const { resolveArtifactURL, urlOfArtifact } =
await loadFreshArtifactUtils();
expect(
urlOfArtifact({
filepath: "/mnt/user-data/outputs/index.html",
threadId: "thread-1",
}),
).toBe("/demo/threads/thread-1/user-data/outputs/index.html");
expect(
resolveArtifactURL("/mnt/user-data/outputs/style.css", "thread-1"),
).toBe("/demo/threads/thread-1/user-data/outputs/style.css");
});
});
@@ -0,0 +1,77 @@
import { afterEach, beforeEach, describe, expect, test, vi } from "vitest";
import { STATIC_WEBSITE_USER } from "@/core/auth/static-user";
vi.mock("next/headers", () => ({
cookies: vi.fn(() => {
throw new Error("cookies should not be read in static website mode");
}),
}));
const ENV_KEYS = [
"DEER_FLOW_AUTH_DISABLED",
"NEXT_PUBLIC_STATIC_WEBSITE_ONLY",
] as const;
type EnvSnapshot = Partial<
Record<(typeof ENV_KEYS)[number], string | undefined>
>;
function snapshotEnv(): EnvSnapshot {
const snapshot: EnvSnapshot = {};
for (const key of ENV_KEYS) {
snapshot[key] = process.env[key];
}
return snapshot;
}
function setEnv(key: (typeof ENV_KEYS)[number], value: string | undefined) {
const env = process.env as Record<string, string | undefined>;
if (value === undefined) {
delete env[key];
} else {
env[key] = value;
}
}
function restoreEnv(snapshot: EnvSnapshot) {
for (const key of ENV_KEYS) {
setEnv(key, snapshot[key]);
}
}
async function loadFreshServerAuth() {
vi.resetModules();
return await import("@/core/auth/server");
}
describe("getServerSideUser", () => {
let saved: EnvSnapshot;
beforeEach(() => {
saved = snapshotEnv();
setEnv("DEER_FLOW_AUTH_DISABLED", undefined);
setEnv("NEXT_PUBLIC_STATIC_WEBSITE_ONLY", undefined);
});
afterEach(() => {
restoreEnv(saved);
vi.unstubAllGlobals();
});
test("bypasses gateway auth in static website mode", async () => {
setEnv("NEXT_PUBLIC_STATIC_WEBSITE_ONLY", "true");
const fetchSpy = vi.fn(() => {
throw new Error("fetch should not be called in static website mode");
});
vi.stubGlobal("fetch", fetchSpy);
const { getServerSideUser } = await loadFreshServerAuth();
await expect(getServerSideUser()).resolves.toEqual({
tag: "authenticated",
user: STATIC_WEBSITE_USER,
});
expect(fetchSpy).not.toHaveBeenCalled();
});
});