From a72af8ea37c7e039ac47f296229b4ec9bc57023a Mon Sep 17 00:00:00 2001 From: heart-scalpel Date: Wed, 17 Jun 2026 14:36:09 +0800 Subject: [PATCH] feat(subagents): attribute subagent spans to parent thread's Langfuse session (#3611) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The subagent execution path did not call inject_langfuse_metadata(...) and built its model with attach_tracing=True, so subagent LLM/tool spans landed in Langfuse as isolated top-level traces carrying fresh session ids and the default user. They were findable in the unfiltered trace list but did not group under the parent thread's session card, and Langfuse cost attribution for subagent traffic did not line up with the parent conversation — even though DeerFlow's internal token accounting (SubagentTokenCollector) was already correct. Extend the lead-agent tracing wiring to the subagent path so a single subagent run produces one trace that shares the parent thread's session_id and user_id, with a subagent: trace name: - subagents/executor.py: append build_tracing_callbacks() output to run_config["callbacks"] (preserving SubagentTokenCollector) and call inject_langfuse_metadata(...) with thread_id, user_id, and the normalized subagent: trace name. Build the model with attach_tracing=False so model-level tracing does not double-count with the graph-root callbacks — the same pairing the lead agent uses. - tools/builtins/task_tool.py: resolve user_id via resolve_runtime_user_id(runtime) at the parent tool layer (before the background thread starts) and thread it through SubagentExecutor.__init__, because the _current_user contextvar is not guaranteed to survive the _execution_pool boundary. Trace topology is unchanged: subagent traces remain separate top-level traces in the same session, not nested as child spans under the lead trace (Plan B follow-up). Tests: tests/test_subagent_executor.py::TestSubagentTracingWiring covers the callback append, the session/user/trace-name injection, the disabled-langfuse no-op, the DEFAULT_USER_ID fallback, the empty-name trace-name fallback, and the env-tag emission. Existing test_create_agent_threads_explicit_app_config_to_model_and_middlewares now also asserts attach_tracing=False. Docs: CLAUDE.md Tracing System section documents subagents/executor.py as a third injection point alongside worker.py and client.py. --- backend/CLAUDE.md | 8 +- .../harness/deerflow/subagents/executor.py | 38 ++- .../deerflow/tools/builtins/task_tool.py | 6 + backend/tests/test_subagent_executor.py | 288 ++++++++++++++++++ 4 files changed, 335 insertions(+), 5 deletions(-) diff --git a/backend/CLAUDE.md b/backend/CLAUDE.md index 058a7ec24..e978e8712 100644 --- a/backend/CLAUDE.md +++ b/backend/CLAUDE.md @@ -481,16 +481,16 @@ LangSmith and Langfuse are both supported. The wiring lives in two layers: - `factory.py::build_tracing_callbacks()` — returns the LangChain `CallbackHandler` list for the providers currently enabled via env vars (`LANGSMITH_TRACING`, `LANGFUSE_TRACING`, etc.). The handlers are attached at the **graph invocation root** for in-graph runs (`make_lead_agent` and `DeerFlowClient.stream` both append them to `config["callbacks"]` before invoking the graph) so a single run produces one trace with all node / LLM / tool calls as child spans. Standalone callers — anything that invokes a model outside such a graph (e.g. `MemoryUpdater`) — keep `create_chat_model`'s default `attach_tracing=True`, which falls back to model-level callback attachment. - `metadata.py::build_langfuse_trace_metadata()` — builds the Langfuse-reserved trace attributes for `RunnableConfig.metadata`. The Langfuse v4 `langchain.CallbackHandler` lifts these onto the root trace (see its `_parse_langfuse_trace_attributes`), but only when it sees `on_chain_start(parent_run_id=None)` — which is why the callbacks have to live at the graph root, not the model. -**Trace-attribute injection points**: both `runtime/runs/worker.py::run_agent` (gateway path) and `client.py::DeerFlowClient.stream` (embedded path) merge the metadata into `config["metadata"]` right before constructing the graph. Caller-supplied keys win via `setdefault`, so an external `session_id` override is preserved. Field mapping: +**Trace-attribute injection points**: both `runtime/runs/worker.py::run_agent` (gateway path) and `client.py::DeerFlowClient.stream` (embedded path) merge the metadata into `config["metadata"]` right before constructing the graph. `subagents/executor.py::_aexecute` does the same for every subagent run so subagent traces group under the parent thread's session card (carrying the parent `thread_id` → `langfuse_session_id`, the user_id captured at `task_tool` → `langfuse_user_id`, and a `subagent:` trace name). Caller-supplied keys win via `setdefault`, so an external `session_id` override is preserved. Field mapping: | Langfuse field | Source | |-----------------------|----------------------------------------------| | `langfuse_session_id` | LangGraph `thread_id` | -| `langfuse_user_id` | `get_effective_user_id()` (`default` in no-auth) | -| `langfuse_trace_name` | `RunRecord.assistant_id` / client `agent_name` (defaults to `lead-agent`) | +| `langfuse_user_id` | `get_effective_user_id()` (`default` in no-auth); for subagents, captured from `runtime.context` at `task_tool` time via `resolve_runtime_user_id()` | +| `langfuse_trace_name` | `RunRecord.assistant_id` / client `agent_name` (defaults to `lead-agent`); for subagents, `subagent:` (lowercased, `_` → `-`) | | `langfuse_tags` | `env:` + `model:` | -Returns `{}` when Langfuse is not in the enabled providers — LangSmith-only deployments are unaffected. Set `DEER_FLOW_ENV` (or `ENVIRONMENT`) to tag traces by deployment environment. Tests live in `tests/test_tracing_factory.py`, `tests/test_tracing_metadata.py`, `tests/test_worker_langfuse_metadata.py`, and `tests/test_client_langfuse_metadata.py`. +Returns `{}` when Langfuse is not in the enabled providers — LangSmith-only deployments are unaffected. Set `DEER_FLOW_ENV` (or `ENVIRONMENT`) to tag traces by deployment environment. Tests live in `tests/test_tracing_factory.py`, `tests/test_tracing_metadata.py`, `tests/test_worker_langfuse_metadata.py`, `tests/test_client_langfuse_metadata.py`, and `tests/test_subagent_executor.py::TestSubagentTracingWiring`. ### Config Schema diff --git a/backend/packages/harness/deerflow/subagents/executor.py b/backend/packages/harness/deerflow/subagents/executor.py index 89b8795a9..16c123d78 100644 --- a/backend/packages/harness/deerflow/subagents/executor.py +++ b/backend/packages/harness/deerflow/subagents/executor.py @@ -3,6 +3,7 @@ import asyncio import atexit import logging +import os import threading import uuid from collections.abc import Callable, Coroutine @@ -27,6 +28,7 @@ from deerflow.skills.tool_policy import filter_tools_by_skill_allowed_tools from deerflow.skills.types import Skill from deerflow.subagents.config import SubagentConfig, resolve_subagent_model_name from deerflow.subagents.token_collector import SubagentTokenCollector +from deerflow.tracing import build_tracing_callbacks, inject_langfuse_metadata if TYPE_CHECKING: # Imported lazily at runtime inside _build_initial_state: importing @@ -286,6 +288,7 @@ class SubagentExecutor: thread_data: ThreadDataState | None = None, thread_id: str | None = None, trace_id: str | None = None, + user_id: str | None = None, ): """Initialize the executor. @@ -300,6 +303,8 @@ class SubagentExecutor: thread_data: Thread data from parent agent. thread_id: Thread ID for sandbox operations. trace_id: Trace ID from parent for distributed tracing. + user_id: User ID captured from the parent tool's runtime context. + When None, the tracing layer falls back to DEFAULT_USER_ID. """ self.config = config self.app_config = app_config @@ -316,6 +321,7 @@ class SubagentExecutor: self.thread_id = thread_id # Generate trace_id if not provided (for top-level calls) self.trace_id = trace_id or str(uuid.uuid4())[:8] + self.user_id = user_id self._base_tools = _filter_tools( tools, @@ -336,7 +342,7 @@ class SubagentExecutor: app_config = self.app_config or get_app_config() if self.model_name is None: self.model_name = resolve_subagent_model_name(self.config, self.parent_model, app_config=app_config) - model = create_chat_model(name=self.model_name, thinking_enabled=False, app_config=app_config) + model = create_chat_model(name=self.model_name, thinking_enabled=False, app_config=app_config, attach_tracing=False) from deerflow.agents.middlewares.tool_error_handling_middleware import build_subagent_runtime_middlewares @@ -522,6 +528,36 @@ class SubagentExecutor: "callbacks": [collector], "tags": [collector_caller], } + + # Inject tracing callbacks at the graph level so a single subagent run + # produces one trace with all node / LLM / tool calls as child spans. + # This mirrors the lead agent pattern: graph-level tracing paired with + # attach_tracing=False on the model avoids double-counted traces. + tracing_callbacks = build_tracing_callbacks() + if tracing_callbacks: + existing_callbacks = list(run_config.get("callbacks") or []) + run_config["callbacks"] = [*existing_callbacks, *tracing_callbacks] + + # Normalize subagent name for tracing so it matches the lead-agent + # naming shape (lowercase, hyphens only). Inline because there is no + # shared helper — runtime/runs/naming.py only handles lead-agent runs. + if self.config.name: + normalized_name = self.config.name.strip().lower().replace("_", "-") + assistant_id = f"subagent:{normalized_name}" + else: + assistant_id = "subagent" + + # Inject Langfuse trace-attribute metadata so the subagent trace + # links to the parent thread and carries the correct session/user IDs. + inject_langfuse_metadata( + run_config, + thread_id=self.thread_id, + user_id=self.user_id, + assistant_id=assistant_id, + model_name=self.model_name, + environment=os.environ.get("DEER_FLOW_ENV") or os.environ.get("ENVIRONMENT"), + ) + context: dict[str, Any] = {} if self.thread_id: run_config["configurable"] = {"thread_id": self.thread_id} diff --git a/backend/packages/harness/deerflow/tools/builtins/task_tool.py b/backend/packages/harness/deerflow/tools/builtins/task_tool.py index dab1377c6..cb34ae4ca 100644 --- a/backend/packages/harness/deerflow/tools/builtins/task_tool.py +++ b/backend/packages/harness/deerflow/tools/builtins/task_tool.py @@ -11,6 +11,7 @@ from langchain_core.callbacks import BaseCallbackManager from langgraph.config import get_stream_writer from deerflow.config import get_app_config +from deerflow.runtime.user_context import resolve_runtime_user_id from deerflow.sandbox.security import LOCAL_BASH_SUBAGENT_DISABLED_MESSAGE, is_host_bash_allowed from deerflow.subagents import SubagentExecutor, get_available_subagent_names, get_subagent_config from deerflow.subagents.config import resolve_subagent_model_name @@ -253,6 +254,7 @@ async def task_tool( thread_id = None parent_model = None trace_id = None + user_id = None metadata: dict = {} if runtime is not None: @@ -269,6 +271,9 @@ async def task_tool( # Get or generate trace_id for distributed tracing trace_id = metadata.get("trace_id") or str(uuid.uuid4())[:8] + # Get user_id for tracing (uses standard resolution order) + user_id = resolve_runtime_user_id(runtime) + parent_available_skills = metadata.get("available_skills") if parent_available_skills is not None: overrides["skills"] = _merge_skill_allowlists(list(parent_available_skills), config.skills) @@ -306,6 +311,7 @@ async def task_tool( "thread_data": thread_data, "thread_id": thread_id, "trace_id": trace_id, + "user_id": user_id, } if resolved_app_config is not None: executor_kwargs["app_config"] = resolved_app_config diff --git a/backend/tests/test_subagent_executor.py b/backend/tests/test_subagent_executor.py index 77918bc98..abe7416dc 100644 --- a/backend/tests/test_subagent_executor.py +++ b/backend/tests/test_subagent_executor.py @@ -307,6 +307,11 @@ class TestAgentConstruction: "name": "parent-model", "thinking_enabled": False, "app_config": app_config, + # attach_tracing=False pairs with graph-root tracing callbacks + # injected in _aexecute (see TestSubagentTracingWiring). Without + # this the subagent would emit both a model-level trace and a + # graph-level trace per call. + "attach_tracing": False, } assert captured["middlewares"] == { "app_config": app_config, @@ -1962,3 +1967,286 @@ class TestCooperativeCancellation: executor_module.cleanup_background_task(task_id) assert task_id not in executor_module._background_tasks + + +# ----------------------------------------------------------------------------- +# Subagent Tracing Wiring +# ----------------------------------------------------------------------------- +# +# Regression coverage for the asymmetry fix: subagent runs must mirror the +# lead agent pattern so a single subagent execution produces one trace with +# the parent thread's session_id and user_id, not an isolated top-level trace. +# Three things must hold simultaneously: +# 1. ``build_tracing_callbacks()`` is appended to ``run_config["callbacks"]`` +# so the Langfuse handler sees ``on_chain_start(parent_run_id=None)`` and +# actually promotes ``langfuse_*`` metadata onto the root trace. +# 2. ``inject_langfuse_metadata(run_config, ...)`` carries the parent +# thread_id (-> session_id) and the captured user_id (-> user_id). +# 3. The subagent's model is built with ``attach_tracing=False`` so the +# model-level handler does not double-count (covered separately by +# ``test_create_agent_threads_explicit_app_config_to_model_and_middlewares``). + + +class _FakeStreamAgent: + """Stand-in agent that records the ``config`` passed to ``astream``. + + Yields no chunks so ``_aexecute`` takes the ``final_state is None`` path + and finishes without exercising message-handling code that is unrelated + to the tracing wiring under test. + """ + + def __init__(self) -> None: + self.captured_config: dict | None = None + self.captured_context: dict | None = None + + async def astream(self, state, *, config, context, stream_mode): # noqa: ARG002 - signature parity + self.captured_config = config + self.captured_context = context + return + yield # pragma: no cover - make this an async generator + + +class TestSubagentTracingWiring: + """Verify the subagent graph-root tracing wiring matches the lead agent.""" + + @pytest.fixture + def executor_module(self, _setup_executor_classes): + executor = importlib.import_module("deerflow.subagents.executor") + return _patch_default_get_app_config(importlib.reload(executor)) + + @pytest.fixture(autouse=True) + def _clear_langfuse_env(self, monkeypatch): + """Reset tracing config and env between tests so monkeypatched env + vars do not leak across tests in this class or the rest of the suite. + """ + from deerflow.config.tracing_config import reset_tracing_config + + for name in ("LANGFUSE_TRACING", "LANGFUSE_PUBLIC_KEY", "LANGFUSE_SECRET_KEY", "LANGFUSE_BASE_URL"): + monkeypatch.delenv(name, raising=False) + reset_tracing_config() + yield + reset_tracing_config() + + def _make_executor(self, classes, *, user_id=None, name="general-purpose", parent_model="test-model"): + SubagentExecutor = classes["SubagentExecutor"] + SubagentConfig = classes["SubagentConfig"] + config = SubagentConfig( + name=name, + description="Tracing test agent", + system_prompt="You are a tracing test agent.", + max_turns=5, + timeout_seconds=30, + ) + return SubagentExecutor( + config=config, + tools=[], + parent_model=parent_model, + thread_id="thread-trace-1", + trace_id="trace-1", + user_id=user_id, + ) + + @pytest.mark.anyio + async def test_aexecute_appends_tracing_callbacks_to_run_config( + self, + classes, + executor_module, + monkeypatch, + ): + """``build_tracing_callbacks()`` output must be appended (not replace) + to the existing callbacks so the SubagentTokenCollector keeps working. + """ + SubagentStatus = classes["SubagentStatus"] + + sentinel_handler = object() + monkeypatch.setattr(executor_module, "build_tracing_callbacks", lambda: [sentinel_handler]) + + executor = self._make_executor(classes, user_id="alice") + fake_agent = _FakeStreamAgent() + monkeypatch.setattr(executor, "_build_initial_state", self._noop_build_initial_state) + monkeypatch.setattr(executor, "_create_agent", lambda *a, **kw: fake_agent) + + result = await executor._aexecute("do something") + + assert fake_agent.captured_config is not None + callbacks = fake_agent.captured_config.get("callbacks") or [] + assert sentinel_handler in callbacks, "tracing handler must reach run_config['callbacks']" + # SubagentTokenCollector must survive the append (graph-root tracing + # cannot displace the token-accounting callback). + assert len(callbacks) >= 2, "existing callbacks must be preserved when tracing is injected" + assert result.status.value == SubagentStatus.COMPLETED.value + + @pytest.mark.anyio + async def test_aexecute_injects_langfuse_session_user_and_trace_name( + self, + classes, + executor_module, + monkeypatch, + ): + """When Langfuse is enabled, ``run_config['metadata']`` must carry the + parent thread_id (-> session_id), the constructor-supplied user_id, and + a ``subagent:`` trace name so the subagent trace groups under + the parent thread's session card. + """ + monkeypatch.setenv("LANGFUSE_TRACING", "true") + monkeypatch.setenv("LANGFUSE_PUBLIC_KEY", "pk-lf-test") + monkeypatch.setenv("LANGFUSE_SECRET_KEY", "sk-lf-test") + from deerflow.config.tracing_config import reset_tracing_config + + reset_tracing_config() + + class _Sentinel: + pass + + sentinel = _Sentinel() + monkeypatch.setattr(executor_module, "build_tracing_callbacks", lambda: [sentinel]) + + executor = self._make_executor(classes, user_id="alice", name="general_purpose") + fake_agent = _FakeStreamAgent() + monkeypatch.setattr(executor, "_build_initial_state", self._noop_build_initial_state) + monkeypatch.setattr(executor, "_create_agent", lambda *a, **kw: fake_agent) + + await executor._aexecute("do something") + + metadata = (fake_agent.captured_config or {}).get("metadata") or {} + assert metadata.get("langfuse_session_id") == "thread-trace-1", "subagent trace must inherit parent thread_id as session_id" + assert metadata.get("langfuse_user_id") == "alice", "subagent trace must carry the user_id captured at task_tool layer" + # Underscores are normalized to hyphens so the trace name matches the + # lead-agent naming shape. + assert metadata.get("langfuse_trace_name") == "subagent:general-purpose" + tags = metadata.get("langfuse_tags") or [] + assert any(t.startswith("model:") for t in tags), "model tag must be emitted for cost attribution" + + @pytest.mark.anyio + async def test_aexecute_skips_langfuse_metadata_when_disabled( + self, + classes, + executor_module, + monkeypatch, + ): + """When Langfuse is not in the enabled providers, ``inject_langfuse_metadata`` + must be a no-op and ``run_config['metadata']`` must not carry langfuse_* + keys. LangSmith-only deployments are unaffected. + """ + monkeypatch.setattr(executor_module, "build_tracing_callbacks", lambda: []) + + executor = self._make_executor(classes, user_id="alice") + fake_agent = _FakeStreamAgent() + monkeypatch.setattr(executor, "_build_initial_state", self._noop_build_initial_state) + monkeypatch.setattr(executor, "_create_agent", lambda *a, **kw: fake_agent) + + await executor._aexecute("do something") + + metadata = (fake_agent.captured_config or {}).get("metadata") or {} + for key in ("langfuse_session_id", "langfuse_user_id", "langfuse_trace_name", "langfuse_tags"): + assert key not in metadata, f"{key} must be absent when Langfuse is disabled" + + @pytest.mark.anyio + async def test_user_id_defaults_when_not_supplied( + self, + classes, + executor_module, + monkeypatch, + ): + """When ``user_id`` is None at construction (parent did not capture + one), the tracing layer must fall back to DEFAULT_USER_ID so the + Langfuse Users page still groups the trace. + """ + monkeypatch.setenv("LANGFUSE_TRACING", "true") + monkeypatch.setenv("LANGFUSE_PUBLIC_KEY", "pk-lf-test") + monkeypatch.setenv("LANGFUSE_SECRET_KEY", "sk-lf-test") + from deerflow.config.tracing_config import reset_tracing_config + + reset_tracing_config() + monkeypatch.setattr(executor_module, "build_tracing_callbacks", lambda: [object()]) + + executor = self._make_executor(classes, user_id=None) + fake_agent = _FakeStreamAgent() + monkeypatch.setattr(executor, "_build_initial_state", self._noop_build_initial_state) + monkeypatch.setattr(executor, "_create_agent", lambda *a, **kw: fake_agent) + + await executor._aexecute("do something") + + metadata = (fake_agent.captured_config or {}).get("metadata") or {} + # DEFAULT_USER_ID is "default" (see deerflow.runtime.user_context). + assert metadata.get("langfuse_user_id") == "default" + + @pytest.mark.anyio + async def test_trace_name_falls_back_when_config_name_empty( + self, + classes, + executor_module, + monkeypatch, + ): + """A subagent config without ``name`` must still produce a non-empty + trace name so Langfuse does not render the trace as unnamed. + """ + monkeypatch.setenv("LANGFUSE_TRACING", "true") + monkeypatch.setenv("LANGFUSE_PUBLIC_KEY", "pk-lf-test") + monkeypatch.setenv("LANGFUSE_SECRET_KEY", "sk-lf-test") + from deerflow.config.tracing_config import reset_tracing_config + + reset_tracing_config() + monkeypatch.setattr(executor_module, "build_tracing_callbacks", lambda: [object()]) + + SubagentExecutor = classes["SubagentExecutor"] + SubagentConfig = classes["SubagentConfig"] + config = SubagentConfig( + name="", # empty name exercises the fallback branch + description="No name", + system_prompt="", + max_turns=5, + timeout_seconds=30, + ) + executor = SubagentExecutor( + config=config, + tools=[], + thread_id="thread-trace-2", + trace_id="trace-2", + ) + fake_agent = _FakeStreamAgent() + monkeypatch.setattr(executor, "_build_initial_state", self._noop_build_initial_state) + monkeypatch.setattr(executor, "_create_agent", lambda *a, **kw: fake_agent) + + await executor._aexecute("do something") + + metadata = (fake_agent.captured_config or {}).get("metadata") or {} + assert metadata.get("langfuse_trace_name") == "subagent" + + @pytest.mark.anyio + async def test_environment_tag_emitted_from_deer_flow_env( + self, + classes, + executor_module, + monkeypatch, + ): + """``DEER_FLOW_ENV`` must surface as an ``env:`` tag so Langfuse + cost aggregation can split traces by deployment environment. + """ + monkeypatch.setenv("LANGFUSE_TRACING", "true") + monkeypatch.setenv("LANGFUSE_PUBLIC_KEY", "pk-lf-test") + monkeypatch.setenv("LANGFUSE_SECRET_KEY", "sk-lf-test") + monkeypatch.setenv("DEER_FLOW_ENV", "staging") + from deerflow.config.tracing_config import reset_tracing_config + + reset_tracing_config() + monkeypatch.setattr(executor_module, "build_tracing_callbacks", lambda: [object()]) + + executor = self._make_executor(classes, user_id="alice") + fake_agent = _FakeStreamAgent() + monkeypatch.setattr(executor, "_build_initial_state", self._noop_build_initial_state) + monkeypatch.setattr(executor, "_create_agent", lambda *a, **kw: fake_agent) + + await executor._aexecute("do something") + + metadata = (fake_agent.captured_config or {}).get("metadata") or {} + tags = metadata.get("langfuse_tags") or [] + assert "env:staging" in tags + + async def _noop_build_initial_state(self, task): # noqa: ARG002 - signature parity + """Return a minimal state tuple so ``_aexecute`` reaches ``astream`` + without loading skills, MCP tools, or the real config. + """ + from langchain_core.messages import HumanMessage + + return ({"messages": [HumanMessage(content=task)]}, [], None)