diff --git a/backend/packages/harness/deerflow/runtime/runs/manager.py b/backend/packages/harness/deerflow/runtime/runs/manager.py index ea78f89c9..cce8e3bf5 100644 --- a/backend/packages/harness/deerflow/runtime/runs/manager.py +++ b/backend/packages/harness/deerflow/runtime/runs/manager.py @@ -53,24 +53,27 @@ class RunManager: self._lock = asyncio.Lock() self._store = store - async def _persist_to_store(self, record: RunRecord) -> None: - """Best-effort persist run record to backing store.""" + async def _persist_new_run_to_store(self, record: RunRecord) -> None: + """Persist a newly created run record to the backing store. + + Initial run creation is part of the run visibility boundary: callers + should not observe a run in memory unless its backing store row exists. + Unlike follow-up status/model updates, failures are propagated so the + caller can treat creation as failed. + """ if self._store is None: return - try: - await self._store.put( - record.run_id, - thread_id=record.thread_id, - assistant_id=record.assistant_id, - status=record.status.value, - multitask_strategy=record.multitask_strategy, - metadata=record.metadata or {}, - kwargs=record.kwargs or {}, - created_at=record.created_at, - model_name=record.model_name, - ) - except Exception: - logger.warning("Failed to persist run %s to store", record.run_id, exc_info=True) + await self._store.put( + record.run_id, + thread_id=record.thread_id, + assistant_id=record.assistant_id, + status=record.status.value, + multitask_strategy=record.multitask_strategy, + metadata=record.metadata or {}, + kwargs=record.kwargs or {}, + created_at=record.created_at, + model_name=record.model_name, + ) async def _persist_status(self, run_id: str, status: RunStatus, *, error: str | None = None) -> None: """Best-effort persist a status transition to the backing store.""" @@ -139,7 +142,12 @@ class RunManager: ) async with self._lock: self._runs[run_id] = record - await self._persist_to_store(record) + try: + await self._persist_new_run_to_store(record) + except Exception: + self._runs.pop(run_id, None) + logger.warning("Failed to persist run %s; rolled back in-memory record", run_id, exc_info=True) + raise logger.info("Run created: run_id=%s thread_id=%s", run_id, thread_id) return record @@ -317,16 +325,8 @@ class RunManager: raise ConflictError(f"Thread {thread_id} already has an active run") if multitask_strategy in ("interrupt", "rollback") and inflight: - for r in inflight: - r.abort_action = multitask_strategy - r.abort_event.set() - if r.task is not None and not r.task.done(): - r.task.cancel() - r.status = RunStatus.interrupted - r.updated_at = now - interrupted_run_ids.append(r.run_id) logger.info( - "Cancelled %d inflight run(s) on thread %s (strategy=%s)", + "Preparing to cancel %d inflight run(s) on thread %s (strategy=%s)", len(inflight), thread_id, multitask_strategy, @@ -346,10 +346,25 @@ class RunManager: model_name=model_name, ) self._runs[run_id] = record + try: + await self._persist_new_run_to_store(record) + except Exception: + self._runs.pop(run_id, None) + logger.warning("Failed to persist run %s; rolled back in-memory record", run_id, exc_info=True) + raise + + if multitask_strategy in ("interrupt", "rollback") and inflight: + for r in inflight: + r.abort_action = multitask_strategy + r.abort_event.set() + if r.task is not None and not r.task.done(): + r.task.cancel() + r.status = RunStatus.interrupted + r.updated_at = now + interrupted_run_ids.append(r.run_id) for interrupted_run_id in interrupted_run_ids: await self._persist_status(interrupted_run_id, RunStatus.interrupted) - await self._persist_to_store(record) logger.info("Run created: run_id=%s thread_id=%s", run_id, thread_id) return record diff --git a/backend/tests/test_run_manager.py b/backend/tests/test_run_manager.py index e7b5f06f5..8dbd277b6 100644 --- a/backend/tests/test_run_manager.py +++ b/backend/tests/test_run_manager.py @@ -1,5 +1,6 @@ """Tests for RunManager.""" +import asyncio import re import pytest @@ -231,6 +232,56 @@ async def test_create_record_is_not_store_only(manager: RunManager): assert record.store_only is False +@pytest.mark.anyio +async def test_create_rolls_back_in_memory_record_on_store_failure(): + """create() must fail and hide the run when the initial store write fails.""" + from unittest.mock import AsyncMock + + store = MemoryRunStore() + store.put = AsyncMock(side_effect=RuntimeError("db down")) + manager = RunManager(store=store) + + with pytest.raises(RuntimeError, match="db down"): + await manager.create("thread-1") + + assert manager._runs == {} + assert await manager.list_by_thread("thread-1") == [] + + +@pytest.mark.anyio +async def test_create_does_not_expose_run_until_store_persist_completes(): + """Concurrent readers must wait until the new run has been persisted.""" + store = MemoryRunStore() + manager = RunManager(store=store) + original_put = store.put + put_started = asyncio.Event() + allow_put = asyncio.Event() + + async def blocking_put(run_id, **kwargs): + put_started.set() + await allow_put.wait() + return await original_put(run_id, **kwargs) + + store.put = blocking_put + create_task = asyncio.create_task(manager.create("thread-1")) + + try: + await put_started.wait() + list_task = asyncio.create_task(manager.list_by_thread("thread-1")) + await asyncio.sleep(0) + assert not list_task.done() + + allow_put.set() + record = await create_task + runs = await list_task + + assert [run.run_id for run in runs] == [record.run_id] + finally: + allow_put.set() + if not create_task.done(): + create_task.cancel() + + @pytest.mark.anyio async def test_get_prefers_in_memory_record_over_store(): """In-memory records retain task/control state when store has same run.""" @@ -318,6 +369,28 @@ async def test_create_or_reject_interrupt_persists_interrupted_status_to_store() assert stored_old["status"] == "interrupted" +@pytest.mark.anyio +async def test_create_or_reject_does_not_interrupt_old_run_when_new_run_store_write_fails(): + """A failed new-run persist must not cancel the existing inflight run.""" + from unittest.mock import AsyncMock + + store = MemoryRunStore() + manager = RunManager(store=store) + old = await manager.create("thread-1") + await manager.set_status(old.run_id, RunStatus.running) + store.put = AsyncMock(side_effect=RuntimeError("db down")) + + with pytest.raises(RuntimeError, match="db down"): + await manager.create_or_reject("thread-1", multitask_strategy="interrupt") + + stored_old = await store.get(old.run_id) + assert list(manager._runs) == [old.run_id] + assert old.status == RunStatus.running + assert old.abort_event.is_set() is False + assert stored_old is not None + assert stored_old["status"] == "running" + + @pytest.mark.anyio async def test_create_or_reject_rollback_persists_interrupted_status_to_store(): """rollback strategy should persist interrupted status for old runs."""