From 9b19cca91c7d33dee2d39607edf19be3ef2e9558 Mon Sep 17 00:00:00 2001 From: john lee <64lamei@gmail.com> Date: Wed, 20 May 2026 16:37:36 +0800 Subject: [PATCH] fix(runtime): make RunManager.cancel() idempotent for already-interrupted runs (#3055) (#3058) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A second cancel() call on an interrupted run returned False, causing the cancel and stream_existing_run router endpoints to raise 409 on double-stop. Fix: return True inside the lock when record.status == RunStatus.interrupted. This covers both the POST /cancel and POST /join endpoints without any re-fetch or extra get() call — the idempotency lives at the source. Also fixes stream_existing_run (the LangGraph SDK stop-button path), which had the identical cancel() → 409 pattern and was not covered by the original PR. Both endpoints share the fix automatically. Co-authored-by: Claude Sonnet 4.6 --- .../harness/deerflow/runtime/runs/manager.py | 7 +- backend/tests/test_cancel_run_idempotent.py | 142 ++++++++++++++++++ 2 files changed, 148 insertions(+), 1 deletion(-) create mode 100644 backend/tests/test_cancel_run_idempotent.py diff --git a/backend/packages/harness/deerflow/runtime/runs/manager.py b/backend/packages/harness/deerflow/runtime/runs/manager.py index 06731eb91..ea78f89c9 100644 --- a/backend/packages/harness/deerflow/runtime/runs/manager.py +++ b/backend/packages/harness/deerflow/runtime/runs/manager.py @@ -258,12 +258,17 @@ class RunManager: action: "interrupt" keeps checkpoint, "rollback" reverts to pre-run state. Sets the abort event with the action reason and cancels the asyncio task. - Returns ``True`` if the run was in-flight and cancellation was initiated. + Returns ``True`` if cancellation was initiated **or** the run was already + interrupted (idempotent — a second cancel is a no-op success). + Returns ``False`` only when the run is unknown to this worker or has + reached a terminal state other than interrupted (completed, failed, etc.). """ async with self._lock: record = self._runs.get(run_id) if record is None: return False + if record.status == RunStatus.interrupted: + return True # idempotent — already cancelled on this worker if record.status not in (RunStatus.pending, RunStatus.running): return False record.abort_action = action diff --git a/backend/tests/test_cancel_run_idempotent.py b/backend/tests/test_cancel_run_idempotent.py new file mode 100644 index 000000000..0bf2548d1 --- /dev/null +++ b/backend/tests/test_cancel_run_idempotent.py @@ -0,0 +1,142 @@ +"""Tests for idempotent run cancellation (issue #3055). + +RunManager.cancel() returns True when a run is already interrupted so that +a second cancel request from the same worker is treated as a no-op success +(202) rather than a conflict (409). Both the POST cancel endpoint and the +POST stream endpoint share this behaviour through the same cancel() call. +""" + +from __future__ import annotations + +import asyncio + +from _router_auth_helpers import make_authed_test_app +from fastapi.testclient import TestClient + +from app.gateway.routers import thread_runs +from deerflow.runtime import RunManager, RunStatus + +THREAD_ID = "thread-cancel-test" + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _make_app(mgr: RunManager) -> TestClient: + app = make_authed_test_app() + app.include_router(thread_runs.router) + app.state.run_manager = mgr + return TestClient(app, raise_server_exceptions=False) + + +def _create_interrupted_run(mgr: RunManager) -> str: + """Create a run and cancel it, returning its run_id.""" + + async def _setup(): + record = await mgr.create(THREAD_ID) + await mgr.set_status(record.run_id, RunStatus.running) + await mgr.cancel(record.run_id) + return record.run_id + + return asyncio.run(_setup()) + + +# --------------------------------------------------------------------------- +# RunManager.cancel() unit tests +# --------------------------------------------------------------------------- + + +class TestRunManagerCancelIdempotency: + def test_cancel_returns_true_for_already_interrupted_run(self): + """cancel() must return True when the run is already interrupted.""" + + async def run(): + mgr = RunManager() + record = await mgr.create(THREAD_ID) + await mgr.set_status(record.run_id, RunStatus.running) + first = await mgr.cancel(record.run_id) + assert first is True + second = await mgr.cancel(record.run_id) + assert second is True # idempotent + + asyncio.run(run()) + + def test_cancel_returns_false_for_successful_run(self): + """cancel() must still return False for runs that completed successfully.""" + + async def run(): + mgr = RunManager() + record = await mgr.create(THREAD_ID) + await mgr.set_status(record.run_id, RunStatus.running) + await mgr.set_status(record.run_id, RunStatus.success) + result = await mgr.cancel(record.run_id) + assert result is False + + asyncio.run(run()) + + def test_cancel_returns_false_for_unknown_run(self): + async def run(): + mgr = RunManager() + result = await mgr.cancel("nonexistent-run-id") + assert result is False + + asyncio.run(run()) + + +# --------------------------------------------------------------------------- +# POST /cancel endpoint — idempotent 202 +# --------------------------------------------------------------------------- + + +class TestCancelRunEndpointIdempotency: + def test_double_cancel_returns_202_not_409(self): + """Second cancel on an already-interrupted run must return 202, not 409.""" + mgr = RunManager() + run_id = _create_interrupted_run(mgr) + client = _make_app(mgr) + + resp = client.post(f"/api/threads/{THREAD_ID}/runs/{run_id}/cancel") + assert resp.status_code == 202, f"Expected 202, got {resp.status_code}: {resp.text}" + + def test_cancel_unknown_run_returns_404(self): + mgr = RunManager() + client = _make_app(mgr) + resp = client.post(f"/api/threads/{THREAD_ID}/runs/no-such-run/cancel") + assert resp.status_code == 404 + + def test_cancel_successful_run_returns_409(self): + """Successfully-completed runs cannot be cancelled — must return 409.""" + + async def _setup(): + mgr = RunManager() + record = await mgr.create(THREAD_ID) + await mgr.set_status(record.run_id, RunStatus.running) + await mgr.set_status(record.run_id, RunStatus.success) + return mgr, record.run_id + + mgr, run_id = asyncio.run(_setup()) + client = _make_app(mgr) + resp = client.post(f"/api/threads/{THREAD_ID}/runs/{run_id}/cancel") + assert resp.status_code == 409 + + +# --------------------------------------------------------------------------- +# POST /{thread_id}/runs/{run_id}/join (stream_existing_run) — idempotent cancel +# --------------------------------------------------------------------------- + + +class TestStreamExistingRunIdempotentCancel: + def test_stream_cancel_already_interrupted_returns_not_409(self): + """stream_existing_run with action=interrupt on an already-interrupted run + must not raise 409 — the idempotent cancel path returns 202/SSE.""" + mgr = RunManager() + run_id = _create_interrupted_run(mgr) + client = _make_app(mgr) + + resp = client.post( + f"/api/threads/{THREAD_ID}/runs/{run_id}/join", + params={"action": "interrupt"}, + ) + assert resp.status_code != 409, f"Should not 409 on idempotent cancel, got {resp.status_code}"