diff --git a/backend/app/gateway/routers/thread_runs.py b/backend/app/gateway/routers/thread_runs.py index 30365fb7d..3d429fc03 100644 --- a/backend/app/gateway/routers/thread_runs.py +++ b/backend/app/gateway/routers/thread_runs.py @@ -180,7 +180,8 @@ async def wait_run(thread_id: str, body: RunCreateRequest, request: Request) -> async def list_runs(thread_id: str, request: Request) -> list[RunResponse]: """List all runs for a thread.""" run_mgr = get_run_manager(request) - records = await run_mgr.list_by_thread(thread_id) + user_id = await get_current_user(request) + records = await run_mgr.list_by_thread(thread_id, user_id=user_id) return [_record_to_response(r) for r in records] @@ -189,7 +190,8 @@ async def list_runs(thread_id: str, request: Request) -> list[RunResponse]: async def get_run(thread_id: str, run_id: str, request: Request) -> RunResponse: """Get details of a specific run.""" run_mgr = get_run_manager(request) - record = run_mgr.get(run_id) + user_id = await get_current_user(request) + record = await run_mgr.aget(run_id, user_id=user_id) if record is None or record.thread_id != thread_id: raise HTTPException(status_code=404, detail=f"Run {run_id} not found") return _record_to_response(record) diff --git a/backend/packages/harness/deerflow/runtime/runs/manager.py b/backend/packages/harness/deerflow/runtime/runs/manager.py index 50dc594ab..11d6b478e 100644 --- a/backend/packages/harness/deerflow/runtime/runs/manager.py +++ b/backend/packages/harness/deerflow/runtime/runs/manager.py @@ -111,15 +111,60 @@ class RunManager: return record def get(self, run_id: str) -> RunRecord | None: - """Return a run record by ID, or ``None``.""" + """Return an in-memory run record by ID, or ``None``.""" return self._runs.get(run_id) - async def list_by_thread(self, thread_id: str) -> list[RunRecord]: - """Return all runs for a given thread, newest first.""" + async def aget(self, run_id: str, *, user_id: str | None = None) -> RunRecord | None: + """Return a run record by ID, checking the persistent store as fallback.""" + record = self._runs.get(run_id) + if record is not None: + return record + if self._store is not None: + try: + d = await self._store.get(run_id, user_id=user_id) + if d is not None: + return self._store_dict_to_record(d) + except Exception: + logger.warning("Failed to query store for run %s", run_id, exc_info=True) + return None + + def _store_dict_to_record(self, d: dict) -> RunRecord: + """Convert a store dict back to a RunRecord for read-only use.""" + return RunRecord( + run_id=d["run_id"], + thread_id=d["thread_id"], + assistant_id=d.get("assistant_id"), + status=RunStatus(d.get("status", RunStatus.error.value)), + on_disconnect=DisconnectMode.cancel, + multitask_strategy=d.get("multitask_strategy", "reject"), + metadata=d.get("metadata", {}), + kwargs=d.get("kwargs", {}), + created_at=d.get("created_at", ""), + updated_at=d.get("updated_at", ""), + model_name=d.get("model_name"), + error=d.get("error"), + ) + + async def list_by_thread(self, thread_id: str, *, user_id: str | None = None) -> list[RunRecord]: + """Return all runs for a given thread, oldest first.""" async with self._lock: - # Dict insertion order matches creation order, so reversing it gives - # us deterministic newest-first results even when timestamps tie. - return [r for r in self._runs.values() if r.thread_id == thread_id] + in_memory = [r for r in self._runs.values() if r.thread_id == thread_id] + in_memory_ids = {r.run_id for r in in_memory} + + store_records: list[RunRecord] = [] + if self._store is not None: + try: + store_dicts = await self._store.list_by_thread(thread_id, user_id=user_id) + for d in store_dicts: + if d["run_id"] not in in_memory_ids: + store_records.append(self._store_dict_to_record(d)) + except Exception: + logger.warning("Failed to query store for thread %s runs", thread_id, exc_info=True) + + return sorted( + in_memory + store_records, + key=lambda record: record.created_at or "", + ) async def set_status(self, run_id: str, status: RunStatus, *, error: str | None = None) -> None: """Transition a run to a new status.""" diff --git a/backend/packages/harness/deerflow/runtime/runs/store/base.py b/backend/packages/harness/deerflow/runtime/runs/store/base.py index d3c10eba6..a742d89ca 100644 --- a/backend/packages/harness/deerflow/runtime/runs/store/base.py +++ b/backend/packages/harness/deerflow/runtime/runs/store/base.py @@ -34,7 +34,12 @@ class RunStore(abc.ABC): pass @abc.abstractmethod - async def get(self, run_id: str) -> dict[str, Any] | None: + async def get( + self, + run_id: str, + *, + user_id: str | None = None, + ) -> dict[str, Any] | None: pass @abc.abstractmethod diff --git a/backend/packages/harness/deerflow/runtime/runs/store/memory.py b/backend/packages/harness/deerflow/runtime/runs/store/memory.py index e41147e3e..9db27cacc 100644 --- a/backend/packages/harness/deerflow/runtime/runs/store/memory.py +++ b/backend/packages/harness/deerflow/runtime/runs/store/memory.py @@ -46,8 +46,13 @@ class MemoryRunStore(RunStore): "updated_at": now, } - async def get(self, run_id): - return self._runs.get(run_id) + async def get(self, run_id, *, user_id=None): + run = self._runs.get(run_id) + if run is None: + return None + if user_id is not None and run.get("user_id") != user_id: + return None + return run async def list_by_thread(self, thread_id, *, user_id=None, limit=100): results = [r for r in self._runs.values() if r["thread_id"] == thread_id and (user_id is None or r.get("user_id") == user_id)] diff --git a/backend/tests/test_run_manager.py b/backend/tests/test_run_manager.py index 98cd58264..de8f66319 100644 --- a/backend/tests/test_run_manager.py +++ b/backend/tests/test_run_manager.py @@ -83,6 +83,7 @@ async def test_list_by_thread(manager: RunManager): runs = await manager.list_by_thread("thread-1") assert len(runs) == 2 + # list_by_thread returns oldest-first (ascending created_at). assert runs[0].run_id == r1.run_id assert runs[1].run_id == r2.run_id @@ -192,3 +193,160 @@ async def test_model_name_default_is_none(): stored = await store.get(record.run_id) assert stored["model_name"] is None + + +# --------------------------------------------------------------------------- +# Store fallback tests (simulates gateway restart scenario) +# --------------------------------------------------------------------------- + + +@pytest.fixture +def manager_with_store() -> RunManager: + """RunManager backed by a MemoryRunStore.""" + return RunManager(store=MemoryRunStore()) + + +@pytest.mark.anyio +async def test_list_by_thread_returns_store_records_after_restart(manager_with_store: RunManager): + """After in-memory state is cleared (simulating restart), list_by_thread + should still return runs from the persistent store.""" + mgr = manager_with_store + r1 = await mgr.create("thread-1", "agent-1") + await mgr.set_status(r1.run_id, RunStatus.success) + r2 = await mgr.create("thread-1", "agent-2") + await mgr.set_status(r2.run_id, RunStatus.error, error="boom") + + # Clear in-memory dict to simulate a restart + mgr._runs.clear() + + runs = await mgr.list_by_thread("thread-1") + assert len(runs) == 2 + statuses = {r.run_id: r.status for r in runs} + assert statuses[r1.run_id] == RunStatus.success + assert statuses[r2.run_id] == RunStatus.error + # Verify other fields survive the round-trip + for r in runs: + assert r.thread_id == "thread-1" + assert ISO_RE.match(r.created_at) + + +@pytest.mark.anyio +async def test_list_by_thread_merges_in_memory_and_store(manager_with_store: RunManager): + """In-memory runs should be included alongside store-only records.""" + mgr = manager_with_store + + # Create a run and let it complete (will be in both memory and store) + r1 = await mgr.create("thread-1") + await mgr.set_status(r1.run_id, RunStatus.success) + + # Simulate restart: clear memory, then create a new in-memory run + mgr._runs.clear() + r2 = await mgr.create("thread-1") + + runs = await mgr.list_by_thread("thread-1") + assert len(runs) == 2 + run_ids = {r.run_id for r in runs} + assert r1.run_id in run_ids + assert r2.run_id in run_ids + + # r2 should be the in-memory record (has live state) + r2_record = next(r for r in runs if r.run_id == r2.run_id) + assert r2_record is r2 # same object reference + + +@pytest.mark.anyio +async def test_list_by_thread_no_store(): + """Without a store, list_by_thread should only return in-memory runs.""" + mgr = RunManager() + await mgr.create("thread-1") + + mgr._runs.clear() + runs = await mgr.list_by_thread("thread-1") + assert runs == [] + + +@pytest.mark.anyio +async def test_aget_returns_in_memory_record(manager_with_store: RunManager): + """aget should return the in-memory record when available.""" + mgr = manager_with_store + r1 = await mgr.create("thread-1", "agent-1") + + result = await mgr.aget(r1.run_id) + assert result is r1 # same object + + +@pytest.mark.anyio +async def test_aget_falls_back_to_store(manager_with_store: RunManager): + """aget should return a record from the store when not in memory.""" + mgr = manager_with_store + r1 = await mgr.create("thread-1", "agent-1") + await mgr.set_status(r1.run_id, RunStatus.success) + + mgr._runs.clear() + + result = await mgr.aget(r1.run_id) + assert result is not None + assert result.run_id == r1.run_id + assert result.status == RunStatus.success + assert result.thread_id == "thread-1" + assert result.assistant_id == "agent-1" + + +@pytest.mark.anyio +async def test_aget_falls_back_to_store_with_user_filter(): + """aget should honor user_id when reading store-only records.""" + store = MemoryRunStore() + await store.put("run-1", thread_id="thread-1", user_id="user-1", status="success") + mgr = RunManager(store=store) + + allowed = await mgr.aget("run-1", user_id="user-1") + denied = await mgr.aget("run-1", user_id="user-2") + assert allowed is not None + assert denied is None + + +@pytest.mark.anyio +async def test_aget_returns_none_for_unknown(manager_with_store: RunManager): + """aget should return None for a run ID that doesn't exist anywhere.""" + result = await manager_with_store.aget("nonexistent-run-id") + assert result is None + + +@pytest.mark.anyio +async def test_aget_store_failure_is_graceful(): + """If the store raises, aget should return None instead of propagating.""" + from unittest.mock import AsyncMock + + store = MemoryRunStore() + store.get = AsyncMock(side_effect=RuntimeError("db down")) + mgr = RunManager(store=store) + + result = await mgr.aget("some-id") + assert result is None + + +@pytest.mark.anyio +async def test_list_by_thread_store_failure_is_graceful(): + """If the store raises, list_by_thread should return only in-memory runs.""" + from unittest.mock import AsyncMock + + store = MemoryRunStore() + store.list_by_thread = AsyncMock(side_effect=RuntimeError("db down")) + mgr = RunManager(store=store) + + r1 = await mgr.create("thread-1") + runs = await mgr.list_by_thread("thread-1") + assert len(runs) == 1 + assert runs[0].run_id == r1.run_id + + +@pytest.mark.anyio +async def test_list_by_thread_falls_back_to_store_with_user_filter(): + """list_by_thread should return only the requesting user's store records.""" + store = MemoryRunStore() + await store.put("run-1", thread_id="thread-1", user_id="user-1", status="success") + await store.put("run-2", thread_id="thread-1", user_id="user-2", status="success") + mgr = RunManager(store=store) + + runs = await mgr.list_by_thread("thread-1", user_id="user-1") + assert [r.run_id for r in runs] == ["run-1"]