feat(gateway): implement LangGraph Platform API in Gateway, replace langgraph-cli (#1403)

* feat(gateway): implement LangGraph Platform API in Gateway, replace langgraph-cli

Implement all core LangGraph Platform API endpoints in the Gateway,
allowing it to fully replace the langgraph-cli dev server for local
development. This eliminates a heavyweight dependency and simplifies
the development stack.

Changes:
- Add runs lifecycle endpoints (create, stream, wait, cancel, join)
- Add threads CRUD and search endpoints
- Add assistants compatibility endpoints (search, get, graph, schemas)
- Add StreamBridge (in-memory pub/sub for SSE) and async provider
- Add RunManager with atomic create_or_reject (eliminates TOCTOU race)
- Add worker with interrupt/rollback cancel actions and runtime context injection
- Route /api/langgraph/* to Gateway in nginx config
- Skip langgraph-cli startup by default (SKIP_LANGGRAPH_SERVER=0 to restore)
- Add unit tests for RunManager, SSE format, and StreamBridge

* fix: drain bridge queue on client disconnect to prevent backpressure

When on_disconnect=continue, keep consuming events from the bridge
without yielding, so the worker is not blocked by a full queue.
Only on_disconnect=cancel breaks out immediately.

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* fix: remove pytest import

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* fix: Fix default stream_mode to ["values", "messages-tuple"]

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* fix: Remove unused if_exists field from ThreadCreateRequest

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* fix: address review comments on gateway LangGraph API

- Mount runs.py router in app.py (missing include_router)
- Normalize interrupt_before/after "*" to node list before run_agent()
- Use entry.id for SSE event ID instead of counter
- Drain bridge queue on disconnect when on_disconnect=continue
- Reuse serialization helper in wait_run() for consistent wire format
- Reject unsupported multitask_strategy with 400
- Remove SKIP_LANGGRAPH_SERVER fallback, always use Gateway

* feat: extract app.state access into deps.py

Encapsulate read/write operations for singleton objects (RunManager,
StreamBridge, checkpointer) held in app.state into a shared utility,
reducing repeated access patterns across router modules.

* feat: extract deerflow.runtime.serialization module with tests

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* refactor: replace duplicated serialization with deerflow.runtime.serialization

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* feat: extract app/gateway/services.py with run lifecycle logic

Create a service layer that centralizes SSE formatting, input/config
normalization, and run lifecycle management. Router modules will delegate
to these functions instead of using private cross-imported helpers.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* refactor: wire routers to use services layer, remove cross-module private imports

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* style: apply ruff formatting to refactored files

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* feat(runtime): support LangGraph dev server and add compat route

- Enable official LangGraph dev server for local development workflow
- Decouple runtime components from agents package for better separation
- Provide gateway-backed fallback route when dev server is skipped
- Simplify lifecycle management using context manager in gateway

* feat(runtime): add Store providers with auto-backend selection

- Add async_provider.py and provider.py under deerflow/runtime/store/
- Support memory, sqlite, postgres backends matching checkpointer config
- Integrate into FastAPI lifespan via AsyncExitStack in deps.py
- Replace hardcoded InMemoryStore with config-driven factory

* refactor(gateway): migrate thread management from checkpointer to Store and resolve multiple endpoint failures

- Add Store-backed CRUD helpers (_store_get, _store_put, _store_upsert)
- Replace checkpoint-scanning search with two-phase strategy:
  phase 1 reads Store (O(threads)), phase 2 backfills from checkpointer
  for legacy/LangGraph Server threads with lazy migration
- Extend Store record schema with values field for title persistence
- Sync thread title from checkpoint to Store after run completion
- Fix /threads/{id}/runs/{run_id}/stream 405 by accepting both
  GET and POST methods; POST handles interrupt/rollback actions
- Fix /threads/{id}/state 500 by separating read_config and
  write_config, adding checkpoint_ns to configurable, and
  shallow-copying checkpoint/metadata before mutation
- Sync title to Store on state update for immediate search reflection
- Move _upsert_thread_in_store into services.py, remove duplicate logic
- Add _sync_thread_title_after_run: await run task, read final
  checkpoint title, write back to Store record
- Spawn title sync as background task from start_run when Store exists

* refactor(runtime): deduplicate store and checkpointer provider logic

Extract _ensure_sqlite_parent_dir() helper into checkpointer/provider.py
and use it in all three places that previously inlined the same mkdir logic.
Consolidate duplicate error constants in store/async_provider.py by importing
from store/provider.py instead of redefining them.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* refactor(runtime): move SQLite helpers to runtime/store, checkpointer imports from store

_resolve_sqlite_conn_str and _ensure_sqlite_parent_dir now live in
runtime/store/provider.py. agents/checkpointer/provider and
agents/checkpointer/async_provider import from there, reversing the
previous dependency direction (store → checkpointer becomes
checkpointer → store).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* refactor(runtime): extract SQLite helpers into runtime/store/_sqlite_utils.py

Move resolve_sqlite_conn_str and ensure_sqlite_parent_dir out of
checkpointer/provider.py into a dedicated _sqlite_utils module.
Functions are now public (no underscore prefix), making cross-module
imports semantically correct. All four provider files import from
the single shared location.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix(gateway): use adelete_thread to fully remove thread checkpoints on delete

AsyncSqliteSaver has no adelete method — the previous hasattr check
always evaluated to False, silently leaving all checkpoint rows in the
database. Switch to adelete_thread(thread_id) which deletes every
checkpoint and pending-write row for the thread across all namespaces
(including sub-graph checkpoints).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix(gateway): remove dead bridge_cm/ckpt_cm code and fix StrEnum lint

app.py had unreachable code after the async-with lifespan refactor:
bridge_cm and ckpt_cm were referenced but never defined (F821), and
the channel service startup/shutdown was outside the langgraph_runtime
block so it never ran. Move channel service lifecycle inside the
async-with block where it belongs.

Replace str+Enum inheritance in RunStatus and DisconnectMode with
StrEnum as suggested by UP042.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* style: format with ruff

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: JeffJiang <for-eleven@hotmail.com>
Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-authored-by: Willem Jiang <willem.jiang@gmail.com>
This commit is contained in:
rayhpeng
2026-03-30 16:02:23 +08:00
committed by GitHub
parent 7db95926b0
commit 34e835bc33
35 changed files with 3492 additions and 66 deletions
@@ -27,9 +27,9 @@ from deerflow.agents.checkpointer.provider import (
POSTGRES_CONN_REQUIRED,
POSTGRES_INSTALL,
SQLITE_INSTALL,
_resolve_sqlite_conn_str,
)
from deerflow.config.app_config import get_app_config
from deerflow.runtime.store._sqlite_utils import ensure_sqlite_parent_dir, resolve_sqlite_conn_str
logger = logging.getLogger(__name__)
@@ -53,12 +53,8 @@ async def _async_checkpointer(config) -> AsyncIterator[Checkpointer]:
except ImportError as exc:
raise ImportError(SQLITE_INSTALL) from exc
import pathlib
conn_str = _resolve_sqlite_conn_str(config.connection_string or "store.db")
# Only create parent directories for real filesystem paths
if conn_str != ":memory:" and not conn_str.startswith("file:"):
pathlib.Path(conn_str).parent.mkdir(parents=True, exist_ok=True)
conn_str = resolve_sqlite_conn_str(config.connection_string or "store.db")
ensure_sqlite_parent_dir(conn_str)
async with AsyncSqliteSaver.from_conn_string(conn_str) as saver:
await saver.setup()
yield saver
@@ -27,7 +27,7 @@ from langgraph.types import Checkpointer
from deerflow.config.app_config import get_app_config
from deerflow.config.checkpointer_config import CheckpointerConfig
from deerflow.config.paths import resolve_path
from deerflow.runtime.store._sqlite_utils import resolve_sqlite_conn_str
logger = logging.getLogger(__name__)
@@ -44,18 +44,6 @@ POSTGRES_CONN_REQUIRED = "checkpointer.connection_string is required for the pos
# ---------------------------------------------------------------------------
def _resolve_sqlite_conn_str(raw: str) -> str:
"""Return a SQLite connection string ready for use with ``SqliteSaver``.
SQLite special strings (``":memory:"`` and ``file:`` URIs) are returned
unchanged. Plain filesystem paths — relative or absolute — are resolved
to an absolute string via :func:`resolve_path`.
"""
if raw == ":memory:" or raw.startswith("file:"):
return raw
return str(resolve_path(raw))
@contextlib.contextmanager
def _sync_checkpointer_cm(config: CheckpointerConfig) -> Iterator[Checkpointer]:
"""Context manager that creates and tears down a sync checkpointer.
@@ -78,7 +66,7 @@ def _sync_checkpointer_cm(config: CheckpointerConfig) -> Iterator[Checkpointer]:
except ImportError as exc:
raise ImportError(SQLITE_INSTALL) from exc
conn_str = _resolve_sqlite_conn_str(config.connection_string or "store.db")
conn_str = resolve_sqlite_conn_str(config.connection_string or "store.db")
with SqliteSaver.from_conn_string(conn_str) as saver:
saver.setup()
logger.info("Checkpointer: using SqliteSaver (%s)", conn_str)
@@ -15,6 +15,7 @@ from deerflow.config.memory_config import load_memory_config_from_dict
from deerflow.config.model_config import ModelConfig
from deerflow.config.sandbox_config import SandboxConfig
from deerflow.config.skills_config import SkillsConfig
from deerflow.config.stream_bridge_config import StreamBridgeConfig, load_stream_bridge_config_from_dict
from deerflow.config.subagents_config import load_subagents_config_from_dict
from deerflow.config.summarization_config import load_summarization_config_from_dict
from deerflow.config.title_config import load_title_config_from_dict
@@ -41,6 +42,7 @@ class AppConfig(BaseModel):
tool_search: ToolSearchConfig = Field(default_factory=ToolSearchConfig, description="Tool search / deferred loading configuration")
model_config = ConfigDict(extra="allow", frozen=False)
checkpointer: CheckpointerConfig | None = Field(default=None, description="Checkpointer configuration")
stream_bridge: StreamBridgeConfig | None = Field(default=None, description="Stream bridge configuration")
@classmethod
def resolve_config_path(cls, config_path: str | None = None) -> Path:
@@ -120,6 +122,10 @@ class AppConfig(BaseModel):
if "checkpointer" in config_data:
load_checkpointer_config_from_dict(config_data["checkpointer"])
# Load stream bridge config if present
if "stream_bridge" in config_data:
load_stream_bridge_config_from_dict(config_data["stream_bridge"])
# Always refresh ACP agent config so removed entries do not linger across reloads.
load_acp_config_from_dict(config_data.get("acp_agents", {}))
@@ -0,0 +1,46 @@
"""Configuration for stream bridge."""
from typing import Literal
from pydantic import BaseModel, Field
StreamBridgeType = Literal["memory", "redis"]
class StreamBridgeConfig(BaseModel):
"""Configuration for the stream bridge that connects agent workers to SSE endpoints."""
type: StreamBridgeType = Field(
default="memory",
description="Stream bridge backend type. 'memory' uses in-process asyncio.Queue (single-process only). 'redis' uses Redis Streams (planned for Phase 2, not yet implemented).",
)
redis_url: str | None = Field(
default=None,
description="Redis URL for the redis stream bridge type. Example: 'redis://localhost:6379/0'.",
)
queue_maxsize: int = Field(
default=256,
description="Maximum number of events buffered per run in the memory bridge.",
)
# Global configuration instance — None means no stream bridge is configured
# (falls back to memory with defaults).
_stream_bridge_config: StreamBridgeConfig | None = None
def get_stream_bridge_config() -> StreamBridgeConfig | None:
"""Get the current stream bridge configuration, or None if not configured."""
return _stream_bridge_config
def set_stream_bridge_config(config: StreamBridgeConfig | None) -> None:
"""Set the stream bridge configuration."""
global _stream_bridge_config
_stream_bridge_config = config
def load_stream_bridge_config_from_dict(config_dict: dict) -> None:
"""Load stream bridge configuration from a dictionary."""
global _stream_bridge_config
_stream_bridge_config = StreamBridgeConfig(**config_dict)
@@ -0,0 +1,39 @@
"""LangGraph-compatible runtime — runs, streaming, and lifecycle management.
Re-exports the public API of :mod:`~deerflow.runtime.runs` and
:mod:`~deerflow.runtime.stream_bridge` so that consumers can import
directly from ``deerflow.runtime``.
"""
from .runs import ConflictError, DisconnectMode, RunManager, RunRecord, RunStatus, UnsupportedStrategyError, run_agent
from .serialization import serialize, serialize_channel_values, serialize_lc_object, serialize_messages_tuple
from .store import get_store, make_store, reset_store, store_context
from .stream_bridge import END_SENTINEL, HEARTBEAT_SENTINEL, MemoryStreamBridge, StreamBridge, StreamEvent, make_stream_bridge
__all__ = [
# runs
"ConflictError",
"DisconnectMode",
"RunManager",
"RunRecord",
"RunStatus",
"UnsupportedStrategyError",
"run_agent",
# serialization
"serialize",
"serialize_channel_values",
"serialize_lc_object",
"serialize_messages_tuple",
# store
"get_store",
"make_store",
"reset_store",
"store_context",
# stream_bridge
"END_SENTINEL",
"HEARTBEAT_SENTINEL",
"MemoryStreamBridge",
"StreamBridge",
"StreamEvent",
"make_stream_bridge",
]
@@ -0,0 +1,15 @@
"""Run lifecycle management for LangGraph Platform API compatibility."""
from .manager import ConflictError, RunManager, RunRecord, UnsupportedStrategyError
from .schemas import DisconnectMode, RunStatus
from .worker import run_agent
__all__ = [
"ConflictError",
"DisconnectMode",
"RunManager",
"RunRecord",
"RunStatus",
"UnsupportedStrategyError",
"run_agent",
]
@@ -0,0 +1,212 @@
"""In-memory run registry."""
from __future__ import annotations
import asyncio
import logging
import uuid
from dataclasses import dataclass, field
from datetime import UTC, datetime
from .schemas import DisconnectMode, RunStatus
logger = logging.getLogger(__name__)
def _now_iso() -> str:
return datetime.now(UTC).isoformat()
@dataclass
class RunRecord:
"""Mutable record for a single run."""
run_id: str
thread_id: str
assistant_id: str | None
status: RunStatus
on_disconnect: DisconnectMode
multitask_strategy: str = "reject"
metadata: dict = field(default_factory=dict)
kwargs: dict = field(default_factory=dict)
created_at: str = ""
updated_at: str = ""
task: asyncio.Task | None = field(default=None, repr=False)
abort_event: asyncio.Event = field(default_factory=asyncio.Event, repr=False)
abort_action: str = "interrupt"
error: str | None = None
class RunManager:
"""In-memory run registry. All mutations are protected by an asyncio lock."""
def __init__(self) -> None:
self._runs: dict[str, RunRecord] = {}
self._lock = asyncio.Lock()
async def create(
self,
thread_id: str,
assistant_id: str | None = None,
*,
on_disconnect: DisconnectMode = DisconnectMode.cancel,
metadata: dict | None = None,
kwargs: dict | None = None,
multitask_strategy: str = "reject",
) -> RunRecord:
"""Create a new pending run and register it."""
run_id = str(uuid.uuid4())
now = _now_iso()
record = RunRecord(
run_id=run_id,
thread_id=thread_id,
assistant_id=assistant_id,
status=RunStatus.pending,
on_disconnect=on_disconnect,
multitask_strategy=multitask_strategy,
metadata=metadata or {},
kwargs=kwargs or {},
created_at=now,
updated_at=now,
)
async with self._lock:
self._runs[run_id] = record
logger.info("Run created: run_id=%s thread_id=%s", run_id, thread_id)
return record
def get(self, run_id: str) -> RunRecord | None:
"""Return a run record by ID, or ``None``."""
return self._runs.get(run_id)
async def list_by_thread(self, thread_id: str) -> list[RunRecord]:
"""Return all runs for a given thread, newest first."""
async with self._lock:
return sorted(
(r for r in self._runs.values() if r.thread_id == thread_id),
key=lambda r: r.created_at,
reverse=True,
)
async def set_status(self, run_id: str, status: RunStatus, *, error: str | None = None) -> None:
"""Transition a run to a new status."""
async with self._lock:
record = self._runs.get(run_id)
if record is None:
logger.warning("set_status called for unknown run %s", run_id)
return
record.status = status
record.updated_at = _now_iso()
if error is not None:
record.error = error
logger.info("Run %s -> %s", run_id, status.value)
async def cancel(self, run_id: str, *, action: str = "interrupt") -> bool:
"""Request cancellation of a run.
Args:
run_id: The run ID to cancel.
action: "interrupt" keeps checkpoint, "rollback" reverts to pre-run state.
Sets the abort event with the action reason and cancels the asyncio task.
Returns ``True`` if the run was in-flight and cancellation was initiated.
"""
async with self._lock:
record = self._runs.get(run_id)
if record is None:
return False
if record.status not in (RunStatus.pending, RunStatus.running):
return False
record.abort_action = action
record.abort_event.set()
if record.task is not None and not record.task.done():
record.task.cancel()
record.status = RunStatus.interrupted
record.updated_at = _now_iso()
logger.info("Run %s cancelled (action=%s)", run_id, action)
return True
async def create_or_reject(
self,
thread_id: str,
assistant_id: str | None = None,
*,
on_disconnect: DisconnectMode = DisconnectMode.cancel,
metadata: dict | None = None,
kwargs: dict | None = None,
multitask_strategy: str = "reject",
) -> RunRecord:
"""Atomically check for inflight runs and create a new one.
For ``reject`` strategy, raises ``ConflictError`` if thread
already has a pending/running run. For ``interrupt``/``rollback``,
cancels inflight runs before creating.
This method holds the lock across both the check and the insert,
eliminating the TOCTOU race in separate ``has_inflight`` + ``create``.
"""
run_id = str(uuid.uuid4())
now = _now_iso()
_supported_strategies = ("reject", "interrupt", "rollback")
async with self._lock:
if multitask_strategy not in _supported_strategies:
raise UnsupportedStrategyError(f"Multitask strategy '{multitask_strategy}' is not yet supported. Supported strategies: {', '.join(_supported_strategies)}")
inflight = [r for r in self._runs.values() if r.thread_id == thread_id and r.status in (RunStatus.pending, RunStatus.running)]
if multitask_strategy == "reject" and inflight:
raise ConflictError(f"Thread {thread_id} already has an active run")
if multitask_strategy in ("interrupt", "rollback") and inflight:
for r in inflight:
r.abort_action = multitask_strategy
r.abort_event.set()
if r.task is not None and not r.task.done():
r.task.cancel()
r.status = RunStatus.interrupted
r.updated_at = now
logger.info(
"Cancelled %d inflight run(s) on thread %s (strategy=%s)",
len(inflight),
thread_id,
multitask_strategy,
)
record = RunRecord(
run_id=run_id,
thread_id=thread_id,
assistant_id=assistant_id,
status=RunStatus.pending,
on_disconnect=on_disconnect,
multitask_strategy=multitask_strategy,
metadata=metadata or {},
kwargs=kwargs or {},
created_at=now,
updated_at=now,
)
self._runs[run_id] = record
logger.info("Run created: run_id=%s thread_id=%s", run_id, thread_id)
return record
async def has_inflight(self, thread_id: str) -> bool:
"""Return ``True`` if *thread_id* has a pending or running run."""
async with self._lock:
return any(r.thread_id == thread_id and r.status in (RunStatus.pending, RunStatus.running) for r in self._runs.values())
async def cleanup(self, run_id: str, *, delay: float = 300) -> None:
"""Remove a run record after an optional delay."""
if delay > 0:
await asyncio.sleep(delay)
async with self._lock:
self._runs.pop(run_id, None)
logger.debug("Run record %s cleaned up", run_id)
class ConflictError(Exception):
"""Raised when multitask_strategy=reject and thread has inflight runs."""
class UnsupportedStrategyError(Exception):
"""Raised when a multitask_strategy value is not yet implemented."""
@@ -0,0 +1,21 @@
"""Run status and disconnect mode enums."""
from enum import StrEnum
class RunStatus(StrEnum):
"""Lifecycle status of a single run."""
pending = "pending"
running = "running"
success = "success"
error = "error"
timeout = "timeout"
interrupted = "interrupted"
class DisconnectMode(StrEnum):
"""Behaviour when the SSE consumer disconnects."""
cancel = "cancel"
continue_ = "continue"
@@ -0,0 +1,253 @@
"""Background agent execution.
Runs an agent graph inside an ``asyncio.Task``, publishing events to
a :class:`StreamBridge` as they are produced.
Uses ``graph.astream(stream_mode=[...])`` which gives correct full-state
snapshots for ``values`` mode, proper ``{node: writes}`` for ``updates``,
and ``(chunk, metadata)`` tuples for ``messages`` mode.
Note: ``events`` mode is not supported through the gateway — it requires
``graph.astream_events()`` which cannot simultaneously produce ``values``
snapshots. The JS open-source LangGraph API server works around this via
internal checkpoint callbacks that are not exposed in the Python public API.
"""
from __future__ import annotations
import asyncio
import logging
from typing import Any, Literal
from deerflow.runtime.serialization import serialize
from deerflow.runtime.stream_bridge import StreamBridge
from .manager import RunManager, RunRecord
from .schemas import RunStatus
logger = logging.getLogger(__name__)
# Valid stream_mode values for LangGraph's graph.astream()
_VALID_LG_MODES = {"values", "updates", "checkpoints", "tasks", "debug", "messages", "custom"}
async def run_agent(
bridge: StreamBridge,
run_manager: RunManager,
record: RunRecord,
*,
checkpointer: Any,
store: Any | None = None,
agent_factory: Any,
graph_input: dict,
config: dict,
stream_modes: list[str] | None = None,
stream_subgraphs: bool = False,
interrupt_before: list[str] | Literal["*"] | None = None,
interrupt_after: list[str] | Literal["*"] | None = None,
) -> None:
"""Execute an agent in the background, publishing events to *bridge*."""
run_id = record.run_id
thread_id = record.thread_id
requested_modes: set[str] = set(stream_modes or ["values"])
# Track whether "events" was requested but skipped
if "events" in requested_modes:
logger.info(
"Run %s: 'events' stream_mode not supported in gateway (requires astream_events + checkpoint callbacks). Skipping.",
run_id,
)
try:
# 1. Mark running
await run_manager.set_status(run_id, RunStatus.running)
# Record pre-run checkpoint_id to support rollback (Phase 2).
pre_run_checkpoint_id = None
try:
config_for_check = {"configurable": {"thread_id": thread_id, "checkpoint_ns": ""}}
ckpt_tuple = await checkpointer.aget_tuple(config_for_check)
if ckpt_tuple is not None:
pre_run_checkpoint_id = getattr(ckpt_tuple, "config", {}).get("configurable", {}).get("checkpoint_id")
except Exception:
logger.debug("Could not get pre-run checkpoint_id for run %s", run_id)
# 2. Publish metadata — useStream needs both run_id AND thread_id
await bridge.publish(
run_id,
"metadata",
{
"run_id": run_id,
"thread_id": thread_id,
},
)
# 3. Build the agent
from langchain_core.runnables import RunnableConfig
from langgraph.runtime import Runtime
# Inject runtime context so middlewares can access thread_id
# (langgraph-cli does this automatically; we must do it manually)
runtime = Runtime(context={"thread_id": thread_id}, store=store)
config.setdefault("configurable", {})["__pregel_runtime"] = runtime
runnable_config = RunnableConfig(**config)
agent = agent_factory(config=runnable_config)
# 4. Attach checkpointer and store
if checkpointer is not None:
agent.checkpointer = checkpointer
if store is not None:
agent.store = store
# 5. Set interrupt nodes
if interrupt_before:
agent.interrupt_before_nodes = interrupt_before
if interrupt_after:
agent.interrupt_after_nodes = interrupt_after
# 6. Build LangGraph stream_mode list
# "events" is NOT a valid astream mode — skip it
# "messages-tuple" maps to LangGraph's "messages" mode
lg_modes: list[str] = []
for m in requested_modes:
if m == "messages-tuple":
lg_modes.append("messages")
elif m == "events":
# Skipped — see log above
continue
elif m in _VALID_LG_MODES:
lg_modes.append(m)
if not lg_modes:
lg_modes = ["values"]
# Deduplicate while preserving order
seen: set[str] = set()
deduped: list[str] = []
for m in lg_modes:
if m not in seen:
seen.add(m)
deduped.append(m)
lg_modes = deduped
logger.info("Run %s: streaming with modes %s (requested: %s)", run_id, lg_modes, requested_modes)
# 7. Stream using graph.astream
if len(lg_modes) == 1 and not stream_subgraphs:
# Single mode, no subgraphs: astream yields raw chunks
single_mode = lg_modes[0]
async for chunk in agent.astream(graph_input, config=runnable_config, stream_mode=single_mode):
if record.abort_event.is_set():
logger.info("Run %s abort requested — stopping", run_id)
break
sse_event = _lg_mode_to_sse_event(single_mode)
await bridge.publish(run_id, sse_event, serialize(chunk, mode=single_mode))
else:
# Multiple modes or subgraphs: astream yields tuples
async for item in agent.astream(
graph_input,
config=runnable_config,
stream_mode=lg_modes,
subgraphs=stream_subgraphs,
):
if record.abort_event.is_set():
logger.info("Run %s abort requested — stopping", run_id)
break
mode, chunk = _unpack_stream_item(item, lg_modes, stream_subgraphs)
if mode is None:
continue
sse_event = _lg_mode_to_sse_event(mode)
await bridge.publish(run_id, sse_event, serialize(chunk, mode=mode))
# 8. Final status
if record.abort_event.is_set():
action = record.abort_action
if action == "rollback":
await run_manager.set_status(run_id, RunStatus.error, error="Rolled back by user")
# TODO(Phase 2): Implement full checkpoint rollback.
# Use pre_run_checkpoint_id to revert the thread's checkpoint
# to the state before this run started. Requires a
# checkpointer.adelete() or equivalent API.
try:
if checkpointer is not None and pre_run_checkpoint_id is not None:
# Phase 2: roll back to pre_run_checkpoint_id
pass
logger.info("Run %s rolled back", run_id)
except Exception:
logger.warning("Failed to rollback checkpoint for run %s", run_id)
else:
await run_manager.set_status(run_id, RunStatus.interrupted)
else:
await run_manager.set_status(run_id, RunStatus.success)
except asyncio.CancelledError:
action = record.abort_action
if action == "rollback":
await run_manager.set_status(run_id, RunStatus.error, error="Rolled back by user")
logger.info("Run %s was cancelled (rollback)", run_id)
else:
await run_manager.set_status(run_id, RunStatus.interrupted)
logger.info("Run %s was cancelled", run_id)
except Exception as exc:
error_msg = f"{exc}"
logger.exception("Run %s failed: %s", run_id, error_msg)
await run_manager.set_status(run_id, RunStatus.error, error=error_msg)
await bridge.publish(
run_id,
"error",
{
"message": error_msg,
"name": type(exc).__name__,
},
)
finally:
await bridge.publish_end(run_id)
asyncio.create_task(bridge.cleanup(run_id, delay=60))
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _lg_mode_to_sse_event(mode: str) -> str:
"""Map LangGraph internal stream_mode name to SSE event name.
LangGraph's ``astream(stream_mode="messages")`` produces message
tuples. The SSE protocol calls this ``messages-tuple`` when the
client explicitly requests it, but the default SSE event name used
by LangGraph Platform is simply ``"messages"``.
"""
# All LG modes map 1:1 to SSE event names — "messages" stays "messages"
return mode
def _unpack_stream_item(
item: Any,
lg_modes: list[str],
stream_subgraphs: bool,
) -> tuple[str | None, Any]:
"""Unpack a multi-mode or subgraph stream item into (mode, chunk).
Returns ``(None, None)`` if the item cannot be parsed.
"""
if stream_subgraphs:
if isinstance(item, tuple) and len(item) == 3:
_ns, mode, chunk = item
return str(mode), chunk
if isinstance(item, tuple) and len(item) == 2:
mode, chunk = item
return str(mode), chunk
return None, None
if isinstance(item, tuple) and len(item) == 2:
mode, chunk = item
return str(mode), chunk
# Fallback: single-element output from first mode
return lg_modes[0] if lg_modes else None, item
@@ -0,0 +1,78 @@
"""Canonical serialization for LangChain / LangGraph objects.
Provides a single source of truth for converting LangChain message
objects, Pydantic models, and LangGraph state dicts into plain
JSON-serialisable Python structures.
Consumers: ``deerflow.runtime.runs.worker`` (SSE publishing) and
``app.gateway.routers.threads`` (REST responses).
"""
from __future__ import annotations
from typing import Any
def serialize_lc_object(obj: Any) -> Any:
"""Recursively serialize a LangChain object to a JSON-serialisable dict."""
if obj is None:
return None
if isinstance(obj, (str, int, float, bool)):
return obj
if isinstance(obj, dict):
return {k: serialize_lc_object(v) for k, v in obj.items()}
if isinstance(obj, (list, tuple)):
return [serialize_lc_object(item) for item in obj]
# Pydantic v2
if hasattr(obj, "model_dump"):
try:
return obj.model_dump()
except Exception:
pass
# Pydantic v1 / older objects
if hasattr(obj, "dict"):
try:
return obj.dict()
except Exception:
pass
# Last resort
try:
return str(obj)
except Exception:
return repr(obj)
def serialize_channel_values(channel_values: dict[str, Any]) -> dict[str, Any]:
"""Serialize channel values, stripping internal LangGraph keys.
Internal keys like ``__pregel_*`` and ``__interrupt__`` are removed
to match what the LangGraph Platform API returns.
"""
result: dict[str, Any] = {}
for key, value in channel_values.items():
if key.startswith("__pregel_") or key == "__interrupt__":
continue
result[key] = serialize_lc_object(value)
return result
def serialize_messages_tuple(obj: Any) -> Any:
"""Serialize a messages-mode tuple ``(chunk, metadata)``."""
if isinstance(obj, tuple) and len(obj) == 2:
chunk, metadata = obj
return [serialize_lc_object(chunk), metadata if isinstance(metadata, dict) else {}]
return serialize_lc_object(obj)
def serialize(obj: Any, *, mode: str = "") -> Any:
"""Serialize LangChain objects with mode-specific handling.
* ``messages`` — obj is ``(message_chunk, metadata_dict)``
* ``values`` — obj is the full state dict; ``__pregel_*`` keys stripped
* everything else — recursive ``model_dump()`` / ``dict()`` fallback
"""
if mode == "messages":
return serialize_messages_tuple(obj)
if mode == "values":
return serialize_channel_values(obj) if isinstance(obj, dict) else serialize_lc_object(obj)
return serialize_lc_object(obj)
@@ -0,0 +1,31 @@
"""Store provider for the DeerFlow runtime.
Re-exports the public API of both the async provider (for long-running
servers) and the sync provider (for CLI tools and the embedded client).
Async usage (FastAPI lifespan)::
from deerflow.runtime.store import make_store
async with make_store() as store:
app.state.store = store
Sync usage (CLI / DeerFlowClient)::
from deerflow.runtime.store import get_store, store_context
store = get_store() # singleton
with store_context() as store: ... # one-shot
"""
from .async_provider import make_store
from .provider import get_store, reset_store, store_context
__all__ = [
# async
"make_store",
# sync
"get_store",
"reset_store",
"store_context",
]
@@ -0,0 +1,28 @@
"""Shared SQLite connection utilities for store and checkpointer providers."""
from __future__ import annotations
import pathlib
from deerflow.config.paths import resolve_path
def resolve_sqlite_conn_str(raw: str) -> str:
"""Return a SQLite connection string ready for use with store/checkpointer backends.
SQLite special strings (``":memory:"`` and ``file:`` URIs) are returned
unchanged. Plain filesystem paths — relative or absolute — are resolved
to an absolute string via :func:`resolve_path`.
"""
if raw == ":memory:" or raw.startswith("file:"):
return raw
return str(resolve_path(raw))
def ensure_sqlite_parent_dir(conn_str: str) -> None:
"""Create parent directory for a SQLite filesystem path.
No-op for in-memory databases (``":memory:"``) and ``file:`` URIs.
"""
if conn_str != ":memory:" and not conn_str.startswith("file:"):
pathlib.Path(conn_str).parent.mkdir(parents=True, exist_ok=True)
@@ -0,0 +1,113 @@
"""Async Store factory — backend mirrors the configured checkpointer.
The store and checkpointer share the same ``checkpointer`` section in
*config.yaml* so they always use the same persistence backend:
- ``type: memory`` → :class:`langgraph.store.memory.InMemoryStore`
- ``type: sqlite`` → :class:`langgraph.store.sqlite.aio.AsyncSqliteStore`
- ``type: postgres`` → :class:`langgraph.store.postgres.aio.AsyncPostgresStore`
Usage (e.g. FastAPI lifespan)::
from deerflow.runtime.store import make_store
async with make_store() as store:
app.state.store = store
"""
from __future__ import annotations
import contextlib
import logging
from collections.abc import AsyncIterator
from langgraph.store.base import BaseStore
from deerflow.config.app_config import get_app_config
from deerflow.runtime.store.provider import POSTGRES_CONN_REQUIRED, POSTGRES_STORE_INSTALL, SQLITE_STORE_INSTALL, ensure_sqlite_parent_dir, resolve_sqlite_conn_str
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Internal backend factory
# ---------------------------------------------------------------------------
@contextlib.asynccontextmanager
async def _async_store(config) -> AsyncIterator[BaseStore]:
"""Async context manager that constructs and tears down a Store.
The ``config`` argument is a :class:`deerflow.config.checkpointer_config.CheckpointerConfig`
instance — the same object used by the checkpointer factory.
"""
if config.type == "memory":
from langgraph.store.memory import InMemoryStore
logger.info("Store: using InMemoryStore (in-process, not persistent)")
yield InMemoryStore()
return
if config.type == "sqlite":
try:
from langgraph.store.sqlite.aio import AsyncSqliteStore
except ImportError as exc:
raise ImportError(SQLITE_STORE_INSTALL) from exc
conn_str = resolve_sqlite_conn_str(config.connection_string or "store.db")
ensure_sqlite_parent_dir(conn_str)
async with AsyncSqliteStore.from_conn_string(conn_str) as store:
await store.setup()
logger.info("Store: using AsyncSqliteStore (%s)", conn_str)
yield store
return
if config.type == "postgres":
try:
from langgraph.store.postgres.aio import AsyncPostgresStore # type: ignore[import]
except ImportError as exc:
raise ImportError(POSTGRES_STORE_INSTALL) from exc
if not config.connection_string:
raise ValueError(POSTGRES_CONN_REQUIRED)
async with AsyncPostgresStore.from_conn_string(config.connection_string) as store:
await store.setup()
logger.info("Store: using AsyncPostgresStore")
yield store
return
raise ValueError(f"Unknown store backend type: {config.type!r}")
# ---------------------------------------------------------------------------
# Public async context manager
# ---------------------------------------------------------------------------
@contextlib.asynccontextmanager
async def make_store() -> AsyncIterator[BaseStore]:
"""Async context manager that yields a Store whose backend matches the
configured checkpointer.
Reads from the same ``checkpointer`` section of *config.yaml* used by
:func:`deerflow.agents.checkpointer.async_provider.make_checkpointer` so
that both singletons always use the same persistence technology::
async with make_store() as store:
app.state.store = store
Yields an :class:`~langgraph.store.memory.InMemoryStore` when no
``checkpointer`` section is configured (emits a WARNING in that case).
"""
config = get_app_config()
if config.checkpointer is None:
from langgraph.store.memory import InMemoryStore
logger.warning("No 'checkpointer' section in config.yaml — using InMemoryStore for the store. Thread list will be lost on server restart. Configure a sqlite or postgres backend for persistence.")
yield InMemoryStore()
return
async with _async_store(config.checkpointer) as store:
yield store
@@ -0,0 +1,188 @@
"""Sync Store factory.
Provides a **sync singleton** and a **sync context manager** for CLI tools
and the embedded :class:`~deerflow.client.DeerFlowClient`.
The backend mirrors the configured checkpointer so that both always use the
same persistence technology. Supported backends: memory, sqlite, postgres.
Usage::
from deerflow.runtime.store.provider import get_store, store_context
# Singleton — reused across calls, closed on process exit
store = get_store()
# One-shot — fresh connection, closed on block exit
with store_context() as store:
store.put(("ns",), "key", {"value": 1})
"""
from __future__ import annotations
import contextlib
import logging
from collections.abc import Iterator
from langgraph.store.base import BaseStore
from deerflow.config.app_config import get_app_config
from deerflow.runtime.store._sqlite_utils import ensure_sqlite_parent_dir, resolve_sqlite_conn_str
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Error message constants
# ---------------------------------------------------------------------------
SQLITE_STORE_INSTALL = "langgraph-checkpoint-sqlite is required for the SQLite store. Install it with: uv add langgraph-checkpoint-sqlite"
POSTGRES_STORE_INSTALL = "langgraph-checkpoint-postgres is required for the PostgreSQL store. Install it with: uv add langgraph-checkpoint-postgres psycopg[binary] psycopg-pool"
POSTGRES_CONN_REQUIRED = "checkpointer.connection_string is required for the postgres backend"
# ---------------------------------------------------------------------------
# Sync factory
# ---------------------------------------------------------------------------
@contextlib.contextmanager
def _sync_store_cm(config) -> Iterator[BaseStore]:
"""Context manager that creates and tears down a sync Store.
The ``config`` argument is a
:class:`~deerflow.config.checkpointer_config.CheckpointerConfig` instance —
the same object used by the checkpointer factory.
"""
if config.type == "memory":
from langgraph.store.memory import InMemoryStore
logger.info("Store: using InMemoryStore (in-process, not persistent)")
yield InMemoryStore()
return
if config.type == "sqlite":
try:
from langgraph.store.sqlite import SqliteStore
except ImportError as exc:
raise ImportError(SQLITE_STORE_INSTALL) from exc
conn_str = resolve_sqlite_conn_str(config.connection_string or "store.db")
ensure_sqlite_parent_dir(conn_str)
with SqliteStore.from_conn_string(conn_str) as store:
store.setup()
logger.info("Store: using SqliteStore (%s)", conn_str)
yield store
return
if config.type == "postgres":
try:
from langgraph.store.postgres import PostgresStore # type: ignore[import]
except ImportError as exc:
raise ImportError(POSTGRES_STORE_INSTALL) from exc
if not config.connection_string:
raise ValueError(POSTGRES_CONN_REQUIRED)
with PostgresStore.from_conn_string(config.connection_string) as store:
store.setup()
logger.info("Store: using PostgresStore")
yield store
return
raise ValueError(f"Unknown store backend type: {config.type!r}")
# ---------------------------------------------------------------------------
# Sync singleton
# ---------------------------------------------------------------------------
_store: BaseStore | None = None
_store_ctx = None # open context manager keeping the connection alive
def get_store() -> BaseStore:
"""Return the global sync Store singleton, creating it on first call.
Returns an :class:`~langgraph.store.memory.InMemoryStore` when no
checkpointer is configured in *config.yaml* (emits a WARNING in that case).
Raises:
ImportError: If the required package for the configured backend is not installed.
ValueError: If ``connection_string`` is missing for a backend that requires it.
"""
global _store, _store_ctx
if _store is not None:
return _store
# Lazily load app config, mirroring the checkpointer singleton pattern so
# that tests that set the global checkpointer config explicitly remain isolated.
from deerflow.config.app_config import _app_config
from deerflow.config.checkpointer_config import get_checkpointer_config
config = get_checkpointer_config()
if config is None and _app_config is None:
try:
get_app_config()
except FileNotFoundError:
pass
config = get_checkpointer_config()
if config is None:
from langgraph.store.memory import InMemoryStore
logger.warning("No 'checkpointer' section in config.yaml — using InMemoryStore for the store. Thread list will be lost on server restart. Configure a sqlite or postgres backend for persistence.")
_store = InMemoryStore()
return _store
_store_ctx = _sync_store_cm(config)
_store = _store_ctx.__enter__()
return _store
def reset_store() -> None:
"""Reset the sync singleton, forcing recreation on the next call.
Closes any open backend connections and clears the cached instance.
Useful in tests or after a configuration change.
"""
global _store, _store_ctx
if _store_ctx is not None:
try:
_store_ctx.__exit__(None, None, None)
except Exception:
logger.warning("Error during store cleanup", exc_info=True)
_store_ctx = None
_store = None
# ---------------------------------------------------------------------------
# Sync context manager
# ---------------------------------------------------------------------------
@contextlib.contextmanager
def store_context() -> Iterator[BaseStore]:
"""Sync context manager that yields a Store and cleans up on exit.
Unlike :func:`get_store`, this does **not** cache the instance — each
``with`` block creates and destroys its own connection. Use it in CLI
scripts or tests where you want deterministic cleanup::
with store_context() as store:
store.put(("threads",), thread_id, {...})
Yields an :class:`~langgraph.store.memory.InMemoryStore` when no
checkpointer is configured in *config.yaml*.
"""
config = get_app_config()
if config.checkpointer is None:
from langgraph.store.memory import InMemoryStore
logger.warning("No 'checkpointer' section in config.yaml — using InMemoryStore for the store. Thread list will be lost on server restart. Configure a sqlite or postgres backend for persistence.")
yield InMemoryStore()
return
with _sync_store_cm(config.checkpointer) as store:
yield store
@@ -0,0 +1,21 @@
"""Stream bridge — decouples agent workers from SSE endpoints.
A ``StreamBridge`` sits between the background task that runs an agent
(producer) and the HTTP endpoint that pushes Server-Sent Events to
the client (consumer). This package provides an abstract protocol
(:class:`StreamBridge`) plus a default in-memory implementation backed
by :mod:`asyncio.Queue`.
"""
from .async_provider import make_stream_bridge
from .base import END_SENTINEL, HEARTBEAT_SENTINEL, StreamBridge, StreamEvent
from .memory import MemoryStreamBridge
__all__ = [
"END_SENTINEL",
"HEARTBEAT_SENTINEL",
"MemoryStreamBridge",
"StreamBridge",
"StreamEvent",
"make_stream_bridge",
]
@@ -0,0 +1,52 @@
"""Async stream bridge factory.
Provides an **async context manager** aligned with
:func:`deerflow.agents.checkpointer.async_provider.make_checkpointer`.
Usage (e.g. FastAPI lifespan)::
from deerflow.agents.stream_bridge import make_stream_bridge
async with make_stream_bridge() as bridge:
app.state.stream_bridge = bridge
"""
from __future__ import annotations
import contextlib
import logging
from collections.abc import AsyncIterator
from deerflow.config.stream_bridge_config import get_stream_bridge_config
from .base import StreamBridge
logger = logging.getLogger(__name__)
@contextlib.asynccontextmanager
async def make_stream_bridge(config=None) -> AsyncIterator[StreamBridge]:
"""Async context manager that yields a :class:`StreamBridge`.
Falls back to :class:`MemoryStreamBridge` when no configuration is
provided and nothing is set globally.
"""
if config is None:
config = get_stream_bridge_config()
if config is None or config.type == "memory":
from deerflow.runtime.stream_bridge.memory import MemoryStreamBridge
maxsize = config.queue_maxsize if config is not None else 256
bridge = MemoryStreamBridge(queue_maxsize=maxsize)
logger.info("Stream bridge initialised: memory (queue_maxsize=%d)", maxsize)
try:
yield bridge
finally:
await bridge.close()
return
if config.type == "redis":
raise NotImplementedError("Redis stream bridge planned for Phase 2")
raise ValueError(f"Unknown stream bridge type: {config.type!r}")
@@ -0,0 +1,72 @@
"""Abstract stream bridge protocol.
StreamBridge decouples agent workers (producers) from SSE endpoints
(consumers), aligning with LangGraph Platform's Queue + StreamManager
architecture.
"""
from __future__ import annotations
import abc
from collections.abc import AsyncIterator
from dataclasses import dataclass
from typing import Any
@dataclass(frozen=True)
class StreamEvent:
"""Single stream event.
Attributes:
id: Monotonically increasing event ID (used as SSE ``id:`` field,
supports ``Last-Event-ID`` reconnection).
event: SSE event name, e.g. ``"metadata"``, ``"updates"``,
``"events"``, ``"error"``, ``"end"``.
data: JSON-serialisable payload.
"""
id: str
event: str
data: Any
HEARTBEAT_SENTINEL = StreamEvent(id="", event="__heartbeat__", data=None)
END_SENTINEL = StreamEvent(id="", event="__end__", data=None)
class StreamBridge(abc.ABC):
"""Abstract base for stream bridges."""
@abc.abstractmethod
async def publish(self, run_id: str, event: str, data: Any) -> None:
"""Enqueue a single event for *run_id* (producer side)."""
@abc.abstractmethod
async def publish_end(self, run_id: str) -> None:
"""Signal that no more events will be produced for *run_id*."""
@abc.abstractmethod
def subscribe(
self,
run_id: str,
*,
last_event_id: str | None = None,
heartbeat_interval: float = 15.0,
) -> AsyncIterator[StreamEvent]:
"""Async iterator that yields events for *run_id* (consumer side).
Yields :data:`HEARTBEAT_SENTINEL` when no event arrives within
*heartbeat_interval* seconds. Yields :data:`END_SENTINEL` once
the producer calls :meth:`publish_end`.
"""
@abc.abstractmethod
async def cleanup(self, run_id: str, *, delay: float = 0) -> None:
"""Release resources associated with *run_id*.
If *delay* > 0 the implementation should wait before releasing,
giving late subscribers a chance to drain remaining events.
"""
async def close(self) -> None:
"""Release backend resources. Default is a no-op."""
@@ -0,0 +1,90 @@
"""In-memory stream bridge backed by :class:`asyncio.Queue`."""
from __future__ import annotations
import asyncio
import logging
import time
from collections.abc import AsyncIterator
from typing import Any
from .base import END_SENTINEL, HEARTBEAT_SENTINEL, StreamBridge, StreamEvent
logger = logging.getLogger(__name__)
_PUBLISH_TIMEOUT = 30.0 # seconds to wait when queue is full
class MemoryStreamBridge(StreamBridge):
"""Per-run ``asyncio.Queue`` implementation.
Each *run_id* gets its own queue on first :meth:`publish` call.
"""
def __init__(self, *, queue_maxsize: int = 256) -> None:
self._maxsize = queue_maxsize
self._queues: dict[str, asyncio.Queue[StreamEvent]] = {}
self._counters: dict[str, int] = {}
# -- helpers ---------------------------------------------------------------
def _get_or_create_queue(self, run_id: str) -> asyncio.Queue[StreamEvent]:
if run_id not in self._queues:
self._queues[run_id] = asyncio.Queue(maxsize=self._maxsize)
self._counters[run_id] = 0
return self._queues[run_id]
def _next_id(self, run_id: str) -> str:
self._counters[run_id] = self._counters.get(run_id, 0) + 1
ts = int(time.time() * 1000)
seq = self._counters[run_id] - 1
return f"{ts}-{seq}"
# -- StreamBridge API ------------------------------------------------------
async def publish(self, run_id: str, event: str, data: Any) -> None:
queue = self._get_or_create_queue(run_id)
entry = StreamEvent(id=self._next_id(run_id), event=event, data=data)
try:
await asyncio.wait_for(queue.put(entry), timeout=_PUBLISH_TIMEOUT)
except TimeoutError:
logger.warning("Stream bridge queue full for run %s — dropping event %s", run_id, event)
async def publish_end(self, run_id: str) -> None:
queue = self._get_or_create_queue(run_id)
try:
await asyncio.wait_for(queue.put(END_SENTINEL), timeout=_PUBLISH_TIMEOUT)
except TimeoutError:
logger.warning("Stream bridge queue full for run %s — dropping END sentinel", run_id)
async def subscribe(
self,
run_id: str,
*,
last_event_id: str | None = None,
heartbeat_interval: float = 15.0,
) -> AsyncIterator[StreamEvent]:
if last_event_id is not None:
logger.debug("last_event_id=%s accepted but ignored (memory bridge has no replay)", last_event_id)
queue = self._get_or_create_queue(run_id)
while True:
try:
entry = await asyncio.wait_for(queue.get(), timeout=heartbeat_interval)
except TimeoutError:
yield HEARTBEAT_SENTINEL
continue
if entry is END_SENTINEL:
yield END_SENTINEL
return
yield entry
async def cleanup(self, run_id: str, *, delay: float = 0) -> None:
if delay > 0:
await asyncio.sleep(delay)
self._queues.pop(run_id, None)
self._counters.pop(run_id, None)
async def close(self) -> None:
self._queues.clear()
self._counters.clear()