diff --git a/README.md b/README.md index 8248e8fe4..83b43fd93 100644 --- a/README.md +++ b/README.md @@ -546,6 +546,15 @@ LANGFUSE_BASE_URL=https://cloud.langfuse.com If you are using a self-hosted Langfuse instance, set `LANGFUSE_BASE_URL` to your deployment URL. +**Trace correlation fields.** Every agent run is annotated with Langfuse's reserved trace attributes so the Sessions and Users pages light up automatically: + +- `session_id` = LangGraph `thread_id` — groups every trace of the same conversation +- `user_id` = effective user from `get_effective_user_id()` (falls back to `default` in no-auth mode) +- `trace_name` = assistant id (defaults to `lead-agent`) +- `tags` = `[env:, model:]` (omitted when not set) + +These are injected into `RunnableConfig.metadata` at the graph invocation root for both the gateway path (`runtime/runs/worker.py::run_agent`) and the embedded path (`client.py::DeerFlowClient.stream`), so any LangChain-compatible callback can read them. Set `DEER_FLOW_ENV` (or `ENVIRONMENT`) to tag traces by deployment environment. + #### Using Both Providers If both LangSmith and Langfuse are enabled, DeerFlow attaches both tracing callbacks and reports the same model activity to both systems. diff --git a/backend/CLAUDE.md b/backend/CLAUDE.md index 886b82dcb..8c4711395 100644 --- a/backend/CLAUDE.md +++ b/backend/CLAUDE.md @@ -397,6 +397,24 @@ Focused regression coverage for the updater lives in `backend/tests/test_memory_ - `resolve_variable(path)` - Import module and return variable (e.g., `module.path:variable_name`) - `resolve_class(path, base_class)` - Import and validate class against base class +### Tracing System (`packages/harness/deerflow/tracing/`) + +LangSmith and Langfuse are both supported. The wiring lives in two layers: + +- `factory.py::build_tracing_callbacks()` — returns the LangChain `CallbackHandler` list for the providers currently enabled via env vars (`LANGSMITH_TRACING`, `LANGFUSE_TRACING`, etc.). The handlers are attached at the **graph invocation root** for in-graph runs (`make_lead_agent` and `DeerFlowClient.stream` both append them to `config["callbacks"]` before invoking the graph) so a single run produces one trace with all node / LLM / tool calls as child spans. Standalone callers — anything that invokes a model outside such a graph (e.g. `MemoryUpdater`) — keep `create_chat_model`'s default `attach_tracing=True`, which falls back to model-level callback attachment. +- `metadata.py::build_langfuse_trace_metadata()` — builds the Langfuse-reserved trace attributes for `RunnableConfig.metadata`. The Langfuse v4 `langchain.CallbackHandler` lifts these onto the root trace (see its `_parse_langfuse_trace_attributes`), but only when it sees `on_chain_start(parent_run_id=None)` — which is why the callbacks have to live at the graph root, not the model. + +**Trace-attribute injection points**: both `runtime/runs/worker.py::run_agent` (gateway path) and `client.py::DeerFlowClient.stream` (embedded path) merge the metadata into `config["metadata"]` right before constructing the graph. Caller-supplied keys win via `setdefault`, so an external `session_id` override is preserved. Field mapping: + +| Langfuse field | Source | +|-----------------------|----------------------------------------------| +| `langfuse_session_id` | LangGraph `thread_id` | +| `langfuse_user_id` | `get_effective_user_id()` (`default` in no-auth) | +| `langfuse_trace_name` | `RunRecord.assistant_id` / client `agent_name` (defaults to `lead-agent`) | +| `langfuse_tags` | `env:` + `model:` | + +Returns `{}` when Langfuse is not in the enabled providers — LangSmith-only deployments are unaffected. Set `DEER_FLOW_ENV` (or `ENVIRONMENT`) to tag traces by deployment environment. Tests live in `tests/test_tracing_factory.py`, `tests/test_tracing_metadata.py`, `tests/test_worker_langfuse_metadata.py`, and `tests/test_client_langfuse_metadata.py`. + ### Config Schema **`config.yaml`** key sections: diff --git a/backend/packages/harness/deerflow/agents/lead_agent/agent.py b/backend/packages/harness/deerflow/agents/lead_agent/agent.py index f4330abc1..328a8a6e1 100644 --- a/backend/packages/harness/deerflow/agents/lead_agent/agent.py +++ b/backend/packages/harness/deerflow/agents/lead_agent/agent.py @@ -1,3 +1,23 @@ +"""Lead agent factory. + +INVARIANT — tracing callback placement +====================================== + +Tracing callbacks (Langfuse, LangSmith) are attached at the **graph +invocation root** in :func:`_make_lead_agent` (see the +``build_tracing_callbacks()`` block that appends to ``config["callbacks"]``). +Every ``create_chat_model(...)`` call inside this module — and inside any +middleware reachable from this graph (e.g. ``TitleMiddleware``) — MUST pass +``attach_tracing=False``. + +Forgetting that flag emits duplicate spans (one rooted at the graph, one at +the model) AND prevents the Langfuse handler's ``propagate_attributes`` +path from firing, so ``session_id`` / ``user_id`` never reach the trace. +The four current sites are: bootstrap agent, default agent, summarization +middleware, and the async path inside ``TitleMiddleware``. Any new in-graph +``create_chat_model`` call must add to this list and pass the flag. +""" + import logging from langchain.agents import create_agent @@ -22,6 +42,7 @@ from deerflow.config.app_config import AppConfig, get_app_config from deerflow.models import create_chat_model from deerflow.skills.tool_policy import filter_tools_by_skill_allowed_tools from deerflow.skills.types import Skill +from deerflow.tracing import build_tracing_callbacks logger = logging.getLogger(__name__) @@ -73,10 +94,14 @@ def _create_summarization_middleware(*, app_config: AppConfig | None = None) -> # Bind "middleware:summarize" tag so RunJournal identifies these LLM calls # as middleware rather than lead_agent (SummarizationMiddleware is a # LangChain built-in, so we tag the model at creation time). + # attach_tracing=False because the graph-level RunnableConfig (set in + # ``_make_lead_agent``) already carries tracing callbacks; binding them + # again at the model level would emit duplicate spans and break + # ``session_id`` / ``user_id`` propagation. if config.model_name: - model = create_chat_model(name=config.model_name, thinking_enabled=False, app_config=resolved_app_config) + model = create_chat_model(name=config.model_name, thinking_enabled=False, app_config=resolved_app_config, attach_tracing=False) else: - model = create_chat_model(thinking_enabled=False, app_config=resolved_app_config) + model = create_chat_model(thinking_enabled=False, app_config=resolved_app_config, attach_tracing=False) model = model.with_config(tags=["middleware:summarize"]) # Prepare kwargs @@ -408,13 +433,26 @@ def _make_lead_agent(config: RunnableConfig, *, app_config: AppConfig): } ) + # Inject tracing callbacks at the graph invocation root so a single LangGraph + # run produces one trace with all node / LLM / tool calls as child spans, + # AND so the Langfuse handler sees ``on_chain_start(parent_run_id=None)`` and + # actually propagates ``langfuse_session_id`` / ``langfuse_user_id`` from + # ``config["metadata"]`` onto the trace. Without root-level attachment the + # model is a nested observation and the handler strips ``langfuse_*`` keys. + tracing_callbacks = build_tracing_callbacks() + if tracing_callbacks: + existing = config.get("callbacks") or [] + if not isinstance(existing, list): + existing = list(existing) + config["callbacks"] = [*existing, *tracing_callbacks] + skills_for_tool_policy = _load_enabled_skills_for_tool_policy(available_skills, app_config=resolved_app_config) if is_bootstrap: # Special bootstrap agent with minimal prompt for initial custom agent creation flow tools = get_available_tools(model_name=model_name, subagent_enabled=subagent_enabled, app_config=resolved_app_config) + [setup_agent] return create_agent( - model=create_chat_model(name=model_name, thinking_enabled=thinking_enabled, app_config=resolved_app_config), + model=create_chat_model(name=model_name, thinking_enabled=thinking_enabled, app_config=resolved_app_config, attach_tracing=False), tools=filter_tools_by_skill_allowed_tools(tools, skills_for_tool_policy), middleware=_build_middlewares(config, model_name=model_name, app_config=resolved_app_config), system_prompt=apply_prompt_template( @@ -432,7 +470,7 @@ def _make_lead_agent(config: RunnableConfig, *, app_config: AppConfig): # Default lead agent (unchanged behavior) tools = get_available_tools(model_name=model_name, groups=agent_config.tool_groups if agent_config else None, subagent_enabled=subagent_enabled, app_config=resolved_app_config) return create_agent( - model=create_chat_model(name=model_name, thinking_enabled=thinking_enabled, reasoning_effort=reasoning_effort, app_config=resolved_app_config), + model=create_chat_model(name=model_name, thinking_enabled=thinking_enabled, reasoning_effort=reasoning_effort, app_config=resolved_app_config, attach_tracing=False), tools=filter_tools_by_skill_allowed_tools(tools + extra_tools, skills_for_tool_policy), middleware=_build_middlewares(config, model_name=model_name, agent_name=agent_name, app_config=resolved_app_config), system_prompt=apply_prompt_template( diff --git a/backend/packages/harness/deerflow/agents/middlewares/title_middleware.py b/backend/packages/harness/deerflow/agents/middlewares/title_middleware.py index b259ce4a4..b6cc72b35 100644 --- a/backend/packages/harness/deerflow/agents/middlewares/title_middleware.py +++ b/backend/packages/harness/deerflow/agents/middlewares/title_middleware.py @@ -160,7 +160,11 @@ class TitleMiddleware(AgentMiddleware[TitleMiddlewareState]): prompt, user_msg = self._build_title_prompt(state) try: - model_kwargs = {"thinking_enabled": False} + # attach_tracing=False because ``_get_runnable_config()`` inherits + # the graph-level RunnableConfig (set in ``_make_lead_agent``) whose + # callbacks already carry tracing handlers; binding them again at + # the model level would emit duplicate spans. + model_kwargs = {"thinking_enabled": False, "attach_tracing": False} if self._app_config is not None: model_kwargs["app_config"] = self._app_config if config.model_name: diff --git a/backend/packages/harness/deerflow/client.py b/backend/packages/harness/deerflow/client.py index 786e7372f..8ffa89e2c 100644 --- a/backend/packages/harness/deerflow/client.py +++ b/backend/packages/harness/deerflow/client.py @@ -19,6 +19,7 @@ import asyncio import json import logging import mimetypes +import os import shutil import tempfile import uuid @@ -42,6 +43,7 @@ from deerflow.config.paths import get_paths from deerflow.models import create_chat_model from deerflow.runtime.user_context import get_effective_user_id from deerflow.skills.storage import get_or_new_skill_storage +from deerflow.tracing import build_tracing_callbacks, inject_langfuse_metadata from deerflow.uploads.manager import ( claim_unique_filename, delete_file_safe, @@ -123,6 +125,7 @@ class DeerFlowClient: agent_name: str | None = None, available_skills: set[str] | None = None, middlewares: Sequence[AgentMiddleware] | None = None, + environment: str | None = None, ): """Initialize the client. @@ -140,6 +143,12 @@ class DeerFlowClient: agent_name: Name of the agent to use. available_skills: Optional set of skill names to make available. If None (default), all scanned skills are available. middlewares: Optional list of custom middlewares to inject into the agent. + environment: Deployment environment label that ends up in + ``langfuse_tags`` (e.g. ``"production"`` / ``"staging"``). + When ``None`` the worker/client falls back to the + ``DEER_FLOW_ENV`` or ``ENVIRONMENT`` env vars. Pass an + explicit value for programmatic callers that do not want + env-var coupling. """ if config_path is not None: reload_app_config(config_path) @@ -156,6 +165,7 @@ class DeerFlowClient: self._agent_name = agent_name self._available_skills = set(available_skills) if available_skills is not None else None self._middlewares = list(middlewares) if middlewares else [] + self._environment = environment # Lazy agent — created on first call, recreated when config changes. self._agent = None @@ -228,7 +238,11 @@ class DeerFlowClient: max_concurrent_subagents = cfg.get("max_concurrent_subagents", 3) kwargs: dict[str, Any] = { - "model": create_chat_model(name=model_name, thinking_enabled=thinking_enabled), + # attach_tracing=False because ``stream()`` injects tracing + # callbacks at the graph invocation root so a single embedded run + # produces one trace with correct session_id / user_id propagation. + # Attaching them again on the model would emit duplicate spans. + "model": create_chat_model(name=model_name, thinking_enabled=thinking_enabled, attach_tracing=False), "tools": self._get_tools(model_name=model_name, subagent_enabled=subagent_enabled), "middleware": _build_middlewares(config, model_name=model_name, agent_name=self._agent_name, custom_middlewares=self._middlewares), "system_prompt": apply_prompt_template( @@ -571,6 +585,28 @@ class DeerFlowClient: thread_id = str(uuid.uuid4()) config = self._get_runnable_config(thread_id, **kwargs) + + # Inject tracing callbacks and Langfuse trace metadata at the graph + # invocation root so the embedded client matches the gateway worker's + # behaviour: a single ``stream()`` produces one trace with all node / + # LLM / tool calls nested under it, and the trace carries the reserved + # ``langfuse_session_id`` / ``langfuse_user_id`` keys that the Langfuse + # CallbackHandler lifts onto the root trace's ``sessionId`` / ``userId``. + tracing_callbacks = build_tracing_callbacks() + if tracing_callbacks: + existing_callbacks = list(config.get("callbacks") or []) + config["callbacks"] = [*existing_callbacks, *tracing_callbacks] + + configurable = config.get("configurable") or {} + inject_langfuse_metadata( + config, + thread_id=thread_id, + user_id=get_effective_user_id(), + assistant_id=self._agent_name or "lead-agent", + model_name=configurable.get("model_name") or self._model_name, + environment=self._environment or os.environ.get("DEER_FLOW_ENV") or os.environ.get("ENVIRONMENT"), + ) + self._ensure_agent(config) state: dict[str, Any] = {"messages": [HumanMessage(content=message)]} diff --git a/backend/packages/harness/deerflow/config/title_config.py b/backend/packages/harness/deerflow/config/title_config.py index f335b4952..2d2e73789 100644 --- a/backend/packages/harness/deerflow/config/title_config.py +++ b/backend/packages/harness/deerflow/config/title_config.py @@ -51,3 +51,16 @@ def load_title_config_from_dict(config_dict: dict) -> None: """Load title configuration from a dictionary.""" global _title_config _title_config = TitleConfig(**config_dict) + + +def reset_title_config() -> None: + """Restore the title configuration to its pristine ``TitleConfig()`` default. + + Public API so that tests do not have to reach into the private + ``_title_config`` module attribute. ``AppConfig.from_file()`` calls + :func:`load_title_config_from_dict`, which permanently mutates the + singleton; tests that need a clean slate between cases should call + this between tests. + """ + global _title_config + _title_config = TitleConfig() diff --git a/backend/packages/harness/deerflow/config/tracing_config.py b/backend/packages/harness/deerflow/config/tracing_config.py index 1ef5ebeb4..399e37424 100644 --- a/backend/packages/harness/deerflow/config/tracing_config.py +++ b/backend/packages/harness/deerflow/config/tracing_config.py @@ -147,3 +147,15 @@ def validate_enabled_tracing_providers() -> None: def is_tracing_enabled() -> bool: """Check if any tracing provider is enabled and fully configured.""" return get_tracing_config().is_configured + + +def reset_tracing_config() -> None: + """Discard the cached :class:`TracingConfig` so the next call rebuilds it. + + Public API so that tests do not have to reach into the private + ``_tracing_config`` module attribute. A future internal rename would + silently break callers that mutate the attribute directly. + """ + global _tracing_config + with _config_lock: + _tracing_config = None diff --git a/backend/packages/harness/deerflow/models/factory.py b/backend/packages/harness/deerflow/models/factory.py index 518bdc9f1..c6a3573f8 100644 --- a/backend/packages/harness/deerflow/models/factory.py +++ b/backend/packages/harness/deerflow/models/factory.py @@ -47,11 +47,24 @@ def _enable_stream_usage_by_default(model_use_path: str, model_settings_from_con model_settings_from_config["stream_usage"] = True -def create_chat_model(name: str | None = None, thinking_enabled: bool = False, *, app_config: AppConfig | None = None, **kwargs) -> BaseChatModel: +def create_chat_model(name: str | None = None, thinking_enabled: bool = False, *, app_config: AppConfig | None = None, attach_tracing: bool = True, **kwargs) -> BaseChatModel: """Create a chat model instance from the config. Args: name: The name of the model to create. If None, the first model in the config will be used. + thinking_enabled: Enable the model's extended-thinking mode when supported. + app_config: Explicit application config; falls back to the cached global if omitted. + attach_tracing: When True (default), attach tracing callbacks (Langfuse, + LangSmith) directly to the model instance. Standalone callers — anything + that invokes the model outside a LangGraph run that already wires tracing + at the invocation root (``MemoryUpdater``, ad-hoc utilities, etc.) — keep + this default so the model-level callback still produces traces. Callers + that already attach tracing at the graph root (``make_lead_agent``, the + in-graph ``TitleMiddleware``) MUST pass ``attach_tracing=False``; otherwise + the same LLM call emits duplicate spans (one rooted at the graph, one at + the model) and ``session_id`` / ``user_id`` metadata never reach the trace + because the model becomes a nested observation whose ``langfuse_*`` keys + get stripped. Returns: A chat model instance. @@ -149,9 +162,10 @@ def create_chat_model(name: str | None = None, thinking_enabled: bool = False, * model_instance = model_class(**kwargs, **model_settings_from_config) - callbacks = build_tracing_callbacks() - if callbacks: - existing_callbacks = model_instance.callbacks or [] - model_instance.callbacks = [*existing_callbacks, *callbacks] - logger.debug(f"Tracing attached to model '{name}' with providers={len(callbacks)}") + if attach_tracing: + callbacks = build_tracing_callbacks() + if callbacks: + existing_callbacks = model_instance.callbacks or [] + model_instance.callbacks = [*existing_callbacks, *callbacks] + logger.debug(f"Tracing attached to model '{name}' with providers={len(callbacks)}") return model_instance diff --git a/backend/packages/harness/deerflow/runtime/runs/worker.py b/backend/packages/harness/deerflow/runtime/runs/worker.py index 09e3c66e9..aa47cd39b 100644 --- a/backend/packages/harness/deerflow/runtime/runs/worker.py +++ b/backend/packages/harness/deerflow/runtime/runs/worker.py @@ -19,6 +19,7 @@ import asyncio import copy import inspect import logging +import os from dataclasses import dataclass, field from functools import lru_cache from typing import TYPE_CHECKING, Any, Literal, cast @@ -31,6 +32,8 @@ if TYPE_CHECKING: from deerflow.config.app_config import AppConfig from deerflow.runtime.serialization import serialize from deerflow.runtime.stream_bridge import StreamBridge +from deerflow.runtime.user_context import get_effective_user_id +from deerflow.tracing import inject_langfuse_metadata from .manager import RunManager, RunRecord from .naming import resolve_root_run_name @@ -225,6 +228,19 @@ async def run_agent( if journal is not None: config.setdefault("callbacks", []).append(journal) + # Inject Langfuse trace-attribute metadata so the langchain CallbackHandler + # can lift session_id / user_id / trace_name / tags onto the root trace. + # Shared helper with ``DeerFlowClient.stream`` so both entry points stay + # in sync; caller-provided metadata wins via setdefault inside the helper. + inject_langfuse_metadata( + config, + thread_id=thread_id, + user_id=get_effective_user_id(), + assistant_id=record.assistant_id, + model_name=record.model_name, + environment=os.environ.get("DEER_FLOW_ENV") or os.environ.get("ENVIRONMENT"), + ) + # Resolve after runtime context installation so context/configurable reflect # the agent name that this run will actually execute. config.setdefault("run_name", resolve_root_run_name(config, record.assistant_id)) diff --git a/backend/packages/harness/deerflow/tracing/__init__.py b/backend/packages/harness/deerflow/tracing/__init__.py index f132815fb..6d00e9c69 100644 --- a/backend/packages/harness/deerflow/tracing/__init__.py +++ b/backend/packages/harness/deerflow/tracing/__init__.py @@ -1,3 +1,8 @@ from .factory import build_tracing_callbacks +from .metadata import build_langfuse_trace_metadata, inject_langfuse_metadata -__all__ = ["build_tracing_callbacks"] +__all__ = [ + "build_langfuse_trace_metadata", + "build_tracing_callbacks", + "inject_langfuse_metadata", +] diff --git a/backend/packages/harness/deerflow/tracing/metadata.py b/backend/packages/harness/deerflow/tracing/metadata.py new file mode 100644 index 000000000..3dabf169a --- /dev/null +++ b/backend/packages/harness/deerflow/tracing/metadata.py @@ -0,0 +1,105 @@ +"""Langfuse trace-attribute metadata builders. + +The Langfuse v4 ``langchain.CallbackHandler`` lifts a fixed set of reserved +keys from ``RunnableConfig.metadata`` onto the root trace: + +- ``langfuse_session_id`` → groups traces (LangGraph thread → Langfuse Session) +- ``langfuse_user_id`` → trace user_id (powers the Users page) +- ``langfuse_trace_name`` → human-readable trace name +- ``langfuse_tags`` → trace tags + +See ``langfuse/langchain/CallbackHandler.py::_parse_langfuse_trace_attributes`` +and https://langfuse.com/docs/observability/features/sessions for the +contract. Builders here exist so the gateway/run worker can inject the +right metadata without leaking Langfuse internals into the call sites. +""" + +from __future__ import annotations + +from typing import Any + +from deerflow.config import get_enabled_tracing_providers + +# Lazy-imported below to avoid a circular import: ``deerflow.runtime`` eagerly +# imports the run worker, which in turn needs ``deerflow.tracing``. +_DEFAULT_TRACE_NAME = "lead-agent" + + +def build_langfuse_trace_metadata( + *, + thread_id: str | None, + user_id: str | None = None, + assistant_id: str | None = None, + model_name: str | None = None, + environment: str | None = None, +) -> dict[str, Any]: + """Return Langfuse trace-attribute metadata for ``RunnableConfig.metadata``. + + Returns ``{}`` when Langfuse is not in the enabled tracing providers so + callers can unconditionally merge the result without affecting LangSmith + or other tracers. + + Args: + thread_id: LangGraph thread id; mapped to ``langfuse_session_id``. + user_id: Effective user id; falls back to ``DEFAULT_USER_ID`` when + ``None`` so the Langfuse Users page works in no-auth mode. + assistant_id: Optional agent identifier; defaults to ``"lead-agent"``. + model_name: Model name; emitted as ``model:`` in ``langfuse_tags``. + environment: Deployment env (e.g. ``"production"``); emitted as + ``env:`` in ``langfuse_tags``. + """ + if "langfuse" not in get_enabled_tracing_providers(): + return {} + + from deerflow.runtime.user_context import DEFAULT_USER_ID + + metadata: dict[str, Any] = { + "langfuse_session_id": thread_id, + "langfuse_user_id": user_id or DEFAULT_USER_ID, + "langfuse_trace_name": assistant_id or _DEFAULT_TRACE_NAME, + } + + tags: list[str] = [] + if environment: + tags.append(f"env:{environment}") + if model_name: + tags.append(f"model:{model_name}") + if tags: + metadata["langfuse_tags"] = tags + + return metadata + + +def inject_langfuse_metadata( + config: dict, + *, + thread_id: str | None, + user_id: str | None = None, + assistant_id: str | None = None, + model_name: str | None = None, + environment: str | None = None, +) -> None: + """Merge Langfuse trace-attribute metadata into ``config["metadata"]``. + + Shared by the gateway worker (``runtime/runs/worker.py``) and the + embedded client (``client.py``) so the two paths cannot drift apart. + + Caller-supplied metadata wins via ``setdefault`` — an upstream value + for e.g. ``langfuse_session_id`` set by the frontend stays untouched. + The ``config`` dict is mutated in place; the call is a no-op when + Langfuse is not in the enabled tracing providers. + """ + langfuse_metadata = build_langfuse_trace_metadata( + thread_id=thread_id, + user_id=user_id, + assistant_id=assistant_id, + model_name=model_name, + environment=environment, + ) + if not langfuse_metadata: + return + + merged_metadata = dict(config.get("metadata") or {}) + for key, value in langfuse_metadata.items(): + merged_metadata.setdefault(key, value) + config["metadata"] = merged_metadata diff --git a/backend/tests/conftest.py b/backend/tests/conftest.py index 9bc8d4884..5293652f6 100644 --- a/backend/tests/conftest.py +++ b/backend/tests/conftest.py @@ -176,6 +176,31 @@ def _reset_skill_storage_singleton(): reset_skill_storage() +@pytest.fixture(autouse=True) +def _restore_title_config_singleton(): + """Reset ``_title_config`` to its pristine default after every test. + + ``AppConfig.from_file()`` writes the on-disk ``title`` block into the + module-level singleton (``config/app_config.py`` calls + ``load_title_config_from_dict``). Any test that loads the real + ``config.yaml`` therefore leaves the singleton in a state that + ``test_title_middleware_core_logic.py`` does not expect; that suite + relies on the pristine ``TitleConfig()`` default (``enabled=True``). + We restore the default after every test so test files stay + independent regardless of order. + """ + try: + from deerflow.config.title_config import reset_title_config + except ImportError: + yield + return + + try: + yield + finally: + reset_title_config() + + @pytest.fixture(autouse=True) def _auto_user_context(request): """Inject a default ``test-user-autouse`` into the contextvar. diff --git a/backend/tests/test_client_langfuse_metadata.py b/backend/tests/test_client_langfuse_metadata.py new file mode 100644 index 000000000..3116fd331 --- /dev/null +++ b/backend/tests/test_client_langfuse_metadata.py @@ -0,0 +1,159 @@ +"""Tests for DeerFlowClient's graph-root tracing wiring. + +Regression coverage for the Copilot review on PR #2944: when the title +and summarization middlewares request ``attach_tracing=False`` we must +make sure ``DeerFlowClient`` injects the tracing callbacks at the graph +invocation root instead, otherwise those middlewares produce untraced +LLM calls. +""" + +from __future__ import annotations + +from types import SimpleNamespace +from typing import Any + +import pytest + +from deerflow.client import DeerFlowClient + + +class _FakeAgent: + """Capture the ``config`` handed to ``agent.stream``.""" + + def __init__(self) -> None: + self.captured_config: dict | None = None + self.checkpointer = None + self.store = None + + def stream(self, state, *, config, context, stream_mode): + self.captured_config = config + return iter(()) # empty stream + + +@pytest.fixture(autouse=True) +def _clear_langfuse_env(monkeypatch): + from deerflow.config.tracing_config import reset_tracing_config + + for name in ("LANGFUSE_TRACING", "LANGFUSE_PUBLIC_KEY", "LANGFUSE_SECRET_KEY", "LANGFUSE_BASE_URL"): + monkeypatch.delenv(name, raising=False) + reset_tracing_config() + yield + reset_tracing_config() + + +def _stub_agent_creation(monkeypatch, fake_agent: _FakeAgent) -> dict[str, Any]: + """Short-circuit the heavy parts of ``_ensure_agent`` so we can drive + ``stream()`` against a fake graph without touching real models, tools + or middleware factories. + """ + captured: dict[str, Any] = {} + + def _stub_ensure_agent(self, config): + captured["config"] = config + self._agent = fake_agent + self._agent_config_key = ("stub",) + + monkeypatch.setattr(DeerFlowClient, "_ensure_agent", _stub_ensure_agent) + return captured + + +def _make_client(_monkeypatch) -> DeerFlowClient: + """Build a client without going through ``__init__`` so we never load + config.yaml or perform any other side-effectful startup work.""" + fake_app_config = SimpleNamespace(models=[SimpleNamespace(name="stub-model")]) + client = DeerFlowClient.__new__(DeerFlowClient) + client._app_config = fake_app_config + client._extensions_config = None + client._model_name = "stub-model" + client._thinking_enabled = False + client._plan_mode = False + client._subagent_enabled = False + client._agent_name = None + client._available_skills = None + client._middlewares = None + client._checkpointer = None + client._agent = None + client._agent_config_key = None + client._environment = None + return client + + +def test_stream_injects_langfuse_metadata_when_enabled(monkeypatch): + monkeypatch.setenv("LANGFUSE_TRACING", "true") + monkeypatch.setenv("LANGFUSE_PUBLIC_KEY", "pk-lf-test") + monkeypatch.setenv("LANGFUSE_SECRET_KEY", "sk-lf-test") + from deerflow.config.tracing_config import reset_tracing_config + + reset_tracing_config() + + class _SentinelHandler: + pass + + sentinel = _SentinelHandler() + monkeypatch.setattr("deerflow.client.build_tracing_callbacks", lambda: [sentinel]) + + fake_agent = _FakeAgent() + captured = _stub_agent_creation(monkeypatch, fake_agent) + client = _make_client(monkeypatch) + + list(client.stream("hi", thread_id="thread-client-1")) + + config = captured["config"] + metadata = config.get("metadata") or {} + assert metadata.get("langfuse_session_id") == "thread-client-1" + assert metadata.get("langfuse_trace_name") == "lead-agent" + # Default no-auth context falls back to ``"default"`` user. + assert metadata.get("langfuse_user_id") in {"default", "test-user-autouse"} + callbacks = config.get("callbacks") or [] + assert sentinel in callbacks + + +def test_stream_is_inert_when_langfuse_disabled(monkeypatch): + monkeypatch.setattr("deerflow.client.build_tracing_callbacks", lambda: []) + + fake_agent = _FakeAgent() + captured = _stub_agent_creation(monkeypatch, fake_agent) + client = _make_client(monkeypatch) + + list(client.stream("hi", thread_id="thread-client-2")) + + config = captured["config"] + assert "callbacks" not in config or not config["callbacks"] + metadata = config.get("metadata") or {} + assert "langfuse_session_id" not in metadata + assert "langfuse_user_id" not in metadata + + +def test_stream_preserves_caller_metadata_overrides(monkeypatch): + monkeypatch.setenv("LANGFUSE_TRACING", "true") + monkeypatch.setenv("LANGFUSE_PUBLIC_KEY", "pk-lf-test") + monkeypatch.setenv("LANGFUSE_SECRET_KEY", "sk-lf-test") + from deerflow.config.tracing_config import reset_tracing_config + + reset_tracing_config() + monkeypatch.setattr("deerflow.client.build_tracing_callbacks", lambda: []) + + fake_agent = _FakeAgent() + captured = _stub_agent_creation(monkeypatch, fake_agent) + client = _make_client(monkeypatch) + + # Drive stream with a pre-populated metadata so the worker-equivalent + # ``setdefault`` semantics are exercised. + original_get_config = DeerFlowClient._get_runnable_config + + def patched_get_runnable_config(self, thread_id, **overrides): + cfg = original_get_config(self, thread_id, **overrides) + cfg["metadata"] = { + "langfuse_session_id": "explicit-session-override", + "langfuse_user_id": "explicit-user", + } + return cfg + + monkeypatch.setattr(DeerFlowClient, "_get_runnable_config", patched_get_runnable_config) + list(client.stream("hi", thread_id="thread-client-3")) + + metadata = captured["config"].get("metadata") or {} + assert metadata["langfuse_session_id"] == "explicit-session-override" + assert metadata["langfuse_user_id"] == "explicit-user" + # ``trace_name`` was not supplied by caller so the worker still fills it. + assert metadata["langfuse_trace_name"] == "lead-agent" diff --git a/backend/tests/test_lead_agent_model_resolution.py b/backend/tests/test_lead_agent_model_resolution.py index 976730d44..7ac4b97e6 100644 --- a/backend/tests/test_lead_agent_model_resolution.py +++ b/backend/tests/test_lead_agent_model_resolution.py @@ -41,6 +41,49 @@ def test_make_lead_agent_signature_matches_langgraph_server_factory_abi(): assert list(inspect.signature(lead_agent_module.make_lead_agent).parameters) == ["config"] +def test_make_lead_agent_attaches_tracing_callbacks_at_graph_root(monkeypatch): + """Regression guard: tracing handlers must be appended to + ``config["callbacks"]`` (graph invocation root), and every in-graph + ``create_chat_model`` call must pass ``attach_tracing=False``. + + Catches future contributors who forget the flag when adding new + in-graph model creation, which would silently produce duplicate + spans and break Langfuse session/user propagation. + """ + app_config = _make_app_config([_make_model("safe-model", supports_thinking=False)]) + + import deerflow.tools as tools_module + + monkeypatch.setattr(lead_agent_module, "get_app_config", lambda: app_config) + monkeypatch.setattr(tools_module, "get_available_tools", lambda **kwargs: []) + monkeypatch.setattr(lead_agent_module, "_build_middlewares", lambda config, model_name, agent_name=None, **kwargs: []) + + sentinel_handler = object() + monkeypatch.setattr(lead_agent_module, "build_tracing_callbacks", lambda: [sentinel_handler]) + + seen_attach_tracing: list[bool] = [] + + def _fake_create_chat_model(*, name, thinking_enabled, reasoning_effort=None, app_config=None, attach_tracing=True): + seen_attach_tracing.append(attach_tracing) + return object() + + monkeypatch.setattr(lead_agent_module, "create_chat_model", _fake_create_chat_model) + monkeypatch.setattr(lead_agent_module, "create_agent", lambda **kwargs: kwargs) + + config: dict = {"configurable": {"model_name": "safe-model"}} + lead_agent_module._make_lead_agent(config, app_config=app_config) + + # Handler must land on the graph invocation config so the Langfuse + # CallbackHandler fires ``on_chain_start(parent_run_id=None)`` and + # propagates ``session_id`` / ``user_id`` onto the trace. + assert sentinel_handler in (config.get("callbacks") or []), "build_tracing_callbacks output must be appended to config['callbacks']" + + # Every in-graph create_chat_model call must opt out of model-level + # tracing to avoid duplicate spans. + assert seen_attach_tracing, "_make_lead_agent did not call create_chat_model" + assert all(flag is False for flag in seen_attach_tracing), f"in-graph create_chat_model must pass attach_tracing=False; got {seen_attach_tracing}" + + def test_internal_make_lead_agent_uses_explicit_app_config(monkeypatch): app_config = _make_app_config([_make_model("explicit-model", supports_thinking=False)]) @@ -55,7 +98,7 @@ def test_internal_make_lead_agent_uses_explicit_app_config(monkeypatch): captured: dict[str, object] = {} - def _fake_create_chat_model(*, name, thinking_enabled, reasoning_effort=None, app_config=None): + def _fake_create_chat_model(*, name, thinking_enabled, reasoning_effort=None, app_config=None, attach_tracing=True): captured["name"] = name captured["app_config"] = app_config return object() @@ -89,7 +132,7 @@ def test_make_lead_agent_uses_runtime_app_config_from_context_without_global_rea captured: dict[str, object] = {} - def _fake_create_chat_model(*, name, thinking_enabled, reasoning_effort=None, app_config=None): + def _fake_create_chat_model(*, name, thinking_enabled, reasoning_effort=None, app_config=None, attach_tracing=True): captured["name"] = name captured["app_config"] = app_config return object() @@ -168,7 +211,7 @@ def test_make_lead_agent_disables_thinking_when_model_does_not_support_it(monkey captured: dict[str, object] = {} - def _fake_create_chat_model(*, name, thinking_enabled, reasoning_effort=None, app_config=None): + def _fake_create_chat_model(*, name, thinking_enabled, reasoning_effort=None, app_config=None, attach_tracing=True): captured["name"] = name captured["thinking_enabled"] = thinking_enabled captured["reasoning_effort"] = reasoning_effort @@ -212,7 +255,7 @@ def test_make_lead_agent_reads_runtime_options_from_context(monkeypatch): captured: dict[str, object] = {} - def _fake_create_chat_model(*, name, thinking_enabled, reasoning_effort=None, app_config=None): + def _fake_create_chat_model(*, name, thinking_enabled, reasoning_effort=None, app_config=None, attach_tracing=True): captured["name"] = name captured["thinking_enabled"] = thinking_enabled captured["reasoning_effort"] = reasoning_effort @@ -407,7 +450,7 @@ def test_create_summarization_middleware_uses_configured_model_alias(monkeypatch fake_model = MagicMock() fake_model.with_config.return_value = fake_model - def _fake_create_chat_model(*, name=None, thinking_enabled, reasoning_effort=None, app_config=None): + def _fake_create_chat_model(*, name=None, thinking_enabled, reasoning_effort=None, app_config=None, attach_tracing=True): captured["name"] = name captured["thinking_enabled"] = thinking_enabled captured["reasoning_effort"] = reasoning_effort @@ -441,7 +484,7 @@ def test_create_summarization_middleware_threads_resolved_app_config_to_model(mo fake_model = MagicMock() fake_model.with_config.return_value = fake_model - def _fake_create_chat_model(*, name=None, thinking_enabled, reasoning_effort=None, app_config=None): + def _fake_create_chat_model(*, name=None, thinking_enabled, reasoning_effort=None, app_config=None, attach_tracing=True): captured["app_config"] = app_config return fake_model diff --git a/backend/tests/test_title_middleware_core_logic.py b/backend/tests/test_title_middleware_core_logic.py index 3fdf4d3f9..ac10848e1 100644 --- a/backend/tests/test_title_middleware_core_logic.py +++ b/backend/tests/test_title_middleware_core_logic.py @@ -109,7 +109,7 @@ class TestTitleMiddlewareCoreLogic: title = result["title"] assert title == "短标题" - title_middleware_module.create_chat_model.assert_called_once_with(thinking_enabled=False) + title_middleware_module.create_chat_model.assert_called_once_with(thinking_enabled=False, attach_tracing=False) model.ainvoke.assert_awaited_once() assert model.ainvoke.await_args.kwargs["config"] == { "run_name": "title_agent", @@ -141,6 +141,7 @@ class TestTitleMiddlewareCoreLogic: title_middleware_module.create_chat_model.assert_called_once_with( name="title-model", thinking_enabled=False, + attach_tracing=False, app_config=app_config, ) diff --git a/backend/tests/test_tracing_config.py b/backend/tests/test_tracing_config.py index a13be516d..943401c97 100644 --- a/backend/tests/test_tracing_config.py +++ b/backend/tests/test_tracing_config.py @@ -5,10 +5,11 @@ from __future__ import annotations import pytest from deerflow.config import tracing_config as tracing_module +from deerflow.config.tracing_config import reset_tracing_config def _reset_tracing_cache() -> None: - tracing_module._tracing_config = None + reset_tracing_config() @pytest.fixture(autouse=True) diff --git a/backend/tests/test_tracing_factory.py b/backend/tests/test_tracing_factory.py index b3e77935f..723e42e80 100644 --- a/backend/tests/test_tracing_factory.py +++ b/backend/tests/test_tracing_factory.py @@ -12,7 +12,7 @@ from deerflow.tracing import factory as tracing_factory @pytest.fixture(autouse=True) def clear_tracing_env(monkeypatch): - from deerflow.config import tracing_config as tracing_module + from deerflow.config.tracing_config import reset_tracing_config for name in ( "LANGSMITH_TRACING", @@ -30,9 +30,9 @@ def clear_tracing_env(monkeypatch): "LANGFUSE_BASE_URL", ): monkeypatch.delenv(name, raising=False) - tracing_module._tracing_config = None + reset_tracing_config() yield - tracing_module._tracing_config = None + reset_tracing_config() def test_build_tracing_callbacks_returns_empty_list_when_disabled(monkeypatch): @@ -114,12 +114,12 @@ def test_build_tracing_callbacks_raises_when_enabled_provider_fails(monkeypatch) def test_build_tracing_callbacks_raises_for_explicitly_enabled_misconfigured_provider(monkeypatch): - from deerflow.config import tracing_config as tracing_module + from deerflow.config.tracing_config import reset_tracing_config monkeypatch.setenv("LANGFUSE_TRACING", "true") monkeypatch.delenv("LANGFUSE_PUBLIC_KEY", raising=False) monkeypatch.setenv("LANGFUSE_SECRET_KEY", "sk-lf-test") - tracing_module._tracing_config = None + reset_tracing_config() with pytest.raises(ValueError, match="LANGFUSE_PUBLIC_KEY"): tracing_factory.build_tracing_callbacks() diff --git a/backend/tests/test_tracing_metadata.py b/backend/tests/test_tracing_metadata.py new file mode 100644 index 000000000..6c758e40d --- /dev/null +++ b/backend/tests/test_tracing_metadata.py @@ -0,0 +1,137 @@ +"""Tests for deerflow.tracing.metadata.build_langfuse_trace_metadata.""" + +from __future__ import annotations + +import pytest + +from deerflow.tracing import metadata as tracing_metadata + + +@pytest.fixture(autouse=True) +def _clear_tracing_env(monkeypatch): + from deerflow.config.tracing_config import reset_tracing_config + + for name in ( + "LANGFUSE_TRACING", + "LANGFUSE_PUBLIC_KEY", + "LANGFUSE_SECRET_KEY", + "LANGFUSE_BASE_URL", + "LANGSMITH_TRACING", + "LANGCHAIN_TRACING_V2", + "LANGCHAIN_TRACING", + "LANGSMITH_API_KEY", + "LANGCHAIN_API_KEY", + ): + monkeypatch.delenv(name, raising=False) + reset_tracing_config() + yield + reset_tracing_config() + + +def _enable_langfuse(monkeypatch): + monkeypatch.setenv("LANGFUSE_TRACING", "true") + monkeypatch.setenv("LANGFUSE_PUBLIC_KEY", "pk-lf-test") + monkeypatch.setenv("LANGFUSE_SECRET_KEY", "sk-lf-test") + + +def test_returns_empty_when_langfuse_disabled(monkeypatch): + # No env vars set → langfuse not in enabled providers. + result = tracing_metadata.build_langfuse_trace_metadata( + thread_id="t-1", + user_id="u-1", + assistant_id="lead-agent", + model_name="gpt-4o", + ) + assert result == {} + + +def test_session_id_maps_to_thread_id(monkeypatch): + _enable_langfuse(monkeypatch) + + result = tracing_metadata.build_langfuse_trace_metadata( + thread_id="thread-abc", + user_id="user-42", + ) + + assert result["langfuse_session_id"] == "thread-abc" + + +def test_user_id_falls_back_to_default(monkeypatch): + _enable_langfuse(monkeypatch) + + result = tracing_metadata.build_langfuse_trace_metadata( + thread_id="thread-abc", + user_id=None, + ) + + assert result["langfuse_user_id"] == "default" + + +def test_user_id_explicit_value_wins(monkeypatch): + _enable_langfuse(monkeypatch) + + result = tracing_metadata.build_langfuse_trace_metadata( + thread_id="thread-abc", + user_id="alice@example.com", + ) + + assert result["langfuse_user_id"] == "alice@example.com" + + +def test_trace_name_uses_assistant_id_when_provided(monkeypatch): + _enable_langfuse(monkeypatch) + + result = tracing_metadata.build_langfuse_trace_metadata( + thread_id="t", + assistant_id="custom-agent", + ) + + assert result["langfuse_trace_name"] == "custom-agent" + + +def test_trace_name_defaults_to_lead_agent(monkeypatch): + _enable_langfuse(monkeypatch) + + result = tracing_metadata.build_langfuse_trace_metadata( + thread_id="t", + assistant_id=None, + ) + + assert result["langfuse_trace_name"] == "lead-agent" + + +def test_tags_include_env_and_model(monkeypatch): + _enable_langfuse(monkeypatch) + + result = tracing_metadata.build_langfuse_trace_metadata( + thread_id="t", + environment="production", + model_name="gpt-4o", + ) + + assert result["langfuse_tags"] == ["env:production", "model:gpt-4o"] + + +def test_tags_omitted_when_no_tag_inputs(monkeypatch): + _enable_langfuse(monkeypatch) + + result = tracing_metadata.build_langfuse_trace_metadata( + thread_id="t", + user_id="u", + ) + + assert "langfuse_tags" not in result + + +def test_thread_id_none_still_produces_metadata(monkeypatch): + # Stateless run paths may not have a thread_id — we still want + # user_id / trace_name to flow through so Users page works. + _enable_langfuse(monkeypatch) + + result = tracing_metadata.build_langfuse_trace_metadata( + thread_id=None, + user_id="u-1", + ) + + assert result["langfuse_session_id"] is None + assert result["langfuse_user_id"] == "u-1" diff --git a/backend/tests/test_worker_langfuse_metadata.py b/backend/tests/test_worker_langfuse_metadata.py new file mode 100644 index 000000000..7b7544771 --- /dev/null +++ b/backend/tests/test_worker_langfuse_metadata.py @@ -0,0 +1,248 @@ +"""Integration test: worker.run_agent injects Langfuse trace metadata. + +Verifies that the agent factory's resulting graph receives a +``RunnableConfig`` whose ``metadata`` carries the Langfuse reserved keys +(``langfuse_session_id`` / ``langfuse_user_id`` / ``langfuse_trace_name``). +""" + +from __future__ import annotations + +import asyncio + +import pytest + +from deerflow.runtime.runs.manager import RunRecord +from deerflow.runtime.runs.schemas import DisconnectMode, RunStatus +from deerflow.runtime.runs.worker import RunContext, run_agent + + +class _FakeAgent: + """Minimal LangGraph-like graph that captures the runnable config.""" + + def __init__(self) -> None: + self.captured_config: dict | None = None + self.metadata: dict = {} + # Worker may assign these attributes; need them to exist. + self.checkpointer = None + self.store = None + self.interrupt_before_nodes: list[str] = [] + self.interrupt_after_nodes: list[str] = [] + + async def astream(self, graph_input, *, config, stream_mode, **kwargs): + self.captured_config = config + # Empty async generator — no chunks produced. + return + yield # pragma: no cover (makes this an async generator) + + +class _FakeRunManager: + async def set_status(self, *_args, **_kwargs) -> None: + return None + + async def update_model_name(self, *_args, **_kwargs) -> None: + return None + + async def update_run_completion(self, *_args, **_kwargs) -> None: + return None + + +class _FakeBridge: + def __init__(self) -> None: + self.events: list[tuple[str, object]] = [] + + async def publish(self, _run_id, event, payload) -> None: + self.events.append((event, payload)) + + async def publish_end(self, _run_id) -> None: + self.events.append(("end", None)) + + async def cleanup(self, _run_id, *, delay: int = 0) -> None: + return None + + +@pytest.fixture(autouse=True) +def _clear_tracing_env(monkeypatch): + from deerflow.config.tracing_config import reset_tracing_config + + for name in ("LANGFUSE_TRACING", "LANGFUSE_PUBLIC_KEY", "LANGFUSE_SECRET_KEY", "LANGFUSE_BASE_URL"): + monkeypatch.delenv(name, raising=False) + reset_tracing_config() + yield + reset_tracing_config() + + +@pytest.mark.asyncio +async def test_run_agent_injects_langfuse_metadata(monkeypatch): + monkeypatch.setenv("LANGFUSE_TRACING", "true") + monkeypatch.setenv("LANGFUSE_PUBLIC_KEY", "pk-lf-test") + monkeypatch.setenv("LANGFUSE_SECRET_KEY", "sk-lf-test") + from deerflow.config.tracing_config import reset_tracing_config + + reset_tracing_config() + + fake_agent = _FakeAgent() + + def agent_factory(config): + return fake_agent + + record = RunRecord( + run_id="run-1", + thread_id="thread-xyz", + assistant_id="lead-agent", + status=RunStatus.pending, + on_disconnect=DisconnectMode.cancel, + model_name="gpt-4o", + ) + record.abort_event = asyncio.Event() + ctx = RunContext(checkpointer=None) + + await run_agent( + _FakeBridge(), + _FakeRunManager(), + record, + ctx=ctx, + agent_factory=agent_factory, + graph_input={"messages": []}, + config={"configurable": {"thread_id": "thread-xyz"}}, + ) + + assert fake_agent.captured_config is not None, "astream was not invoked" + metadata = fake_agent.captured_config.get("metadata") or {} + assert metadata.get("langfuse_session_id") == "thread-xyz" + # conftest.py autouse fixture injects ``test-user-autouse`` into the + # contextvar — the worker should read it via ``get_effective_user_id``. + user_id = metadata.get("langfuse_user_id") + assert user_id == "test-user-autouse", f"expected test-user-autouse, got {user_id}" + assert metadata.get("langfuse_trace_name") == "lead-agent" + tags = metadata.get("langfuse_tags") or [] + assert "model:gpt-4o" in tags + + +@pytest.mark.asyncio +async def test_run_agent_falls_back_to_default_user_when_unset(monkeypatch): + """When no user is in the contextvar, langfuse_user_id falls back to 'default'. + + Uses ``monkeypatch.setattr`` to redirect ``get_effective_user_id`` to return + ``"default"`` rather than directly mutating the contextvar — direct contextvar + operations across pytest test boundaries have produced spooky cross-file + pollution when combined with the langfuse OTel global tracer provider. + """ + monkeypatch.setenv("LANGFUSE_TRACING", "true") + monkeypatch.setenv("LANGFUSE_PUBLIC_KEY", "pk-lf-test") + monkeypatch.setenv("LANGFUSE_SECRET_KEY", "sk-lf-test") + from deerflow.config.tracing_config import reset_tracing_config + from deerflow.runtime.runs import worker as worker_module + from deerflow.runtime.user_context import DEFAULT_USER_ID + + reset_tracing_config() + monkeypatch.setattr(worker_module, "get_effective_user_id", lambda: DEFAULT_USER_ID) + + fake_agent = _FakeAgent() + + def agent_factory(config): + return fake_agent + + record = RunRecord( + run_id="run-fallback", + thread_id="thread-fb", + assistant_id="lead-agent", + status=RunStatus.pending, + on_disconnect=DisconnectMode.cancel, + ) + record.abort_event = asyncio.Event() + ctx = RunContext(checkpointer=None) + + await run_agent( + _FakeBridge(), + _FakeRunManager(), + record, + ctx=ctx, + agent_factory=agent_factory, + graph_input={"messages": []}, + config={"configurable": {"thread_id": "thread-fb"}}, + ) + + metadata = fake_agent.captured_config.get("metadata") or {} + assert metadata.get("langfuse_user_id") == "default" + + +@pytest.mark.asyncio +async def test_run_agent_preserves_caller_metadata_overrides(monkeypatch): + """Caller-provided langfuse_* keys must NOT be overridden by the default injection.""" + monkeypatch.setenv("LANGFUSE_TRACING", "true") + monkeypatch.setenv("LANGFUSE_PUBLIC_KEY", "pk-lf-test") + monkeypatch.setenv("LANGFUSE_SECRET_KEY", "sk-lf-test") + from deerflow.config.tracing_config import reset_tracing_config + + reset_tracing_config() + + fake_agent = _FakeAgent() + + def agent_factory(config): + return fake_agent + + record = RunRecord( + run_id="run-2", + thread_id="thread-default", + assistant_id="lead-agent", + status=RunStatus.pending, + on_disconnect=DisconnectMode.cancel, + ) + record.abort_event = asyncio.Event() + ctx = RunContext(checkpointer=None) + + await run_agent( + _FakeBridge(), + _FakeRunManager(), + record, + ctx=ctx, + agent_factory=agent_factory, + graph_input={"messages": []}, + config={ + "configurable": {"thread_id": "thread-default"}, + "metadata": { + "langfuse_session_id": "custom-session-id", + "langfuse_user_id": "explicit-user", + }, + }, + ) + + metadata = fake_agent.captured_config.get("metadata") or {} + # Caller-supplied keys win. + assert metadata["langfuse_session_id"] == "custom-session-id" + assert metadata["langfuse_user_id"] == "explicit-user" + # Worker still fills in keys that the caller didn't set. + assert metadata["langfuse_trace_name"] == "lead-agent" + + +@pytest.mark.asyncio +async def test_run_agent_skips_metadata_when_langfuse_disabled(monkeypatch): + fake_agent = _FakeAgent() + + def agent_factory(config): + return fake_agent + + record = RunRecord( + run_id="run-3", + thread_id="thread-noop", + assistant_id="lead-agent", + status=RunStatus.pending, + on_disconnect=DisconnectMode.cancel, + ) + record.abort_event = asyncio.Event() + ctx = RunContext(checkpointer=None) + + await run_agent( + _FakeBridge(), + _FakeRunManager(), + record, + ctx=ctx, + agent_factory=agent_factory, + graph_input={"messages": []}, + config={"configurable": {"thread_id": "thread-noop"}}, + ) + + metadata = fake_agent.captured_config.get("metadata") or {} + assert "langfuse_session_id" not in metadata + assert "langfuse_user_id" not in metadata + assert "langfuse_trace_name" not in metadata