mirror of
https://github.com/bytedance/deer-flow.git
synced 2026-05-21 23:46:50 +00:00
* fix: repair stream resume run metadata # Conflicts: # backend/packages/harness/deerflow/runtime/stream_bridge/memory.py # frontend/src/core/threads/hooks.ts * fix(stream): repair resumable replay validation --------- Co-authored-by: luoxiao6645 <luoxiao6645@gmail.com> Co-authored-by: Willem Jiang <willem.jiang@gmail.com>
This commit is contained in:
@@ -1,4 +1,4 @@
|
||||
"""In-memory stream bridge backed by :class:`asyncio.Queue`."""
|
||||
"""In-memory stream bridge backed by an in-process event log."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
@@ -6,35 +6,41 @@ import asyncio
|
||||
import logging
|
||||
import time
|
||||
from collections.abc import AsyncIterator
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Any
|
||||
|
||||
from .base import END_SENTINEL, HEARTBEAT_SENTINEL, StreamBridge, StreamEvent
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_PUBLISH_TIMEOUT = 30.0 # seconds to wait when queue is full
|
||||
|
||||
@dataclass
|
||||
class _RunStream:
|
||||
events: list[StreamEvent] = field(default_factory=list)
|
||||
condition: asyncio.Condition = field(default_factory=asyncio.Condition)
|
||||
ended: bool = False
|
||||
start_offset: int = 0
|
||||
|
||||
|
||||
class MemoryStreamBridge(StreamBridge):
|
||||
"""Per-run ``asyncio.Queue`` implementation.
|
||||
"""Per-run in-memory event log implementation.
|
||||
|
||||
Each *run_id* gets its own queue on first :meth:`publish` call.
|
||||
Events are retained for a bounded time window per run so late subscribers
|
||||
and reconnecting clients can replay buffered events from ``Last-Event-ID``.
|
||||
"""
|
||||
|
||||
def __init__(self, *, queue_maxsize: int = 256) -> None:
|
||||
self._maxsize = queue_maxsize
|
||||
self._queues: dict[str, asyncio.Queue[StreamEvent]] = {}
|
||||
self._streams: dict[str, _RunStream] = {}
|
||||
self._counters: dict[str, int] = {}
|
||||
self._dropped_counts: dict[str, int] = {}
|
||||
|
||||
# -- helpers ---------------------------------------------------------------
|
||||
|
||||
def _get_or_create_queue(self, run_id: str) -> asyncio.Queue[StreamEvent]:
|
||||
if run_id not in self._queues:
|
||||
self._queues[run_id] = asyncio.Queue(maxsize=self._maxsize)
|
||||
def _get_or_create_stream(self, run_id: str) -> _RunStream:
|
||||
if run_id not in self._streams:
|
||||
self._streams[run_id] = _RunStream()
|
||||
self._counters[run_id] = 0
|
||||
self._dropped_counts[run_id] = 0
|
||||
return self._queues[run_id]
|
||||
return self._streams[run_id]
|
||||
|
||||
def _next_id(self, run_id: str) -> str:
|
||||
self._counters[run_id] = self._counters.get(run_id, 0) + 1
|
||||
@@ -42,49 +48,39 @@ class MemoryStreamBridge(StreamBridge):
|
||||
seq = self._counters[run_id] - 1
|
||||
return f"{ts}-{seq}"
|
||||
|
||||
def _resolve_start_offset(self, stream: _RunStream, last_event_id: str | None) -> int:
|
||||
if last_event_id is None:
|
||||
return stream.start_offset
|
||||
|
||||
for index, entry in enumerate(stream.events):
|
||||
if entry.id == last_event_id:
|
||||
return stream.start_offset + index + 1
|
||||
|
||||
if stream.events:
|
||||
logger.warning(
|
||||
"last_event_id=%s not found in retained buffer; replaying from earliest retained event",
|
||||
last_event_id,
|
||||
)
|
||||
return stream.start_offset
|
||||
|
||||
# -- StreamBridge API ------------------------------------------------------
|
||||
|
||||
async def publish(self, run_id: str, event: str, data: Any) -> None:
|
||||
queue = self._get_or_create_queue(run_id)
|
||||
stream = self._get_or_create_stream(run_id)
|
||||
entry = StreamEvent(id=self._next_id(run_id), event=event, data=data)
|
||||
try:
|
||||
await asyncio.wait_for(queue.put(entry), timeout=_PUBLISH_TIMEOUT)
|
||||
except TimeoutError:
|
||||
self._dropped_counts[run_id] = self._dropped_counts.get(run_id, 0) + 1
|
||||
logger.warning(
|
||||
"Stream bridge queue full for run %s — dropping event %s (total dropped: %d)",
|
||||
run_id,
|
||||
event,
|
||||
self._dropped_counts[run_id],
|
||||
)
|
||||
async with stream.condition:
|
||||
stream.events.append(entry)
|
||||
if len(stream.events) > self._maxsize:
|
||||
overflow = len(stream.events) - self._maxsize
|
||||
del stream.events[:overflow]
|
||||
stream.start_offset += overflow
|
||||
stream.condition.notify_all()
|
||||
|
||||
async def publish_end(self, run_id: str) -> None:
|
||||
queue = self._get_or_create_queue(run_id)
|
||||
|
||||
# END sentinel is critical — it is the only signal that allows
|
||||
# subscribers to terminate. If the queue is full we evict the
|
||||
# oldest *regular* events to make room rather than dropping END,
|
||||
# which would cause the SSE connection to hang forever and leak
|
||||
# the queue/counter resources for this run_id.
|
||||
if queue.full():
|
||||
evicted = 0
|
||||
while queue.full():
|
||||
try:
|
||||
queue.get_nowait()
|
||||
evicted += 1
|
||||
except asyncio.QueueEmpty:
|
||||
break # pragma: no cover – defensive
|
||||
if evicted:
|
||||
logger.warning(
|
||||
"Stream bridge queue full for run %s — evicted %d event(s) to guarantee END sentinel delivery",
|
||||
run_id,
|
||||
evicted,
|
||||
)
|
||||
|
||||
# After eviction the queue is guaranteed to have space, so a
|
||||
# simple non-blocking put is safe. We still use put() (which
|
||||
# blocks until space is available) as a defensive measure.
|
||||
await queue.put(END_SENTINEL)
|
||||
stream = self._get_or_create_stream(run_id)
|
||||
async with stream.condition:
|
||||
stream.ended = True
|
||||
stream.condition.notify_all()
|
||||
|
||||
async def subscribe(
|
||||
self,
|
||||
@@ -93,16 +89,34 @@ class MemoryStreamBridge(StreamBridge):
|
||||
last_event_id: str | None = None,
|
||||
heartbeat_interval: float = 15.0,
|
||||
) -> AsyncIterator[StreamEvent]:
|
||||
if last_event_id is not None:
|
||||
logger.debug("last_event_id=%s accepted but ignored (memory bridge has no replay)", last_event_id)
|
||||
stream = self._get_or_create_stream(run_id)
|
||||
async with stream.condition:
|
||||
next_offset = self._resolve_start_offset(stream, last_event_id)
|
||||
|
||||
queue = self._get_or_create_queue(run_id)
|
||||
while True:
|
||||
try:
|
||||
entry = await asyncio.wait_for(queue.get(), timeout=heartbeat_interval)
|
||||
except TimeoutError:
|
||||
yield HEARTBEAT_SENTINEL
|
||||
continue
|
||||
async with stream.condition:
|
||||
if next_offset < stream.start_offset:
|
||||
logger.warning(
|
||||
"subscriber for run %s fell behind retained buffer; resuming from offset %s",
|
||||
run_id,
|
||||
stream.start_offset,
|
||||
)
|
||||
next_offset = stream.start_offset
|
||||
|
||||
local_index = next_offset - stream.start_offset
|
||||
if 0 <= local_index < len(stream.events):
|
||||
entry = stream.events[local_index]
|
||||
next_offset += 1
|
||||
elif stream.ended:
|
||||
entry = END_SENTINEL
|
||||
else:
|
||||
try:
|
||||
await asyncio.wait_for(stream.condition.wait(), timeout=heartbeat_interval)
|
||||
except TimeoutError:
|
||||
entry = HEARTBEAT_SENTINEL
|
||||
else:
|
||||
continue
|
||||
|
||||
if entry is END_SENTINEL:
|
||||
yield END_SENTINEL
|
||||
return
|
||||
@@ -111,20 +125,9 @@ class MemoryStreamBridge(StreamBridge):
|
||||
async def cleanup(self, run_id: str, *, delay: float = 0) -> None:
|
||||
if delay > 0:
|
||||
await asyncio.sleep(delay)
|
||||
self._queues.pop(run_id, None)
|
||||
self._streams.pop(run_id, None)
|
||||
self._counters.pop(run_id, None)
|
||||
self._dropped_counts.pop(run_id, None)
|
||||
|
||||
async def close(self) -> None:
|
||||
self._queues.clear()
|
||||
self._streams.clear()
|
||||
self._counters.clear()
|
||||
self._dropped_counts.clear()
|
||||
|
||||
def dropped_count(self, run_id: str) -> int:
|
||||
"""Return the number of events dropped for *run_id*."""
|
||||
return self._dropped_counts.get(run_id, 0)
|
||||
|
||||
@property
|
||||
def dropped_total(self) -> int:
|
||||
"""Return the total number of events dropped across all runs."""
|
||||
return sum(self._dropped_counts.values())
|
||||
|
||||
Reference in New Issue
Block a user