mirror of
https://github.com/bytedance/deer-flow.git
synced 2026-05-23 08:25:57 +00:00
fix: bucket subagent token usage into parent run totals (#2838)
* fix: bucket subagent token usage into RunRow.subagent_tokens Add caller-bucketed token tracking to RunJournal so subagent and middleware LLM calls are written to the correct RunRow columns instead of all falling into lead_agent_tokens (default 0). - RunJournal: accumulate _lead_agent_tokens / _subagent_tokens / _middleware_tokens in on_llm_end, deduped by langchain run_id. Add record_external_llm_usage_records() for external sources (respects track_token_usage flag). Return caller buckets from get_completion_data(). - SubagentTokenCollector: new lightweight callback handler that collects LLM usage within subagent execution. - SubagentExecutor: wire collector into subagent run_config and sync records to SubagentResult on every chunk (timeout/cancel safe). - SubagentResult: add token_usage_records and usage_reported fields. - task_tool: report subagent usage to parent RunJournal on every terminal status (COMPLETED/FAILED/CANCELLED/TIMED_OUT), including the CancelledError path, guarded against double-reporting. No DB migration needed — RunRow columns already exist. * Potential fix for pull request finding Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> * fix: address token usage review feedback * Address review follow-ups --------- Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
@@ -27,6 +27,92 @@ if TYPE_CHECKING:
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _is_subagent_terminal(result: Any) -> bool:
|
||||
"""Return whether a background subagent result is safe to clean up."""
|
||||
return result.status in {SubagentStatus.COMPLETED, SubagentStatus.FAILED, SubagentStatus.CANCELLED, SubagentStatus.TIMED_OUT} or getattr(result, "completed_at", None) is not None
|
||||
|
||||
|
||||
async def _await_subagent_terminal(task_id: str, max_polls: int) -> Any | None:
|
||||
"""Poll until the background subagent reaches a terminal status or we run out of polls."""
|
||||
for _ in range(max_polls):
|
||||
result = get_background_task_result(task_id)
|
||||
if result is None:
|
||||
return None
|
||||
if _is_subagent_terminal(result):
|
||||
return result
|
||||
await asyncio.sleep(5)
|
||||
return None
|
||||
|
||||
|
||||
async def _deferred_cleanup_subagent_task(task_id: str, trace_id: str, max_polls: int) -> None:
|
||||
"""Keep polling a cancelled subagent until it can be safely removed."""
|
||||
cleanup_poll_count = 0
|
||||
while True:
|
||||
result = get_background_task_result(task_id)
|
||||
if result is None:
|
||||
return
|
||||
if _is_subagent_terminal(result):
|
||||
cleanup_background_task(task_id)
|
||||
return
|
||||
if cleanup_poll_count >= max_polls:
|
||||
logger.warning(f"[trace={trace_id}] Deferred cleanup for task {task_id} timed out after {cleanup_poll_count} polls")
|
||||
return
|
||||
await asyncio.sleep(5)
|
||||
cleanup_poll_count += 1
|
||||
|
||||
|
||||
def _log_cleanup_failure(cleanup_task: asyncio.Task[None], *, trace_id: str, task_id: str) -> None:
|
||||
if cleanup_task.cancelled():
|
||||
return
|
||||
|
||||
exc = cleanup_task.exception()
|
||||
if exc is not None:
|
||||
logger.error(f"[trace={trace_id}] Deferred cleanup failed for task {task_id}: {exc}")
|
||||
|
||||
|
||||
def _schedule_deferred_subagent_cleanup(task_id: str, trace_id: str, max_polls: int) -> None:
|
||||
logger.debug(f"[trace={trace_id}] Scheduling deferred cleanup for cancelled task {task_id}")
|
||||
cleanup_task = asyncio.create_task(_deferred_cleanup_subagent_task(task_id, trace_id, max_polls))
|
||||
cleanup_task.add_done_callback(lambda task: _log_cleanup_failure(task, trace_id=trace_id, task_id=task_id))
|
||||
|
||||
|
||||
def _find_usage_recorder(runtime: Any) -> Any | None:
|
||||
"""Find a callback handler with ``record_external_llm_usage_records`` in the runtime config."""
|
||||
if runtime is None:
|
||||
return None
|
||||
config = getattr(runtime, "config", None)
|
||||
if not isinstance(config, dict):
|
||||
return None
|
||||
callbacks = config.get("callbacks", [])
|
||||
if not callbacks:
|
||||
return None
|
||||
for cb in callbacks:
|
||||
if hasattr(cb, "record_external_llm_usage_records"):
|
||||
return cb
|
||||
return None
|
||||
|
||||
|
||||
def _report_subagent_usage(runtime: Any, result: Any) -> None:
|
||||
"""Report subagent token usage to the parent RunJournal, if available.
|
||||
|
||||
Each subagent task must be reported only once (guarded by usage_reported).
|
||||
"""
|
||||
if getattr(result, "usage_reported", True):
|
||||
return
|
||||
records = getattr(result, "token_usage_records", None) or []
|
||||
if not records:
|
||||
return
|
||||
journal = _find_usage_recorder(runtime)
|
||||
if journal is None:
|
||||
logger.debug("No usage recorder found in runtime callbacks — subagent token usage not recorded")
|
||||
return
|
||||
try:
|
||||
journal.record_external_llm_usage_records(records)
|
||||
result.usage_reported = True
|
||||
except Exception:
|
||||
logger.warning("Failed to report subagent token usage", exc_info=True)
|
||||
|
||||
|
||||
def _get_runtime_app_config(runtime: Any) -> "AppConfig | None":
|
||||
context = getattr(runtime, "context", None)
|
||||
if isinstance(context, dict):
|
||||
@@ -227,21 +313,25 @@ async def task_tool(
|
||||
|
||||
# Check if task completed, failed, or timed out
|
||||
if result.status == SubagentStatus.COMPLETED:
|
||||
_report_subagent_usage(runtime, result)
|
||||
writer({"type": "task_completed", "task_id": task_id, "result": result.result})
|
||||
logger.info(f"[trace={trace_id}] Task {task_id} completed after {poll_count} polls")
|
||||
cleanup_background_task(task_id)
|
||||
return f"Task Succeeded. Result: {result.result}"
|
||||
elif result.status == SubagentStatus.FAILED:
|
||||
_report_subagent_usage(runtime, result)
|
||||
writer({"type": "task_failed", "task_id": task_id, "error": result.error})
|
||||
logger.error(f"[trace={trace_id}] Task {task_id} failed: {result.error}")
|
||||
cleanup_background_task(task_id)
|
||||
return f"Task failed. Error: {result.error}"
|
||||
elif result.status == SubagentStatus.CANCELLED:
|
||||
_report_subagent_usage(runtime, result)
|
||||
writer({"type": "task_cancelled", "task_id": task_id, "error": result.error})
|
||||
logger.info(f"[trace={trace_id}] Task {task_id} cancelled: {result.error}")
|
||||
cleanup_background_task(task_id)
|
||||
return "Task cancelled by user."
|
||||
elif result.status == SubagentStatus.TIMED_OUT:
|
||||
_report_subagent_usage(runtime, result)
|
||||
writer({"type": "task_timed_out", "task_id": task_id, "error": result.error})
|
||||
logger.warning(f"[trace={trace_id}] Task {task_id} timed out: {result.error}")
|
||||
cleanup_background_task(task_id)
|
||||
@@ -260,43 +350,28 @@ async def task_tool(
|
||||
if poll_count > max_poll_count:
|
||||
timeout_minutes = config.timeout_seconds // 60
|
||||
logger.error(f"[trace={trace_id}] Task {task_id} polling timed out after {poll_count} polls (should have been caught by thread pool timeout)")
|
||||
_report_subagent_usage(runtime, result)
|
||||
writer({"type": "task_timed_out", "task_id": task_id})
|
||||
return f"Task polling timed out after {timeout_minutes} minutes. This may indicate the background task is stuck. Status: {result.status.value}"
|
||||
except asyncio.CancelledError:
|
||||
# Signal the background subagent thread to stop cooperatively.
|
||||
# Without this, the thread (running in ThreadPoolExecutor with its
|
||||
# own event loop via asyncio.run) would continue executing even
|
||||
# after the parent task is cancelled.
|
||||
request_cancel_background_task(task_id)
|
||||
|
||||
async def cleanup_when_done() -> None:
|
||||
max_cleanup_polls = max_poll_count
|
||||
cleanup_poll_count = 0
|
||||
# Wait (shielded) for the subagent to reach a terminal state so the
|
||||
# final token usage snapshot is reported to the parent RunJournal
|
||||
# before the parent worker persists get_completion_data().
|
||||
terminal_result = None
|
||||
try:
|
||||
terminal_result = await asyncio.shield(_await_subagent_terminal(task_id, max_poll_count))
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
|
||||
while True:
|
||||
result = get_background_task_result(task_id)
|
||||
if result is None:
|
||||
return
|
||||
|
||||
if result.status in {SubagentStatus.COMPLETED, SubagentStatus.FAILED, SubagentStatus.CANCELLED, SubagentStatus.TIMED_OUT} or getattr(result, "completed_at", None) is not None:
|
||||
cleanup_background_task(task_id)
|
||||
return
|
||||
|
||||
if cleanup_poll_count > max_cleanup_polls:
|
||||
logger.warning(f"[trace={trace_id}] Deferred cleanup for task {task_id} timed out after {cleanup_poll_count} polls")
|
||||
return
|
||||
|
||||
await asyncio.sleep(5)
|
||||
cleanup_poll_count += 1
|
||||
|
||||
def log_cleanup_failure(cleanup_task: asyncio.Task[None]) -> None:
|
||||
if cleanup_task.cancelled():
|
||||
return
|
||||
|
||||
exc = cleanup_task.exception()
|
||||
if exc is not None:
|
||||
logger.error(f"[trace={trace_id}] Deferred cleanup failed for task {task_id}: {exc}")
|
||||
|
||||
logger.debug(f"[trace={trace_id}] Scheduling deferred cleanup for cancelled task {task_id}")
|
||||
asyncio.create_task(cleanup_when_done()).add_done_callback(log_cleanup_failure)
|
||||
# Report whatever the subagent collected (even if we timed out).
|
||||
final_result = terminal_result or get_background_task_result(task_id)
|
||||
if final_result is not None:
|
||||
_report_subagent_usage(runtime, final_result)
|
||||
if final_result is not None and _is_subagent_terminal(final_result):
|
||||
cleanup_background_task(task_id)
|
||||
else:
|
||||
_schedule_deferred_subagent_cleanup(task_id, trace_id, max_poll_count)
|
||||
raise
|
||||
|
||||
Reference in New Issue
Block a user