fix(task-tool): cancel and schedule deferred cleanup on polling safety timeout (#3097)

When the poll loop's safety-net timeout fires (poll_count > max_poll_count),
the background subagent task was abandoned without cancellation or cleanup,
leaving a stale entry in _background_tasks indefinitely.

The original code had a comment promising "the cleanup will happen when the
executor completes", but run_task() in executor.py never calls
cleanup_background_task after reaching a terminal state -- the promise was
never implemented.

This change mirrors the asyncio.CancelledError path: signal cooperative
cancellation via request_cancel_background_task and schedule
_deferred_cleanup_subagent_task to remove the entry once the background
thread reaches a terminal state.

Direct cleanup at poll-timeout time would introduce a race: run_task() could
remove the entry while the poll loop is still mid-iteration, causing a
spurious "Task disappeared" error. The deferred approach avoids this by
waiting for terminal state before removal.

Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
InitBoy
2026-05-21 07:47:19 +08:00
committed by GitHub
parent 9afeaf66bc
commit e19bec1422
2 changed files with 30 additions and 8 deletions
@@ -383,9 +383,6 @@ async def task_tool(
# Polling timeout as a safety net (in case thread pool timeout doesn't work)
# Set to execution timeout + 60s buffer, in 5s poll intervals
# This catches edge cases where the background task gets stuck
# Note: We don't call cleanup_background_task here because the task may
# still be running in the background. The cleanup will happen when the
# executor completes and sets a terminal status.
if poll_count > max_poll_count:
timeout_minutes = config.timeout_seconds // 60
logger.error(f"[trace={trace_id}] Task {task_id} polling timed out after {poll_count} polls (should have been caught by thread pool timeout)")
@@ -393,6 +390,11 @@ async def task_tool(
usage = _summarize_usage(getattr(result, "token_usage_records", None))
_cache_subagent_usage(tool_call_id, usage, enabled=cache_token_usage)
writer({"type": "task_timed_out", "task_id": task_id, "usage": usage})
# The task may still be running in the background. Signal cooperative
# cancellation and schedule deferred cleanup to remove the entry from
# _background_tasks once the background thread reaches a terminal state.
request_cancel_background_task(task_id)
_schedule_deferred_subagent_cleanup(task_id, trace_id, max_poll_count)
return f"Task polling timed out after {timeout_minutes} minutes. This may indicate the background task is stuck. Status: {result.status.value}"
except asyncio.CancelledError:
# Signal the background subagent thread to stop cooperatively.
+25 -5
View File
@@ -732,17 +732,27 @@ def test_cleanup_called_on_timed_out(monkeypatch):
def test_cleanup_not_called_on_polling_safety_timeout(monkeypatch):
"""Verify cleanup_background_task is NOT called on polling safety timeout.
"""Verify cleanup_background_task is NOT called directly on polling safety timeout.
This prevents race conditions where the background task is still running
but the polling loop gives up. The cleanup should happen later when the
executor completes and sets a terminal status.
The task is still RUNNING so it cannot be safely removed yet. Instead,
cooperative cancellation is requested and a deferred cleanup is scheduled.
"""
config = _make_subagent_config()
# Keep max_poll_count small for test speed: (1 + 60) // 5 = 12
config.timeout_seconds = 1
events = []
cleanup_calls = []
cancel_requests = []
scheduled_cleanups = []
class DummyCleanupTask:
def add_done_callback(self, _callback):
return None
def fake_create_task(coro):
scheduled_cleanups.append(coro)
coro.close()
return DummyCleanupTask()
monkeypatch.setattr(task_tool_module, "SubagentStatus", FakeSubagentStatus)
monkeypatch.setattr(
@@ -759,12 +769,18 @@ def test_cleanup_not_called_on_polling_safety_timeout(monkeypatch):
)
monkeypatch.setattr(task_tool_module, "get_stream_writer", lambda: events.append)
monkeypatch.setattr(task_tool_module.asyncio, "sleep", _no_sleep)
monkeypatch.setattr(task_tool_module.asyncio, "create_task", fake_create_task)
monkeypatch.setattr("deerflow.tools.get_available_tools", lambda **kwargs: [])
monkeypatch.setattr(
task_tool_module,
"cleanup_background_task",
lambda task_id: cleanup_calls.append(task_id),
)
monkeypatch.setattr(
task_tool_module,
"request_cancel_background_task",
lambda task_id: cancel_requests.append(task_id),
)
output = _run_task_tool(
runtime=_make_runtime(),
@@ -775,8 +791,12 @@ def test_cleanup_not_called_on_polling_safety_timeout(monkeypatch):
)
assert output.startswith("Task polling timed out after 0 minutes")
# cleanup should NOT be called because the task is still RUNNING
# cleanup_background_task must NOT be called directly (task is still RUNNING)
assert cleanup_calls == []
# cooperative cancellation must be requested
assert cancel_requests == ["tc-no-cleanup-safety-timeout"]
# a deferred cleanup coroutine must be scheduled
assert len(scheduled_cleanups) == 1
def test_cleanup_scheduled_on_cancellation(monkeypatch):