diff --git a/backend/packages/harness/deerflow/tools/builtins/task_tool.py b/backend/packages/harness/deerflow/tools/builtins/task_tool.py index cf9281ff4..a45bff787 100644 --- a/backend/packages/harness/deerflow/tools/builtins/task_tool.py +++ b/backend/packages/harness/deerflow/tools/builtins/task_tool.py @@ -383,9 +383,6 @@ async def task_tool( # Polling timeout as a safety net (in case thread pool timeout doesn't work) # Set to execution timeout + 60s buffer, in 5s poll intervals # This catches edge cases where the background task gets stuck - # Note: We don't call cleanup_background_task here because the task may - # still be running in the background. The cleanup will happen when the - # executor completes and sets a terminal status. if poll_count > max_poll_count: timeout_minutes = config.timeout_seconds // 60 logger.error(f"[trace={trace_id}] Task {task_id} polling timed out after {poll_count} polls (should have been caught by thread pool timeout)") @@ -393,6 +390,11 @@ async def task_tool( usage = _summarize_usage(getattr(result, "token_usage_records", None)) _cache_subagent_usage(tool_call_id, usage, enabled=cache_token_usage) writer({"type": "task_timed_out", "task_id": task_id, "usage": usage}) + # The task may still be running in the background. Signal cooperative + # cancellation and schedule deferred cleanup to remove the entry from + # _background_tasks once the background thread reaches a terminal state. + request_cancel_background_task(task_id) + _schedule_deferred_subagent_cleanup(task_id, trace_id, max_poll_count) return f"Task polling timed out after {timeout_minutes} minutes. This may indicate the background task is stuck. Status: {result.status.value}" except asyncio.CancelledError: # Signal the background subagent thread to stop cooperatively. diff --git a/backend/tests/test_task_tool_core_logic.py b/backend/tests/test_task_tool_core_logic.py index 658968d65..dc0f844d3 100644 --- a/backend/tests/test_task_tool_core_logic.py +++ b/backend/tests/test_task_tool_core_logic.py @@ -732,17 +732,27 @@ def test_cleanup_called_on_timed_out(monkeypatch): def test_cleanup_not_called_on_polling_safety_timeout(monkeypatch): - """Verify cleanup_background_task is NOT called on polling safety timeout. + """Verify cleanup_background_task is NOT called directly on polling safety timeout. - This prevents race conditions where the background task is still running - but the polling loop gives up. The cleanup should happen later when the - executor completes and sets a terminal status. + The task is still RUNNING so it cannot be safely removed yet. Instead, + cooperative cancellation is requested and a deferred cleanup is scheduled. """ config = _make_subagent_config() # Keep max_poll_count small for test speed: (1 + 60) // 5 = 12 config.timeout_seconds = 1 events = [] cleanup_calls = [] + cancel_requests = [] + scheduled_cleanups = [] + + class DummyCleanupTask: + def add_done_callback(self, _callback): + return None + + def fake_create_task(coro): + scheduled_cleanups.append(coro) + coro.close() + return DummyCleanupTask() monkeypatch.setattr(task_tool_module, "SubagentStatus", FakeSubagentStatus) monkeypatch.setattr( @@ -759,12 +769,18 @@ def test_cleanup_not_called_on_polling_safety_timeout(monkeypatch): ) monkeypatch.setattr(task_tool_module, "get_stream_writer", lambda: events.append) monkeypatch.setattr(task_tool_module.asyncio, "sleep", _no_sleep) + monkeypatch.setattr(task_tool_module.asyncio, "create_task", fake_create_task) monkeypatch.setattr("deerflow.tools.get_available_tools", lambda **kwargs: []) monkeypatch.setattr( task_tool_module, "cleanup_background_task", lambda task_id: cleanup_calls.append(task_id), ) + monkeypatch.setattr( + task_tool_module, + "request_cancel_background_task", + lambda task_id: cancel_requests.append(task_id), + ) output = _run_task_tool( runtime=_make_runtime(), @@ -775,8 +791,12 @@ def test_cleanup_not_called_on_polling_safety_timeout(monkeypatch): ) assert output.startswith("Task polling timed out after 0 minutes") - # cleanup should NOT be called because the task is still RUNNING + # cleanup_background_task must NOT be called directly (task is still RUNNING) assert cleanup_calls == [] + # cooperative cancellation must be requested + assert cancel_requests == ["tc-no-cleanup-safety-timeout"] + # a deferred cleanup coroutine must be scheduled + assert len(scheduled_cleanups) == 1 def test_cleanup_scheduled_on_cancellation(monkeypatch):