fix(tracing): propagate session_id and user_id into Langfuse traces (#2944)

* fix(tracing): propagate session_id and user_id into Langfuse traces

Adds Langfuse v4 reserved trace attributes (langfuse_session_id,
langfuse_user_id, langfuse_trace_name, langfuse_tags) to
RunnableConfig.metadata inside the run worker, so the langchain
CallbackHandler can lift them onto the root trace.

- New deerflow.tracing.metadata.build_langfuse_trace_metadata() returns
  the reserved keys when Langfuse is in the enabled providers, else {}.
- worker.run_agent merges them with setdefault so caller-supplied keys
  win, allowing per-request overrides from upstream metadata.
- session_id mirrors the LangGraph thread_id; user_id reads
  get_effective_user_id() (falls back to "default" in no-auth mode).
- trace_name defaults to "lead-agent"; tags carry env and model name
  when DEER_FLOW_ENV (or ENVIRONMENT) and a model name are present.

Closes #2930

* fix(tracing): attach Langfuse callback at graph root so metadata propagates

The first commit injected ``langfuse_session_id`` / ``langfuse_user_id`` /
``langfuse_trace_name`` / ``langfuse_tags`` into ``RunnableConfig.metadata``,
but on ``main`` the Langfuse callback is attached at *model* level
(``models/factory.py``). LangChain still threads ``parent_run_id`` through
the contextvar, so the handler sees the model as a nested observation and
``__on_llm_action`` strips the ``langfuse_*`` keys
(``keep_langfuse_trace_attributes=False``). The trace's top-level
``sessionId`` / ``userId`` therefore stayed empty in deer-flow's LangGraph
runtime — confirmed live against a real Langfuse instance.

This commit moves the callback to the **graph invocation root** so the
handler fires ``on_chain_start(parent_run_id=None)`` and runs the
``propagate_attributes`` path that actually lifts ``session_id`` /
``user_id`` onto the trace:

- ``models/factory.py``: add ``attach_tracing`` keyword (default ``True``)
  so standalone callers (``MemoryUpdater``, etc.) keep their direct
  model-level tracing.
- ``agents/lead_agent/agent.py``: call ``build_tracing_callbacks()`` once
  inside ``_make_lead_agent`` and append the result to
  ``config["callbacks"]``; the four in-graph ``create_chat_model`` sites
  (bootstrap, default agent, sync + async summarization) pass
  ``attach_tracing=False`` to avoid duplicate spans.
- ``agents/middlewares/title_middleware.py``: same ``attach_tracing=False``
  for the title-generation model, since it inherits the graph's
  RunnableConfig via ``_get_runnable_config``.

Test updates:

- ``tests/test_lead_agent_model_resolution.py`` and
  ``tests/test_title_middleware_core_logic.py``: extend the fake
  ``create_chat_model`` signatures / mock assertions to accept the new
  ``attach_tracing`` kwarg.
- ``tests/test_worker_langfuse_metadata.py``: switch the no-user fallback
  test from direct ContextVar mutation to ``monkeypatch.setattr`` on
  ``get_effective_user_id`` to avoid pollution across the langfuse OTel
  global tracer provider.
- ``tests/conftest.py``: add an autouse fixture that resets
  ``deerflow.config.title_config._title_config`` to its pristine default
  after every test. Any test that loads the real ``config.yaml`` (via
  ``get_app_config()``) calls ``load_title_config_from_dict`` and mutates
  the module-level singleton, which previously poisoned the
  title-middleware suite when run after, e.g., the new
  ``test_worker_langfuse_metadata.py`` cases. The fixture is independent
  of this PR's main change but unblocks the cross-file test run.

Live verification (same Langfuse instance as before):

- Drove ``worker.run_agent`` against the real ``make_lead_agent`` +
  ``gpt-4o-mini`` for three distinct ``user_context`` identities
  (``fancy-engineer``, ``alice-pm``, ``bob-designer``).
- Each run produced one ``lead-agent`` trace whose top-level
  ``sessionId`` / ``userId`` / ``tags`` carry the expected values, e.g.
  ``session=e2e-2930-8f347c-alice-pm user=alice-pm name='lead-agent'
  tags=['model:gpt-4o-mini']``.

Refs #2930.

* fix(tracing): extend root-callback + metadata injection to the embedded client

Addresses Copilot review on PR #2944.

Commit 2 disabled model-level tracing for ``TitleMiddleware`` and
``_create_summarization_middleware`` because ``_make_lead_agent`` now
attaches the tracing callbacks at the graph invocation root. But the
embedded ``DeerFlowClient`` does not call ``_make_lead_agent`` — it
calls ``_build_middlewares`` directly and never appends the tracing
handlers to its ``RunnableConfig``. So under the embedded path,
title-generation and summarization LLM calls were left untraced —
a regression introduced by this PR.

This commit mirrors the gateway worker's injection in
``DeerFlowClient.stream``:

- Append ``build_tracing_callbacks()`` to ``config["callbacks"]`` so
  the Langfuse handler sees ``on_chain_start(parent_run_id=None)`` at
  the graph root and runs the ``propagate_attributes`` path.
- Merge ``build_langfuse_trace_metadata(...)`` into
  ``config["metadata"]`` with ``setdefault`` so caller-supplied keys
  still win.
- ``_ensure_agent`` now creates its main model with
  ``attach_tracing=False`` to avoid duplicate spans now that the
  callback lives at the graph root.

Docs:
- ``backend/CLAUDE.md`` Tracing section rewritten to describe the
  graph-root attachment model (replacing the inaccurate
  "at model-creation time" wording).
- ``README.md`` Langfuse section now lists both injection points
  (worker + client) instead of only the worker path.

Tests:
- ``tests/test_client_langfuse_metadata.py`` (new, 3 cases):
  callbacks + metadata are injected when Langfuse is enabled,
  caller-supplied metadata overrides win via ``setdefault``, and the
  injection is inert when Langfuse is disabled.

Live verification on the real Langfuse instance:

  === user=fancy-client ===
    id=cbd22847..  session=client-2930-6b9491-fancy-client  user=fancy-client  name='lead-agent'
  === user=alice-client ===
    id=b4f6f576..  session=client-2930-6b9491-alice-client  user=alice-client  name='lead-agent'

Refs #2930.

* refactor(tracing): address maintainer review on PR #2944

Addresses @WillemJiang's 5 comments.

1. Duplicated metadata-injection code between worker.py and client.py
   New ``deerflow.tracing.inject_langfuse_metadata(config, ...)`` helper
   takes the 10-line build + merge + setdefault logic that was duplicated
   in ``runtime/runs/worker.py`` and ``client.py``. Both callers now share
   a single source of truth, so the two paths cannot drift.

2. Direct private-attribute mutation in conftest.py and tests
   Added public ``reset_tracing_config()`` / ``reset_title_config()``
   functions. ``tests/conftest.py`` and every test that previously did
   ``tracing_module._tracing_config = None`` or
   ``title_module._title_config = TitleConfig()`` now goes through the
   public API. A future internal rename will surface as an ImportError
   instead of a silent no-op.

3. client.py reading os.environ directly
   ``DeerFlowClient.__init__`` grows an optional ``environment`` parameter
   so programmatic callers can pass the deployment label explicitly.
   ``stream()`` consults ``self._environment`` first and only falls back
   to ``DEER_FLOW_ENV`` / ``ENVIRONMENT`` env vars when nothing was
   passed in. Backwards compatible — env-var behaviour preserved for
   callers that opt to keep using it.

4. build_tracing_callbacks() cached on hot path
   Not implemented. Inspected the langfuse v4 ``langchain.CallbackHandler``
   constructor: it only resolves the module-level singleton client via
   ``get_client()`` and initialises a few dicts (no I/O, no env parsing
   at construction time). The build is essentially free. Caching would
   trade a non-measurable speedup for two real risks: handler instances
   carry per-run state internally (``_run_states``, ``_root_run_states``,
   ``last_trace_id``), and tracing config can be reloaded by env-var
   changes between runs. Will revisit if profiling ever shows it as
   a hot spot.

5. attach_tracing=False easy to forget at new in-graph call sites
   - Module docstring at the top of ``lead_agent/agent.py`` documents
     the invariant ("every in-graph ``create_chat_model`` MUST pass
     ``attach_tracing=False``") and enumerates the current sites.
   - New regression test
     ``test_make_lead_agent_attaches_tracing_callbacks_at_graph_root`` in
     ``tests/test_lead_agent_model_resolution.py`` locks both halves of
     the invariant: ``config["callbacks"]`` carries the tracing handler
     after ``_make_lead_agent``, AND every ``create_chat_model`` call
     captured by the test passes ``attach_tracing=False``. A future
     in-graph site that forgets the flag will fail this test.

Lint clean. Full touched-suite bundle: 246 passed.

---------

Co-authored-by: Willem Jiang <willem.jiang@gmail.com>
This commit is contained in:
Xinmin Zeng
2026-05-21 16:49:31 +08:00
committed by GitHub
parent ca7042dec2
commit df95154282
19 changed files with 910 additions and 26 deletions
+9
View File
@@ -546,6 +546,15 @@ LANGFUSE_BASE_URL=https://cloud.langfuse.com
If you are using a self-hosted Langfuse instance, set `LANGFUSE_BASE_URL` to your deployment URL. If you are using a self-hosted Langfuse instance, set `LANGFUSE_BASE_URL` to your deployment URL.
**Trace correlation fields.** Every agent run is annotated with Langfuse's reserved trace attributes so the Sessions and Users pages light up automatically:
- `session_id` = LangGraph `thread_id` — groups every trace of the same conversation
- `user_id` = effective user from `get_effective_user_id()` (falls back to `default` in no-auth mode)
- `trace_name` = assistant id (defaults to `lead-agent`)
- `tags` = `[env:<DEER_FLOW_ENV>, model:<model_name>]` (omitted when not set)
These are injected into `RunnableConfig.metadata` at the graph invocation root for both the gateway path (`runtime/runs/worker.py::run_agent`) and the embedded path (`client.py::DeerFlowClient.stream`), so any LangChain-compatible callback can read them. Set `DEER_FLOW_ENV` (or `ENVIRONMENT`) to tag traces by deployment environment.
#### Using Both Providers #### Using Both Providers
If both LangSmith and Langfuse are enabled, DeerFlow attaches both tracing callbacks and reports the same model activity to both systems. If both LangSmith and Langfuse are enabled, DeerFlow attaches both tracing callbacks and reports the same model activity to both systems.
+18
View File
@@ -397,6 +397,24 @@ Focused regression coverage for the updater lives in `backend/tests/test_memory_
- `resolve_variable(path)` - Import module and return variable (e.g., `module.path:variable_name`) - `resolve_variable(path)` - Import module and return variable (e.g., `module.path:variable_name`)
- `resolve_class(path, base_class)` - Import and validate class against base class - `resolve_class(path, base_class)` - Import and validate class against base class
### Tracing System (`packages/harness/deerflow/tracing/`)
LangSmith and Langfuse are both supported. The wiring lives in two layers:
- `factory.py::build_tracing_callbacks()` — returns the LangChain `CallbackHandler` list for the providers currently enabled via env vars (`LANGSMITH_TRACING`, `LANGFUSE_TRACING`, etc.). The handlers are attached at the **graph invocation root** for in-graph runs (`make_lead_agent` and `DeerFlowClient.stream` both append them to `config["callbacks"]` before invoking the graph) so a single run produces one trace with all node / LLM / tool calls as child spans. Standalone callers — anything that invokes a model outside such a graph (e.g. `MemoryUpdater`) — keep `create_chat_model`'s default `attach_tracing=True`, which falls back to model-level callback attachment.
- `metadata.py::build_langfuse_trace_metadata()` — builds the Langfuse-reserved trace attributes for `RunnableConfig.metadata`. The Langfuse v4 `langchain.CallbackHandler` lifts these onto the root trace (see its `_parse_langfuse_trace_attributes`), but only when it sees `on_chain_start(parent_run_id=None)` — which is why the callbacks have to live at the graph root, not the model.
**Trace-attribute injection points**: both `runtime/runs/worker.py::run_agent` (gateway path) and `client.py::DeerFlowClient.stream` (embedded path) merge the metadata into `config["metadata"]` right before constructing the graph. Caller-supplied keys win via `setdefault`, so an external `session_id` override is preserved. Field mapping:
| Langfuse field | Source |
|-----------------------|----------------------------------------------|
| `langfuse_session_id` | LangGraph `thread_id` |
| `langfuse_user_id` | `get_effective_user_id()` (`default` in no-auth) |
| `langfuse_trace_name` | `RunRecord.assistant_id` / client `agent_name` (defaults to `lead-agent`) |
| `langfuse_tags` | `env:<DEER_FLOW_ENV>` + `model:<model_name>` |
Returns `{}` when Langfuse is not in the enabled providers — LangSmith-only deployments are unaffected. Set `DEER_FLOW_ENV` (or `ENVIRONMENT`) to tag traces by deployment environment. Tests live in `tests/test_tracing_factory.py`, `tests/test_tracing_metadata.py`, `tests/test_worker_langfuse_metadata.py`, and `tests/test_client_langfuse_metadata.py`.
### Config Schema ### Config Schema
**`config.yaml`** key sections: **`config.yaml`** key sections:
@@ -1,3 +1,23 @@
"""Lead agent factory.
INVARIANT — tracing callback placement
======================================
Tracing callbacks (Langfuse, LangSmith) are attached at the **graph
invocation root** in :func:`_make_lead_agent` (see the
``build_tracing_callbacks()`` block that appends to ``config["callbacks"]``).
Every ``create_chat_model(...)`` call inside this module — and inside any
middleware reachable from this graph (e.g. ``TitleMiddleware``) — MUST pass
``attach_tracing=False``.
Forgetting that flag emits duplicate spans (one rooted at the graph, one at
the model) AND prevents the Langfuse handler's ``propagate_attributes``
path from firing, so ``session_id`` / ``user_id`` never reach the trace.
The four current sites are: bootstrap agent, default agent, summarization
middleware, and the async path inside ``TitleMiddleware``. Any new in-graph
``create_chat_model`` call must add to this list and pass the flag.
"""
import logging import logging
from langchain.agents import create_agent from langchain.agents import create_agent
@@ -22,6 +42,7 @@ from deerflow.config.app_config import AppConfig, get_app_config
from deerflow.models import create_chat_model from deerflow.models import create_chat_model
from deerflow.skills.tool_policy import filter_tools_by_skill_allowed_tools from deerflow.skills.tool_policy import filter_tools_by_skill_allowed_tools
from deerflow.skills.types import Skill from deerflow.skills.types import Skill
from deerflow.tracing import build_tracing_callbacks
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -73,10 +94,14 @@ def _create_summarization_middleware(*, app_config: AppConfig | None = None) ->
# Bind "middleware:summarize" tag so RunJournal identifies these LLM calls # Bind "middleware:summarize" tag so RunJournal identifies these LLM calls
# as middleware rather than lead_agent (SummarizationMiddleware is a # as middleware rather than lead_agent (SummarizationMiddleware is a
# LangChain built-in, so we tag the model at creation time). # LangChain built-in, so we tag the model at creation time).
# attach_tracing=False because the graph-level RunnableConfig (set in
# ``_make_lead_agent``) already carries tracing callbacks; binding them
# again at the model level would emit duplicate spans and break
# ``session_id`` / ``user_id`` propagation.
if config.model_name: if config.model_name:
model = create_chat_model(name=config.model_name, thinking_enabled=False, app_config=resolved_app_config) model = create_chat_model(name=config.model_name, thinking_enabled=False, app_config=resolved_app_config, attach_tracing=False)
else: else:
model = create_chat_model(thinking_enabled=False, app_config=resolved_app_config) model = create_chat_model(thinking_enabled=False, app_config=resolved_app_config, attach_tracing=False)
model = model.with_config(tags=["middleware:summarize"]) model = model.with_config(tags=["middleware:summarize"])
# Prepare kwargs # Prepare kwargs
@@ -408,13 +433,26 @@ def _make_lead_agent(config: RunnableConfig, *, app_config: AppConfig):
} }
) )
# Inject tracing callbacks at the graph invocation root so a single LangGraph
# run produces one trace with all node / LLM / tool calls as child spans,
# AND so the Langfuse handler sees ``on_chain_start(parent_run_id=None)`` and
# actually propagates ``langfuse_session_id`` / ``langfuse_user_id`` from
# ``config["metadata"]`` onto the trace. Without root-level attachment the
# model is a nested observation and the handler strips ``langfuse_*`` keys.
tracing_callbacks = build_tracing_callbacks()
if tracing_callbacks:
existing = config.get("callbacks") or []
if not isinstance(existing, list):
existing = list(existing)
config["callbacks"] = [*existing, *tracing_callbacks]
skills_for_tool_policy = _load_enabled_skills_for_tool_policy(available_skills, app_config=resolved_app_config) skills_for_tool_policy = _load_enabled_skills_for_tool_policy(available_skills, app_config=resolved_app_config)
if is_bootstrap: if is_bootstrap:
# Special bootstrap agent with minimal prompt for initial custom agent creation flow # Special bootstrap agent with minimal prompt for initial custom agent creation flow
tools = get_available_tools(model_name=model_name, subagent_enabled=subagent_enabled, app_config=resolved_app_config) + [setup_agent] tools = get_available_tools(model_name=model_name, subagent_enabled=subagent_enabled, app_config=resolved_app_config) + [setup_agent]
return create_agent( return create_agent(
model=create_chat_model(name=model_name, thinking_enabled=thinking_enabled, app_config=resolved_app_config), model=create_chat_model(name=model_name, thinking_enabled=thinking_enabled, app_config=resolved_app_config, attach_tracing=False),
tools=filter_tools_by_skill_allowed_tools(tools, skills_for_tool_policy), tools=filter_tools_by_skill_allowed_tools(tools, skills_for_tool_policy),
middleware=_build_middlewares(config, model_name=model_name, app_config=resolved_app_config), middleware=_build_middlewares(config, model_name=model_name, app_config=resolved_app_config),
system_prompt=apply_prompt_template( system_prompt=apply_prompt_template(
@@ -432,7 +470,7 @@ def _make_lead_agent(config: RunnableConfig, *, app_config: AppConfig):
# Default lead agent (unchanged behavior) # Default lead agent (unchanged behavior)
tools = get_available_tools(model_name=model_name, groups=agent_config.tool_groups if agent_config else None, subagent_enabled=subagent_enabled, app_config=resolved_app_config) tools = get_available_tools(model_name=model_name, groups=agent_config.tool_groups if agent_config else None, subagent_enabled=subagent_enabled, app_config=resolved_app_config)
return create_agent( return create_agent(
model=create_chat_model(name=model_name, thinking_enabled=thinking_enabled, reasoning_effort=reasoning_effort, app_config=resolved_app_config), model=create_chat_model(name=model_name, thinking_enabled=thinking_enabled, reasoning_effort=reasoning_effort, app_config=resolved_app_config, attach_tracing=False),
tools=filter_tools_by_skill_allowed_tools(tools + extra_tools, skills_for_tool_policy), tools=filter_tools_by_skill_allowed_tools(tools + extra_tools, skills_for_tool_policy),
middleware=_build_middlewares(config, model_name=model_name, agent_name=agent_name, app_config=resolved_app_config), middleware=_build_middlewares(config, model_name=model_name, agent_name=agent_name, app_config=resolved_app_config),
system_prompt=apply_prompt_template( system_prompt=apply_prompt_template(
@@ -160,7 +160,11 @@ class TitleMiddleware(AgentMiddleware[TitleMiddlewareState]):
prompt, user_msg = self._build_title_prompt(state) prompt, user_msg = self._build_title_prompt(state)
try: try:
model_kwargs = {"thinking_enabled": False} # attach_tracing=False because ``_get_runnable_config()`` inherits
# the graph-level RunnableConfig (set in ``_make_lead_agent``) whose
# callbacks already carry tracing handlers; binding them again at
# the model level would emit duplicate spans.
model_kwargs = {"thinking_enabled": False, "attach_tracing": False}
if self._app_config is not None: if self._app_config is not None:
model_kwargs["app_config"] = self._app_config model_kwargs["app_config"] = self._app_config
if config.model_name: if config.model_name:
+37 -1
View File
@@ -19,6 +19,7 @@ import asyncio
import json import json
import logging import logging
import mimetypes import mimetypes
import os
import shutil import shutil
import tempfile import tempfile
import uuid import uuid
@@ -42,6 +43,7 @@ from deerflow.config.paths import get_paths
from deerflow.models import create_chat_model from deerflow.models import create_chat_model
from deerflow.runtime.user_context import get_effective_user_id from deerflow.runtime.user_context import get_effective_user_id
from deerflow.skills.storage import get_or_new_skill_storage from deerflow.skills.storage import get_or_new_skill_storage
from deerflow.tracing import build_tracing_callbacks, inject_langfuse_metadata
from deerflow.uploads.manager import ( from deerflow.uploads.manager import (
claim_unique_filename, claim_unique_filename,
delete_file_safe, delete_file_safe,
@@ -123,6 +125,7 @@ class DeerFlowClient:
agent_name: str | None = None, agent_name: str | None = None,
available_skills: set[str] | None = None, available_skills: set[str] | None = None,
middlewares: Sequence[AgentMiddleware] | None = None, middlewares: Sequence[AgentMiddleware] | None = None,
environment: str | None = None,
): ):
"""Initialize the client. """Initialize the client.
@@ -140,6 +143,12 @@ class DeerFlowClient:
agent_name: Name of the agent to use. agent_name: Name of the agent to use.
available_skills: Optional set of skill names to make available. If None (default), all scanned skills are available. available_skills: Optional set of skill names to make available. If None (default), all scanned skills are available.
middlewares: Optional list of custom middlewares to inject into the agent. middlewares: Optional list of custom middlewares to inject into the agent.
environment: Deployment environment label that ends up in
``langfuse_tags`` (e.g. ``"production"`` / ``"staging"``).
When ``None`` the worker/client falls back to the
``DEER_FLOW_ENV`` or ``ENVIRONMENT`` env vars. Pass an
explicit value for programmatic callers that do not want
env-var coupling.
""" """
if config_path is not None: if config_path is not None:
reload_app_config(config_path) reload_app_config(config_path)
@@ -156,6 +165,7 @@ class DeerFlowClient:
self._agent_name = agent_name self._agent_name = agent_name
self._available_skills = set(available_skills) if available_skills is not None else None self._available_skills = set(available_skills) if available_skills is not None else None
self._middlewares = list(middlewares) if middlewares else [] self._middlewares = list(middlewares) if middlewares else []
self._environment = environment
# Lazy agent — created on first call, recreated when config changes. # Lazy agent — created on first call, recreated when config changes.
self._agent = None self._agent = None
@@ -228,7 +238,11 @@ class DeerFlowClient:
max_concurrent_subagents = cfg.get("max_concurrent_subagents", 3) max_concurrent_subagents = cfg.get("max_concurrent_subagents", 3)
kwargs: dict[str, Any] = { kwargs: dict[str, Any] = {
"model": create_chat_model(name=model_name, thinking_enabled=thinking_enabled), # attach_tracing=False because ``stream()`` injects tracing
# callbacks at the graph invocation root so a single embedded run
# produces one trace with correct session_id / user_id propagation.
# Attaching them again on the model would emit duplicate spans.
"model": create_chat_model(name=model_name, thinking_enabled=thinking_enabled, attach_tracing=False),
"tools": self._get_tools(model_name=model_name, subagent_enabled=subagent_enabled), "tools": self._get_tools(model_name=model_name, subagent_enabled=subagent_enabled),
"middleware": _build_middlewares(config, model_name=model_name, agent_name=self._agent_name, custom_middlewares=self._middlewares), "middleware": _build_middlewares(config, model_name=model_name, agent_name=self._agent_name, custom_middlewares=self._middlewares),
"system_prompt": apply_prompt_template( "system_prompt": apply_prompt_template(
@@ -571,6 +585,28 @@ class DeerFlowClient:
thread_id = str(uuid.uuid4()) thread_id = str(uuid.uuid4())
config = self._get_runnable_config(thread_id, **kwargs) config = self._get_runnable_config(thread_id, **kwargs)
# Inject tracing callbacks and Langfuse trace metadata at the graph
# invocation root so the embedded client matches the gateway worker's
# behaviour: a single ``stream()`` produces one trace with all node /
# LLM / tool calls nested under it, and the trace carries the reserved
# ``langfuse_session_id`` / ``langfuse_user_id`` keys that the Langfuse
# CallbackHandler lifts onto the root trace's ``sessionId`` / ``userId``.
tracing_callbacks = build_tracing_callbacks()
if tracing_callbacks:
existing_callbacks = list(config.get("callbacks") or [])
config["callbacks"] = [*existing_callbacks, *tracing_callbacks]
configurable = config.get("configurable") or {}
inject_langfuse_metadata(
config,
thread_id=thread_id,
user_id=get_effective_user_id(),
assistant_id=self._agent_name or "lead-agent",
model_name=configurable.get("model_name") or self._model_name,
environment=self._environment or os.environ.get("DEER_FLOW_ENV") or os.environ.get("ENVIRONMENT"),
)
self._ensure_agent(config) self._ensure_agent(config)
state: dict[str, Any] = {"messages": [HumanMessage(content=message)]} state: dict[str, Any] = {"messages": [HumanMessage(content=message)]}
@@ -51,3 +51,16 @@ def load_title_config_from_dict(config_dict: dict) -> None:
"""Load title configuration from a dictionary.""" """Load title configuration from a dictionary."""
global _title_config global _title_config
_title_config = TitleConfig(**config_dict) _title_config = TitleConfig(**config_dict)
def reset_title_config() -> None:
"""Restore the title configuration to its pristine ``TitleConfig()`` default.
Public API so that tests do not have to reach into the private
``_title_config`` module attribute. ``AppConfig.from_file()`` calls
:func:`load_title_config_from_dict`, which permanently mutates the
singleton; tests that need a clean slate between cases should call
this between tests.
"""
global _title_config
_title_config = TitleConfig()
@@ -147,3 +147,15 @@ def validate_enabled_tracing_providers() -> None:
def is_tracing_enabled() -> bool: def is_tracing_enabled() -> bool:
"""Check if any tracing provider is enabled and fully configured.""" """Check if any tracing provider is enabled and fully configured."""
return get_tracing_config().is_configured return get_tracing_config().is_configured
def reset_tracing_config() -> None:
"""Discard the cached :class:`TracingConfig` so the next call rebuilds it.
Public API so that tests do not have to reach into the private
``_tracing_config`` module attribute. A future internal rename would
silently break callers that mutate the attribute directly.
"""
global _tracing_config
with _config_lock:
_tracing_config = None
@@ -47,11 +47,24 @@ def _enable_stream_usage_by_default(model_use_path: str, model_settings_from_con
model_settings_from_config["stream_usage"] = True model_settings_from_config["stream_usage"] = True
def create_chat_model(name: str | None = None, thinking_enabled: bool = False, *, app_config: AppConfig | None = None, **kwargs) -> BaseChatModel: def create_chat_model(name: str | None = None, thinking_enabled: bool = False, *, app_config: AppConfig | None = None, attach_tracing: bool = True, **kwargs) -> BaseChatModel:
"""Create a chat model instance from the config. """Create a chat model instance from the config.
Args: Args:
name: The name of the model to create. If None, the first model in the config will be used. name: The name of the model to create. If None, the first model in the config will be used.
thinking_enabled: Enable the model's extended-thinking mode when supported.
app_config: Explicit application config; falls back to the cached global if omitted.
attach_tracing: When True (default), attach tracing callbacks (Langfuse,
LangSmith) directly to the model instance. Standalone callers — anything
that invokes the model outside a LangGraph run that already wires tracing
at the invocation root (``MemoryUpdater``, ad-hoc utilities, etc.) — keep
this default so the model-level callback still produces traces. Callers
that already attach tracing at the graph root (``make_lead_agent``, the
in-graph ``TitleMiddleware``) MUST pass ``attach_tracing=False``; otherwise
the same LLM call emits duplicate spans (one rooted at the graph, one at
the model) and ``session_id`` / ``user_id`` metadata never reach the trace
because the model becomes a nested observation whose ``langfuse_*`` keys
get stripped.
Returns: Returns:
A chat model instance. A chat model instance.
@@ -149,9 +162,10 @@ def create_chat_model(name: str | None = None, thinking_enabled: bool = False, *
model_instance = model_class(**kwargs, **model_settings_from_config) model_instance = model_class(**kwargs, **model_settings_from_config)
callbacks = build_tracing_callbacks() if attach_tracing:
if callbacks: callbacks = build_tracing_callbacks()
existing_callbacks = model_instance.callbacks or [] if callbacks:
model_instance.callbacks = [*existing_callbacks, *callbacks] existing_callbacks = model_instance.callbacks or []
logger.debug(f"Tracing attached to model '{name}' with providers={len(callbacks)}") model_instance.callbacks = [*existing_callbacks, *callbacks]
logger.debug(f"Tracing attached to model '{name}' with providers={len(callbacks)}")
return model_instance return model_instance
@@ -19,6 +19,7 @@ import asyncio
import copy import copy
import inspect import inspect
import logging import logging
import os
from dataclasses import dataclass, field from dataclasses import dataclass, field
from functools import lru_cache from functools import lru_cache
from typing import TYPE_CHECKING, Any, Literal, cast from typing import TYPE_CHECKING, Any, Literal, cast
@@ -31,6 +32,8 @@ if TYPE_CHECKING:
from deerflow.config.app_config import AppConfig from deerflow.config.app_config import AppConfig
from deerflow.runtime.serialization import serialize from deerflow.runtime.serialization import serialize
from deerflow.runtime.stream_bridge import StreamBridge from deerflow.runtime.stream_bridge import StreamBridge
from deerflow.runtime.user_context import get_effective_user_id
from deerflow.tracing import inject_langfuse_metadata
from .manager import RunManager, RunRecord from .manager import RunManager, RunRecord
from .naming import resolve_root_run_name from .naming import resolve_root_run_name
@@ -225,6 +228,19 @@ async def run_agent(
if journal is not None: if journal is not None:
config.setdefault("callbacks", []).append(journal) config.setdefault("callbacks", []).append(journal)
# Inject Langfuse trace-attribute metadata so the langchain CallbackHandler
# can lift session_id / user_id / trace_name / tags onto the root trace.
# Shared helper with ``DeerFlowClient.stream`` so both entry points stay
# in sync; caller-provided metadata wins via setdefault inside the helper.
inject_langfuse_metadata(
config,
thread_id=thread_id,
user_id=get_effective_user_id(),
assistant_id=record.assistant_id,
model_name=record.model_name,
environment=os.environ.get("DEER_FLOW_ENV") or os.environ.get("ENVIRONMENT"),
)
# Resolve after runtime context installation so context/configurable reflect # Resolve after runtime context installation so context/configurable reflect
# the agent name that this run will actually execute. # the agent name that this run will actually execute.
config.setdefault("run_name", resolve_root_run_name(config, record.assistant_id)) config.setdefault("run_name", resolve_root_run_name(config, record.assistant_id))
@@ -1,3 +1,8 @@
from .factory import build_tracing_callbacks from .factory import build_tracing_callbacks
from .metadata import build_langfuse_trace_metadata, inject_langfuse_metadata
__all__ = ["build_tracing_callbacks"] __all__ = [
"build_langfuse_trace_metadata",
"build_tracing_callbacks",
"inject_langfuse_metadata",
]
@@ -0,0 +1,105 @@
"""Langfuse trace-attribute metadata builders.
The Langfuse v4 ``langchain.CallbackHandler`` lifts a fixed set of reserved
keys from ``RunnableConfig.metadata`` onto the root trace:
- ``langfuse_session_id`` → groups traces (LangGraph thread → Langfuse Session)
- ``langfuse_user_id`` → trace user_id (powers the Users page)
- ``langfuse_trace_name`` → human-readable trace name
- ``langfuse_tags`` → trace tags
See ``langfuse/langchain/CallbackHandler.py::_parse_langfuse_trace_attributes``
and https://langfuse.com/docs/observability/features/sessions for the
contract. Builders here exist so the gateway/run worker can inject the
right metadata without leaking Langfuse internals into the call sites.
"""
from __future__ import annotations
from typing import Any
from deerflow.config import get_enabled_tracing_providers
# Lazy-imported below to avoid a circular import: ``deerflow.runtime`` eagerly
# imports the run worker, which in turn needs ``deerflow.tracing``.
_DEFAULT_TRACE_NAME = "lead-agent"
def build_langfuse_trace_metadata(
*,
thread_id: str | None,
user_id: str | None = None,
assistant_id: str | None = None,
model_name: str | None = None,
environment: str | None = None,
) -> dict[str, Any]:
"""Return Langfuse trace-attribute metadata for ``RunnableConfig.metadata``.
Returns ``{}`` when Langfuse is not in the enabled tracing providers so
callers can unconditionally merge the result without affecting LangSmith
or other tracers.
Args:
thread_id: LangGraph thread id; mapped to ``langfuse_session_id``.
user_id: Effective user id; falls back to ``DEFAULT_USER_ID`` when
``None`` so the Langfuse Users page works in no-auth mode.
assistant_id: Optional agent identifier; defaults to ``"lead-agent"``.
model_name: Model name; emitted as ``model:<name>`` in ``langfuse_tags``.
environment: Deployment env (e.g. ``"production"``); emitted as
``env:<value>`` in ``langfuse_tags``.
"""
if "langfuse" not in get_enabled_tracing_providers():
return {}
from deerflow.runtime.user_context import DEFAULT_USER_ID
metadata: dict[str, Any] = {
"langfuse_session_id": thread_id,
"langfuse_user_id": user_id or DEFAULT_USER_ID,
"langfuse_trace_name": assistant_id or _DEFAULT_TRACE_NAME,
}
tags: list[str] = []
if environment:
tags.append(f"env:{environment}")
if model_name:
tags.append(f"model:{model_name}")
if tags:
metadata["langfuse_tags"] = tags
return metadata
def inject_langfuse_metadata(
config: dict,
*,
thread_id: str | None,
user_id: str | None = None,
assistant_id: str | None = None,
model_name: str | None = None,
environment: str | None = None,
) -> None:
"""Merge Langfuse trace-attribute metadata into ``config["metadata"]``.
Shared by the gateway worker (``runtime/runs/worker.py``) and the
embedded client (``client.py``) so the two paths cannot drift apart.
Caller-supplied metadata wins via ``setdefault`` — an upstream value
for e.g. ``langfuse_session_id`` set by the frontend stays untouched.
The ``config`` dict is mutated in place; the call is a no-op when
Langfuse is not in the enabled tracing providers.
"""
langfuse_metadata = build_langfuse_trace_metadata(
thread_id=thread_id,
user_id=user_id,
assistant_id=assistant_id,
model_name=model_name,
environment=environment,
)
if not langfuse_metadata:
return
merged_metadata = dict(config.get("metadata") or {})
for key, value in langfuse_metadata.items():
merged_metadata.setdefault(key, value)
config["metadata"] = merged_metadata
+25
View File
@@ -176,6 +176,31 @@ def _reset_skill_storage_singleton():
reset_skill_storage() reset_skill_storage()
@pytest.fixture(autouse=True)
def _restore_title_config_singleton():
"""Reset ``_title_config`` to its pristine default after every test.
``AppConfig.from_file()`` writes the on-disk ``title`` block into the
module-level singleton (``config/app_config.py`` calls
``load_title_config_from_dict``). Any test that loads the real
``config.yaml`` therefore leaves the singleton in a state that
``test_title_middleware_core_logic.py`` does not expect; that suite
relies on the pristine ``TitleConfig()`` default (``enabled=True``).
We restore the default after every test so test files stay
independent regardless of order.
"""
try:
from deerflow.config.title_config import reset_title_config
except ImportError:
yield
return
try:
yield
finally:
reset_title_config()
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
def _auto_user_context(request): def _auto_user_context(request):
"""Inject a default ``test-user-autouse`` into the contextvar. """Inject a default ``test-user-autouse`` into the contextvar.
@@ -0,0 +1,159 @@
"""Tests for DeerFlowClient's graph-root tracing wiring.
Regression coverage for the Copilot review on PR #2944: when the title
and summarization middlewares request ``attach_tracing=False`` we must
make sure ``DeerFlowClient`` injects the tracing callbacks at the graph
invocation root instead, otherwise those middlewares produce untraced
LLM calls.
"""
from __future__ import annotations
from types import SimpleNamespace
from typing import Any
import pytest
from deerflow.client import DeerFlowClient
class _FakeAgent:
"""Capture the ``config`` handed to ``agent.stream``."""
def __init__(self) -> None:
self.captured_config: dict | None = None
self.checkpointer = None
self.store = None
def stream(self, state, *, config, context, stream_mode):
self.captured_config = config
return iter(()) # empty stream
@pytest.fixture(autouse=True)
def _clear_langfuse_env(monkeypatch):
from deerflow.config.tracing_config import reset_tracing_config
for name in ("LANGFUSE_TRACING", "LANGFUSE_PUBLIC_KEY", "LANGFUSE_SECRET_KEY", "LANGFUSE_BASE_URL"):
monkeypatch.delenv(name, raising=False)
reset_tracing_config()
yield
reset_tracing_config()
def _stub_agent_creation(monkeypatch, fake_agent: _FakeAgent) -> dict[str, Any]:
"""Short-circuit the heavy parts of ``_ensure_agent`` so we can drive
``stream()`` against a fake graph without touching real models, tools
or middleware factories.
"""
captured: dict[str, Any] = {}
def _stub_ensure_agent(self, config):
captured["config"] = config
self._agent = fake_agent
self._agent_config_key = ("stub",)
monkeypatch.setattr(DeerFlowClient, "_ensure_agent", _stub_ensure_agent)
return captured
def _make_client(_monkeypatch) -> DeerFlowClient:
"""Build a client without going through ``__init__`` so we never load
config.yaml or perform any other side-effectful startup work."""
fake_app_config = SimpleNamespace(models=[SimpleNamespace(name="stub-model")])
client = DeerFlowClient.__new__(DeerFlowClient)
client._app_config = fake_app_config
client._extensions_config = None
client._model_name = "stub-model"
client._thinking_enabled = False
client._plan_mode = False
client._subagent_enabled = False
client._agent_name = None
client._available_skills = None
client._middlewares = None
client._checkpointer = None
client._agent = None
client._agent_config_key = None
client._environment = None
return client
def test_stream_injects_langfuse_metadata_when_enabled(monkeypatch):
monkeypatch.setenv("LANGFUSE_TRACING", "true")
monkeypatch.setenv("LANGFUSE_PUBLIC_KEY", "pk-lf-test")
monkeypatch.setenv("LANGFUSE_SECRET_KEY", "sk-lf-test")
from deerflow.config.tracing_config import reset_tracing_config
reset_tracing_config()
class _SentinelHandler:
pass
sentinel = _SentinelHandler()
monkeypatch.setattr("deerflow.client.build_tracing_callbacks", lambda: [sentinel])
fake_agent = _FakeAgent()
captured = _stub_agent_creation(monkeypatch, fake_agent)
client = _make_client(monkeypatch)
list(client.stream("hi", thread_id="thread-client-1"))
config = captured["config"]
metadata = config.get("metadata") or {}
assert metadata.get("langfuse_session_id") == "thread-client-1"
assert metadata.get("langfuse_trace_name") == "lead-agent"
# Default no-auth context falls back to ``"default"`` user.
assert metadata.get("langfuse_user_id") in {"default", "test-user-autouse"}
callbacks = config.get("callbacks") or []
assert sentinel in callbacks
def test_stream_is_inert_when_langfuse_disabled(monkeypatch):
monkeypatch.setattr("deerflow.client.build_tracing_callbacks", lambda: [])
fake_agent = _FakeAgent()
captured = _stub_agent_creation(monkeypatch, fake_agent)
client = _make_client(monkeypatch)
list(client.stream("hi", thread_id="thread-client-2"))
config = captured["config"]
assert "callbacks" not in config or not config["callbacks"]
metadata = config.get("metadata") or {}
assert "langfuse_session_id" not in metadata
assert "langfuse_user_id" not in metadata
def test_stream_preserves_caller_metadata_overrides(monkeypatch):
monkeypatch.setenv("LANGFUSE_TRACING", "true")
monkeypatch.setenv("LANGFUSE_PUBLIC_KEY", "pk-lf-test")
monkeypatch.setenv("LANGFUSE_SECRET_KEY", "sk-lf-test")
from deerflow.config.tracing_config import reset_tracing_config
reset_tracing_config()
monkeypatch.setattr("deerflow.client.build_tracing_callbacks", lambda: [])
fake_agent = _FakeAgent()
captured = _stub_agent_creation(monkeypatch, fake_agent)
client = _make_client(monkeypatch)
# Drive stream with a pre-populated metadata so the worker-equivalent
# ``setdefault`` semantics are exercised.
original_get_config = DeerFlowClient._get_runnable_config
def patched_get_runnable_config(self, thread_id, **overrides):
cfg = original_get_config(self, thread_id, **overrides)
cfg["metadata"] = {
"langfuse_session_id": "explicit-session-override",
"langfuse_user_id": "explicit-user",
}
return cfg
monkeypatch.setattr(DeerFlowClient, "_get_runnable_config", patched_get_runnable_config)
list(client.stream("hi", thread_id="thread-client-3"))
metadata = captured["config"].get("metadata") or {}
assert metadata["langfuse_session_id"] == "explicit-session-override"
assert metadata["langfuse_user_id"] == "explicit-user"
# ``trace_name`` was not supplied by caller so the worker still fills it.
assert metadata["langfuse_trace_name"] == "lead-agent"
@@ -41,6 +41,49 @@ def test_make_lead_agent_signature_matches_langgraph_server_factory_abi():
assert list(inspect.signature(lead_agent_module.make_lead_agent).parameters) == ["config"] assert list(inspect.signature(lead_agent_module.make_lead_agent).parameters) == ["config"]
def test_make_lead_agent_attaches_tracing_callbacks_at_graph_root(monkeypatch):
"""Regression guard: tracing handlers must be appended to
``config["callbacks"]`` (graph invocation root), and every in-graph
``create_chat_model`` call must pass ``attach_tracing=False``.
Catches future contributors who forget the flag when adding new
in-graph model creation, which would silently produce duplicate
spans and break Langfuse session/user propagation.
"""
app_config = _make_app_config([_make_model("safe-model", supports_thinking=False)])
import deerflow.tools as tools_module
monkeypatch.setattr(lead_agent_module, "get_app_config", lambda: app_config)
monkeypatch.setattr(tools_module, "get_available_tools", lambda **kwargs: [])
monkeypatch.setattr(lead_agent_module, "_build_middlewares", lambda config, model_name, agent_name=None, **kwargs: [])
sentinel_handler = object()
monkeypatch.setattr(lead_agent_module, "build_tracing_callbacks", lambda: [sentinel_handler])
seen_attach_tracing: list[bool] = []
def _fake_create_chat_model(*, name, thinking_enabled, reasoning_effort=None, app_config=None, attach_tracing=True):
seen_attach_tracing.append(attach_tracing)
return object()
monkeypatch.setattr(lead_agent_module, "create_chat_model", _fake_create_chat_model)
monkeypatch.setattr(lead_agent_module, "create_agent", lambda **kwargs: kwargs)
config: dict = {"configurable": {"model_name": "safe-model"}}
lead_agent_module._make_lead_agent(config, app_config=app_config)
# Handler must land on the graph invocation config so the Langfuse
# CallbackHandler fires ``on_chain_start(parent_run_id=None)`` and
# propagates ``session_id`` / ``user_id`` onto the trace.
assert sentinel_handler in (config.get("callbacks") or []), "build_tracing_callbacks output must be appended to config['callbacks']"
# Every in-graph create_chat_model call must opt out of model-level
# tracing to avoid duplicate spans.
assert seen_attach_tracing, "_make_lead_agent did not call create_chat_model"
assert all(flag is False for flag in seen_attach_tracing), f"in-graph create_chat_model must pass attach_tracing=False; got {seen_attach_tracing}"
def test_internal_make_lead_agent_uses_explicit_app_config(monkeypatch): def test_internal_make_lead_agent_uses_explicit_app_config(monkeypatch):
app_config = _make_app_config([_make_model("explicit-model", supports_thinking=False)]) app_config = _make_app_config([_make_model("explicit-model", supports_thinking=False)])
@@ -55,7 +98,7 @@ def test_internal_make_lead_agent_uses_explicit_app_config(monkeypatch):
captured: dict[str, object] = {} captured: dict[str, object] = {}
def _fake_create_chat_model(*, name, thinking_enabled, reasoning_effort=None, app_config=None): def _fake_create_chat_model(*, name, thinking_enabled, reasoning_effort=None, app_config=None, attach_tracing=True):
captured["name"] = name captured["name"] = name
captured["app_config"] = app_config captured["app_config"] = app_config
return object() return object()
@@ -89,7 +132,7 @@ def test_make_lead_agent_uses_runtime_app_config_from_context_without_global_rea
captured: dict[str, object] = {} captured: dict[str, object] = {}
def _fake_create_chat_model(*, name, thinking_enabled, reasoning_effort=None, app_config=None): def _fake_create_chat_model(*, name, thinking_enabled, reasoning_effort=None, app_config=None, attach_tracing=True):
captured["name"] = name captured["name"] = name
captured["app_config"] = app_config captured["app_config"] = app_config
return object() return object()
@@ -168,7 +211,7 @@ def test_make_lead_agent_disables_thinking_when_model_does_not_support_it(monkey
captured: dict[str, object] = {} captured: dict[str, object] = {}
def _fake_create_chat_model(*, name, thinking_enabled, reasoning_effort=None, app_config=None): def _fake_create_chat_model(*, name, thinking_enabled, reasoning_effort=None, app_config=None, attach_tracing=True):
captured["name"] = name captured["name"] = name
captured["thinking_enabled"] = thinking_enabled captured["thinking_enabled"] = thinking_enabled
captured["reasoning_effort"] = reasoning_effort captured["reasoning_effort"] = reasoning_effort
@@ -212,7 +255,7 @@ def test_make_lead_agent_reads_runtime_options_from_context(monkeypatch):
captured: dict[str, object] = {} captured: dict[str, object] = {}
def _fake_create_chat_model(*, name, thinking_enabled, reasoning_effort=None, app_config=None): def _fake_create_chat_model(*, name, thinking_enabled, reasoning_effort=None, app_config=None, attach_tracing=True):
captured["name"] = name captured["name"] = name
captured["thinking_enabled"] = thinking_enabled captured["thinking_enabled"] = thinking_enabled
captured["reasoning_effort"] = reasoning_effort captured["reasoning_effort"] = reasoning_effort
@@ -407,7 +450,7 @@ def test_create_summarization_middleware_uses_configured_model_alias(monkeypatch
fake_model = MagicMock() fake_model = MagicMock()
fake_model.with_config.return_value = fake_model fake_model.with_config.return_value = fake_model
def _fake_create_chat_model(*, name=None, thinking_enabled, reasoning_effort=None, app_config=None): def _fake_create_chat_model(*, name=None, thinking_enabled, reasoning_effort=None, app_config=None, attach_tracing=True):
captured["name"] = name captured["name"] = name
captured["thinking_enabled"] = thinking_enabled captured["thinking_enabled"] = thinking_enabled
captured["reasoning_effort"] = reasoning_effort captured["reasoning_effort"] = reasoning_effort
@@ -441,7 +484,7 @@ def test_create_summarization_middleware_threads_resolved_app_config_to_model(mo
fake_model = MagicMock() fake_model = MagicMock()
fake_model.with_config.return_value = fake_model fake_model.with_config.return_value = fake_model
def _fake_create_chat_model(*, name=None, thinking_enabled, reasoning_effort=None, app_config=None): def _fake_create_chat_model(*, name=None, thinking_enabled, reasoning_effort=None, app_config=None, attach_tracing=True):
captured["app_config"] = app_config captured["app_config"] = app_config
return fake_model return fake_model
@@ -109,7 +109,7 @@ class TestTitleMiddlewareCoreLogic:
title = result["title"] title = result["title"]
assert title == "短标题" assert title == "短标题"
title_middleware_module.create_chat_model.assert_called_once_with(thinking_enabled=False) title_middleware_module.create_chat_model.assert_called_once_with(thinking_enabled=False, attach_tracing=False)
model.ainvoke.assert_awaited_once() model.ainvoke.assert_awaited_once()
assert model.ainvoke.await_args.kwargs["config"] == { assert model.ainvoke.await_args.kwargs["config"] == {
"run_name": "title_agent", "run_name": "title_agent",
@@ -141,6 +141,7 @@ class TestTitleMiddlewareCoreLogic:
title_middleware_module.create_chat_model.assert_called_once_with( title_middleware_module.create_chat_model.assert_called_once_with(
name="title-model", name="title-model",
thinking_enabled=False, thinking_enabled=False,
attach_tracing=False,
app_config=app_config, app_config=app_config,
) )
+2 -1
View File
@@ -5,10 +5,11 @@ from __future__ import annotations
import pytest import pytest
from deerflow.config import tracing_config as tracing_module from deerflow.config import tracing_config as tracing_module
from deerflow.config.tracing_config import reset_tracing_config
def _reset_tracing_cache() -> None: def _reset_tracing_cache() -> None:
tracing_module._tracing_config = None reset_tracing_config()
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
+5 -5
View File
@@ -12,7 +12,7 @@ from deerflow.tracing import factory as tracing_factory
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
def clear_tracing_env(monkeypatch): def clear_tracing_env(monkeypatch):
from deerflow.config import tracing_config as tracing_module from deerflow.config.tracing_config import reset_tracing_config
for name in ( for name in (
"LANGSMITH_TRACING", "LANGSMITH_TRACING",
@@ -30,9 +30,9 @@ def clear_tracing_env(monkeypatch):
"LANGFUSE_BASE_URL", "LANGFUSE_BASE_URL",
): ):
monkeypatch.delenv(name, raising=False) monkeypatch.delenv(name, raising=False)
tracing_module._tracing_config = None reset_tracing_config()
yield yield
tracing_module._tracing_config = None reset_tracing_config()
def test_build_tracing_callbacks_returns_empty_list_when_disabled(monkeypatch): def test_build_tracing_callbacks_returns_empty_list_when_disabled(monkeypatch):
@@ -114,12 +114,12 @@ def test_build_tracing_callbacks_raises_when_enabled_provider_fails(monkeypatch)
def test_build_tracing_callbacks_raises_for_explicitly_enabled_misconfigured_provider(monkeypatch): def test_build_tracing_callbacks_raises_for_explicitly_enabled_misconfigured_provider(monkeypatch):
from deerflow.config import tracing_config as tracing_module from deerflow.config.tracing_config import reset_tracing_config
monkeypatch.setenv("LANGFUSE_TRACING", "true") monkeypatch.setenv("LANGFUSE_TRACING", "true")
monkeypatch.delenv("LANGFUSE_PUBLIC_KEY", raising=False) monkeypatch.delenv("LANGFUSE_PUBLIC_KEY", raising=False)
monkeypatch.setenv("LANGFUSE_SECRET_KEY", "sk-lf-test") monkeypatch.setenv("LANGFUSE_SECRET_KEY", "sk-lf-test")
tracing_module._tracing_config = None reset_tracing_config()
with pytest.raises(ValueError, match="LANGFUSE_PUBLIC_KEY"): with pytest.raises(ValueError, match="LANGFUSE_PUBLIC_KEY"):
tracing_factory.build_tracing_callbacks() tracing_factory.build_tracing_callbacks()
+137
View File
@@ -0,0 +1,137 @@
"""Tests for deerflow.tracing.metadata.build_langfuse_trace_metadata."""
from __future__ import annotations
import pytest
from deerflow.tracing import metadata as tracing_metadata
@pytest.fixture(autouse=True)
def _clear_tracing_env(monkeypatch):
from deerflow.config.tracing_config import reset_tracing_config
for name in (
"LANGFUSE_TRACING",
"LANGFUSE_PUBLIC_KEY",
"LANGFUSE_SECRET_KEY",
"LANGFUSE_BASE_URL",
"LANGSMITH_TRACING",
"LANGCHAIN_TRACING_V2",
"LANGCHAIN_TRACING",
"LANGSMITH_API_KEY",
"LANGCHAIN_API_KEY",
):
monkeypatch.delenv(name, raising=False)
reset_tracing_config()
yield
reset_tracing_config()
def _enable_langfuse(monkeypatch):
monkeypatch.setenv("LANGFUSE_TRACING", "true")
monkeypatch.setenv("LANGFUSE_PUBLIC_KEY", "pk-lf-test")
monkeypatch.setenv("LANGFUSE_SECRET_KEY", "sk-lf-test")
def test_returns_empty_when_langfuse_disabled(monkeypatch):
# No env vars set → langfuse not in enabled providers.
result = tracing_metadata.build_langfuse_trace_metadata(
thread_id="t-1",
user_id="u-1",
assistant_id="lead-agent",
model_name="gpt-4o",
)
assert result == {}
def test_session_id_maps_to_thread_id(monkeypatch):
_enable_langfuse(monkeypatch)
result = tracing_metadata.build_langfuse_trace_metadata(
thread_id="thread-abc",
user_id="user-42",
)
assert result["langfuse_session_id"] == "thread-abc"
def test_user_id_falls_back_to_default(monkeypatch):
_enable_langfuse(monkeypatch)
result = tracing_metadata.build_langfuse_trace_metadata(
thread_id="thread-abc",
user_id=None,
)
assert result["langfuse_user_id"] == "default"
def test_user_id_explicit_value_wins(monkeypatch):
_enable_langfuse(monkeypatch)
result = tracing_metadata.build_langfuse_trace_metadata(
thread_id="thread-abc",
user_id="alice@example.com",
)
assert result["langfuse_user_id"] == "alice@example.com"
def test_trace_name_uses_assistant_id_when_provided(monkeypatch):
_enable_langfuse(monkeypatch)
result = tracing_metadata.build_langfuse_trace_metadata(
thread_id="t",
assistant_id="custom-agent",
)
assert result["langfuse_trace_name"] == "custom-agent"
def test_trace_name_defaults_to_lead_agent(monkeypatch):
_enable_langfuse(monkeypatch)
result = tracing_metadata.build_langfuse_trace_metadata(
thread_id="t",
assistant_id=None,
)
assert result["langfuse_trace_name"] == "lead-agent"
def test_tags_include_env_and_model(monkeypatch):
_enable_langfuse(monkeypatch)
result = tracing_metadata.build_langfuse_trace_metadata(
thread_id="t",
environment="production",
model_name="gpt-4o",
)
assert result["langfuse_tags"] == ["env:production", "model:gpt-4o"]
def test_tags_omitted_when_no_tag_inputs(monkeypatch):
_enable_langfuse(monkeypatch)
result = tracing_metadata.build_langfuse_trace_metadata(
thread_id="t",
user_id="u",
)
assert "langfuse_tags" not in result
def test_thread_id_none_still_produces_metadata(monkeypatch):
# Stateless run paths may not have a thread_id — we still want
# user_id / trace_name to flow through so Users page works.
_enable_langfuse(monkeypatch)
result = tracing_metadata.build_langfuse_trace_metadata(
thread_id=None,
user_id="u-1",
)
assert result["langfuse_session_id"] is None
assert result["langfuse_user_id"] == "u-1"
@@ -0,0 +1,248 @@
"""Integration test: worker.run_agent injects Langfuse trace metadata.
Verifies that the agent factory's resulting graph receives a
``RunnableConfig`` whose ``metadata`` carries the Langfuse reserved keys
(``langfuse_session_id`` / ``langfuse_user_id`` / ``langfuse_trace_name``).
"""
from __future__ import annotations
import asyncio
import pytest
from deerflow.runtime.runs.manager import RunRecord
from deerflow.runtime.runs.schemas import DisconnectMode, RunStatus
from deerflow.runtime.runs.worker import RunContext, run_agent
class _FakeAgent:
"""Minimal LangGraph-like graph that captures the runnable config."""
def __init__(self) -> None:
self.captured_config: dict | None = None
self.metadata: dict = {}
# Worker may assign these attributes; need them to exist.
self.checkpointer = None
self.store = None
self.interrupt_before_nodes: list[str] = []
self.interrupt_after_nodes: list[str] = []
async def astream(self, graph_input, *, config, stream_mode, **kwargs):
self.captured_config = config
# Empty async generator — no chunks produced.
return
yield # pragma: no cover (makes this an async generator)
class _FakeRunManager:
async def set_status(self, *_args, **_kwargs) -> None:
return None
async def update_model_name(self, *_args, **_kwargs) -> None:
return None
async def update_run_completion(self, *_args, **_kwargs) -> None:
return None
class _FakeBridge:
def __init__(self) -> None:
self.events: list[tuple[str, object]] = []
async def publish(self, _run_id, event, payload) -> None:
self.events.append((event, payload))
async def publish_end(self, _run_id) -> None:
self.events.append(("end", None))
async def cleanup(self, _run_id, *, delay: int = 0) -> None:
return None
@pytest.fixture(autouse=True)
def _clear_tracing_env(monkeypatch):
from deerflow.config.tracing_config import reset_tracing_config
for name in ("LANGFUSE_TRACING", "LANGFUSE_PUBLIC_KEY", "LANGFUSE_SECRET_KEY", "LANGFUSE_BASE_URL"):
monkeypatch.delenv(name, raising=False)
reset_tracing_config()
yield
reset_tracing_config()
@pytest.mark.asyncio
async def test_run_agent_injects_langfuse_metadata(monkeypatch):
monkeypatch.setenv("LANGFUSE_TRACING", "true")
monkeypatch.setenv("LANGFUSE_PUBLIC_KEY", "pk-lf-test")
monkeypatch.setenv("LANGFUSE_SECRET_KEY", "sk-lf-test")
from deerflow.config.tracing_config import reset_tracing_config
reset_tracing_config()
fake_agent = _FakeAgent()
def agent_factory(config):
return fake_agent
record = RunRecord(
run_id="run-1",
thread_id="thread-xyz",
assistant_id="lead-agent",
status=RunStatus.pending,
on_disconnect=DisconnectMode.cancel,
model_name="gpt-4o",
)
record.abort_event = asyncio.Event()
ctx = RunContext(checkpointer=None)
await run_agent(
_FakeBridge(),
_FakeRunManager(),
record,
ctx=ctx,
agent_factory=agent_factory,
graph_input={"messages": []},
config={"configurable": {"thread_id": "thread-xyz"}},
)
assert fake_agent.captured_config is not None, "astream was not invoked"
metadata = fake_agent.captured_config.get("metadata") or {}
assert metadata.get("langfuse_session_id") == "thread-xyz"
# conftest.py autouse fixture injects ``test-user-autouse`` into the
# contextvar — the worker should read it via ``get_effective_user_id``.
user_id = metadata.get("langfuse_user_id")
assert user_id == "test-user-autouse", f"expected test-user-autouse, got {user_id}"
assert metadata.get("langfuse_trace_name") == "lead-agent"
tags = metadata.get("langfuse_tags") or []
assert "model:gpt-4o" in tags
@pytest.mark.asyncio
async def test_run_agent_falls_back_to_default_user_when_unset(monkeypatch):
"""When no user is in the contextvar, langfuse_user_id falls back to 'default'.
Uses ``monkeypatch.setattr`` to redirect ``get_effective_user_id`` to return
``"default"`` rather than directly mutating the contextvar — direct contextvar
operations across pytest test boundaries have produced spooky cross-file
pollution when combined with the langfuse OTel global tracer provider.
"""
monkeypatch.setenv("LANGFUSE_TRACING", "true")
monkeypatch.setenv("LANGFUSE_PUBLIC_KEY", "pk-lf-test")
monkeypatch.setenv("LANGFUSE_SECRET_KEY", "sk-lf-test")
from deerflow.config.tracing_config import reset_tracing_config
from deerflow.runtime.runs import worker as worker_module
from deerflow.runtime.user_context import DEFAULT_USER_ID
reset_tracing_config()
monkeypatch.setattr(worker_module, "get_effective_user_id", lambda: DEFAULT_USER_ID)
fake_agent = _FakeAgent()
def agent_factory(config):
return fake_agent
record = RunRecord(
run_id="run-fallback",
thread_id="thread-fb",
assistant_id="lead-agent",
status=RunStatus.pending,
on_disconnect=DisconnectMode.cancel,
)
record.abort_event = asyncio.Event()
ctx = RunContext(checkpointer=None)
await run_agent(
_FakeBridge(),
_FakeRunManager(),
record,
ctx=ctx,
agent_factory=agent_factory,
graph_input={"messages": []},
config={"configurable": {"thread_id": "thread-fb"}},
)
metadata = fake_agent.captured_config.get("metadata") or {}
assert metadata.get("langfuse_user_id") == "default"
@pytest.mark.asyncio
async def test_run_agent_preserves_caller_metadata_overrides(monkeypatch):
"""Caller-provided langfuse_* keys must NOT be overridden by the default injection."""
monkeypatch.setenv("LANGFUSE_TRACING", "true")
monkeypatch.setenv("LANGFUSE_PUBLIC_KEY", "pk-lf-test")
monkeypatch.setenv("LANGFUSE_SECRET_KEY", "sk-lf-test")
from deerflow.config.tracing_config import reset_tracing_config
reset_tracing_config()
fake_agent = _FakeAgent()
def agent_factory(config):
return fake_agent
record = RunRecord(
run_id="run-2",
thread_id="thread-default",
assistant_id="lead-agent",
status=RunStatus.pending,
on_disconnect=DisconnectMode.cancel,
)
record.abort_event = asyncio.Event()
ctx = RunContext(checkpointer=None)
await run_agent(
_FakeBridge(),
_FakeRunManager(),
record,
ctx=ctx,
agent_factory=agent_factory,
graph_input={"messages": []},
config={
"configurable": {"thread_id": "thread-default"},
"metadata": {
"langfuse_session_id": "custom-session-id",
"langfuse_user_id": "explicit-user",
},
},
)
metadata = fake_agent.captured_config.get("metadata") or {}
# Caller-supplied keys win.
assert metadata["langfuse_session_id"] == "custom-session-id"
assert metadata["langfuse_user_id"] == "explicit-user"
# Worker still fills in keys that the caller didn't set.
assert metadata["langfuse_trace_name"] == "lead-agent"
@pytest.mark.asyncio
async def test_run_agent_skips_metadata_when_langfuse_disabled(monkeypatch):
fake_agent = _FakeAgent()
def agent_factory(config):
return fake_agent
record = RunRecord(
run_id="run-3",
thread_id="thread-noop",
assistant_id="lead-agent",
status=RunStatus.pending,
on_disconnect=DisconnectMode.cancel,
)
record.abort_event = asyncio.Event()
ctx = RunContext(checkpointer=None)
await run_agent(
_FakeBridge(),
_FakeRunManager(),
record,
ctx=ctx,
agent_factory=agent_factory,
graph_input={"messages": []},
config={"configurable": {"thread_id": "thread-noop"}},
)
metadata = fake_agent.captured_config.get("metadata") or {}
assert "langfuse_session_id" not in metadata
assert "langfuse_user_id" not in metadata
assert "langfuse_trace_name" not in metadata