mirror of
https://github.com/bytedance/deer-flow.git
synced 2026-05-22 16:06:50 +00:00
fix(task-tool): unwrap callback manager when locating usage recorder
`config["callbacks"]` may arrive as a `BaseCallbackManager` (e.g. the `AsyncCallbackManager` LangChain hands to async tool runs), not just a plain list. The previous `for cb in callbacks` loop raised `TypeError: 'AsyncCallbackManager' object is not iterable`, which `ToolErrorHandlingMiddleware` then converted into a failed `task` ToolMessage even though the subagent had completed internally — Ultra mode lost subagent results and the lead agent fell back to redoing the work. Unwrap `BaseCallbackManager.handlers` before searching for the recorder. Refs: bytedance/deer-flow#3107 (BUG-002)
This commit is contained in:
@@ -7,6 +7,7 @@ from dataclasses import replace
|
||||
from typing import TYPE_CHECKING, Annotated, Any, cast
|
||||
|
||||
from langchain.tools import InjectedToolCallId, tool
|
||||
from langchain_core.callbacks import BaseCallbackManager
|
||||
from langgraph.config import get_stream_writer
|
||||
|
||||
from deerflow.config import get_app_config
|
||||
@@ -99,13 +100,21 @@ def _schedule_deferred_subagent_cleanup(task_id: str, trace_id: str, max_polls:
|
||||
|
||||
|
||||
def _find_usage_recorder(runtime: Any) -> Any | None:
|
||||
"""Find a callback handler with ``record_external_llm_usage_records`` in the runtime config."""
|
||||
"""Find a callback handler with ``record_external_llm_usage_records`` in the runtime config.
|
||||
|
||||
LangChain may pass ``config["callbacks"]`` as either a plain list of handlers
|
||||
or as a ``BaseCallbackManager`` instance (e.g. ``AsyncCallbackManager`` on
|
||||
async tool runs). Callback managers are not iterable; unwrap their
|
||||
``handlers`` list before searching.
|
||||
"""
|
||||
if runtime is None:
|
||||
return None
|
||||
config = getattr(runtime, "config", None)
|
||||
if not isinstance(config, dict):
|
||||
return None
|
||||
callbacks = config.get("callbacks", [])
|
||||
callbacks = config.get("callbacks")
|
||||
if isinstance(callbacks, BaseCallbackManager):
|
||||
callbacks = callbacks.handlers
|
||||
if not callbacks:
|
||||
return None
|
||||
for cb in callbacks:
|
||||
|
||||
@@ -0,0 +1,65 @@
|
||||
"""Regression tests for _find_usage_recorder callback shape handling.
|
||||
|
||||
Bytedance issue #3107 BUG-002: When LangChain passes ``config["callbacks"]`` as
|
||||
an ``AsyncCallbackManager`` (instead of a plain list), the previous
|
||||
``for cb in callbacks`` loop raised ``TypeError: 'AsyncCallbackManager' object
|
||||
is not iterable``. ToolErrorHandlingMiddleware then converted the entire ``task``
|
||||
tool call into an error ToolMessage, losing the subagent result.
|
||||
"""
|
||||
|
||||
from types import SimpleNamespace
|
||||
|
||||
from langchain_core.callbacks import AsyncCallbackManager, CallbackManager
|
||||
|
||||
from deerflow.tools.builtins.task_tool import _find_usage_recorder
|
||||
|
||||
|
||||
class _RecorderHandler:
|
||||
def record_external_llm_usage_records(self, records):
|
||||
self.records = records
|
||||
|
||||
|
||||
class _OtherHandler:
|
||||
pass
|
||||
|
||||
|
||||
def _make_runtime(callbacks):
|
||||
return SimpleNamespace(config={"callbacks": callbacks})
|
||||
|
||||
|
||||
def test_find_usage_recorder_with_plain_list():
|
||||
recorder = _RecorderHandler()
|
||||
runtime = _make_runtime([_OtherHandler(), recorder])
|
||||
assert _find_usage_recorder(runtime) is recorder
|
||||
|
||||
|
||||
def test_find_usage_recorder_with_async_callback_manager():
|
||||
"""LangChain wraps callbacks in AsyncCallbackManager for async tool runs.
|
||||
|
||||
The old implementation raised TypeError here. The recorder lives on
|
||||
``manager.handlers``; we must look there too.
|
||||
"""
|
||||
recorder = _RecorderHandler()
|
||||
manager = AsyncCallbackManager(handlers=[_OtherHandler(), recorder])
|
||||
runtime = _make_runtime(manager)
|
||||
assert _find_usage_recorder(runtime) is recorder
|
||||
|
||||
|
||||
def test_find_usage_recorder_with_sync_callback_manager():
|
||||
"""Sync flavor of the same wrapper used by some langchain code paths."""
|
||||
recorder = _RecorderHandler()
|
||||
manager = CallbackManager(handlers=[recorder])
|
||||
runtime = _make_runtime(manager)
|
||||
assert _find_usage_recorder(runtime) is recorder
|
||||
|
||||
|
||||
def test_find_usage_recorder_returns_none_when_no_recorder():
|
||||
manager = AsyncCallbackManager(handlers=[_OtherHandler()])
|
||||
runtime = _make_runtime(manager)
|
||||
assert _find_usage_recorder(runtime) is None
|
||||
|
||||
|
||||
def test_find_usage_recorder_handles_empty_manager():
|
||||
manager = AsyncCallbackManager(handlers=[])
|
||||
runtime = _make_runtime(manager)
|
||||
assert _find_usage_recorder(runtime) is None
|
||||
Reference in New Issue
Block a user