mirror of
https://github.com/bytedance/deer-flow.git
synced 2026-05-21 07:26:50 +00:00
34e835bc33
* feat(gateway): implement LangGraph Platform API in Gateway, replace langgraph-cli Implement all core LangGraph Platform API endpoints in the Gateway, allowing it to fully replace the langgraph-cli dev server for local development. This eliminates a heavyweight dependency and simplifies the development stack. Changes: - Add runs lifecycle endpoints (create, stream, wait, cancel, join) - Add threads CRUD and search endpoints - Add assistants compatibility endpoints (search, get, graph, schemas) - Add StreamBridge (in-memory pub/sub for SSE) and async provider - Add RunManager with atomic create_or_reject (eliminates TOCTOU race) - Add worker with interrupt/rollback cancel actions and runtime context injection - Route /api/langgraph/* to Gateway in nginx config - Skip langgraph-cli startup by default (SKIP_LANGGRAPH_SERVER=0 to restore) - Add unit tests for RunManager, SSE format, and StreamBridge * fix: drain bridge queue on client disconnect to prevent backpressure When on_disconnect=continue, keep consuming events from the bridge without yielding, so the worker is not blocked by a full queue. Only on_disconnect=cancel breaks out immediately. Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * fix: remove pytest import Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * fix: Fix default stream_mode to ["values", "messages-tuple"] Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * fix: Remove unused if_exists field from ThreadCreateRequest Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * fix: address review comments on gateway LangGraph API - Mount runs.py router in app.py (missing include_router) - Normalize interrupt_before/after "*" to node list before run_agent() - Use entry.id for SSE event ID instead of counter - Drain bridge queue on disconnect when on_disconnect=continue - Reuse serialization helper in wait_run() for consistent wire format - Reject unsupported multitask_strategy with 400 - Remove SKIP_LANGGRAPH_SERVER fallback, always use Gateway * feat: extract app.state access into deps.py Encapsulate read/write operations for singleton objects (RunManager, StreamBridge, checkpointer) held in app.state into a shared utility, reducing repeated access patterns across router modules. * feat: extract deerflow.runtime.serialization module with tests Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * refactor: replace duplicated serialization with deerflow.runtime.serialization Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * feat: extract app/gateway/services.py with run lifecycle logic Create a service layer that centralizes SSE formatting, input/config normalization, and run lifecycle management. Router modules will delegate to these functions instead of using private cross-imported helpers. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * refactor: wire routers to use services layer, remove cross-module private imports Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * style: apply ruff formatting to refactored files Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * feat(runtime): support LangGraph dev server and add compat route - Enable official LangGraph dev server for local development workflow - Decouple runtime components from agents package for better separation - Provide gateway-backed fallback route when dev server is skipped - Simplify lifecycle management using context manager in gateway * feat(runtime): add Store providers with auto-backend selection - Add async_provider.py and provider.py under deerflow/runtime/store/ - Support memory, sqlite, postgres backends matching checkpointer config - Integrate into FastAPI lifespan via AsyncExitStack in deps.py - Replace hardcoded InMemoryStore with config-driven factory * refactor(gateway): migrate thread management from checkpointer to Store and resolve multiple endpoint failures - Add Store-backed CRUD helpers (_store_get, _store_put, _store_upsert) - Replace checkpoint-scanning search with two-phase strategy: phase 1 reads Store (O(threads)), phase 2 backfills from checkpointer for legacy/LangGraph Server threads with lazy migration - Extend Store record schema with values field for title persistence - Sync thread title from checkpoint to Store after run completion - Fix /threads/{id}/runs/{run_id}/stream 405 by accepting both GET and POST methods; POST handles interrupt/rollback actions - Fix /threads/{id}/state 500 by separating read_config and write_config, adding checkpoint_ns to configurable, and shallow-copying checkpoint/metadata before mutation - Sync title to Store on state update for immediate search reflection - Move _upsert_thread_in_store into services.py, remove duplicate logic - Add _sync_thread_title_after_run: await run task, read final checkpoint title, write back to Store record - Spawn title sync as background task from start_run when Store exists * refactor(runtime): deduplicate store and checkpointer provider logic Extract _ensure_sqlite_parent_dir() helper into checkpointer/provider.py and use it in all three places that previously inlined the same mkdir logic. Consolidate duplicate error constants in store/async_provider.py by importing from store/provider.py instead of redefining them. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * refactor(runtime): move SQLite helpers to runtime/store, checkpointer imports from store _resolve_sqlite_conn_str and _ensure_sqlite_parent_dir now live in runtime/store/provider.py. agents/checkpointer/provider and agents/checkpointer/async_provider import from there, reversing the previous dependency direction (store → checkpointer becomes checkpointer → store). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * refactor(runtime): extract SQLite helpers into runtime/store/_sqlite_utils.py Move resolve_sqlite_conn_str and ensure_sqlite_parent_dir out of checkpointer/provider.py into a dedicated _sqlite_utils module. Functions are now public (no underscore prefix), making cross-module imports semantically correct. All four provider files import from the single shared location. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix(gateway): use adelete_thread to fully remove thread checkpoints on delete AsyncSqliteSaver has no adelete method — the previous hasattr check always evaluated to False, silently leaving all checkpoint rows in the database. Switch to adelete_thread(thread_id) which deletes every checkpoint and pending-write row for the thread across all namespaces (including sub-graph checkpoints). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix(gateway): remove dead bridge_cm/ckpt_cm code and fix StrEnum lint app.py had unreachable code after the async-with lifespan refactor: bridge_cm and ckpt_cm were referenced but never defined (F821), and the channel service startup/shutdown was outside the langgraph_runtime block so it never ran. Move channel service lifecycle inside the async-with block where it belongs. Replace str+Enum inheritance in RunStatus and DisconnectMode with StrEnum as suggested by UP042. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * style: format with ruff --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Co-authored-by: JeffJiang <for-eleven@hotmail.com> Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Co-authored-by: Willem Jiang <willem.jiang@gmail.com>
213 lines
7.6 KiB
Python
213 lines
7.6 KiB
Python
"""In-memory run registry."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import logging
|
|
import uuid
|
|
from dataclasses import dataclass, field
|
|
from datetime import UTC, datetime
|
|
|
|
from .schemas import DisconnectMode, RunStatus
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def _now_iso() -> str:
|
|
return datetime.now(UTC).isoformat()
|
|
|
|
|
|
@dataclass
|
|
class RunRecord:
|
|
"""Mutable record for a single run."""
|
|
|
|
run_id: str
|
|
thread_id: str
|
|
assistant_id: str | None
|
|
status: RunStatus
|
|
on_disconnect: DisconnectMode
|
|
multitask_strategy: str = "reject"
|
|
metadata: dict = field(default_factory=dict)
|
|
kwargs: dict = field(default_factory=dict)
|
|
created_at: str = ""
|
|
updated_at: str = ""
|
|
task: asyncio.Task | None = field(default=None, repr=False)
|
|
abort_event: asyncio.Event = field(default_factory=asyncio.Event, repr=False)
|
|
abort_action: str = "interrupt"
|
|
error: str | None = None
|
|
|
|
|
|
class RunManager:
|
|
"""In-memory run registry. All mutations are protected by an asyncio lock."""
|
|
|
|
def __init__(self) -> None:
|
|
self._runs: dict[str, RunRecord] = {}
|
|
self._lock = asyncio.Lock()
|
|
|
|
async def create(
|
|
self,
|
|
thread_id: str,
|
|
assistant_id: str | None = None,
|
|
*,
|
|
on_disconnect: DisconnectMode = DisconnectMode.cancel,
|
|
metadata: dict | None = None,
|
|
kwargs: dict | None = None,
|
|
multitask_strategy: str = "reject",
|
|
) -> RunRecord:
|
|
"""Create a new pending run and register it."""
|
|
run_id = str(uuid.uuid4())
|
|
now = _now_iso()
|
|
record = RunRecord(
|
|
run_id=run_id,
|
|
thread_id=thread_id,
|
|
assistant_id=assistant_id,
|
|
status=RunStatus.pending,
|
|
on_disconnect=on_disconnect,
|
|
multitask_strategy=multitask_strategy,
|
|
metadata=metadata or {},
|
|
kwargs=kwargs or {},
|
|
created_at=now,
|
|
updated_at=now,
|
|
)
|
|
async with self._lock:
|
|
self._runs[run_id] = record
|
|
logger.info("Run created: run_id=%s thread_id=%s", run_id, thread_id)
|
|
return record
|
|
|
|
def get(self, run_id: str) -> RunRecord | None:
|
|
"""Return a run record by ID, or ``None``."""
|
|
return self._runs.get(run_id)
|
|
|
|
async def list_by_thread(self, thread_id: str) -> list[RunRecord]:
|
|
"""Return all runs for a given thread, newest first."""
|
|
async with self._lock:
|
|
return sorted(
|
|
(r for r in self._runs.values() if r.thread_id == thread_id),
|
|
key=lambda r: r.created_at,
|
|
reverse=True,
|
|
)
|
|
|
|
async def set_status(self, run_id: str, status: RunStatus, *, error: str | None = None) -> None:
|
|
"""Transition a run to a new status."""
|
|
async with self._lock:
|
|
record = self._runs.get(run_id)
|
|
if record is None:
|
|
logger.warning("set_status called for unknown run %s", run_id)
|
|
return
|
|
record.status = status
|
|
record.updated_at = _now_iso()
|
|
if error is not None:
|
|
record.error = error
|
|
logger.info("Run %s -> %s", run_id, status.value)
|
|
|
|
async def cancel(self, run_id: str, *, action: str = "interrupt") -> bool:
|
|
"""Request cancellation of a run.
|
|
|
|
Args:
|
|
run_id: The run ID to cancel.
|
|
action: "interrupt" keeps checkpoint, "rollback" reverts to pre-run state.
|
|
|
|
Sets the abort event with the action reason and cancels the asyncio task.
|
|
Returns ``True`` if the run was in-flight and cancellation was initiated.
|
|
"""
|
|
async with self._lock:
|
|
record = self._runs.get(run_id)
|
|
if record is None:
|
|
return False
|
|
if record.status not in (RunStatus.pending, RunStatus.running):
|
|
return False
|
|
record.abort_action = action
|
|
record.abort_event.set()
|
|
if record.task is not None and not record.task.done():
|
|
record.task.cancel()
|
|
record.status = RunStatus.interrupted
|
|
record.updated_at = _now_iso()
|
|
logger.info("Run %s cancelled (action=%s)", run_id, action)
|
|
return True
|
|
|
|
async def create_or_reject(
|
|
self,
|
|
thread_id: str,
|
|
assistant_id: str | None = None,
|
|
*,
|
|
on_disconnect: DisconnectMode = DisconnectMode.cancel,
|
|
metadata: dict | None = None,
|
|
kwargs: dict | None = None,
|
|
multitask_strategy: str = "reject",
|
|
) -> RunRecord:
|
|
"""Atomically check for inflight runs and create a new one.
|
|
|
|
For ``reject`` strategy, raises ``ConflictError`` if thread
|
|
already has a pending/running run. For ``interrupt``/``rollback``,
|
|
cancels inflight runs before creating.
|
|
|
|
This method holds the lock across both the check and the insert,
|
|
eliminating the TOCTOU race in separate ``has_inflight`` + ``create``.
|
|
"""
|
|
run_id = str(uuid.uuid4())
|
|
now = _now_iso()
|
|
|
|
_supported_strategies = ("reject", "interrupt", "rollback")
|
|
|
|
async with self._lock:
|
|
if multitask_strategy not in _supported_strategies:
|
|
raise UnsupportedStrategyError(f"Multitask strategy '{multitask_strategy}' is not yet supported. Supported strategies: {', '.join(_supported_strategies)}")
|
|
|
|
inflight = [r for r in self._runs.values() if r.thread_id == thread_id and r.status in (RunStatus.pending, RunStatus.running)]
|
|
|
|
if multitask_strategy == "reject" and inflight:
|
|
raise ConflictError(f"Thread {thread_id} already has an active run")
|
|
|
|
if multitask_strategy in ("interrupt", "rollback") and inflight:
|
|
for r in inflight:
|
|
r.abort_action = multitask_strategy
|
|
r.abort_event.set()
|
|
if r.task is not None and not r.task.done():
|
|
r.task.cancel()
|
|
r.status = RunStatus.interrupted
|
|
r.updated_at = now
|
|
logger.info(
|
|
"Cancelled %d inflight run(s) on thread %s (strategy=%s)",
|
|
len(inflight),
|
|
thread_id,
|
|
multitask_strategy,
|
|
)
|
|
|
|
record = RunRecord(
|
|
run_id=run_id,
|
|
thread_id=thread_id,
|
|
assistant_id=assistant_id,
|
|
status=RunStatus.pending,
|
|
on_disconnect=on_disconnect,
|
|
multitask_strategy=multitask_strategy,
|
|
metadata=metadata or {},
|
|
kwargs=kwargs or {},
|
|
created_at=now,
|
|
updated_at=now,
|
|
)
|
|
self._runs[run_id] = record
|
|
|
|
logger.info("Run created: run_id=%s thread_id=%s", run_id, thread_id)
|
|
return record
|
|
|
|
async def has_inflight(self, thread_id: str) -> bool:
|
|
"""Return ``True`` if *thread_id* has a pending or running run."""
|
|
async with self._lock:
|
|
return any(r.thread_id == thread_id and r.status in (RunStatus.pending, RunStatus.running) for r in self._runs.values())
|
|
|
|
async def cleanup(self, run_id: str, *, delay: float = 300) -> None:
|
|
"""Remove a run record after an optional delay."""
|
|
if delay > 0:
|
|
await asyncio.sleep(delay)
|
|
async with self._lock:
|
|
self._runs.pop(run_id, None)
|
|
logger.debug("Run record %s cleaned up", run_id)
|
|
|
|
|
|
class ConflictError(Exception):
|
|
"""Raised when multitask_strategy=reject and thread has inflight runs."""
|
|
|
|
|
|
class UnsupportedStrategyError(Exception):
|
|
"""Raised when a multitask_strategy value is not yet implemented."""
|