Compare commits

..

2 Commits

Author SHA1 Message Date
Willem Jiang cfd9c61b9a Merge branch 'main' into fix-3127 2026-05-22 21:56:04 +08:00
Willem Jiang 4731605d99 fix(sandbox): add group/other read permissions to uploaded files for Docker sandbox (#3127)
When using AIO sandbox with LocalContainerBackend, uploaded files are
  created with 0o600 (owner-only) permissions by the gateway process
  running as root. The sandbox process inside the Docker container runs
  as a non-root user and cannot read these bind-mounted files, causing
  a "Permission denied" error on read_file.

  Add `needs_upload_permission_adjustment` attribute to SandboxProvider
  (default True) to indicate that uploaded files need chmod adjustment.
  LocalSandboxProvider opts out (same user). A new `_make_file_sandbox_readable`
  function adds S_IRGRP | S_IROTH bits after files are written, changing
  permissions from 0o600 to 0o644 so the sandbox can read the uploads.

  fixes #3127
2026-05-21 17:51:56 +08:00
33 changed files with 204 additions and 1235 deletions
-35
View File
@@ -37,36 +37,11 @@ if TYPE_CHECKING:
from app.gateway.auth.local_provider import LocalAuthProvider
from app.gateway.auth.repositories.sqlite import SQLiteUserRepository
from deerflow.persistence.thread_meta.base import ThreadMetaStore
from deerflow.runtime import RunRecord
T = TypeVar("T")
async def _mark_latest_recovered_threads_error(
run_manager: RunManager,
thread_store: ThreadMetaStore,
recovered_runs: list[RunRecord],
) -> None:
"""Mark thread status as error only when its newest run was recovered."""
recovered_by_thread: dict[str, set[str]] = {}
for record in recovered_runs:
recovered_by_thread.setdefault(record.thread_id, set()).add(record.run_id)
for thread_id, recovered_run_ids in recovered_by_thread.items():
try:
latest_runs = await run_manager.list_by_thread(thread_id, user_id=None, limit=1)
except Exception:
logger.warning("Failed to find latest run for thread %s during run reconciliation", thread_id, exc_info=True)
continue
if not latest_runs or latest_runs[0].run_id not in recovered_run_ids:
continue
try:
await thread_store.update_status(thread_id, "error", user_id=None)
except Exception:
logger.warning("Failed to mark thread %s as error during run reconciliation", thread_id, exc_info=True)
def get_config() -> AppConfig:
"""Return the freshest ``AppConfig`` for the current request.
@@ -163,16 +138,6 @@ async def langgraph_runtime(app: FastAPI, startup_config: AppConfig) -> AsyncGen
# RunManager with store backing for persistence
app.state.run_manager = RunManager(store=app.state.run_store)
if getattr(config.database, "backend", None) == "sqlite":
from deerflow.utils.time import now_iso
# Startup-only recovery: clean shutdowns return no active rows and
# the thread-status update below becomes a no-op.
recovered_runs = await app.state.run_manager.reconcile_orphaned_inflight_runs(
error="Gateway restarted before this run reached a durable final state.",
before=now_iso(),
)
await _mark_latest_recovered_threads_error(app.state.run_manager, app.state.thread_store, recovered_runs)
try:
yield
+28
View File
@@ -74,6 +74,25 @@ def _make_file_sandbox_writable(file_path: os.PathLike[str] | str) -> None:
os.chmod(file_path, writable_mode, **chmod_kwargs)
def _make_file_sandbox_readable(file_path: os.PathLike[str] | str) -> None:
"""Ensure uploaded files are readable by the sandbox process.
For Docker sandboxes (AIO), the gateway writes files as root with 0o600
permissions, then bind-mounts the host directory into the container. The
sandbox process inside the container runs as a non-root user and cannot
read those files without group/other read bits. This function adds
``S_IRGRP | S_IROTH`` so the sandbox can read the uploaded content.
"""
file_stat = os.lstat(file_path)
if stat.S_ISLNK(file_stat.st_mode):
logger.warning("Skipping sandbox chmod for symlinked upload path: %s", file_path)
return
readable_mode = stat.S_IMODE(file_stat.st_mode) | stat.S_IRGRP | stat.S_IROTH
chmod_kwargs = {"follow_symlinks": False} if os.chmod in os.supports_follow_symlinks else {}
os.chmod(file_path, readable_mode, **chmod_kwargs)
def _uses_thread_data_mounts(sandbox_provider: SandboxProvider) -> bool:
return bool(getattr(sandbox_provider, "uses_thread_data_mounts", False))
@@ -276,6 +295,15 @@ async def upload_files(
_cleanup_uploaded_paths(written_paths)
raise HTTPException(status_code=500, detail=f"Failed to upload {file.filename}: {str(e)}")
# When the sandbox uses bind-mounted thread data directories (e.g. AIO with
# LocalContainerBackend), uploaded files are visible inside the container but
# retain the 0o600 permissions set by the gateway. The sandbox process runs
# as a different user and cannot read them. Adjust permissions to add
# group/other read bits so the sandbox can access the files.
if not sync_to_sandbox and getattr(sandbox_provider, "needs_upload_permission_adjustment", True):
for file_path in written_paths:
_make_file_sandbox_readable(file_path)
if sync_to_sandbox:
for file_path, virtual_path in sandbox_sync_targets:
_make_file_sandbox_writable(file_path)
@@ -94,35 +94,25 @@ class RunRepository(RunStore):
created_at=None,
follow_up_to_run_id=None,
):
"""Insert or update a run row.
``RunManager`` retries ``put`` after transient SQLite failures. Making
this operation idempotent prevents a successful-but-unacknowledged first
commit from turning the retry into a primary-key failure.
"""
resolved_user_id = resolve_user_id(user_id, method_name="RunRepository.put")
now = datetime.now(UTC)
created = datetime.fromisoformat(created_at) if created_at else now
values = {
"thread_id": thread_id,
"assistant_id": assistant_id,
"user_id": resolved_user_id,
"model_name": self._normalize_model_name(model_name),
"status": status,
"multitask_strategy": multitask_strategy,
"metadata_json": self._safe_json(metadata) or {},
"kwargs_json": self._safe_json(kwargs) or {},
"error": error,
"follow_up_to_run_id": follow_up_to_run_id,
"updated_at": now,
}
row = RunRow(
run_id=run_id,
thread_id=thread_id,
assistant_id=assistant_id,
user_id=resolved_user_id,
model_name=self._normalize_model_name(model_name),
status=status,
multitask_strategy=multitask_strategy,
metadata_json=self._safe_json(metadata) or {},
kwargs_json=self._safe_json(kwargs) or {},
error=error,
follow_up_to_run_id=follow_up_to_run_id,
created_at=datetime.fromisoformat(created_at) if created_at else now,
updated_at=now,
)
async with self._sf() as session:
row = await session.get(RunRow, run_id)
if row is None:
session.add(RunRow(run_id=run_id, created_at=created, **values))
else:
for key, value in values.items():
setattr(row, key, value)
session.add(row)
await session.commit()
async def get(
@@ -156,14 +146,13 @@ class RunRepository(RunStore):
result = await session.execute(stmt)
return [self._row_to_dict(r) for r in result.scalars()]
async def update_status(self, run_id, status, *, error=None) -> bool:
async def update_status(self, run_id, status, *, error=None):
values: dict[str, Any] = {"status": status, "updated_at": datetime.now(UTC)}
if error is not None:
values["error"] = error
async with self._sf() as session:
result = await session.execute(update(RunRow).where(RunRow.run_id == run_id).values(**values))
await session.execute(update(RunRow).where(RunRow.run_id == run_id).values(**values))
await session.commit()
return result.rowcount != 0
async def update_model_name(self, run_id, model_name):
async with self._sf() as session:
@@ -198,26 +187,6 @@ class RunRepository(RunStore):
result = await session.execute(stmt)
return [self._row_to_dict(r) for r in result.scalars()]
async def list_inflight(self, *, before=None):
"""Return persisted active runs for startup recovery."""
if before is None:
before_dt = datetime.now(UTC)
elif isinstance(before, datetime):
before_dt = before
else:
before_dt = datetime.fromisoformat(before)
stmt = (
select(RunRow)
.where(
RunRow.status.in_(("pending", "running")),
RunRow.created_at <= before_dt,
)
.order_by(RunRow.created_at.asc())
)
async with self._sf() as session:
result = await session.execute(stmt)
return [self._row_to_dict(r) for r in result.scalars()]
async def update_run_completion(
self,
run_id: str,
@@ -234,11 +203,8 @@ class RunRepository(RunStore):
last_ai_message: str | None = None,
first_human_message: str | None = None,
error: str | None = None,
) -> bool:
"""Update status + token usage + convenience fields on run completion.
Returns ``False`` when no run row matched the requested ``run_id``.
"""
) -> None:
"""Update status + token usage + convenience fields on run completion."""
values: dict[str, Any] = {
"status": status,
"total_input_tokens": total_input_tokens,
@@ -258,9 +224,8 @@ class RunRepository(RunStore):
if error is not None:
values["error"] = error
async with self._sf() as session:
result = await session.execute(update(RunRow).where(RunRow.run_id == run_id).values(**values))
await session.execute(update(RunRow).where(RunRow.run_id == run_id).values(**values))
await session.commit()
return result.rowcount != 0
async def update_run_progress(
self,
@@ -4,9 +4,7 @@ from __future__ import annotations
import asyncio
import logging
import sqlite3
import uuid
from collections.abc import Awaitable, Callable
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any
@@ -19,57 +17,6 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
_RETRYABLE_SQLITE_MESSAGES = (
"database is locked",
"database table is locked",
"database is busy",
)
_RETRYABLE_SQLITE_ERROR_CODES = {
sqlite3.SQLITE_BUSY,
sqlite3.SQLITE_LOCKED,
}
def _is_retryable_persistence_error(exc: BaseException) -> bool:
"""Return True for transient SQLite persistence failures.
SQLite lock contention normally surfaces through either sqlite3 exceptions
or SQLAlchemy wrappers. The short bounded retry here protects run status
finalization from transient writer pressure without hiding permanent
failures forever.
"""
pending: list[BaseException] = [exc]
seen: set[int] = set()
while pending:
current = pending.pop()
if id(current) in seen:
continue
seen.add(id(current))
message = str(current).lower()
if any(fragment in message for fragment in _RETRYABLE_SQLITE_MESSAGES):
return True
if isinstance(current, (sqlite3.OperationalError, sqlite3.DatabaseError)):
error_code = getattr(current, "sqlite_errorcode", None)
if error_code in _RETRYABLE_SQLITE_ERROR_CODES:
return True
for chained in (getattr(current, "orig", None), current.__cause__, current.__context__):
if isinstance(chained, BaseException):
pending.append(chained)
return False
@dataclass(frozen=True)
class PersistenceRetryPolicy:
"""Bounded retry policy for short run-store writes."""
max_attempts: int = 5
initial_delay: float = 0.05
max_delay: float = 1.0
backoff_factor: float = 2.0
@dataclass
class RunRecord:
@@ -111,100 +58,38 @@ class RunManager:
that run history survives process restarts.
"""
def __init__(
self,
store: RunStore | None = None,
*,
persistence_retry_policy: PersistenceRetryPolicy | None = None,
) -> None:
def __init__(self, store: RunStore | None = None) -> None:
self._runs: dict[str, RunRecord] = {}
self._lock = asyncio.Lock()
self._store = store
self._persistence_retry_policy = persistence_retry_policy or PersistenceRetryPolicy()
@staticmethod
def _store_put_payload(record: RunRecord, *, error: str | None = None) -> dict[str, Any]:
return {
"thread_id": record.thread_id,
"assistant_id": record.assistant_id,
"status": record.status.value,
"multitask_strategy": record.multitask_strategy,
"metadata": record.metadata or {},
"kwargs": record.kwargs or {},
"error": error if error is not None else record.error,
"created_at": record.created_at,
"model_name": record.model_name,
}
async def _call_store_with_retry(
self,
operation_name: str,
run_id: str,
operation: Callable[[], Awaitable[Any]],
) -> Any:
"""Run a short store operation with bounded retries for SQLite pressure."""
policy = self._persistence_retry_policy
attempt = 1
delay = policy.initial_delay
while True:
try:
return await operation()
except Exception as exc:
retryable = _is_retryable_persistence_error(exc)
if attempt >= policy.max_attempts or not retryable:
raise
logger.warning(
"Transient persistence failure during %s for run %s (attempt %d/%d); retrying",
operation_name,
run_id,
attempt,
policy.max_attempts,
exc_info=True,
)
if delay > 0:
await asyncio.sleep(delay)
delay = min(policy.max_delay, delay * policy.backoff_factor if delay else policy.initial_delay)
attempt += 1
async def _persist_snapshot_to_store(self, run_id: str, payload: dict[str, Any]) -> bool:
"""Best-effort persist a previously captured run snapshot."""
if self._store is None:
return True
try:
await self._call_store_with_retry(
"put",
run_id,
lambda: self._store.put(run_id, **payload),
)
return True
except Exception:
logger.warning("Failed to persist run %s to store", run_id, exc_info=True)
return False
async def _persist_to_store(self, record: RunRecord, *, error: str | None = None) -> bool:
async def _persist_to_store(self, record: RunRecord) -> None:
"""Best-effort persist run record to backing store."""
return await self._persist_snapshot_to_store(
record.run_id,
self._store_put_payload(record, error=error),
)
if self._store is None:
return
try:
await self._store.put(
record.run_id,
thread_id=record.thread_id,
assistant_id=record.assistant_id,
status=record.status.value,
multitask_strategy=record.multitask_strategy,
metadata=record.metadata or {},
kwargs=record.kwargs or {},
created_at=record.created_at,
model_name=record.model_name,
)
except Exception:
logger.warning("Failed to persist run %s to store", record.run_id, exc_info=True)
async def _persist_status(self, record: RunRecord, status: RunStatus, *, error: str | None = None) -> bool:
async def _persist_status(self, run_id: str, status: RunStatus, *, error: str | None = None) -> None:
"""Best-effort persist a status transition to the backing store."""
if self._store is None:
return True
row_recovery_payload = self._store_put_payload(record, error=error)
return
try:
updated = await self._call_store_with_retry(
"update_status",
record.run_id,
lambda: self._store.update_status(record.run_id, status.value, error=error),
)
if updated is False:
return await self._persist_snapshot_to_store(record.run_id, row_recovery_payload)
return True
await self._store.update_status(run_id, status.value, error=error)
except Exception:
logger.warning("Failed to persist status update for run %s", record.run_id, exc_info=True)
return False
logger.warning("Failed to persist status update for run %s", run_id, exc_info=True)
@staticmethod
def _record_from_store(row: dict[str, Any]) -> RunRecord:
@@ -241,7 +126,6 @@ class RunManager:
async def update_run_completion(self, run_id: str, **kwargs) -> None:
"""Persist token usage and completion data to the backing store."""
row_recovery_payload: dict[str, Any] | None = None
async with self._lock:
record = self._runs.get(run_id)
if record is not None:
@@ -251,30 +135,11 @@ class RunManager:
if hasattr(record, key) and value is not None:
setattr(record, key, value)
record.updated_at = _now_iso()
row_recovery_payload = self._store_put_payload(record, error=kwargs.get("error"))
if self._store is None:
return
try:
updated = await self._call_store_with_retry(
"update_run_completion",
run_id,
lambda: self._store.update_run_completion(run_id, **kwargs),
)
if updated is False:
if row_recovery_payload is None:
logger.warning("Failed to recreate missing run %s for completion persistence", run_id)
return
if not await self._persist_snapshot_to_store(run_id, row_recovery_payload):
return
recovered = await self._call_store_with_retry(
"update_run_completion",
run_id,
lambda: self._store.update_run_completion(run_id, **kwargs),
)
if recovered is False:
logger.warning("Run completion update for %s affected no rows after row recreation", run_id)
except Exception:
logger.warning("Failed to persist run completion for %s", run_id, exc_info=True)
if self._store is not None:
try:
await self._store.update_run_completion(run_id, **kwargs)
except Exception:
logger.warning("Failed to persist run completion for %s", run_id, exc_info=True)
async def update_run_progress(self, run_id: str, **kwargs) -> None:
"""Persist a running token/message snapshot without changing status."""
@@ -408,7 +273,7 @@ class RunManager:
record.updated_at = _now_iso()
if error is not None:
record.error = error
await self._persist_status(record, status, error=error)
await self._persist_status(run_id, status, error=error)
logger.info("Run %s -> %s", run_id, status.value)
async def _persist_model_name(self, run_id: str, model_name: str | None) -> None:
@@ -416,11 +281,7 @@ class RunManager:
if self._store is None:
return
try:
await self._call_store_with_retry(
"update_model_name",
run_id,
lambda: self._store.update_model_name(run_id, model_name),
)
await self._store.update_model_name(run_id, model_name)
except Exception:
logger.warning("Failed to persist model_name update for run %s", run_id, exc_info=True)
@@ -463,7 +324,7 @@ class RunManager:
record.task.cancel()
record.status = RunStatus.interrupted
record.updated_at = _now_iso()
await self._persist_status(record, RunStatus.interrupted)
await self._persist_status(run_id, RunStatus.interrupted)
logger.info("Run %s cancelled (action=%s)", run_id, action)
return True
@@ -491,7 +352,7 @@ class RunManager:
now = _now_iso()
_supported_strategies = ("reject", "interrupt", "rollback")
interrupted_records: list[RunRecord] = []
interrupted_run_ids: list[str] = []
async with self._lock:
if multitask_strategy not in _supported_strategies:
@@ -510,7 +371,7 @@ class RunManager:
r.task.cancel()
r.status = RunStatus.interrupted
r.updated_at = now
interrupted_records.append(r)
interrupted_run_ids.append(r.run_id)
logger.info(
"Cancelled %d inflight run(s) on thread %s (strategy=%s)",
len(inflight),
@@ -533,66 +394,12 @@ class RunManager:
)
self._runs[run_id] = record
for interrupted_record in interrupted_records:
await self._persist_status(interrupted_record, RunStatus.interrupted)
for interrupted_run_id in interrupted_run_ids:
await self._persist_status(interrupted_run_id, RunStatus.interrupted)
await self._persist_to_store(record)
logger.info("Run created: run_id=%s thread_id=%s", run_id, thread_id)
return record
async def reconcile_orphaned_inflight_runs(
self,
*,
error: str,
before: str | None = None,
) -> list[RunRecord]:
"""Mark persisted active runs as failed when no local task owns them.
Gateway runs are process-local: the asyncio task and abort event live in
memory, while the run row is durable. After a SQLite-backed gateway
restart, any persisted ``pending`` or ``running`` row created before
startup cannot still have a local worker. This recovery step turns that
ambiguous state into an explicit error instead of letting the UI show an
indefinite active run.
"""
if self._store is None:
return []
try:
rows = await self._call_store_with_retry(
"list_inflight",
"*",
lambda: self._store.list_inflight(before=before),
)
except Exception:
logger.warning("Failed to list orphaned inflight runs for reconciliation", exc_info=True)
return []
recovered: list[RunRecord] = []
now = _now_iso()
for row in rows:
try:
record = self._record_from_store(row)
except Exception:
logger.warning("Failed to map orphaned run row during reconciliation", exc_info=True)
continue
async with self._lock:
live_record = self._runs.get(record.run_id)
if live_record is not None and live_record.status in (RunStatus.pending, RunStatus.running):
continue
record.status = RunStatus.error
record.error = error
record.updated_at = now
persisted = await self._persist_status(record, RunStatus.error, error=error)
if not persisted:
logger.warning("Skipped orphaned run %s recovery because error status was not persisted", record.run_id)
continue
recovered.append(record)
if recovered:
logger.warning("Recovered %d orphaned inflight run(s) as error", len(recovered))
return recovered
async def has_inflight(self, thread_id: str) -> bool:
"""Return ``True`` if *thread_id* has a pending or running run."""
async with self._lock:
@@ -59,12 +59,7 @@ class RunStore(abc.ABC):
status: str,
*,
error: str | None = None,
) -> bool | None:
"""Update a run status.
Returns ``False`` when the store can prove no row was updated. Older or
lightweight stores may return ``None`` when they cannot report rowcount.
"""
) -> None:
pass
@abc.abstractmethod
@@ -97,11 +92,7 @@ class RunStore(abc.ABC):
last_ai_message: str | None = None,
first_human_message: str | None = None,
error: str | None = None,
) -> bool | None:
"""Persist final completion fields.
Returns ``False`` when the store can prove no row was updated.
"""
) -> None:
pass
async def update_run_progress(
@@ -126,11 +117,6 @@ class RunStore(abc.ABC):
async def list_pending(self, *, before: str | None = None) -> list[dict[str, Any]]:
pass
@abc.abstractmethod
async def list_inflight(self, *, before: str | None = None) -> list[dict[str, Any]]:
"""Return persisted runs that are still ``pending`` or ``running``."""
pass
@abc.abstractmethod
async def aggregate_tokens_by_thread(self, thread_id: str, *, include_active: bool = False) -> dict[str, Any]:
"""Aggregate token usage for completed runs in a thread.
@@ -65,8 +65,6 @@ class MemoryRunStore(RunStore):
if error is not None:
self._runs[run_id]["error"] = error
self._runs[run_id]["updated_at"] = datetime.now(UTC).isoformat()
return True
return False
async def update_model_name(self, run_id, model_name):
if run_id in self._runs:
@@ -83,8 +81,6 @@ class MemoryRunStore(RunStore):
if value is not None:
self._runs[run_id][key] = value
self._runs[run_id]["updated_at"] = datetime.now(UTC).isoformat()
return True
return False
async def update_run_progress(self, run_id, **kwargs):
if run_id in self._runs and self._runs[run_id].get("status") == "running":
@@ -99,12 +95,6 @@ class MemoryRunStore(RunStore):
results.sort(key=lambda r: r["created_at"])
return results
async def list_inflight(self, *, before=None):
now = before or datetime.now(UTC).isoformat()
results = [r for r in self._runs.values() if r["status"] in ("pending", "running") and r["created_at"] <= now]
results.sort(key=lambda r: r["created_at"])
return results
async def aggregate_tokens_by_thread(self, thread_id: str, *, include_active: bool = False) -> dict[str, Any]:
statuses = ("success", "error", "running") if include_active else ("success", "error")
completed = [r for r in self._runs.values() if r["thread_id"] == thread_id and r.get("status") in statuses]
@@ -63,6 +63,7 @@ class LocalSandboxProvider(SandboxProvider):
"""
uses_thread_data_mounts = True
needs_upload_permission_adjustment = False
def __init__(self, max_cached_threads: int = DEFAULT_MAX_CACHED_THREAD_SANDBOXES):
"""Initialize the local sandbox provider with static path mappings.
@@ -10,6 +10,7 @@ class SandboxProvider(ABC):
"""Abstract base class for sandbox providers"""
uses_thread_data_mounts: bool = False
needs_upload_permission_adjustment: bool = True
@abstractmethod
def acquire(self, thread_id: str | None = None) -> str:
-127
View File
@@ -1,127 +0,0 @@
"""Gateway startup recovery for stale persisted runs."""
from __future__ import annotations
from contextlib import asynccontextmanager
from types import SimpleNamespace
import pytest
from fastapi import FastAPI
import deerflow.runtime as runtime_module
from app.gateway import deps as gateway_deps
from deerflow.persistence import engine as engine_module
from deerflow.persistence import thread_meta as thread_meta_module
from deerflow.runtime.checkpointer import async_provider as checkpointer_module
from deerflow.runtime.events import store as event_store_module
@asynccontextmanager
async def _fake_context(value):
yield value
class _FakeRunManager:
"""RunManager double that records startup reconciliation calls."""
instances: list[_FakeRunManager] = []
recovered_runs = [SimpleNamespace(run_id="run-1", thread_id="thread-1")]
latest_by_thread: dict[str, list[SimpleNamespace]] = {}
def __init__(self, *, store):
self.store = store
self.reconcile_calls: list[dict] = []
self.list_by_thread_calls: list[dict] = []
_FakeRunManager.instances.append(self)
async def reconcile_orphaned_inflight_runs(self, *, error: str, before: str | None = None):
self.reconcile_calls.append({"error": error, "before": before})
return self.recovered_runs
async def list_by_thread(self, thread_id: str, *, user_id=None, limit: int = 100):
self.list_by_thread_calls.append({"thread_id": thread_id, "user_id": user_id, "limit": limit})
return self.latest_by_thread.get(thread_id, self.recovered_runs[:limit])
class _FakeThreadStore:
def __init__(self) -> None:
self.status_updates: list[tuple[str, str, str | None]] = []
async def update_status(self, thread_id: str, status: str, *, user_id=None) -> None:
self.status_updates.append((thread_id, status, user_id))
@pytest.mark.anyio
async def test_sqlite_runtime_reconciles_orphaned_runs_on_startup(monkeypatch):
"""SQLite startup should recover stale active runs before serving requests."""
app = FastAPI()
config = SimpleNamespace(
database=SimpleNamespace(backend="sqlite"),
run_events=SimpleNamespace(backend="memory"),
)
thread_store = _FakeThreadStore()
_FakeRunManager.instances.clear()
_FakeRunManager.recovered_runs = [SimpleNamespace(run_id="run-1", thread_id="thread-1")]
_FakeRunManager.latest_by_thread = {}
async def fake_init_engine_from_config(_database):
return None
async def fake_close_engine():
return None
monkeypatch.setattr(engine_module, "init_engine_from_config", fake_init_engine_from_config)
monkeypatch.setattr(engine_module, "get_session_factory", lambda: None)
monkeypatch.setattr(engine_module, "close_engine", fake_close_engine)
monkeypatch.setattr(runtime_module, "make_stream_bridge", lambda _config: _fake_context(object()))
monkeypatch.setattr(checkpointer_module, "make_checkpointer", lambda _config: _fake_context(object()))
monkeypatch.setattr(runtime_module, "make_store", lambda _config: _fake_context(object()))
monkeypatch.setattr(thread_meta_module, "make_thread_store", lambda _sf, _store: thread_store)
monkeypatch.setattr(event_store_module, "make_run_event_store", lambda _config: object())
monkeypatch.setattr(gateway_deps, "RunManager", _FakeRunManager)
async with gateway_deps.langgraph_runtime(app, config):
pass
assert len(_FakeRunManager.instances) == 1
assert _FakeRunManager.instances[0].reconcile_calls
assert _FakeRunManager.instances[0].reconcile_calls[0]["error"]
assert _FakeRunManager.instances[0].list_by_thread_calls == [{"thread_id": "thread-1", "user_id": None, "limit": 1}]
assert thread_store.status_updates == [("thread-1", "error", None)]
@pytest.mark.anyio
async def test_sqlite_runtime_does_not_mark_thread_error_when_newer_run_is_success(monkeypatch):
"""Startup recovery should not let an old orphaned run overwrite a newer terminal thread state."""
app = FastAPI()
config = SimpleNamespace(
database=SimpleNamespace(backend="sqlite"),
run_events=SimpleNamespace(backend="memory"),
)
thread_store = _FakeThreadStore()
_FakeRunManager.instances.clear()
_FakeRunManager.recovered_runs = [SimpleNamespace(run_id="old-running", thread_id="thread-1")]
_FakeRunManager.latest_by_thread = {"thread-1": [SimpleNamespace(run_id="newer-success", thread_id="thread-1", status="success")]}
async def fake_init_engine_from_config(_database):
return None
async def fake_close_engine():
return None
monkeypatch.setattr(engine_module, "init_engine_from_config", fake_init_engine_from_config)
monkeypatch.setattr(engine_module, "get_session_factory", lambda: None)
monkeypatch.setattr(engine_module, "close_engine", fake_close_engine)
monkeypatch.setattr(runtime_module, "make_stream_bridge", lambda _config: _fake_context(object()))
monkeypatch.setattr(checkpointer_module, "make_checkpointer", lambda _config: _fake_context(object()))
monkeypatch.setattr(runtime_module, "make_store", lambda _config: _fake_context(object()))
monkeypatch.setattr(thread_meta_module, "make_thread_store", lambda _sf, _store: thread_store)
monkeypatch.setattr(event_store_module, "make_run_event_store", lambda _config: object())
monkeypatch.setattr(gateway_deps, "RunManager", _FakeRunManager)
async with gateway_deps.langgraph_runtime(app, config):
pass
assert len(_FakeRunManager.instances) == 1
assert _FakeRunManager.instances[0].list_by_thread_calls == [{"thread_id": "thread-1", "user_id": None, "limit": 1}]
assert thread_store.status_updates == []
-240
View File
@@ -1,15 +1,10 @@
"""Tests for RunManager."""
import logging
import re
import sqlite3
from typing import Any
import pytest
from sqlalchemy.exc import DatabaseError as SQLAlchemyDatabaseError
from deerflow.runtime import DisconnectMode, RunManager, RunStatus
from deerflow.runtime.runs.manager import PersistenceRetryPolicy
from deerflow.runtime.runs.store.memory import MemoryRunStore
ISO_RE = re.compile(r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}")
@@ -20,92 +15,6 @@ def manager() -> RunManager:
return RunManager()
class FlakyStatusRunStore(MemoryRunStore):
"""Memory run store that simulates transient SQLite status-write failures."""
def __init__(self, *, status_failures: int) -> None:
super().__init__()
self.status_failures = status_failures
self.status_update_attempts = 0
async def update_status(self, run_id, status, *, error=None):
self.status_update_attempts += 1
if self.status_failures > 0:
self.status_failures -= 1
raise sqlite3.OperationalError("database is locked")
return await super().update_status(run_id, status, error=error)
class MissingRowStatusRunStore(MemoryRunStore):
"""Memory run store that reports a missing row for status updates."""
async def update_status(self, run_id, status, *, error=None):
await super().update_status(run_id, status, error=error)
return False
class PermanentStatusRunStore(MemoryRunStore):
"""Memory run store that simulates a permanent SQLAlchemy write failure."""
def __init__(self) -> None:
super().__init__()
self.status_update_attempts = 0
async def update_status(self, run_id, status, *, error=None):
self.status_update_attempts += 1
raise SQLAlchemyDatabaseError(
"UPDATE runs SET status = :status WHERE run_id = :run_id",
{"status": status, "run_id": run_id},
sqlite3.DatabaseError("no such table: runs"),
)
class FailingStatusRunStore(MemoryRunStore):
"""Memory run store that always fails status updates."""
def __init__(self) -> None:
super().__init__()
self.status_update_attempts = 0
async def update_status(self, run_id, status, *, error=None):
self.status_update_attempts += 1
raise sqlite3.OperationalError("database is locked")
class MissingCompletionRunStore(MemoryRunStore):
"""Memory run store that reports one missing row for completion updates."""
def __init__(self) -> None:
super().__init__()
self.completion_update_attempts = 0
async def update_run_completion(self, run_id, *, status, **kwargs):
self.completion_update_attempts += 1
if self.completion_update_attempts == 1:
return False
return await super().update_run_completion(run_id, status=status, **kwargs)
class AlwaysMissingCompletionRunStore(MemoryRunStore):
"""Memory run store that keeps reporting missing rows for completion updates."""
def __init__(self) -> None:
super().__init__()
self.completion_update_attempts = 0
async def update_run_completion(self, run_id, *, status, **kwargs):
self.completion_update_attempts += 1
return False
async def _stored_statuses(store: MemoryRunStore, *run_ids: str) -> dict[str, Any]:
rows = {}
for run_id in run_ids:
row = await store.get(run_id)
rows[run_id] = row["status"] if row else None
return rows
@pytest.mark.anyio
async def test_create_and_get(manager: RunManager):
"""Created run should be retrievable with new fields."""
@@ -171,155 +80,6 @@ async def test_cancel_persists_interrupted_status_to_store():
assert stored["status"] == "interrupted"
@pytest.mark.anyio
async def test_status_persistence_retries_transient_sqlite_lock():
"""Transient SQLite lock errors should not leave a final status stale."""
store = FlakyStatusRunStore(status_failures=2)
manager = RunManager(store=store)
record = await manager.create("thread-1")
await manager.set_status(record.run_id, RunStatus.running)
await manager.set_status(record.run_id, RunStatus.success)
stored = await store.get(record.run_id)
assert stored is not None
assert stored["status"] == "success"
assert store.status_update_attempts >= 4
@pytest.mark.anyio
async def test_status_persistence_recreates_missing_store_row():
"""A final status update should recreate a run row if initial persistence was lost."""
store = MissingRowStatusRunStore()
manager = RunManager(store=store)
record = await manager.create("thread-1")
await store.delete(record.run_id)
await manager.set_status(record.run_id, RunStatus.error, error="boom")
stored = await store.get(record.run_id)
assert stored is not None
assert stored["status"] == "error"
assert stored["error"] == "boom"
@pytest.mark.anyio
async def test_status_persistence_does_not_retry_permanent_sqlalchemy_errors():
"""Permanent SQLAlchemy failures should not be retried as SQLite pressure."""
store = PermanentStatusRunStore()
manager = RunManager(
store=store,
persistence_retry_policy=PersistenceRetryPolicy(max_attempts=5, initial_delay=0),
)
record = await manager.create("thread-1")
await manager.set_status(record.run_id, RunStatus.error, error="boom")
assert store.status_update_attempts == 1
@pytest.mark.anyio
async def test_completion_persistence_recreates_missing_store_row():
"""Completion updates should recreate a missing row and persist final counters."""
store = MissingCompletionRunStore()
manager = RunManager(store=store)
record = await manager.create("thread-1")
await manager.set_status(record.run_id, RunStatus.running)
await manager.set_status(record.run_id, RunStatus.success)
await store.delete(record.run_id)
await manager.update_run_completion(
record.run_id,
status="success",
total_tokens=42,
llm_call_count=2,
last_ai_message="done",
)
stored = await store.get(record.run_id)
assert stored is not None
assert stored["status"] == "success"
assert stored["total_tokens"] == 42
assert stored["llm_call_count"] == 2
assert stored["last_ai_message"] == "done"
assert store.completion_update_attempts == 2
@pytest.mark.anyio
async def test_completion_persistence_warns_when_recreated_row_still_missing(caplog):
"""A second zero-row completion update after recreation should not be silent."""
store = AlwaysMissingCompletionRunStore()
manager = RunManager(store=store)
record = await manager.create("thread-1")
await manager.set_status(record.run_id, RunStatus.success)
caplog.set_level(logging.WARNING, logger="deerflow.runtime.runs.manager")
await manager.update_run_completion(record.run_id, status="success", total_tokens=42)
assert store.completion_update_attempts == 2
assert "affected no rows after row recreation" in caplog.text
@pytest.mark.anyio
async def test_reconcile_orphaned_inflight_runs_marks_stale_rows_error():
"""Startup recovery should turn persisted active rows into explicit errors."""
store = MemoryRunStore()
await store.put("pending-run", thread_id="thread-1", status="pending", created_at="2026-01-01T00:00:00+00:00")
await store.put("running-run", thread_id="thread-1", status="running", created_at="2026-01-01T00:00:01+00:00")
await store.put("success-run", thread_id="thread-1", status="success", created_at="2026-01-01T00:00:02+00:00")
manager = RunManager(store=store)
recovered = await manager.reconcile_orphaned_inflight_runs(
error="Gateway restarted before this run reached a durable final state.",
before="2026-01-01T00:00:02+00:00",
)
assert {record.run_id for record in recovered} == {"pending-run", "running-run"}
assert await _stored_statuses(store, "pending-run", "running-run", "success-run") == {
"pending-run": "error",
"running-run": "error",
"success-run": "success",
}
@pytest.mark.anyio
async def test_reconcile_orphaned_inflight_runs_skips_live_local_run():
"""Startup recovery should not mark an active row orphaned when this worker owns it."""
store = MemoryRunStore()
manager = RunManager(store=store)
record = await manager.create("thread-1")
await manager.set_status(record.run_id, RunStatus.running)
recovered = await manager.reconcile_orphaned_inflight_runs(
error="Gateway restarted before this run reached a durable final state.",
)
stored = await store.get(record.run_id)
assert recovered == []
assert stored["status"] == "running"
@pytest.mark.anyio
async def test_reconcile_orphaned_inflight_runs_skips_rows_when_error_status_is_not_persisted():
"""Startup recovery must not report a row as recovered if the error update failed."""
store = FailingStatusRunStore()
await store.put("running-run", thread_id="thread-1", status="running", created_at="2026-01-01T00:00:00+00:00")
manager = RunManager(
store=store,
persistence_retry_policy=PersistenceRetryPolicy(max_attempts=2, initial_delay=0),
)
recovered = await manager.reconcile_orphaned_inflight_runs(
error="Gateway restarted before this run reached a durable final state.",
before="2026-01-01T00:00:01+00:00",
)
stored = await store.get("running-run")
assert recovered == []
assert stored["status"] == "running"
assert store.status_update_attempts == 2
@pytest.mark.anyio
async def test_cancel_not_inflight(manager: RunManager):
"""Cancelling a completed run should return False."""
+2 -47
View File
@@ -52,9 +52,6 @@ class _CustomRunStoreWithoutProgress(RunStore):
async def list_pending(self, *args, **kwargs):
return []
async def list_inflight(self, *args, **kwargs):
return []
async def aggregate_tokens_by_thread(self, *args, **kwargs):
return {}
@@ -78,19 +75,6 @@ class TestRunRepository:
assert row["status"] == "pending"
await _cleanup()
@pytest.mark.anyio
async def test_put_is_idempotent_for_retried_writes(self, tmp_path):
repo = await _make_repo(tmp_path)
await repo.put("r1", thread_id="t1", assistant_id="old-agent", status="pending")
await repo.put("r1", thread_id="t1", assistant_id="new-agent", status="running", error="retry")
row = await repo.get("r1")
assert row["assistant_id"] == "new-agent"
assert row["status"] == "running"
assert row["error"] == "retry"
await _cleanup()
@pytest.mark.anyio
async def test_get_missing_returns_none(self, tmp_path):
repo = await _make_repo(tmp_path)
@@ -101,19 +85,11 @@ class TestRunRepository:
async def test_update_status(self, tmp_path):
repo = await _make_repo(tmp_path)
await repo.put("r1", thread_id="t1")
updated = await repo.update_status("r1", "running")
await repo.update_status("r1", "running")
row = await repo.get("r1")
assert updated is True
assert row["status"] == "running"
await _cleanup()
@pytest.mark.anyio
async def test_update_status_returns_false_for_missing_row(self, tmp_path):
repo = await _make_repo(tmp_path)
updated = await repo.update_status("missing", "error", error="lost")
assert updated is False
await _cleanup()
@pytest.mark.anyio
async def test_update_status_with_error(self, tmp_path):
repo = await _make_repo(tmp_path)
@@ -170,24 +146,11 @@ class TestRunRepository:
assert all(r["status"] == "pending" for r in pending)
await _cleanup()
@pytest.mark.anyio
async def test_list_inflight_returns_pending_and_running_before_cutoff(self, tmp_path):
repo = await _make_repo(tmp_path)
await repo.put("pending-old", thread_id="t1", status="pending", created_at="2026-01-01T00:00:00+00:00")
await repo.put("running-old", thread_id="t1", status="running", created_at="2026-01-01T00:00:01+00:00")
await repo.put("success-old", thread_id="t1", status="success", created_at="2026-01-01T00:00:02+00:00")
await repo.put("pending-new", thread_id="t1", status="pending", created_at="2026-01-01T00:00:03+00:00")
inflight = await repo.list_inflight(before="2026-01-01T00:00:02+00:00")
assert [row["run_id"] for row in inflight] == ["pending-old", "running-old"]
await _cleanup()
@pytest.mark.anyio
async def test_update_run_completion(self, tmp_path):
repo = await _make_repo(tmp_path)
await repo.put("r1", thread_id="t1", status="running")
updated = await repo.update_run_completion(
await repo.update_run_completion(
"r1",
status="success",
total_input_tokens=100,
@@ -202,7 +165,6 @@ class TestRunRepository:
first_human_message="What is the meaning?",
)
row = await repo.get("r1")
assert updated is True
assert row["status"] == "success"
assert row["total_tokens"] == 150
assert row["llm_call_count"] == 2
@@ -212,13 +174,6 @@ class TestRunRepository:
assert row["first_human_message"] == "What is the meaning?"
await _cleanup()
@pytest.mark.anyio
async def test_update_run_completion_returns_false_for_missing_row(self, tmp_path):
repo = await _make_repo(tmp_path)
updated = await repo.update_run_completion("missing", status="error", total_tokens=1)
assert updated is False
await _cleanup()
@pytest.mark.anyio
async def test_metadata_preserved(self, tmp_path):
repo = await _make_repo(tmp_path)
+56
View File
@@ -219,6 +219,7 @@ def test_upload_files_does_not_adjust_permissions_for_local_sandbox(tmp_path):
provider = MagicMock()
provider.uses_thread_data_mounts = True
provider.needs_upload_permission_adjustment = False
provider.acquire.return_value = "local"
sandbox = MagicMock()
provider.get.return_value = sandbox
@@ -228,12 +229,14 @@ def test_upload_files_does_not_adjust_permissions_for_local_sandbox(tmp_path):
patch.object(uploads, "ensure_uploads_dir", return_value=thread_uploads_dir),
patch.object(uploads, "get_sandbox_provider", return_value=provider),
patch.object(uploads, "_make_file_sandbox_writable") as make_writable,
patch.object(uploads, "_make_file_sandbox_readable") as make_readable,
):
file = UploadFile(filename="notes.txt", file=BytesIO(b"hello uploads"))
result = asyncio.run(call_unwrapped(uploads.upload_files, "thread-local", request=MagicMock(), files=[file], config=SimpleNamespace()))
assert result.success is True
make_writable.assert_not_called()
make_readable.assert_not_called()
def test_upload_files_acquires_non_local_sandbox_before_writing(tmp_path):
@@ -431,6 +434,59 @@ def test_make_file_sandbox_writable_skips_symlinks(tmp_path):
chmod.assert_not_called()
def test_make_file_sandbox_readable_adds_read_bits_for_regular_files(tmp_path):
file_path = tmp_path / "data.csv"
file_path.write_bytes(b"csv-data")
# Simulate the 0o600 permissions set by open_upload_file_no_symlink
file_path.chmod(0o600)
uploads._make_file_sandbox_readable(file_path)
updated_mode = stat.S_IMODE(file_path.stat().st_mode)
assert updated_mode & stat.S_IRUSR
assert updated_mode & stat.S_IRGRP
assert updated_mode & stat.S_IROTH
def test_make_file_sandbox_readable_skips_symlinks(tmp_path):
file_path = tmp_path / "target-link.txt"
file_path.write_text("hello", encoding="utf-8")
symlink_stat = MagicMock(st_mode=stat.S_IFLNK)
with (
patch.object(uploads.os, "lstat", return_value=symlink_stat),
patch.object(uploads.os, "chmod") as chmod,
):
uploads._make_file_sandbox_readable(file_path)
chmod.assert_not_called()
def test_upload_files_adjusts_read_permissions_for_mounted_non_local_sandbox(tmp_path):
thread_uploads_dir = tmp_path / "uploads"
thread_uploads_dir.mkdir(parents=True)
# AIO sandbox with LocalContainerBackend: uses_thread_data_mounts=True
# but needs_upload_permission_adjustment=True (default)
provider = MagicMock()
provider.uses_thread_data_mounts = True
provider.needs_upload_permission_adjustment = True
with (
patch.object(uploads, "get_uploads_dir", return_value=thread_uploads_dir),
patch.object(uploads, "ensure_uploads_dir", return_value=thread_uploads_dir),
patch.object(uploads, "get_sandbox_provider", return_value=provider),
patch.object(uploads, "_make_file_sandbox_readable") as make_readable,
):
file = UploadFile(filename="notes.txt", file=BytesIO(b"hello uploads"))
result = asyncio.run(call_unwrapped(uploads.upload_files, "thread-aio", request=MagicMock(), files=[file], config=SimpleNamespace()))
assert result.success is True
make_readable.assert_called_once()
called_path = make_readable.call_args[0][0]
assert called_path.name == "notes.txt"
def test_upload_files_rejects_dotdot_and_dot_filenames(tmp_path):
thread_uploads_dir = tmp_path / "uploads"
thread_uploads_dir.mkdir(parents=True)
-4
View File
@@ -18,7 +18,3 @@ lint:
format:
pnpm format:write
build-static:
NEXT_CONFIG_BUILD_OUTPUT=standalone SKIP_ENV_VALIDATION=1 NEXT_PUBLIC_STATIC_WEBSITE_ONLY=true pnpm build
@if [ -d .next/static ]; then mkdir -p .next/standalone/.next && cp -R .next/static .next/standalone/.next/static; fi
-4
View File
@@ -16,10 +16,6 @@ const withNextra = nextra({});
/** @type {import("next").NextConfig} */
const config = {
output:
process.env.NEXT_CONFIG_BUILD_OUTPUT === "standalone"
? "standalone"
: undefined,
i18n: {
locales: ["en", "zh"],
defaultLocale: "en",
@@ -32,7 +32,7 @@ Even with digital Leicas, photographers often emulate film characteristics: natu
### Image 1: Parisian Decisive Moment
![Paris Decisive Moment](/demo/threads/7f9dc56c-e49c-4671-a3d2-c492ff4dce0c/user-data/outputs/leica-paris-decisive-moment.jpg)
![Paris Decisive Moment](/frontend/public/demo/threads/7f9dc56c-e49c-4671-a3d2-c492ff4dce0c/user-data/outputs/leica-paris-decisive-moment.jpg)
This image captures the essence of Cartier-Bresson's philosophy. A woman in a red coat leaps over a puddle while a cyclist passes in perfect synchrony. The composition follows the rule of thirds, with the subject positioned at the intersection of grid lines. Shot with a simulated Leica M11 and 35mm Summicron lens at f/2.8, the image features shallow depth of field, natural film grain, and the warm, muted color palette characteristic of Leica photography.
@@ -40,7 +40,7 @@ The "decisive moment" here isn't just about timing—it's about the alignment of
### Image 2: Tokyo Night Reflections
![Tokyo Night Scene](/demo/threads/7f9dc56c-e49c-4671-a3d2-c492ff4dce0c/user-data/outputs/leica-tokyo-night.jpg)
![Tokyo Night Scene](/frontend/public/demo/threads/7f9dc56c-e49c-4671-a3d2-c492ff4dce0c/user-data/outputs/leica-tokyo-night.jpg)
Moving to Shinjuku, Tokyo, this image explores the atmospheric possibilities of Leica's legendary Noctilux lens. Simulating a Leica M10-P with a 50mm f/0.95 Noctilux wide open, the image creates extremely shallow depth of field with beautiful bokeh balls from neon signs reflected in wet pavement.
@@ -48,7 +48,7 @@ A salaryman waits under glowing kanji signs, steam rising from a nearby ramen sh
### Image 3: New York City Candid
![NYC Candid Scene](/demo/threads/7f9dc56c-e49c-4671-a3d2-c492ff4dce0c/user-data/outputs/leica-nyc-candid.jpg)
![NYC Candid Scene](/frontend/public/demo/threads/7f9dc56c-e49c-4671-a3d2-c492ff4dce0c/user-data/outputs/leica-nyc-candid.jpg)
This Chinatown scene demonstrates the documentary power of Leica's Q2 camera with its fixed 28mm Summilux lens. The wide angle captures environmental context while maintaining intimate proximity to the subjects. A fishmonger hands a live fish to a customer while tourists photograph the scene—a moment of cultural contrast and authentic urban life.
@@ -1,19 +1,19 @@
import { isStaticWebsiteOnly } from "@/core/static-mode";
import { DEMO_THREAD_IDS } from "@/core/threads/static-demo";
"use client";
import { ChatProviders } from "./providers";
export function generateStaticParams() {
if (!isStaticWebsiteOnly()) {
return [];
}
return DEMO_THREAD_IDS.map((thread_id) => ({ thread_id }));
}
import { PromptInputProvider } from "@/components/ai-elements/prompt-input";
import { ArtifactsProvider } from "@/components/workspace/artifacts";
import { SubtasksProvider } from "@/core/tasks/context";
export default function ChatLayout({
children,
}: {
children: React.ReactNode;
}) {
return <ChatProviders>{children}</ChatProviders>;
return (
<SubtasksProvider>
<ArtifactsProvider>
<PromptInputProvider>{children}</PromptInputProvider>
</ArtifactsProvider>
</SubtasksProvider>
);
}
@@ -227,7 +227,6 @@ export default function ChatPage() {
isWelcomeMode && <Welcome mode={settings.context.mode} />
}
disabled={
isMock ||
env.NEXT_PUBLIC_STATIC_WEBSITE_ONLY === "true" ||
isUploading
}
@@ -1,15 +0,0 @@
"use client";
import { PromptInputProvider } from "@/components/ai-elements/prompt-input";
import { ArtifactsProvider } from "@/components/workspace/artifacts";
import { SubtasksProvider } from "@/core/tasks/context";
export function ChatProviders({ children }: { children: React.ReactNode }) {
return (
<SubtasksProvider>
<ArtifactsProvider>
<PromptInputProvider>{children}</PromptInputProvider>
</ArtifactsProvider>
</SubtasksProvider>
);
}
+6 -8
View File
@@ -43,14 +43,12 @@ export default async function WorkspaceLayout({
>
Retry
</Link>
<form action="/api/v1/auth/logout" method="post">
<button
type="submit"
className="text-muted-foreground hover:bg-muted rounded-md border px-4 py-2 text-sm"
>
Logout &amp; Reset
</button>
</form>
<Link
href="/api/v1/auth/logout"
className="text-muted-foreground hover:bg-muted rounded-md border px-4 py-2 text-sm"
>
Logout &amp; Reset
</Link>
</div>
</div>
);
@@ -83,7 +83,7 @@ export function ArtifactFileDetail({
const isSupportPreview = useMemo(() => {
return language === "html" || language === "markdown";
}, [language]);
const { content, url } = useArtifactContent({
const { content } = useArtifactContent({
threadId,
filepath: filepathFromProps,
enabled: isCodeFile && !isWriteFile,
@@ -254,9 +254,7 @@ export function ArtifactFileDetail({
(language === "markdown" || language === "html") && (
<ArtifactFilePreview
content={displayContent}
isWriteFile={isWriteFile}
language={language ?? "text"}
url={url}
/>
)}
{isCodeFile && viewMode === "code" && (
@@ -279,33 +277,27 @@ export function ArtifactFileDetail({
export function ArtifactFilePreview({
content,
isWriteFile,
language,
url,
}: {
content: string;
isWriteFile: boolean;
language: string;
url?: string;
}) {
const [htmlPreviewUrl, setHtmlPreviewUrl] = useState<string>();
useEffect(() => {
if (language !== "html" || isWriteFile) {
if (language !== "html") {
setHtmlPreviewUrl(undefined);
return;
}
const blob = new Blob([htmlWithBaseHref(content ?? "", url)], {
type: "text/html",
});
const objectUrl = URL.createObjectURL(blob);
setHtmlPreviewUrl(objectUrl);
const blob = new Blob([content ?? ""], { type: "text/html" });
const url = URL.createObjectURL(blob);
setHtmlPreviewUrl(url);
return () => {
URL.revokeObjectURL(objectUrl);
URL.revokeObjectURL(url);
};
}, [content, isWriteFile, language, url]);
}, [content, language]);
if (language === "markdown") {
return (
@@ -326,35 +318,9 @@ export function ArtifactFilePreview({
className="size-full"
title="Artifact preview"
sandbox="allow-scripts allow-forms"
src={isWriteFile ? undefined : htmlPreviewUrl}
srcDoc={isWriteFile ? content : undefined}
src={htmlPreviewUrl}
/>
);
}
return null;
}
function htmlWithBaseHref(content: string, url?: string) {
if (!url || /<base\s/i.exec(content)) {
return content;
}
const baseHref = htmlBaseHref(url);
const baseElement = `<base href="${escapeHtmlAttribute(baseHref)}">`;
if (/<head[^>]*>/i.exec(content)) {
return content.replace(/<head([^>]*)>/i, `<head$1>${baseElement}`);
}
return `${baseElement}${content}`;
}
function htmlBaseHref(url: string) {
const baseUrl = new URL(url, window.location.href);
baseUrl.pathname = baseUrl.pathname.replace(/\/[^/]*$/, "/");
baseUrl.search = "";
baseUrl.hash = "";
return baseUrl.toString();
}
function escapeHtmlAttribute(value: string) {
return value.replaceAll("&", "&amp;").replaceAll('"', "&quot;");
}
+17 -17
View File
@@ -20,27 +20,27 @@ If you want to understand how DeerFlow works, start with the Introduction. If yo
Start with the conceptual overview first.
- [Introduction](./docs/introduction)
- [Why DeerFlow](./docs/introduction/why-deerflow)
- [Harness vs App](./docs/introduction/harness-vs-app)
- [Introduction](/docs/introduction)
- [Why DeerFlow](/docs/introduction/why-deerflow)
- [Harness vs App](/docs/introduction/harness-vs-app)
### If you want to build with DeerFlow
Start with the Harness section. This path is for teams who want to integrate DeerFlow capabilities into their own system or build a custom agent product on top of the DeerFlow runtime.
- [DeerFlow Harness](./docs/harness)
- [Quick Start](./docs/harness/quick-start)
- [Configuration](./docs/harness/configuration)
- [Customization](./docs/harness/customization)
- [DeerFlow Harness](/docs/harness)
- [Quick Start](/docs/harness/quick-start)
- [Configuration](/docs/harness/configuration)
- [Customization](/docs/harness/customization)
### If you want to deploy and use DeerFlow
Start with the App section. This path is for teams who want to run DeerFlow as a complete application and understand how to configure, operate, and use it in practice.
- [DeerFlow App](./docs/app)
- [Quick Start](./docs/app/quick-start)
- [Deployment Guide](./docs/app/deployment-guide)
- [Workspace Usage](./docs/app/workspace-usage)
- [DeerFlow App](/docs/app)
- [Quick Start](/docs/app/quick-start)
- [Deployment Guide](/docs/app/deployment-guide)
- [Workspace Usage](/docs/app/workspace-usage)
## Documentation structure
@@ -79,17 +79,17 @@ The App section is written for teams who want to deploy DeerFlow as a usable pro
The Tutorials section is for hands-on, task-oriented learning.
- [Tutorials](./docs/tutorials)
- [Tutorials](/docs/tutorials)
### Reference
The Reference section is for detailed lookup material, including configuration, runtime modes, APIs, and source-oriented mapping.
- [Reference](./docs/reference)
- [Reference](/docs/reference)
## Choose the right path
- If you are **evaluating the project**, start with [Introduction](./docs/introduction).
- If you are **building your own agent system**, start with [DeerFlow Harness](./docs/harness).
- If you are **deploying DeerFlow for users**, start with [DeerFlow App](./docs/app).
- If you want to **learn by doing**, go to [Tutorials](./docs/tutorials).
- If you are **evaluating the project**, start with [Introduction](/docs/introduction).
- If you are **building your own agent system**, start with [DeerFlow Harness](/docs/harness).
- If you are **deploying DeerFlow for users**, start with [DeerFlow App](/docs/app).
- If you want to **learn by doing**, go to [Tutorials](/docs/tutorials).
@@ -1,9 +0,0 @@
---
title: DeerFlow 2.0 M1
description: DeerFlow 2.0 M1 is officially in RC. Here's what you need to know.
date: 2026-05-30
tags:
- Release
---
## DeerFlow 2.0 M1 Release
+11 -11
View File
@@ -20,27 +20,27 @@ DeerFlow 是一个用于构建和运行 Agent 系统的框架。它提供了一
先从概念概述开始。
- [简介](./docs/introduction)
- [为什么选择 DeerFlow](./docs/introduction/why-deerflow)
- [Harness 与应用的区别](./docs/introduction/harness-vs-app)
- [简介](/docs/introduction)
- [为什么选择 DeerFlow](/docs/introduction/why-deerflow)
- [Harness 与应用的区别](/docs/introduction/harness-vs-app)
### 如果你想基于 DeerFlow 进行开发
从 Harness 章节开始。这条路径适合想将 DeerFlow 功能集成到自己系统中,或基于 DeerFlow 运行时构建自定义 Agent 产品的团队。
- [DeerFlow Harness](./docs/harness)
- [快速上手](./docs/harness/quick-start)
- [配置](./docs/harness/configuration)
- [自定义与扩展](./docs/harness/customization)
- [DeerFlow Harness](/docs/harness)
- [快速上手](/docs/harness/quick-start)
- [配置](/docs/harness/configuration)
- [自定义与扩展](/docs/harness/customization)
### 如果你想部署和使用 DeerFlow
从应用章节开始。这条路径适合想将 DeerFlow 作为完整应用运行,并了解如何配置、运维和实际使用的团队。
- [DeerFlow 应用](./docs/application)
- [快速上手](./docs/application/quick-start)
- [部署指南](./docs/application/deployment-guide)
- [工作区使用](./docs/application/workspace-usage)
- [DeerFlow 应用](/docs/application)
- [快速上手](/docs/application/quick-start)
- [部署指南](/docs/application/deployment-guide)
- [工作区使用](/docs/application/workspace-usage)
## 文档结构
-49
View File
@@ -3,13 +3,6 @@
import { Client as LangGraphClient } from "@langchain/langgraph-sdk/client";
import { getLangGraphBaseURL } from "../config";
import { isStaticWebsiteOnly } from "../static-mode";
import {
loadStaticDemoThread,
loadStaticDemoThreads,
staticDemoThreadState,
} from "../threads/static-demo";
import type { AgentThreadState } from "../threads/types";
import { isStateChangingMethod, readCsrfCookie } from "./fetcher";
import { sanitizeRunStreamOptions } from "./stream-mode";
@@ -39,10 +32,6 @@ function injectCsrfHeader(_url: URL, init: RequestInit): RequestInit {
}
function createCompatibleClient(isMock?: boolean): LangGraphClient {
if (isStaticWebsiteOnly() && !isMock) {
return createStaticClient();
}
const apiUrl = getLangGraphBaseURL(isMock);
console.log(`Creating API client with base URL: ${apiUrl}`);
const client = new LangGraphClient({
@@ -69,44 +58,6 @@ function createCompatibleClient(isMock?: boolean): LangGraphClient {
return client;
}
function createStaticClient(): LangGraphClient {
const apiUrl =
typeof window === "undefined"
? "http://localhost:3000"
: window.location.origin;
const client = new LangGraphClient({ apiUrl });
client.threads.search = (async (query) => {
return loadStaticDemoThreads(query);
}) as typeof client.threads.search;
client.threads.get = (async (threadId) => {
return loadStaticDemoThread(threadId);
}) as typeof client.threads.get;
client.threads.getState = (async (threadId) => {
return staticDemoThreadState(await loadStaticDemoThread(threadId));
}) as typeof client.threads.getState;
client.threads.getHistory = (async (threadId) => {
return [staticDemoThreadState(await loadStaticDemoThread(threadId))];
}) as typeof client.threads.getHistory;
client.threads.update = (async (threadId) => {
return loadStaticDemoThread(threadId);
}) as typeof client.threads.update;
client.runs.list = (async () => []) as typeof client.runs.list;
client.runs.stream = async function* () {
/* empty */
} as typeof client.runs.stream;
client.runs.joinStream = async function* () {
/* empty */
} as typeof client.runs.joinStream;
return client as LangGraphClient<AgentThreadState>;
}
const _clients = new Map<string, LangGraphClient>();
export function getAPIClient(isMock?: boolean): LangGraphClient {
const cacheKey = isMock ? "mock" : "default";
-20
View File
@@ -1,5 +1,4 @@
import { getBackendBaseURL } from "../config";
import { isStaticWebsiteOnly } from "../static-mode";
import type { AgentThread } from "../threads";
export function urlOfArtifact({
@@ -13,9 +12,6 @@ export function urlOfArtifact({
download?: boolean;
isMock?: boolean;
}) {
if (isStaticWebsiteOnly()) {
return staticDemoArtifactURL({ filepath, threadId, download });
}
if (isMock) {
return `${getBackendBaseURL()}/mock/api/threads/${threadId}/artifacts${filepath}${download ? "?download=true" : ""}`;
}
@@ -27,21 +23,5 @@ export function extractArtifactsFromThread(thread: AgentThread) {
}
export function resolveArtifactURL(absolutePath: string, threadId: string) {
if (isStaticWebsiteOnly()) {
return staticDemoArtifactURL({ filepath: absolutePath, threadId });
}
return `${getBackendBaseURL()}/api/threads/${threadId}/artifacts${absolutePath}`;
}
function staticDemoArtifactURL({
filepath,
threadId,
download = false,
}: {
filepath: string;
threadId: string;
download?: boolean;
}) {
const demoPath = filepath.replace(/^\/mnt\//, "/");
return `${getBackendBaseURL()}/demo/threads/${threadId}${demoPath}${download ? "?download=true" : ""}`;
}
+3 -17
View File
@@ -10,8 +10,6 @@ import React, {
type ReactNode,
} from "react";
import { isStaticWebsiteOnly } from "../static-mode";
import { type User, buildLoginUrl } from "./types";
// Re-export for consumers
@@ -48,7 +46,6 @@ export function AuthProvider({ children, initialUser }: AuthProviderProps) {
const [isLoading, setIsLoading] = useState(false);
const router = useRouter();
const pathname = usePathname();
const staticMode = isStaticWebsiteOnly();
const isAuthenticated = user !== null;
@@ -57,8 +54,6 @@ export function AuthProvider({ children, initialUser }: AuthProviderProps) {
* Used when initialUser might be stale (e.g., after tab was inactive)
*/
const refreshUser = useCallback(async () => {
if (staticMode) return;
try {
setIsLoading(true);
const res = await fetch("/api/v1/auth/me", {
@@ -82,7 +77,7 @@ export function AuthProvider({ children, initialUser }: AuthProviderProps) {
} finally {
setIsLoading(false);
}
}, [staticMode, pathname, router]);
}, [pathname, router]);
/**
* Logout - call FastAPI logout endpoint and clear local state
@@ -92,11 +87,6 @@ export function AuthProvider({ children, initialUser }: AuthProviderProps) {
// Immediately clear local state to prevent UI flicker
setUser(null);
if (staticMode) {
router.push("/");
return;
}
try {
await fetch("/api/v1/auth/logout", {
method: "POST",
@@ -109,7 +99,7 @@ export function AuthProvider({ children, initialUser }: AuthProviderProps) {
// Redirect to home page
router.push("/");
}, [staticMode, router]);
}, [router]);
/**
* Handle visibility change - refresh user when tab becomes visible again.
@@ -118,8 +108,6 @@ export function AuthProvider({ children, initialUser }: AuthProviderProps) {
const lastCheckRef = React.useRef(0);
useEffect(() => {
if (staticMode) return;
const handleVisibilityChange = () => {
if (document.visibilityState !== "visible" || user === null) return;
const now = Date.now();
@@ -132,7 +120,7 @@ export function AuthProvider({ children, initialUser }: AuthProviderProps) {
return () => {
document.removeEventListener("visibilitychange", handleVisibilityChange);
};
}, [staticMode, user, refreshUser]);
}, [user, refreshUser]);
const value: AuthContextType = {
user,
@@ -167,8 +155,6 @@ export function useRequireAuth(): AuthContextType {
const pathname = usePathname();
useEffect(() => {
if (isStaticWebsiteOnly()) return;
// Only redirect if we're sure user is not authenticated (not just loading)
if (!auth.isLoading && !auth.isAuthenticated) {
router.push(buildLoginUrl(pathname || "/workspace"));
-10
View File
@@ -1,9 +1,6 @@
import { cookies } from "next/headers";
import { isStaticWebsiteOnly } from "../static-mode";
import { getGatewayConfig } from "./gateway-config";
import { STATIC_WEBSITE_USER } from "./static-user";
import { type AuthResult, userSchema } from "./types";
const SSR_AUTH_TIMEOUT_MS = 5_000;
@@ -13,13 +10,6 @@ const SSR_AUTH_TIMEOUT_MS = 5_000;
* Returns a tagged AuthResult — callers use exhaustive switch, no try/catch.
*/
export async function getServerSideUser(): Promise<AuthResult> {
if (isStaticWebsiteOnly()) {
return {
tag: "authenticated",
user: STATIC_WEBSITE_USER,
};
}
if (process.env.DEER_FLOW_AUTH_DISABLED === "1") {
return {
tag: "authenticated",
-8
View File
@@ -1,8 +0,0 @@
import type { User } from "./types";
export const STATIC_WEBSITE_USER: User = {
id: "static-website-user",
email: "static@example.local",
system_role: "admin",
needs_setup: false,
};
-10
View File
@@ -1,18 +1,8 @@
import { getBackendBaseURL } from "../config";
import { isStaticWebsiteOnly } from "../static-mode";
import type { ModelsResponse } from "./types";
const STATIC_MODELS_RESPONSE: ModelsResponse = {
models: [],
token_usage: { enabled: false },
};
export async function loadModels(): Promise<ModelsResponse> {
if (isStaticWebsiteOnly()) {
return STATIC_MODELS_RESPONSE;
}
const res = await fetch(`${getBackendBaseURL()}/api/models`);
const data = (await res.json()) as Partial<ModelsResponse>;
return {
-5
View File
@@ -1,5 +0,0 @@
import { env } from "@/env";
export function isStaticWebsiteOnly() {
return env.NEXT_PUBLIC_STATIC_WEBSITE_ONLY === "true";
}
-87
View File
@@ -1,87 +0,0 @@
import type { ThreadState } from "@langchain/langgraph-sdk";
import type { ThreadsClient } from "@langchain/langgraph-sdk/client";
import type { AgentThread, AgentThreadState } from "./types";
export const DEMO_THREAD_IDS = [
"21cfea46-34bd-4aa6-9e1f-3009452fbeb9",
"3823e443-4e2b-4679-b496-a9506eae462b",
"4f3e55ee-f853-43db-bfb3-7d1a411f03cb",
"5aa47db1-d0cb-4eb9-aea5-3dac1b371c5a",
"7cfa5f8f-a2f8-47ad-acbd-da7137baf990",
"7f9dc56c-e49c-4671-a3d2-c492ff4dce0c",
"90040b36-7eba-4b97-ba89-02c3ad47a8b9",
"ad76c455-5bf9-4335-8517-fc03834ab828",
"b83fbb2a-4e36-4d82-9de0-7b2a02c2092a",
"c02bb4d5-4202-490e-ae8f-ff4864fc0d2e",
"d3e5adaf-084c-4dd5-9d29-94f1d6bccd98",
"f4125791-0128-402a-8ca9-50e0947557e4",
"fe3f7974-1bcb-4a01-a950-79673baafefd",
] as const;
export type ThreadSearchParams = NonNullable<
Parameters<ThreadsClient["search"]>[0]
>;
export async function loadStaticDemoThreads(
params: ThreadSearchParams = {},
): Promise<AgentThread[]> {
const threads = await Promise.all(
DEMO_THREAD_IDS.map((threadId) => loadStaticDemoThread(threadId)),
);
const sortBy = params.sortBy ?? "updated_at";
const sortOrder = params.sortOrder ?? "desc";
const sortedThreads = [...threads].sort((a, b) => {
const aTimestamp = (a as unknown as Record<string, unknown>)[sortBy];
const bTimestamp = (b as unknown as Record<string, unknown>)[sortBy];
const aParsed = typeof aTimestamp === "string" ? Date.parse(aTimestamp) : 0;
const bParsed = typeof bTimestamp === "string" ? Date.parse(bTimestamp) : 0;
const aValue = Number.isNaN(aParsed) ? 0 : aParsed;
const bValue = Number.isNaN(bParsed) ? 0 : bParsed;
return sortOrder === "asc" ? aValue - bValue : bValue - aValue;
});
const offset = Math.max(0, Math.floor(params.offset ?? 0));
const limit =
typeof params.limit === "number"
? Math.max(0, Math.floor(params.limit))
: sortedThreads.length;
return sortedThreads.slice(offset, offset + limit);
}
export async function loadStaticDemoThread(
threadId: string,
): Promise<AgentThread> {
const response = await globalThis.fetch(
`/demo/threads/${encodeURIComponent(threadId)}/thread.json`,
);
if (!response.ok) {
throw new Error(`Failed to load demo thread ${threadId}`);
}
const thread = (await response.json()) as AgentThread;
return {
...thread,
thread_id: threadId,
updated_at: thread.updated_at ?? thread.created_at,
};
}
export function staticDemoThreadState(
thread: AgentThread,
): ThreadState<AgentThreadState> {
return {
values: thread.values,
next: [],
checkpoint: {
thread_id: thread.thread_id,
checkpoint_ns: "",
checkpoint_id: null,
checkpoint_map: null,
},
metadata: thread.metadata ?? null,
created_at: thread.updated_at ?? thread.created_at ?? null,
parent_checkpoint: null,
tasks: [],
};
}
@@ -1,69 +0,0 @@
import { afterEach, beforeEach, describe, expect, test, vi } from "vitest";
const ENV_KEYS = [
"NEXT_PUBLIC_BACKEND_BASE_URL",
"NEXT_PUBLIC_STATIC_WEBSITE_ONLY",
] as const;
type EnvSnapshot = Partial<
Record<(typeof ENV_KEYS)[number], string | undefined>
>;
function snapshotEnv(): EnvSnapshot {
const snapshot: EnvSnapshot = {};
for (const key of ENV_KEYS) {
snapshot[key] = process.env[key];
}
return snapshot;
}
function setEnv(key: (typeof ENV_KEYS)[number], value: string | undefined) {
const env = process.env as Record<string, string | undefined>;
if (value === undefined) {
delete env[key];
} else {
env[key] = value;
}
}
function restoreEnv(snapshot: EnvSnapshot) {
for (const key of ENV_KEYS) {
setEnv(key, snapshot[key]);
}
}
async function loadFreshArtifactUtils() {
vi.resetModules();
return await import("@/core/artifacts/utils");
}
describe("artifact URL helpers", () => {
let saved: EnvSnapshot;
beforeEach(() => {
saved = snapshotEnv();
setEnv("NEXT_PUBLIC_BACKEND_BASE_URL", undefined);
setEnv("NEXT_PUBLIC_STATIC_WEBSITE_ONLY", undefined);
});
afterEach(() => {
restoreEnv(saved);
});
test("maps static demo artifact paths to bundled public files", async () => {
setEnv("NEXT_PUBLIC_STATIC_WEBSITE_ONLY", "true");
const { resolveArtifactURL, urlOfArtifact } =
await loadFreshArtifactUtils();
expect(
urlOfArtifact({
filepath: "/mnt/user-data/outputs/index.html",
threadId: "thread-1",
}),
).toBe("/demo/threads/thread-1/user-data/outputs/index.html");
expect(
resolveArtifactURL("/mnt/user-data/outputs/style.css", "thread-1"),
).toBe("/demo/threads/thread-1/user-data/outputs/style.css");
});
});
@@ -1,77 +0,0 @@
import { afterEach, beforeEach, describe, expect, test, vi } from "vitest";
import { STATIC_WEBSITE_USER } from "@/core/auth/static-user";
vi.mock("next/headers", () => ({
cookies: vi.fn(() => {
throw new Error("cookies should not be read in static website mode");
}),
}));
const ENV_KEYS = [
"DEER_FLOW_AUTH_DISABLED",
"NEXT_PUBLIC_STATIC_WEBSITE_ONLY",
] as const;
type EnvSnapshot = Partial<
Record<(typeof ENV_KEYS)[number], string | undefined>
>;
function snapshotEnv(): EnvSnapshot {
const snapshot: EnvSnapshot = {};
for (const key of ENV_KEYS) {
snapshot[key] = process.env[key];
}
return snapshot;
}
function setEnv(key: (typeof ENV_KEYS)[number], value: string | undefined) {
const env = process.env as Record<string, string | undefined>;
if (value === undefined) {
delete env[key];
} else {
env[key] = value;
}
}
function restoreEnv(snapshot: EnvSnapshot) {
for (const key of ENV_KEYS) {
setEnv(key, snapshot[key]);
}
}
async function loadFreshServerAuth() {
vi.resetModules();
return await import("@/core/auth/server");
}
describe("getServerSideUser", () => {
let saved: EnvSnapshot;
beforeEach(() => {
saved = snapshotEnv();
setEnv("DEER_FLOW_AUTH_DISABLED", undefined);
setEnv("NEXT_PUBLIC_STATIC_WEBSITE_ONLY", undefined);
});
afterEach(() => {
restoreEnv(saved);
vi.unstubAllGlobals();
});
test("bypasses gateway auth in static website mode", async () => {
setEnv("NEXT_PUBLIC_STATIC_WEBSITE_ONLY", "true");
const fetchSpy = vi.fn(() => {
throw new Error("fetch should not be called in static website mode");
});
vi.stubGlobal("fetch", fetchSpy);
const { getServerSideUser } = await loadFreshServerAuth();
await expect(getServerSideUser()).resolves.toEqual({
tag: "authenticated",
user: STATIC_WEBSITE_USER,
});
expect(fetchSpy).not.toHaveBeenCalled();
});
});