diff --git a/backend/CLAUDE.md b/backend/CLAUDE.md index 058a7ec24..e978e8712 100644 --- a/backend/CLAUDE.md +++ b/backend/CLAUDE.md @@ -481,16 +481,16 @@ LangSmith and Langfuse are both supported. The wiring lives in two layers: - `factory.py::build_tracing_callbacks()` — returns the LangChain `CallbackHandler` list for the providers currently enabled via env vars (`LANGSMITH_TRACING`, `LANGFUSE_TRACING`, etc.). The handlers are attached at the **graph invocation root** for in-graph runs (`make_lead_agent` and `DeerFlowClient.stream` both append them to `config["callbacks"]` before invoking the graph) so a single run produces one trace with all node / LLM / tool calls as child spans. Standalone callers — anything that invokes a model outside such a graph (e.g. `MemoryUpdater`) — keep `create_chat_model`'s default `attach_tracing=True`, which falls back to model-level callback attachment. - `metadata.py::build_langfuse_trace_metadata()` — builds the Langfuse-reserved trace attributes for `RunnableConfig.metadata`. The Langfuse v4 `langchain.CallbackHandler` lifts these onto the root trace (see its `_parse_langfuse_trace_attributes`), but only when it sees `on_chain_start(parent_run_id=None)` — which is why the callbacks have to live at the graph root, not the model. -**Trace-attribute injection points**: both `runtime/runs/worker.py::run_agent` (gateway path) and `client.py::DeerFlowClient.stream` (embedded path) merge the metadata into `config["metadata"]` right before constructing the graph. Caller-supplied keys win via `setdefault`, so an external `session_id` override is preserved. Field mapping: +**Trace-attribute injection points**: both `runtime/runs/worker.py::run_agent` (gateway path) and `client.py::DeerFlowClient.stream` (embedded path) merge the metadata into `config["metadata"]` right before constructing the graph. `subagents/executor.py::_aexecute` does the same for every subagent run so subagent traces group under the parent thread's session card (carrying the parent `thread_id` → `langfuse_session_id`, the user_id captured at `task_tool` → `langfuse_user_id`, and a `subagent:` trace name). Caller-supplied keys win via `setdefault`, so an external `session_id` override is preserved. Field mapping: | Langfuse field | Source | |-----------------------|----------------------------------------------| | `langfuse_session_id` | LangGraph `thread_id` | -| `langfuse_user_id` | `get_effective_user_id()` (`default` in no-auth) | -| `langfuse_trace_name` | `RunRecord.assistant_id` / client `agent_name` (defaults to `lead-agent`) | +| `langfuse_user_id` | `get_effective_user_id()` (`default` in no-auth); for subagents, captured from `runtime.context` at `task_tool` time via `resolve_runtime_user_id()` | +| `langfuse_trace_name` | `RunRecord.assistant_id` / client `agent_name` (defaults to `lead-agent`); for subagents, `subagent:` (lowercased, `_` → `-`) | | `langfuse_tags` | `env:` + `model:` | -Returns `{}` when Langfuse is not in the enabled providers — LangSmith-only deployments are unaffected. Set `DEER_FLOW_ENV` (or `ENVIRONMENT`) to tag traces by deployment environment. Tests live in `tests/test_tracing_factory.py`, `tests/test_tracing_metadata.py`, `tests/test_worker_langfuse_metadata.py`, and `tests/test_client_langfuse_metadata.py`. +Returns `{}` when Langfuse is not in the enabled providers — LangSmith-only deployments are unaffected. Set `DEER_FLOW_ENV` (or `ENVIRONMENT`) to tag traces by deployment environment. Tests live in `tests/test_tracing_factory.py`, `tests/test_tracing_metadata.py`, `tests/test_worker_langfuse_metadata.py`, `tests/test_client_langfuse_metadata.py`, and `tests/test_subagent_executor.py::TestSubagentTracingWiring`. ### Config Schema diff --git a/backend/packages/harness/deerflow/subagents/executor.py b/backend/packages/harness/deerflow/subagents/executor.py index 89b8795a9..16c123d78 100644 --- a/backend/packages/harness/deerflow/subagents/executor.py +++ b/backend/packages/harness/deerflow/subagents/executor.py @@ -3,6 +3,7 @@ import asyncio import atexit import logging +import os import threading import uuid from collections.abc import Callable, Coroutine @@ -27,6 +28,7 @@ from deerflow.skills.tool_policy import filter_tools_by_skill_allowed_tools from deerflow.skills.types import Skill from deerflow.subagents.config import SubagentConfig, resolve_subagent_model_name from deerflow.subagents.token_collector import SubagentTokenCollector +from deerflow.tracing import build_tracing_callbacks, inject_langfuse_metadata if TYPE_CHECKING: # Imported lazily at runtime inside _build_initial_state: importing @@ -286,6 +288,7 @@ class SubagentExecutor: thread_data: ThreadDataState | None = None, thread_id: str | None = None, trace_id: str | None = None, + user_id: str | None = None, ): """Initialize the executor. @@ -300,6 +303,8 @@ class SubagentExecutor: thread_data: Thread data from parent agent. thread_id: Thread ID for sandbox operations. trace_id: Trace ID from parent for distributed tracing. + user_id: User ID captured from the parent tool's runtime context. + When None, the tracing layer falls back to DEFAULT_USER_ID. """ self.config = config self.app_config = app_config @@ -316,6 +321,7 @@ class SubagentExecutor: self.thread_id = thread_id # Generate trace_id if not provided (for top-level calls) self.trace_id = trace_id or str(uuid.uuid4())[:8] + self.user_id = user_id self._base_tools = _filter_tools( tools, @@ -336,7 +342,7 @@ class SubagentExecutor: app_config = self.app_config or get_app_config() if self.model_name is None: self.model_name = resolve_subagent_model_name(self.config, self.parent_model, app_config=app_config) - model = create_chat_model(name=self.model_name, thinking_enabled=False, app_config=app_config) + model = create_chat_model(name=self.model_name, thinking_enabled=False, app_config=app_config, attach_tracing=False) from deerflow.agents.middlewares.tool_error_handling_middleware import build_subagent_runtime_middlewares @@ -522,6 +528,36 @@ class SubagentExecutor: "callbacks": [collector], "tags": [collector_caller], } + + # Inject tracing callbacks at the graph level so a single subagent run + # produces one trace with all node / LLM / tool calls as child spans. + # This mirrors the lead agent pattern: graph-level tracing paired with + # attach_tracing=False on the model avoids double-counted traces. + tracing_callbacks = build_tracing_callbacks() + if tracing_callbacks: + existing_callbacks = list(run_config.get("callbacks") or []) + run_config["callbacks"] = [*existing_callbacks, *tracing_callbacks] + + # Normalize subagent name for tracing so it matches the lead-agent + # naming shape (lowercase, hyphens only). Inline because there is no + # shared helper — runtime/runs/naming.py only handles lead-agent runs. + if self.config.name: + normalized_name = self.config.name.strip().lower().replace("_", "-") + assistant_id = f"subagent:{normalized_name}" + else: + assistant_id = "subagent" + + # Inject Langfuse trace-attribute metadata so the subagent trace + # links to the parent thread and carries the correct session/user IDs. + inject_langfuse_metadata( + run_config, + thread_id=self.thread_id, + user_id=self.user_id, + assistant_id=assistant_id, + model_name=self.model_name, + environment=os.environ.get("DEER_FLOW_ENV") or os.environ.get("ENVIRONMENT"), + ) + context: dict[str, Any] = {} if self.thread_id: run_config["configurable"] = {"thread_id": self.thread_id} diff --git a/backend/packages/harness/deerflow/tools/builtins/task_tool.py b/backend/packages/harness/deerflow/tools/builtins/task_tool.py index dab1377c6..cb34ae4ca 100644 --- a/backend/packages/harness/deerflow/tools/builtins/task_tool.py +++ b/backend/packages/harness/deerflow/tools/builtins/task_tool.py @@ -11,6 +11,7 @@ from langchain_core.callbacks import BaseCallbackManager from langgraph.config import get_stream_writer from deerflow.config import get_app_config +from deerflow.runtime.user_context import resolve_runtime_user_id from deerflow.sandbox.security import LOCAL_BASH_SUBAGENT_DISABLED_MESSAGE, is_host_bash_allowed from deerflow.subagents import SubagentExecutor, get_available_subagent_names, get_subagent_config from deerflow.subagents.config import resolve_subagent_model_name @@ -253,6 +254,7 @@ async def task_tool( thread_id = None parent_model = None trace_id = None + user_id = None metadata: dict = {} if runtime is not None: @@ -269,6 +271,9 @@ async def task_tool( # Get or generate trace_id for distributed tracing trace_id = metadata.get("trace_id") or str(uuid.uuid4())[:8] + # Get user_id for tracing (uses standard resolution order) + user_id = resolve_runtime_user_id(runtime) + parent_available_skills = metadata.get("available_skills") if parent_available_skills is not None: overrides["skills"] = _merge_skill_allowlists(list(parent_available_skills), config.skills) @@ -306,6 +311,7 @@ async def task_tool( "thread_data": thread_data, "thread_id": thread_id, "trace_id": trace_id, + "user_id": user_id, } if resolved_app_config is not None: executor_kwargs["app_config"] = resolved_app_config diff --git a/backend/tests/test_subagent_executor.py b/backend/tests/test_subagent_executor.py index 77918bc98..abe7416dc 100644 --- a/backend/tests/test_subagent_executor.py +++ b/backend/tests/test_subagent_executor.py @@ -307,6 +307,11 @@ class TestAgentConstruction: "name": "parent-model", "thinking_enabled": False, "app_config": app_config, + # attach_tracing=False pairs with graph-root tracing callbacks + # injected in _aexecute (see TestSubagentTracingWiring). Without + # this the subagent would emit both a model-level trace and a + # graph-level trace per call. + "attach_tracing": False, } assert captured["middlewares"] == { "app_config": app_config, @@ -1962,3 +1967,286 @@ class TestCooperativeCancellation: executor_module.cleanup_background_task(task_id) assert task_id not in executor_module._background_tasks + + +# ----------------------------------------------------------------------------- +# Subagent Tracing Wiring +# ----------------------------------------------------------------------------- +# +# Regression coverage for the asymmetry fix: subagent runs must mirror the +# lead agent pattern so a single subagent execution produces one trace with +# the parent thread's session_id and user_id, not an isolated top-level trace. +# Three things must hold simultaneously: +# 1. ``build_tracing_callbacks()`` is appended to ``run_config["callbacks"]`` +# so the Langfuse handler sees ``on_chain_start(parent_run_id=None)`` and +# actually promotes ``langfuse_*`` metadata onto the root trace. +# 2. ``inject_langfuse_metadata(run_config, ...)`` carries the parent +# thread_id (-> session_id) and the captured user_id (-> user_id). +# 3. The subagent's model is built with ``attach_tracing=False`` so the +# model-level handler does not double-count (covered separately by +# ``test_create_agent_threads_explicit_app_config_to_model_and_middlewares``). + + +class _FakeStreamAgent: + """Stand-in agent that records the ``config`` passed to ``astream``. + + Yields no chunks so ``_aexecute`` takes the ``final_state is None`` path + and finishes without exercising message-handling code that is unrelated + to the tracing wiring under test. + """ + + def __init__(self) -> None: + self.captured_config: dict | None = None + self.captured_context: dict | None = None + + async def astream(self, state, *, config, context, stream_mode): # noqa: ARG002 - signature parity + self.captured_config = config + self.captured_context = context + return + yield # pragma: no cover - make this an async generator + + +class TestSubagentTracingWiring: + """Verify the subagent graph-root tracing wiring matches the lead agent.""" + + @pytest.fixture + def executor_module(self, _setup_executor_classes): + executor = importlib.import_module("deerflow.subagents.executor") + return _patch_default_get_app_config(importlib.reload(executor)) + + @pytest.fixture(autouse=True) + def _clear_langfuse_env(self, monkeypatch): + """Reset tracing config and env between tests so monkeypatched env + vars do not leak across tests in this class or the rest of the suite. + """ + from deerflow.config.tracing_config import reset_tracing_config + + for name in ("LANGFUSE_TRACING", "LANGFUSE_PUBLIC_KEY", "LANGFUSE_SECRET_KEY", "LANGFUSE_BASE_URL"): + monkeypatch.delenv(name, raising=False) + reset_tracing_config() + yield + reset_tracing_config() + + def _make_executor(self, classes, *, user_id=None, name="general-purpose", parent_model="test-model"): + SubagentExecutor = classes["SubagentExecutor"] + SubagentConfig = classes["SubagentConfig"] + config = SubagentConfig( + name=name, + description="Tracing test agent", + system_prompt="You are a tracing test agent.", + max_turns=5, + timeout_seconds=30, + ) + return SubagentExecutor( + config=config, + tools=[], + parent_model=parent_model, + thread_id="thread-trace-1", + trace_id="trace-1", + user_id=user_id, + ) + + @pytest.mark.anyio + async def test_aexecute_appends_tracing_callbacks_to_run_config( + self, + classes, + executor_module, + monkeypatch, + ): + """``build_tracing_callbacks()`` output must be appended (not replace) + to the existing callbacks so the SubagentTokenCollector keeps working. + """ + SubagentStatus = classes["SubagentStatus"] + + sentinel_handler = object() + monkeypatch.setattr(executor_module, "build_tracing_callbacks", lambda: [sentinel_handler]) + + executor = self._make_executor(classes, user_id="alice") + fake_agent = _FakeStreamAgent() + monkeypatch.setattr(executor, "_build_initial_state", self._noop_build_initial_state) + monkeypatch.setattr(executor, "_create_agent", lambda *a, **kw: fake_agent) + + result = await executor._aexecute("do something") + + assert fake_agent.captured_config is not None + callbacks = fake_agent.captured_config.get("callbacks") or [] + assert sentinel_handler in callbacks, "tracing handler must reach run_config['callbacks']" + # SubagentTokenCollector must survive the append (graph-root tracing + # cannot displace the token-accounting callback). + assert len(callbacks) >= 2, "existing callbacks must be preserved when tracing is injected" + assert result.status.value == SubagentStatus.COMPLETED.value + + @pytest.mark.anyio + async def test_aexecute_injects_langfuse_session_user_and_trace_name( + self, + classes, + executor_module, + monkeypatch, + ): + """When Langfuse is enabled, ``run_config['metadata']`` must carry the + parent thread_id (-> session_id), the constructor-supplied user_id, and + a ``subagent:`` trace name so the subagent trace groups under + the parent thread's session card. + """ + monkeypatch.setenv("LANGFUSE_TRACING", "true") + monkeypatch.setenv("LANGFUSE_PUBLIC_KEY", "pk-lf-test") + monkeypatch.setenv("LANGFUSE_SECRET_KEY", "sk-lf-test") + from deerflow.config.tracing_config import reset_tracing_config + + reset_tracing_config() + + class _Sentinel: + pass + + sentinel = _Sentinel() + monkeypatch.setattr(executor_module, "build_tracing_callbacks", lambda: [sentinel]) + + executor = self._make_executor(classes, user_id="alice", name="general_purpose") + fake_agent = _FakeStreamAgent() + monkeypatch.setattr(executor, "_build_initial_state", self._noop_build_initial_state) + monkeypatch.setattr(executor, "_create_agent", lambda *a, **kw: fake_agent) + + await executor._aexecute("do something") + + metadata = (fake_agent.captured_config or {}).get("metadata") or {} + assert metadata.get("langfuse_session_id") == "thread-trace-1", "subagent trace must inherit parent thread_id as session_id" + assert metadata.get("langfuse_user_id") == "alice", "subagent trace must carry the user_id captured at task_tool layer" + # Underscores are normalized to hyphens so the trace name matches the + # lead-agent naming shape. + assert metadata.get("langfuse_trace_name") == "subagent:general-purpose" + tags = metadata.get("langfuse_tags") or [] + assert any(t.startswith("model:") for t in tags), "model tag must be emitted for cost attribution" + + @pytest.mark.anyio + async def test_aexecute_skips_langfuse_metadata_when_disabled( + self, + classes, + executor_module, + monkeypatch, + ): + """When Langfuse is not in the enabled providers, ``inject_langfuse_metadata`` + must be a no-op and ``run_config['metadata']`` must not carry langfuse_* + keys. LangSmith-only deployments are unaffected. + """ + monkeypatch.setattr(executor_module, "build_tracing_callbacks", lambda: []) + + executor = self._make_executor(classes, user_id="alice") + fake_agent = _FakeStreamAgent() + monkeypatch.setattr(executor, "_build_initial_state", self._noop_build_initial_state) + monkeypatch.setattr(executor, "_create_agent", lambda *a, **kw: fake_agent) + + await executor._aexecute("do something") + + metadata = (fake_agent.captured_config or {}).get("metadata") or {} + for key in ("langfuse_session_id", "langfuse_user_id", "langfuse_trace_name", "langfuse_tags"): + assert key not in metadata, f"{key} must be absent when Langfuse is disabled" + + @pytest.mark.anyio + async def test_user_id_defaults_when_not_supplied( + self, + classes, + executor_module, + monkeypatch, + ): + """When ``user_id`` is None at construction (parent did not capture + one), the tracing layer must fall back to DEFAULT_USER_ID so the + Langfuse Users page still groups the trace. + """ + monkeypatch.setenv("LANGFUSE_TRACING", "true") + monkeypatch.setenv("LANGFUSE_PUBLIC_KEY", "pk-lf-test") + monkeypatch.setenv("LANGFUSE_SECRET_KEY", "sk-lf-test") + from deerflow.config.tracing_config import reset_tracing_config + + reset_tracing_config() + monkeypatch.setattr(executor_module, "build_tracing_callbacks", lambda: [object()]) + + executor = self._make_executor(classes, user_id=None) + fake_agent = _FakeStreamAgent() + monkeypatch.setattr(executor, "_build_initial_state", self._noop_build_initial_state) + monkeypatch.setattr(executor, "_create_agent", lambda *a, **kw: fake_agent) + + await executor._aexecute("do something") + + metadata = (fake_agent.captured_config or {}).get("metadata") or {} + # DEFAULT_USER_ID is "default" (see deerflow.runtime.user_context). + assert metadata.get("langfuse_user_id") == "default" + + @pytest.mark.anyio + async def test_trace_name_falls_back_when_config_name_empty( + self, + classes, + executor_module, + monkeypatch, + ): + """A subagent config without ``name`` must still produce a non-empty + trace name so Langfuse does not render the trace as unnamed. + """ + monkeypatch.setenv("LANGFUSE_TRACING", "true") + monkeypatch.setenv("LANGFUSE_PUBLIC_KEY", "pk-lf-test") + monkeypatch.setenv("LANGFUSE_SECRET_KEY", "sk-lf-test") + from deerflow.config.tracing_config import reset_tracing_config + + reset_tracing_config() + monkeypatch.setattr(executor_module, "build_tracing_callbacks", lambda: [object()]) + + SubagentExecutor = classes["SubagentExecutor"] + SubagentConfig = classes["SubagentConfig"] + config = SubagentConfig( + name="", # empty name exercises the fallback branch + description="No name", + system_prompt="", + max_turns=5, + timeout_seconds=30, + ) + executor = SubagentExecutor( + config=config, + tools=[], + thread_id="thread-trace-2", + trace_id="trace-2", + ) + fake_agent = _FakeStreamAgent() + monkeypatch.setattr(executor, "_build_initial_state", self._noop_build_initial_state) + monkeypatch.setattr(executor, "_create_agent", lambda *a, **kw: fake_agent) + + await executor._aexecute("do something") + + metadata = (fake_agent.captured_config or {}).get("metadata") or {} + assert metadata.get("langfuse_trace_name") == "subagent" + + @pytest.mark.anyio + async def test_environment_tag_emitted_from_deer_flow_env( + self, + classes, + executor_module, + monkeypatch, + ): + """``DEER_FLOW_ENV`` must surface as an ``env:`` tag so Langfuse + cost aggregation can split traces by deployment environment. + """ + monkeypatch.setenv("LANGFUSE_TRACING", "true") + monkeypatch.setenv("LANGFUSE_PUBLIC_KEY", "pk-lf-test") + monkeypatch.setenv("LANGFUSE_SECRET_KEY", "sk-lf-test") + monkeypatch.setenv("DEER_FLOW_ENV", "staging") + from deerflow.config.tracing_config import reset_tracing_config + + reset_tracing_config() + monkeypatch.setattr(executor_module, "build_tracing_callbacks", lambda: [object()]) + + executor = self._make_executor(classes, user_id="alice") + fake_agent = _FakeStreamAgent() + monkeypatch.setattr(executor, "_build_initial_state", self._noop_build_initial_state) + monkeypatch.setattr(executor, "_create_agent", lambda *a, **kw: fake_agent) + + await executor._aexecute("do something") + + metadata = (fake_agent.captured_config or {}).get("metadata") or {} + tags = metadata.get("langfuse_tags") or [] + assert "env:staging" in tags + + async def _noop_build_initial_state(self, task): # noqa: ARG002 - signature parity + """Return a minimal state tuple so ``_aexecute`` reaches ``astream`` + without loading skills, MCP tools, or the real config. + """ + from langchain_core.messages import HumanMessage + + return ({"messages": [HumanMessage(content=task)]}, [], None)