fix(persistence): stream hang when run_events.backend=db

DbRunEventStore._user_id_from_context() returned user.id without
coercing it to str. User.id is a Pydantic UUID, and aiosqlite cannot
bind a raw UUID object to a VARCHAR column, so the INSERT for the
initial human_message event silently rolled back and raised out of
the worker task. Because that put() sat outside the worker's try
block, the finally-clause that publishes end-of-stream never ran
and the SSE stream hung forever.

jsonl mode was unaffected because json.dumps(default=str) coerces
UUID objects transparently.

Fixes:
- db.py: coerce user.id to str at the context-read boundary (matches
  what resolve_user_id already does for the other repositories)
- worker.py: move RunJournal init + human_message put inside the try
  block so any failure flows through the finally/publish_end path
  instead of hanging the subscriber

Defense-in-depth:
- engine.py: add PRAGMA busy_timeout=5000 so checkpointer and event
  store wait for each other on the shared deerflow.db file instead
  of failing immediately under write-lock contention
- journal.py: skip fire-and-forget _flush_sync when a previous flush
  task is still in flight, to avoid piling up concurrent put_batch
  writes on the same SQLAlchemy engine during streaming; flush() now
  waits for pending tasks before draining the buffer
- database_config.py: doc-only update clarifying WAL + busy_timeout
  keep the unified deerflow.db safe for both workloads

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
rayhpeng
2026-04-11 11:16:22 +08:00
parent 20f64bbf4f
commit 10cc651578
5 changed files with 76 additions and 44 deletions
@@ -7,7 +7,10 @@ configures one backend; the system handles physical separation details.
SQLite mode: checkpointer and app share a single .db file SQLite mode: checkpointer and app share a single .db file
({sqlite_dir}/deerflow.db) with WAL journal mode enabled on every ({sqlite_dir}/deerflow.db) with WAL journal mode enabled on every
connection. WAL allows concurrent readers and a single writer without connection. WAL allows concurrent readers and a single writer without
blocking, making a unified file safe for both workloads. blocking, making a unified file safe for both workloads. The
``busy_timeout`` PRAGMA (set in ``engine.py``) ensures writers wait
for each other instead of failing immediately when they contend for
the write lock.
Postgres mode: both use the same database URL but maintain independent Postgres mode: both use the same database URL but maintain independent
connection pools with different lifecycles. connection pools with different lifecycles.
@@ -105,6 +105,7 @@ async def init_engine(
cursor.execute("PRAGMA journal_mode=WAL;") cursor.execute("PRAGMA journal_mode=WAL;")
cursor.execute("PRAGMA synchronous=NORMAL;") cursor.execute("PRAGMA synchronous=NORMAL;")
cursor.execute("PRAGMA foreign_keys=ON;") cursor.execute("PRAGMA foreign_keys=ON;")
cursor.execute("PRAGMA busy_timeout=5000;")
finally: finally:
cursor.close() cursor.close()
elif backend == "postgres": elif backend == "postgres":
@@ -62,9 +62,15 @@ class DbRunEventStore(RunEventStore):
which is the expected case for background worker writes. HTTP which is the expected case for background worker writes. HTTP
request writes will have the contextvar set by auth middleware request writes will have the contextvar set by auth middleware
and get their user_id stamped automatically. and get their user_id stamped automatically.
Coerces ``user.id`` to ``str`` at the boundary: ``User.id`` is
typed as ``UUID`` by the auth layer, but ``run_events.user_id``
is ``VARCHAR(64)`` and aiosqlite cannot bind a raw UUID object
to a VARCHAR column ("type 'UUID' is not supported") — the
INSERT would silently roll back and the worker would hang.
""" """
user = get_current_user() user = get_current_user()
return user.id if user is not None else None return str(user.id) if user is not None else None
async def put(self, *, thread_id, run_id, event_type, category, content="", metadata=None, created_at=None): # noqa: D401 async def put(self, *, thread_id, run_id, event_type, category, content="", metadata=None, created_at=None): # noqa: D401
"""Write a single event — low-frequency path only. """Write a single event — low-frequency path only.
@@ -50,6 +50,7 @@ class RunJournal(BaseCallbackHandler):
# Write buffer # Write buffer
self._buffer: list[dict] = [] self._buffer: list[dict] = []
self._pending_flush_tasks: set[asyncio.Task[None]] = set()
# Token accumulators # Token accumulators
self._total_input_tokens = 0 self._total_input_tokens = 0
@@ -381,6 +382,10 @@ class RunJournal(BaseCallbackHandler):
""" """
if not self._buffer: if not self._buffer:
return return
# Skip if a flush is already in flight — avoids concurrent writes
# to the same SQLite file from multiple fire-and-forget tasks.
if self._pending_flush_tasks:
return
try: try:
loop = asyncio.get_running_loop() loop = asyncio.get_running_loop()
except RuntimeError: except RuntimeError:
@@ -389,6 +394,7 @@ class RunJournal(BaseCallbackHandler):
batch = self._buffer.copy() batch = self._buffer.copy()
self._buffer.clear() self._buffer.clear()
task = loop.create_task(self._flush_async(batch)) task = loop.create_task(self._flush_async(batch))
self._pending_flush_tasks.add(task)
task.add_done_callback(self._on_flush_done) task.add_done_callback(self._on_flush_done)
async def _flush_async(self, batch: list[dict]) -> None: async def _flush_async(self, batch: list[dict]) -> None:
@@ -404,8 +410,8 @@ class RunJournal(BaseCallbackHandler):
# Return failed events to buffer for retry on next flush # Return failed events to buffer for retry on next flush
self._buffer = batch + self._buffer self._buffer = batch + self._buffer
@staticmethod def _on_flush_done(self, task: asyncio.Task) -> None:
def _on_flush_done(task: asyncio.Task) -> None: self._pending_flush_tasks.discard(task)
if task.cancelled(): if task.cancelled():
return return
exc = task.exception() exc = task.exception()
@@ -450,10 +456,17 @@ class RunJournal(BaseCallbackHandler):
async def flush(self) -> None: async def flush(self) -> None:
"""Force flush remaining buffer. Called in worker's finally block.""" """Force flush remaining buffer. Called in worker's finally block."""
if self._buffer: if self._pending_flush_tasks:
batch = self._buffer.copy() await asyncio.gather(*tuple(self._pending_flush_tasks), return_exceptions=True)
self._buffer.clear()
await self._store.put_batch(batch) while self._buffer:
batch = self._buffer[: self._flush_threshold]
del self._buffer[: self._flush_threshold]
try:
await self._store.put_batch(batch)
except Exception:
self._buffer = batch + self._buffer
raise
def get_completion_data(self) -> dict: def get_completion_data(self) -> dict:
"""Return accumulated token and message data for run completion.""" """Return accumulated token and message data for run completion."""
@@ -143,34 +143,7 @@ async def run_agent(
content = human_msg.content content = human_msg.content
journal.set_first_human_message(content if isinstance(content, str) else str(content)) journal.set_first_human_message(content if isinstance(content, str) else str(content))
# Initialize RunJournal for event capture
journal = None journal = None
if event_store is not None:
from deerflow.runtime.journal import RunJournal
journal = RunJournal(
run_id=run_id,
thread_id=thread_id,
event_store=event_store,
track_token_usage=getattr(run_events_config, "track_token_usage", True),
)
# Write human_message event (model_dump format, aligned with checkpoint)
human_msg = _extract_human_message(graph_input)
if human_msg is not None:
msg_metadata = {}
if follow_up_to_run_id:
msg_metadata["follow_up_to_run_id"] = follow_up_to_run_id
await event_store.put(
thread_id=thread_id,
run_id=run_id,
event_type="human_message",
category="message",
content=human_msg.model_dump(),
metadata=msg_metadata or None,
)
content = human_msg.content
journal.set_first_human_message(content if isinstance(content, str) else str(content))
# Track whether "events" was requested but skipped # Track whether "events" was requested but skipped
if "events" in requested_modes: if "events" in requested_modes:
@@ -180,6 +153,38 @@ async def run_agent(
) )
try: try:
# Initialize RunJournal + write human_message event.
# These are inside the try block so any exception (e.g. a DB
# error writing the event) flows through the except/finally
# path that publishes an "end" event to the SSE bridge —
# otherwise a failure here would leave the stream hanging
# with no terminator.
if event_store is not None:
from deerflow.runtime.journal import RunJournal
journal = RunJournal(
run_id=run_id,
thread_id=thread_id,
event_store=event_store,
track_token_usage=getattr(run_events_config, "track_token_usage", True),
)
human_msg = _extract_human_message(graph_input)
if human_msg is not None:
msg_metadata = {}
if follow_up_to_run_id:
msg_metadata["follow_up_to_run_id"] = follow_up_to_run_id
await event_store.put(
thread_id=thread_id,
run_id=run_id,
event_type="human_message",
category="message",
content=human_msg.model_dump(),
metadata=msg_metadata or None,
)
content = human_msg.content
journal.set_first_human_message(content if isinstance(content, str) else str(content))
# 1. Mark running # 1. Mark running
await run_manager.set_status(run_id, RunStatus.running) await run_manager.set_status(run_id, RunStatus.running)
@@ -363,12 +368,15 @@ async def run_agent(
except Exception: except Exception:
logger.warning("Failed to flush journal for run %s", run_id, exc_info=True) logger.warning("Failed to flush journal for run %s", run_id, exc_info=True)
# Persist token usage + convenience fields to RunStore try:
completion = journal.get_completion_data() # Persist token usage + convenience fields to RunStore
await run_manager.update_run_completion(run_id, status=record.status.value, **completion) completion = journal.get_completion_data()
await run_manager.update_run_completion(run_id, status=record.status.value, **completion)
except Exception:
logger.warning("Failed to persist run completion for %s (non-fatal)", run_id, exc_info=True)
# Sync title from checkpoint to threads_meta.display_name # Sync title from checkpoint to threads_meta.display_name
if checkpointer is not None: if checkpointer is not None and thread_store is not None:
try: try:
ckpt_config = {"configurable": {"thread_id": thread_id, "checkpoint_ns": ""}} ckpt_config = {"configurable": {"thread_id": thread_id, "checkpoint_ns": ""}}
ckpt_tuple = await checkpointer.aget_tuple(ckpt_config) ckpt_tuple = await checkpointer.aget_tuple(ckpt_config)
@@ -381,11 +389,12 @@ async def run_agent(
logger.debug("Failed to sync title for thread %s (non-fatal)", thread_id) logger.debug("Failed to sync title for thread %s (non-fatal)", thread_id)
# Update threads_meta status based on run outcome # Update threads_meta status based on run outcome
try: if thread_store is not None:
final_status = "idle" if record.status == RunStatus.success else record.status.value try:
await thread_store.update_status(thread_id, final_status) final_status = "idle" if record.status == RunStatus.success else record.status.value
except Exception: await thread_store.update_status(thread_id, final_status)
logger.debug("Failed to update thread_meta status for %s (non-fatal)", thread_id) except Exception:
logger.debug("Failed to update thread_meta status for %s (non-fatal)", thread_id)
await bridge.publish_end(run_id) await bridge.publish_end(run_id)
asyncio.create_task(bridge.cleanup(run_id, delay=60)) asyncio.create_task(bridge.cleanup(run_id, delay=60))