fix(agents): make update_agent honor runtime.context user_id like setup_agent (#2867)

* fix(agents): make update_agent honor runtime.context user_id like setup_agent

PR #2784 hardened setup_agent to prefer runtime.context["user_id"] (set by
inject_authenticated_user_context from the auth-validated request) over the
contextvar, so an agent created during the bootstrap flow always lands under
users/<auth_uid>/agents/<name>. update_agent was left calling
get_effective_user_id() unconditionally — the same class of bug that produced
issues #2782 / #2862 still applies whenever the contextvar is not available
on the executing task (background work, future cross-process drivers,
checkpoint resume on a different task). In that regime update_agent silently
routes writes to users/default/agents/<name>, corrupting the shared default
bucket and losing the user's edit.

Extract the resolution policy into a shared resolve_runtime_user_id helper
on deerflow.runtime.user_context and route both setup_agent and update_agent
through it so the two halves of the lifecycle stay in lockstep.

Add load-bearing end-to-end tests that drive a real langchain.agents
create_agent graph with a fake LLM, exercising the full pipeline:

  HTTP wire format
    -> app.gateway.services.start_run config-assembly
    -> deerflow.runtime.runs.worker._build_runtime_context
    -> langchain.agents create_agent graph
    -> ToolNode dispatch (sync + async + sub-graph + ContextThreadPoolExecutor)
    -> setup_agent / update_agent

The negative-control tests intentionally land in users/default/ to prove the
positive tests are actually load-bearing rather than vacuously passing.

The new test_update_agent_e2e_user_isolation suite included a test that
failed against main and now passes after this fix.

* style: ruff format on new e2e tests

* test(e2e): real-server HTTP test driving setup_agent through the full ASGI stack

Adds tests/test_setup_agent_http_e2e_real_server.py — a single load-bearing
test that drives the entire FastAPI gateway through starlette.testclient.
TestClient with no mocks above the LLM:

  - lifespan boots (config, sqlite engine, LangGraph runtime, channels)
  - POST /api/v1/auth/register (real password hash, real sqlite write,
    issues access_token + csrf_token cookies)
  - POST /api/threads (real thread_meta + checkpoint creation)
  - POST /api/threads/{id}/runs/stream with the exact wire shape the React
    frontend sends (assistant_id + input + config + context with
    agent_name/is_bootstrap)
  - AuthMiddleware -> CSRFMiddleware -> require_permission ->
    start_run -> inject_authenticated_user_context ->
    asyncio.create_task(run_agent) -> worker._build_runtime_context ->
    Runtime injection -> ToolNode dispatch -> real setup_agent
  - Asserts SOUL.md is under users/<authenticated_uid>/agents/<name>/
    and NOT under users/default/agents/<name>/.

DEER_FLOW_HOME and the sqlite path are redirected into tmp_path so the test
never touches the real .deer-flow directory or developer database. The only
patch above the LLM boundary is replacing create_chat_model with a fake that
emits a single setup_agent tool_call.

This is the "真实验证" answer: it reproduces what curl-against-uvicorn would
do, minus the network socket layer.

* test: address Copilot review on user-isolation e2e tests

- Drop "currently expected to FAIL" wording from update_agent e2e docstring
  and header (Copilot review): the fix is in this PR, the test pins the
  corrected behaviour rather than driving a future change.
- Rephrase the assertion failure messages from "BUG:" to "REGRESSION:" to
  match the test's role on the fixed branch.
- Bound _drain_stream with a wall-clock timeout, a max-bytes cap, and an
  early break on the "event: end" SSE frame (Copilot review). Stops the
  test from hanging on a stuck run or runaway heartbeat loop.
- Replace the misleading "patch both module aliases" comment with an
  explanation of why patching lead_agent.agent.create_chat_model is the
  only correct target (Copilot review): lead_agent rebinds the symbol
  into its own namespace at import time, so patching deerflow.models is
  too late.

* test(refactor): address WillemJiang review on user-isolation e2e tests

- Extract the duplicated FakeToolCallingModel (and a
  build_single_tool_call_model helper) into tests/_agent_e2e_helpers.py.
  All three e2e files now import from the shared module instead of
  redefining the shim locally.
- Convert the manual p.start() / p.stop() try/finally blocks in
  test_update_agent_e2e_user_isolation.py to contextlib.ExitStack so
  patch lifecycle is Pythonic and exception-safe.
- Lift the isolated_app fixture's private-attribute resets into a
  named _reset_process_singletons helper with a comment block
  explaining why each singleton has to be invalidated for true e2e
  isolation, and why raising=False is intentional. Makes the
  fragility visible and the intent self-documenting rather than
  leaving the resets inline as opaque monkeypatch calls.

Net change: -59 lines (143 -> 84) across the three test files, with
every assertion intact. Full suite remains 69 passed / lint clean.

* test(e2e): make real-server test self-supply its config

CI's actions/checkout only ships config.example.yaml (the real config.yaml
is gitignored), so the production config-discovery search
(./config.yaml -> ../config.yaml -> $DEER_FLOW_CONFIG_PATH) finds nothing
and the test fails at lifespan boot with FileNotFoundError. The dev-machine
run passed only because a local config.yaml happened to exist.

Write a minimal AppConfig-valid yaml into tmp_path and pin
DEER_FLOW_CONFIG_PATH to it. The yaml carries just what the schema requires
(a single fake-test-model entry, LocalSandboxProvider, sqlite database).
The LLM never gets instantiated because the test patches create_chat_model
on the lead agent module, so the api_key/base_url stay placeholders.

Verified by hiding the local config.yaml to mirror the CI checkout — the
test now passes in both environments.
This commit is contained in:
Xinmin Zeng
2026-05-12 23:18:54 +08:00
committed by GitHub
parent 506be8bffd
commit 68d8caec1f
7 changed files with 1114 additions and 13 deletions
@@ -109,6 +109,34 @@ def get_effective_user_id() -> str:
return str(user.id)
def resolve_runtime_user_id(runtime: object | None) -> str:
"""Single source of truth for a tool/middleware's effective user_id.
Resolution order (most authoritative first):
1. ``runtime.context["user_id"]`` — set by ``inject_authenticated_user_context``
in the gateway from the auth-validated ``request.state.user``. This is
the only source that survives boundaries where the contextvar may have
been lost (background tasks scheduled outside the request task,
worker pools that don't copy_context, future cross-process drivers).
2. The ``_current_user`` ContextVar — set by the auth middleware at
request entry. Reliable for in-task work; copied by ``asyncio``
child tasks and by ``ContextThreadPoolExecutor``.
3. ``DEFAULT_USER_ID`` — last-resort fallback so unauthenticated
CLI / migration / test paths keep working without raising.
Tools that persist user-scoped state (custom agents, memory, uploads)
MUST call this instead of ``get_effective_user_id()`` directly so they
benefit from the runtime.context channel that ``setup_agent`` already
relies on.
"""
context = getattr(runtime, "context", None)
if isinstance(context, dict):
ctx_user_id = context.get("user_id")
if ctx_user_id:
return str(ctx_user_id)
return get_effective_user_id()
# ---------------------------------------------------------------------------
# Sentinel-based user_id resolution
# ---------------------------------------------------------------------------
@@ -7,19 +7,12 @@ from langgraph.types import Command
from deerflow.config.agents_config import validate_agent_name
from deerflow.config.paths import get_paths
from deerflow.runtime.user_context import get_effective_user_id
from deerflow.runtime.user_context import resolve_runtime_user_id
from deerflow.tools.types import Runtime
logger = logging.getLogger(__name__)
def _get_runtime_user_id(runtime: Runtime) -> str:
context_user_id = runtime.context.get("user_id") if runtime.context else None
if context_user_id:
return str(context_user_id)
return get_effective_user_id()
@tool(parse_docstring=True)
def setup_agent(
soul: str,
@@ -45,7 +38,7 @@ def setup_agent(
if agent_name:
# Custom agents are persisted under the current user's bucket so
# different users do not see each other's agents.
user_id = _get_runtime_user_id(runtime)
user_id = resolve_runtime_user_id(runtime)
agent_dir = paths.user_agent_dir(user_id, agent_name)
else:
# Default agent (no agent_name): SOUL.md lives at the global base dir.
@@ -27,7 +27,7 @@ from langgraph.types import Command
from deerflow.config.agents_config import load_agent_config, validate_agent_name
from deerflow.config.app_config import get_app_config
from deerflow.config.paths import get_paths
from deerflow.runtime.user_context import get_effective_user_id
from deerflow.runtime.user_context import resolve_runtime_user_id
from deerflow.tools.types import Runtime
logger = logging.getLogger(__name__)
@@ -118,9 +118,13 @@ def update_agent(
return _err("update_agent is only available inside a custom agent's chat. There is no agent_name in the current runtime context, so there is nothing to update. If you are inside the bootstrap flow, use setup_agent instead.")
# Resolve the active user so that updates only affect this user's agent.
# ``get_effective_user_id`` returns DEFAULT_USER_ID when no auth context
# is set (matching how memory and thread storage behave).
user_id = get_effective_user_id()
# ``resolve_runtime_user_id`` prefers ``runtime.context["user_id"]`` (set by
# the gateway from the auth-validated request) and falls back to the
# contextvar, then DEFAULT_USER_ID. This matches setup_agent so a user
# creating an agent and later refining it always touches the same files,
# even if the contextvar gets lost across an async/thread boundary
# (issue #2782 / #2862 class of bugs).
user_id = resolve_runtime_user_id(runtime)
# Reject an unknown ``model`` *before* touching the filesystem. Otherwise
# ``_resolve_model_name`` silently falls back to the default at runtime
+68
View File
@@ -0,0 +1,68 @@
"""Shared helpers for user-isolation e2e tests on the custom-agent tooling.
Centralises the small fake-LLM shim and a few test-data builders that the
three e2e files in this PR (``test_setup_agent_e2e_user_isolation``,
``test_update_agent_e2e_user_isolation``, ``test_setup_agent_http_e2e_real_server``)
all need. The shim is what lets a real ``langchain.agents.create_agent``
graph run without an API key — every other layer in those tests is real
production code, which is the entire point of the test design.
"""
from __future__ import annotations
from typing import Any
from langchain_core.language_models.fake_chat_models import FakeMessagesListChatModel
from langchain_core.messages import AIMessage
from langchain_core.runnables import Runnable
class FakeToolCallingModel(FakeMessagesListChatModel):
"""FakeMessagesListChatModel plus a no-op ``bind_tools`` for create_agent.
``langchain.agents.create_agent`` calls ``model.bind_tools(...)`` to
expose the tool schemas to the model; the upstream fake raises
``NotImplementedError`` there. We just return ``self`` because we
drive deterministic tool_call output via ``responses=...``, no schema
handling needed.
"""
def bind_tools( # type: ignore[override]
self,
tools: Any,
*,
tool_choice: Any = None,
**kwargs: Any,
) -> Runnable:
return self
def build_single_tool_call_model(
*,
tool_name: str,
tool_args: dict[str, Any],
tool_call_id: str = "call_e2e_1",
final_text: str = "done",
) -> FakeToolCallingModel:
"""Build a fake model that emits exactly one tool_call then finishes.
Two-turn behaviour, identical across our e2e tests:
turn 1 → AIMessage with a single tool_call for *tool_name*
turn 2 → AIMessage with *final_text* (terminates the agent loop)
"""
return FakeToolCallingModel(
responses=[
AIMessage(
content="",
tool_calls=[
{
"name": tool_name,
"args": tool_args,
"id": tool_call_id,
"type": "tool_call",
}
],
),
AIMessage(content=final_text),
]
)
@@ -0,0 +1,429 @@
"""End-to-end verification for issue #2862 (and the regression of #2782).
Goal: prove — without trusting any single layer's claim — that an authenticated
user creating a custom agent through the real ``setup_agent`` tool, driven by a
real LangGraph ``create_agent`` graph, ends up with files under
``users/<auth_uid>/agents/<name>`` and **not** under ``users/default/agents/...``.
We intentionally exercise the full pipeline:
HTTP body shape (mimics LangGraph SDK wire format)
-> app.gateway.services.start_run config-assembly chain
-> deerflow.runtime.runs.worker._build_runtime_context
-> langchain.agents.create_agent graph
-> ToolNode dispatch
-> setup_agent tool
The only thing we mock is the LLM (FakeMessagesListChatModel) — every layer
that handles ``user_id`` is the real production code path. If the
``user_id`` propagation is broken anywhere in this chain, these tests will
fail.
These tests intentionally ``no_auto_user`` so that the ``contextvar``
fallback would put files into ``default/`` if propagation breaks.
"""
from __future__ import annotations
from pathlib import Path
from types import SimpleNamespace
from unittest.mock import patch
from uuid import UUID
import pytest
from _agent_e2e_helpers import FakeToolCallingModel
from langchain_core.messages import AIMessage, HumanMessage
from app.gateway.services import (
build_run_config,
inject_authenticated_user_context,
merge_run_context_overrides,
)
from deerflow.runtime.runs.worker import _build_runtime_context, _install_runtime_context
# ---------------------------------------------------------------------------
# Helpers — real production code paths
# ---------------------------------------------------------------------------
def _make_request(user_id_str: str | None) -> SimpleNamespace:
"""Build a fake FastAPI Request that carries an authenticated user."""
if user_id_str is None:
user = None
else:
# User.id is UUID in production; honour that
user = SimpleNamespace(id=UUID(user_id_str), email="alice@local")
return SimpleNamespace(state=SimpleNamespace(user=user))
def _assemble_config(
*,
body_config: dict | None,
body_context: dict | None,
request_user_id: str | None,
thread_id: str = "thread-e2e",
assistant_id: str = "lead_agent",
) -> dict:
"""Replay the **exact** start_run config-assembly sequence."""
config = build_run_config(thread_id, body_config, None, assistant_id=assistant_id)
merge_run_context_overrides(config, body_context)
inject_authenticated_user_context(config, _make_request(request_user_id))
return config
def _make_paths_mock(tmp_path: Path):
"""Mirror the production paths.user_agent_dir signature."""
from unittest.mock import MagicMock
paths = MagicMock()
paths.base_dir = tmp_path
paths.agent_dir = lambda name: tmp_path / "agents" / name
paths.user_agent_dir = lambda user_id, name: tmp_path / "users" / user_id / "agents" / name
return paths
# ---------------------------------------------------------------------------
# L1-L3: HTTP wire format → start_run → worker._build_runtime_context
# ---------------------------------------------------------------------------
class TestConfigAssembly:
"""Covers L1-L3: validate that user_id reaches runtime_ctx for every wire shape."""
def test_typical_wire_format_user_id_in_runtime_ctx(self):
"""Real frontend: body.config={recursion_limit}, body.context={agent_name,...}."""
config = _assemble_config(
body_config={"recursion_limit": 1000},
body_context={"agent_name": "myagent", "is_bootstrap": True, "mode": "flash"},
request_user_id="11111111-2222-3333-4444-555555555555",
)
runtime_ctx = _build_runtime_context("thread-e2e", "run-1", config.get("context"), None)
assert runtime_ctx["user_id"] == "11111111-2222-3333-4444-555555555555"
assert runtime_ctx["agent_name"] == "myagent"
def test_body_context_none_still_injects_user_id(self):
"""If frontend omits body.context entirely, inject must still create it."""
config = _assemble_config(
body_config={"recursion_limit": 1000},
body_context=None,
request_user_id="aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee",
)
runtime_ctx = _build_runtime_context("thread-e2e", "run-1", config.get("context"), None)
assert runtime_ctx["user_id"] == "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee"
def test_body_context_empty_dict_still_injects_user_id(self):
"""body.context={} (falsy) path: inject must still produce user_id."""
config = _assemble_config(
body_config={"recursion_limit": 1000},
body_context={},
request_user_id="aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee",
)
runtime_ctx = _build_runtime_context("thread-e2e", "run-1", config.get("context"), None)
assert runtime_ctx["user_id"] == "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee"
def test_body_config_already_contains_context_field(self):
"""body.config={'context': {...}} (LG 0.6 alt wire): inject still wins."""
config = _assemble_config(
body_config={"context": {"agent_name": "myagent"}, "recursion_limit": 1000},
body_context=None,
request_user_id="aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee",
)
runtime_ctx = _build_runtime_context("thread-e2e", "run-1", config.get("context"), None)
assert runtime_ctx["user_id"] == "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee"
def test_client_supplied_user_id_is_overridden(self):
"""Spoofed client user_id must be overwritten by inject (auth-trusted source)."""
config = _assemble_config(
body_config={"recursion_limit": 1000},
body_context={"agent_name": "myagent", "user_id": "spoofed"},
request_user_id="11111111-2222-3333-4444-555555555555",
)
runtime_ctx = _build_runtime_context("thread-e2e", "run-1", config.get("context"), None)
assert runtime_ctx["user_id"] == "11111111-2222-3333-4444-555555555555"
def test_unauthenticated_request_does_not_inject(self):
"""If request.state.user is missing (impossible under fail-closed auth, but
verify defensively), inject must not write user_id and runtime_ctx must
therefore lack it — forcing the tool fallback path to reveal itself."""
config = _assemble_config(
body_config={"recursion_limit": 1000},
body_context={"agent_name": "myagent"},
request_user_id=None,
)
runtime_ctx = _build_runtime_context("thread-e2e", "run-1", config.get("context"), None)
assert "user_id" not in runtime_ctx
# ---------------------------------------------------------------------------
# L4-L7: Real LangGraph create_agent driving the real setup_agent tool
# ---------------------------------------------------------------------------
def _build_real_bootstrap_graph(authenticated_user_id: str):
"""Construct a real LangGraph using create_agent + the real setup_agent tool.
The LLM is faked (FakeMessagesListChatModel) so we don't need an API key.
Everything else — ToolNode dispatch, runtime injection, middleware — is
the real production code path.
"""
from langchain.agents import create_agent
from deerflow.tools.builtins.setup_agent_tool import setup_agent
# First model turn: emit a tool_call for setup_agent
# Second model turn (after tool result): final answer (terminates the loop)
fake_model = FakeToolCallingModel(
responses=[
AIMessage(
content="",
tool_calls=[
{
"name": "setup_agent",
"args": {
"soul": "# My E2E Agent\n\nA SOUL written by the model.",
"description": "End-to-end test agent",
},
"id": "call_setup_1",
"type": "tool_call",
}
],
),
AIMessage(content=f"Done. Agent created for user {authenticated_user_id}."),
]
)
graph = create_agent(
model=fake_model,
tools=[setup_agent],
system_prompt="You are a bootstrap agent. Call setup_agent immediately.",
)
return graph
@pytest.mark.no_auto_user
@pytest.mark.asyncio
async def test_real_graph_real_setup_agent_writes_to_authenticated_user_dir(tmp_path: Path):
"""The smoking-gun test for issue #2862.
Under no_auto_user (contextvar = empty), if user_id propagation through
runtime.context is broken, setup_agent will fall back to DEFAULT_USER_ID
and write to users/default/agents/... The assertion that this directory
DOES NOT exist is what makes this test load-bearing.
"""
from langgraph.runtime import Runtime
auth_uid = "abcdef01-2345-6789-abcd-ef0123456789"
config = _assemble_config(
body_config={"recursion_limit": 50},
body_context={"agent_name": "e2e-agent", "is_bootstrap": True},
request_user_id=auth_uid,
thread_id="thread-e2e-1",
)
# Replay worker.run_agent's runtime construction. This is the key step:
# it is what makes ToolRuntime.context contain user_id when the tool
# actually fires.
runtime_ctx = _build_runtime_context("thread-e2e-1", "run-1", config.get("context"), None)
_install_runtime_context(config, runtime_ctx)
runtime = Runtime(context=runtime_ctx, store=None)
config.setdefault("configurable", {})["__pregel_runtime"] = runtime
graph = _build_real_bootstrap_graph(auth_uid)
# Patch get_paths only (the file-system rooting); everything else is real
with patch(
"deerflow.tools.builtins.setup_agent_tool.get_paths",
return_value=_make_paths_mock(tmp_path),
):
# Drive the real graph. This goes through real ToolNode + real Runtime merge.
final_state = await graph.ainvoke(
{"messages": [HumanMessage(content="Create an agent named e2e-agent")]},
config=config,
)
expected_dir = tmp_path / "users" / auth_uid / "agents" / "e2e-agent"
default_dir = tmp_path / "users" / "default" / "agents" / "e2e-agent"
# Load-bearing assertions:
assert expected_dir.exists(), f"Agent directory not found at the authenticated user's path. Expected: {expected_dir}. tmp_path tree: {[str(p) for p in tmp_path.rglob('*')]}"
assert (expected_dir / "SOUL.md").read_text() == "# My E2E Agent\n\nA SOUL written by the model."
assert (expected_dir / "config.yaml").exists()
assert not default_dir.exists(), "REGRESSION: agent landed under users/default/. user_id propagation broke somewhere between HTTP layer and ToolRuntime.context."
# And final state should reflect tool success
last = final_state["messages"][-1]
assert "Done" in (last.content if isinstance(last.content, str) else str(last.content))
@pytest.mark.no_auto_user
@pytest.mark.asyncio
async def test_inject_failure_falls_back_to_default_proving_test_is_load_bearing(tmp_path: Path):
"""Negative control: if inject does NOT happen (no user in request), and
contextvar is empty (no_auto_user), setup_agent must land in default/.
This proves the positive test is actually load-bearing — i.e. it would
have failed before PR #2784, not passed accidentally.
"""
from langgraph.runtime import Runtime
config = _assemble_config(
body_config={"recursion_limit": 50},
body_context={"agent_name": "fallback-agent", "is_bootstrap": True},
request_user_id=None, # no auth — inject is a no-op
thread_id="thread-e2e-2",
)
runtime_ctx = _build_runtime_context("thread-e2e-2", "run-2", config.get("context"), None)
_install_runtime_context(config, runtime_ctx)
runtime = Runtime(context=runtime_ctx, store=None)
config.setdefault("configurable", {})["__pregel_runtime"] = runtime
graph = _build_real_bootstrap_graph("does-not-matter")
with patch(
"deerflow.tools.builtins.setup_agent_tool.get_paths",
return_value=_make_paths_mock(tmp_path),
):
await graph.ainvoke(
{"messages": [HumanMessage(content="Create fallback-agent")]},
config=config,
)
default_dir = tmp_path / "users" / "default" / "agents" / "fallback-agent"
assert default_dir.exists(), "Negative control failed: even without inject + contextvar, agent did not land in default/. The test infrastructure may not be reproducing the bug condition."
# ---------------------------------------------------------------------------
# L5: Sub-graph runtime propagation (the task tool case)
# ---------------------------------------------------------------------------
@pytest.mark.no_auto_user
@pytest.mark.asyncio
async def test_subgraph_invocation_preserves_user_id_in_runtime(tmp_path: Path):
"""When a parent graph invokes a child graph (the pattern used by
subagents), parent_runtime.merge() must keep user_id intact.
We construct a child graph that contains setup_agent and call it from
a parent graph's tool. If LangGraph re-creates the Runtime and drops
user_id at the sub-graph boundary, this fails.
"""
from langchain.agents import create_agent
from langgraph.runtime import Runtime
from deerflow.tools.builtins.setup_agent_tool import setup_agent
auth_uid = "deadbeef-0000-1111-2222-333344445555"
# Inner graph: same as the bootstrap flow
inner_model = FakeToolCallingModel(
responses=[
AIMessage(
content="",
tool_calls=[
{
"name": "setup_agent",
"args": {"soul": "# Inner", "description": "subgraph"},
"id": "call_inner_1",
"type": "tool_call",
}
],
),
AIMessage(content="inner done"),
]
)
inner_graph = create_agent(
model=inner_model,
tools=[setup_agent],
system_prompt="inner",
)
config = _assemble_config(
body_config={"recursion_limit": 50},
body_context={"agent_name": "subgraph-agent", "is_bootstrap": True},
request_user_id=auth_uid,
thread_id="thread-e2e-3",
)
runtime_ctx = _build_runtime_context("thread-e2e-3", "run-3", config.get("context"), None)
_install_runtime_context(config, runtime_ctx)
runtime = Runtime(context=runtime_ctx, store=None)
config.setdefault("configurable", {})["__pregel_runtime"] = runtime
with patch(
"deerflow.tools.builtins.setup_agent_tool.get_paths",
return_value=_make_paths_mock(tmp_path),
):
# Direct sub-graph invoke (mimics what a subagent invocation looks like
# — distinct ainvoke call, but parent config carries the same runtime).
await inner_graph.ainvoke(
{"messages": [HumanMessage(content="Create subgraph-agent")]},
config=config,
)
expected_dir = tmp_path / "users" / auth_uid / "agents" / "subgraph-agent"
default_dir = tmp_path / "users" / "default" / "agents" / "subgraph-agent"
assert expected_dir.exists()
assert not default_dir.exists()
# ---------------------------------------------------------------------------
# L6: Sync tool path through ContextThreadPoolExecutor
# ---------------------------------------------------------------------------
def test_sync_tool_dispatch_through_thread_pool_uses_runtime_context(tmp_path: Path):
"""setup_agent is a sync function. When dispatched through ToolNode's
ContextThreadPoolExecutor, runtime.context must still carry user_id —
not via thread-local copy_context (which only carries contextvars), but
because it was passed in as the ToolRuntime constructor argument.
"""
from langchain.agents import create_agent
from langgraph.runtime import Runtime
from deerflow.tools.builtins.setup_agent_tool import setup_agent
auth_uid = "11112222-3333-4444-5555-666677778888"
fake_model = FakeToolCallingModel(
responses=[
AIMessage(
content="",
tool_calls=[
{
"name": "setup_agent",
"args": {"soul": "# Sync", "description": "sync path"},
"id": "call_sync_1",
"type": "tool_call",
}
],
),
AIMessage(content="sync done"),
]
)
graph = create_agent(model=fake_model, tools=[setup_agent], system_prompt="sync")
config = _assemble_config(
body_config={"recursion_limit": 50},
body_context={"agent_name": "sync-agent", "is_bootstrap": True},
request_user_id=auth_uid,
thread_id="thread-e2e-4",
)
runtime_ctx = _build_runtime_context("thread-e2e-4", "run-4", config.get("context"), None)
_install_runtime_context(config, runtime_ctx)
runtime = Runtime(context=runtime_ctx, store=None)
config.setdefault("configurable", {})["__pregel_runtime"] = runtime
with patch(
"deerflow.tools.builtins.setup_agent_tool.get_paths",
return_value=_make_paths_mock(tmp_path),
):
# Use SYNC invoke to hit the ContextThreadPoolExecutor path
graph.invoke(
{"messages": [HumanMessage(content="Create sync-agent")]},
config=config,
)
expected_dir = tmp_path / "users" / auth_uid / "agents" / "sync-agent"
default_dir = tmp_path / "users" / "default" / "agents" / "sync-agent"
assert expected_dir.exists()
assert not default_dir.exists()
@@ -0,0 +1,326 @@
"""Real HTTP end-to-end verification for issue #2862's setup_agent path.
This test drives the **entire** FastAPI gateway through ``starlette.testclient.TestClient``:
starlette.testclient.TestClient (real ASGI stack)
-> AuthMiddleware (real cookie parsing, real JWT decode)
-> /api/v1/auth/register endpoint (real password hash + sqlite write)
-> /api/threads/{id}/runs/stream endpoint (real start_run config-assembly)
-> background asyncio.create_task(run_agent) (real worker, real Runtime)
-> langchain.agents.create_agent graph (real, with fake LLM)
-> ToolNode dispatch (real)
-> setup_agent tool (real file I/O)
The only mock is the LLM (no API key needed). Every layer that participates
in ``user_id`` propagation — auth, ContextVar, ``inject_authenticated_user_context``,
``worker._build_runtime_context``, ``Runtime.merge`` — is the real production
code path. If the chain is broken at any layer, this test fails.
This is what "真实验证" looks like for a server that lives behind authentication:
register a user, log in (cookie), POST to /runs/stream, wait for the run to
finish, then read the filesystem.
"""
from __future__ import annotations
from pathlib import Path
from typing import Any
from unittest.mock import patch
import pytest
from _agent_e2e_helpers import FakeToolCallingModel, build_single_tool_call_model
def _build_fake_create_chat_model(agent_name: str):
"""Return a callable matching the real ``create_chat_model`` signature.
Whenever the lead agent constructs a chat model during the bootstrap flow,
we hand it a fake that emits a single setup_agent tool_call on its first
turn, then a benign final answer on its second turn.
"""
def fake_create_chat_model(*args: Any, **kwargs: Any) -> FakeToolCallingModel:
return build_single_tool_call_model(
tool_name="setup_agent",
tool_args={
"soul": f"# Real HTTP E2E SOUL for {agent_name}",
"description": "real-http-e2e agent",
},
tool_call_id="call_real_http_1",
final_text=f"Agent {agent_name} created via real HTTP e2e.",
)
return fake_create_chat_model
@pytest.fixture
def isolated_deer_flow_home(tmp_path: Path, monkeypatch: pytest.MonkeyPatch):
"""Stand up an isolated DeerFlow data root + config under tmp_path.
- Sets ``DEER_FLOW_HOME`` so paths land under tmp_path, not the real
``.deer-flow`` directory.
- Stages a copy of the project's ``config.yaml`` (or ``config.example.yaml``
on a fresh CI checkout where ``config.yaml`` is gitignored) and pins
``DEER_FLOW_CONFIG_PATH`` to it, so lifespan boot doesn't depend on the
developer's local config layout.
- Sets a placeholder OPENAI_API_KEY because the config has
``$OPENAI_API_KEY`` that gets resolved at parse time; the LLM itself is
mocked, so any non-empty value works.
"""
home = tmp_path / "deer-flow-home"
home.mkdir()
monkeypatch.setenv("DEER_FLOW_HOME", str(home))
monkeypatch.setenv("OPENAI_API_KEY", "sk-fake-key-not-used-because-llm-is-mocked")
monkeypatch.setenv("OPENAI_API_BASE", "https://example.invalid")
# Hermetic config: do not depend on whether the dev machine has a real
# ``config.yaml`` at the repo root. CI's ``actions/checkout`` only ships
# ``config.example.yaml`` (and its ``models:`` list is commented out, so
# AppConfig validation would reject it). Write a minimal, self-sufficient
# config to tmp_path and pin ``DEER_FLOW_CONFIG_PATH`` to it.
staged_config = tmp_path / "config.yaml"
staged_config.write_text(_MINIMAL_CONFIG_YAML, encoding="utf-8")
monkeypatch.setenv("DEER_FLOW_CONFIG_PATH", str(staged_config))
return home
# Minimal config that satisfies AppConfig + LeadAgent's _resolve_model_name.
# The model `use` path must resolve to a real class for config parsing to
# succeed; the test patches ``create_chat_model`` on the lead agent module,
# so the model is never actually instantiated. SandboxConfig.use is required
# at schema level; LocalSandboxProvider is the only sandbox that runs without
# Docker.
_MINIMAL_CONFIG_YAML = """\
log_level: info
models:
- name: fake-test-model
display_name: Fake Test Model
use: langchain_openai:ChatOpenAI
model: gpt-4o-mini
api_key: $OPENAI_API_KEY
base_url: $OPENAI_API_BASE
sandbox:
use: deerflow.sandbox.local:LocalSandboxProvider
agents_api:
enabled: true
database:
backend: sqlite
"""
def _reset_process_singletons(monkeypatch: pytest.MonkeyPatch) -> None:
"""Reset every process-wide cache that would survive across tests.
This fixture stands up a full FastAPI app + sqlite DB + LangGraph runtime
inside ``tmp_path``. To get true per-test isolation we have to invalidate
a handful of module-level caches that production normally never resets,
so they pick up our test-only ``DEER_FLOW_HOME`` and sqlite path:
- ``deerflow.config.app_config`` caches the parsed ``config.yaml``.
- ``deerflow.config.paths`` caches the ``Paths`` singleton derived from
``DEER_FLOW_HOME`` at first access.
- ``deerflow.persistence.engine`` caches the SQLAlchemy engine and
session factory after the first call to ``init_engine_from_config``.
``raising=False`` keeps the fixture resilient if upstream renames or
drops one of these attributes — the test will simply skip that reset
instead of failing with a confusing AttributeError, and the next test
to call ``get_app_config()``/``get_paths()`` will surface the real
incompatibility loudly.
"""
from deerflow.config import app_config as app_config_module
from deerflow.config import paths as paths_module
from deerflow.persistence import engine as engine_module
for module, attr in (
(app_config_module, "_app_config"),
(app_config_module, "_app_config_path"),
(app_config_module, "_app_config_mtime"),
(paths_module, "_paths_singleton"),
(engine_module, "_engine"),
(engine_module, "_session_factory"),
):
monkeypatch.setattr(module, attr, None, raising=False)
@pytest.fixture
def isolated_app(isolated_deer_flow_home: Path, monkeypatch: pytest.MonkeyPatch):
"""Build a fresh FastAPI app inside a clean DEER_FLOW_HOME.
Each test gets its own sqlite DB and checkpoint store under ``tmp_path``,
with no cross-test contamination.
"""
_reset_process_singletons(monkeypatch)
# Re-resolve the config from the test-only DEER_FLOW_HOME and pin its
# sqlite path into tmp_path so the lifespan-time engine init lands there.
from deerflow.config import app_config as app_config_module
cfg = app_config_module.get_app_config()
cfg.database.sqlite_dir = str(isolated_deer_flow_home / "db")
from app.gateway.app import create_app
return create_app()
def _drain_stream(response, *, timeout: float = 30.0, max_bytes: int = 4 * 1024 * 1024) -> str:
"""Consume an SSE response body until the run terminates and return the text.
Bounded to keep the test fail-fast:
- Stops as soon as an ``event: end`` SSE frame is observed (the gateway
sends this when the background run finishes — see ``services.format_sse``
and ``StreamBridge.publish_end``).
- Stops at ``timeout`` seconds wall-clock so a stuck run / runaway heartbeat
loop surfaces a real failure instead of hanging pytest.
- Stops at ``max_bytes`` so a runaway producer can't OOM the test process.
"""
import time as _time
deadline = _time.monotonic() + timeout
body = b""
for chunk in response.iter_bytes():
body += chunk
if b"event: end" in body:
break
if len(body) >= max_bytes:
break
if _time.monotonic() >= deadline:
break
return body.decode("utf-8", errors="replace")
def _wait_for_file(path: Path, *, timeout: float = 10.0) -> bool:
"""Block until *path* exists or *timeout* elapses.
The run completes inside ``asyncio.create_task`` after start_run returns,
so the test must wait for the background task to flush its writes.
"""
import time as _time
deadline = _time.monotonic() + timeout
while _time.monotonic() < deadline:
if path.exists():
return True
_time.sleep(0.05)
return False
@pytest.mark.no_auto_user
def test_real_http_create_agent_lands_in_authenticated_user_dir(
isolated_app: Any,
isolated_deer_flow_home: Path,
monkeypatch: pytest.MonkeyPatch,
):
"""The full real-server contract test.
1. Register a real user via POST /api/v1/auth/register (also auto-logs in)
2. POST to /api/threads/{tid}/runs/stream with the **exact** body shape the
frontend (LangGraph SDK) sends during the bootstrap flow.
3. Wait for the background run to finish.
4. Assert SOUL.md exists under users/<authenticated_uid>/agents/<name>/.
5. Assert NOTHING exists under users/default/agents/<name>/.
"""
# ``deerflow.agents.lead_agent.agent`` imports ``create_chat_model`` with
# ``from deerflow.models import create_chat_model`` at module load time,
# rebinding the symbol into its own namespace. So the only patch that
# intercepts the call is the bound name on ``lead_agent.agent`` — patching
# ``deerflow.models.create_chat_model`` would be too late.
agent_name = "real-http-agent"
from starlette.testclient import TestClient
with (
patch(
"deerflow.agents.lead_agent.agent.create_chat_model",
new=_build_fake_create_chat_model(agent_name),
),
TestClient(isolated_app) as client,
):
# --- 1. Register & auto-login ---
register = client.post(
"/api/v1/auth/register",
json={"email": "e2e-user@example.com", "password": "very-strong-password-123"},
)
assert register.status_code == 201, register.text
registered = register.json()
auth_uid = registered["id"]
# The endpoint sets both access_token (auth) and csrf_token (CSRF Double
# Submit Cookie) cookies; the TestClient cookie jar propagates them.
assert client.cookies.get("access_token"), "register endpoint must set session cookie"
csrf_token = client.cookies.get("csrf_token")
assert csrf_token, "register endpoint must set csrf_token cookie"
# --- 2. Create a thread (require_existing=True on /runs/stream means
# we must call POST /api/threads first; the React frontend does the
# same via the LangGraph SDK's threads.create) ---
import uuid as _uuid
thread_id = str(_uuid.uuid4())
created = client.post(
"/api/threads",
json={"thread_id": thread_id, "metadata": {}},
headers={"X-CSRF-Token": csrf_token},
)
assert created.status_code == 200, created.text
# --- 3. POST /runs/stream with the bootstrap wire format ---
# This is the EXACT shape the React frontend sends after PR #2784:
# thread.submit(input, {config, context}) ->
# POST /api/threads/{id}/runs/stream body =
# {assistant_id, input, config, context}
body = {
"assistant_id": "lead_agent",
"input": {
"messages": [
{
"role": "user",
"content": (f"The new custom agent name is {agent_name}. Help me design its SOUL.md before saving it."),
}
]
},
"config": {"recursion_limit": 50},
"context": {
"agent_name": agent_name,
"is_bootstrap": True,
"mode": "flash",
"thinking_enabled": False,
"is_plan_mode": False,
"subagent_enabled": False,
},
"stream_mode": ["values"],
}
# The /stream endpoint returns SSE; we drain it so the server-side
# background task (run_agent) gets to completion before we look at disk.
with client.stream(
"POST",
f"/api/threads/{thread_id}/runs/stream",
json=body,
headers={"X-CSRF-Token": csrf_token},
) as resp:
assert resp.status_code == 200, resp.read().decode()
transcript = _drain_stream(resp)
# Sanity: the stream should have produced at least one event
assert "event:" in transcript, f"no SSE events in response: {transcript[:500]!r}"
# --- 4. Verify filesystem outcome ---
expected_dir = isolated_deer_flow_home / "users" / auth_uid / "agents" / agent_name
default_dir = isolated_deer_flow_home / "users" / "default" / "agents" / agent_name
# The setup_agent tool runs inside the background asyncio task spawned
# by start_run; SSE-drain typically waits for it, but we add a bounded
# poll to be robust against scheduler jitter.
assert _wait_for_file(expected_dir / "SOUL.md", timeout=15.0), (
"SOUL.md did not appear under users/<auth_uid>/agents/. "
f"Expected: {expected_dir / 'SOUL.md'}. "
f"tmp tree: {sorted(str(p.relative_to(isolated_deer_flow_home)) for p in isolated_deer_flow_home.rglob('SOUL.md'))}. "
f"SSE transcript tail: {transcript[-1000:]!r}"
)
soul_text = (expected_dir / "SOUL.md").read_text()
assert agent_name in soul_text, f"unexpected SOUL content: {soul_text!r}"
# The smoking-gun assertion: the agent must NOT have landed in default/
assert not default_dir.exists(), f"REGRESSION: agent landed under users/default/{agent_name} instead of the authenticated user. Default-dir contents: {list(default_dir.rglob('*')) if default_dir.exists() else 'n/a'}"
@@ -0,0 +1,253 @@
"""End-to-end verification for update_agent's user_id resolution.
PR #2784 hardened setup_agent to prefer runtime.context["user_id"] over the
contextvar. update_agent had the same latent gap: it unconditionally called
get_effective_user_id() at module level, so any scenario where the contextvar
was unavailable while runtime.context carried user_id (a background task
scheduled outside the request task, a worker pool that doesn't copy_context,
checkpoint resume on a different task) would silently route writes to
users/default/agents/...
These tests are load-bearing under @no_auto_user (contextvar empty):
- The negative-control test confirms the fixture actually puts the tool in
the regime where the contextvar fallback would land in users/default/.
Without that, the positive test would be vacuously satisfied.
- The positive test verifies update_agent honours runtime.context["user_id"]
injected by inject_authenticated_user_context in the gateway. Before the
fix in this PR, this test failed; now it passes.
"""
from __future__ import annotations
from contextlib import ExitStack
from pathlib import Path
from types import SimpleNamespace
from unittest.mock import MagicMock, patch
from uuid import UUID
import pytest
import yaml
from _agent_e2e_helpers import build_single_tool_call_model
from langchain_core.messages import HumanMessage
from app.gateway.services import (
build_run_config,
inject_authenticated_user_context,
merge_run_context_overrides,
)
from deerflow.runtime.runs.worker import _build_runtime_context, _install_runtime_context
def _make_request(user_id_str: str | None) -> SimpleNamespace:
user = SimpleNamespace(id=UUID(user_id_str), email="alice@local") if user_id_str else None
return SimpleNamespace(state=SimpleNamespace(user=user))
def _assemble_config(*, body_context: dict | None, request_user_id: str | None, thread_id: str) -> dict:
config = build_run_config(thread_id, {"recursion_limit": 50}, None, assistant_id="lead_agent")
merge_run_context_overrides(config, body_context)
inject_authenticated_user_context(config, _make_request(request_user_id))
return config
def _seed_existing_agent(tmp_path: Path, user_id: str, agent_name: str, soul: str = "# Original"):
"""Pre-create an agent on disk for update_agent to overwrite."""
agent_dir = tmp_path / "users" / user_id / "agents" / agent_name
agent_dir.mkdir(parents=True, exist_ok=True)
(agent_dir / "config.yaml").write_text(
yaml.dump({"name": agent_name, "description": "old"}, allow_unicode=True),
encoding="utf-8",
)
(agent_dir / "SOUL.md").write_text(soul, encoding="utf-8")
return agent_dir
def _make_paths_mock(tmp_path: Path):
paths = MagicMock()
paths.base_dir = tmp_path
paths.agent_dir = lambda name: tmp_path / "agents" / name
paths.user_agent_dir = lambda user_id, name: tmp_path / "users" / user_id / "agents" / name
return paths
def _patch_update_agent_dependencies(tmp_path: Path):
"""update_agent reads load_agent_config + get_app_config — stub them
minimally so the tool can run without a real config file or LLM."""
fake_model_cfg = SimpleNamespace(name="fake-model")
fake_app_cfg = MagicMock()
fake_app_cfg.get_model_config = lambda name: fake_model_cfg if name == "fake-model" else None
return [
patch(
"deerflow.tools.builtins.update_agent_tool.get_paths",
return_value=_make_paths_mock(tmp_path),
),
patch(
"deerflow.tools.builtins.update_agent_tool.get_app_config",
return_value=fake_app_cfg,
),
# load_agent_config (used by update_agent to read existing config) also
# reads paths via its own module-level get_paths reference. Patch it too
# or the tool returns "Agent does not exist" before touching disk.
patch(
"deerflow.config.agents_config.get_paths",
return_value=_make_paths_mock(tmp_path),
),
]
def _build_update_graph(*, soul_payload: str):
from langchain.agents import create_agent
from deerflow.tools.builtins.update_agent_tool import update_agent
fake_model = build_single_tool_call_model(
tool_name="update_agent",
tool_args={"soul": soul_payload, "description": "refined"},
tool_call_id="call_update_1",
final_text="updated",
)
return create_agent(model=fake_model, tools=[update_agent], system_prompt="updater")
# ---------------------------------------------------------------------------
# Negative control — proves the test environment puts update_agent in the
# regime where the contextvar fallback would land in default/.
# ---------------------------------------------------------------------------
@pytest.mark.no_auto_user
def test_update_agent_falls_back_to_default_when_no_inject_and_no_contextvar(tmp_path: Path):
"""No request.state.user, no contextvar — update_agent must look in
users/default/agents/. We seed the file there so the tool succeeds and
we know which directory it actually consulted."""
from langgraph.runtime import Runtime
_seed_existing_agent(tmp_path, "default", "fallback-target")
config = _assemble_config(
body_context={"agent_name": "fallback-target"},
request_user_id=None, # no auth, inject is no-op
thread_id="thread-update-1",
)
runtime_ctx = _build_runtime_context("thread-update-1", "run-1", config.get("context"), None)
_install_runtime_context(config, runtime_ctx)
runtime = Runtime(context=runtime_ctx, store=None)
config.setdefault("configurable", {})["__pregel_runtime"] = runtime
graph = _build_update_graph(soul_payload="# Fallback Updated")
with ExitStack() as stack:
for p in _patch_update_agent_dependencies(tmp_path):
stack.enter_context(p)
graph.invoke(
{"messages": [HumanMessage(content="update fallback-target")]},
config=config,
)
soul = (tmp_path / "users" / "default" / "agents" / "fallback-target" / "SOUL.md").read_text()
assert soul == "# Fallback Updated", "Sanity: tool should have written under default/"
# ---------------------------------------------------------------------------
# Regression guard — passes on this branch, would fail on main before the fix.
# ---------------------------------------------------------------------------
@pytest.mark.no_auto_user
def test_update_agent_should_use_runtime_context_user_id_when_contextvar_missing(tmp_path: Path):
"""update_agent prefers the authenticated user_id carried in
runtime.context (placed there by inject_authenticated_user_context)
over the contextvar — same contract as setup_agent (PR #2784).
Before this PR's fix, update_agent unconditionally called
get_effective_user_id() and landed in default/ whenever the contextvar
was unavailable. This test pins the corrected behaviour.
"""
from langgraph.runtime import Runtime
auth_uid = "abcdef01-2345-6789-abcd-ef0123456789"
# Seed the agent in BOTH locations so we can prove which one was opened.
auth_dir = _seed_existing_agent(tmp_path, auth_uid, "shared-name", soul="# Auth Original")
default_dir = _seed_existing_agent(tmp_path, "default", "shared-name", soul="# Default Original")
config = _assemble_config(
body_context={"agent_name": "shared-name"},
request_user_id=auth_uid,
thread_id="thread-update-2",
)
runtime_ctx = _build_runtime_context("thread-update-2", "run-2", config.get("context"), None)
assert runtime_ctx["user_id"] == auth_uid, "Pre-condition: inject must have placed user_id into runtime_ctx"
_install_runtime_context(config, runtime_ctx)
runtime = Runtime(context=runtime_ctx, store=None)
config.setdefault("configurable", {})["__pregel_runtime"] = runtime
graph = _build_update_graph(soul_payload="# Auth Updated")
with ExitStack() as stack:
for p in _patch_update_agent_dependencies(tmp_path):
stack.enter_context(p)
graph.invoke(
{"messages": [HumanMessage(content="update shared-name")]},
config=config,
)
auth_soul = (auth_dir / "SOUL.md").read_text()
default_soul = (default_dir / "SOUL.md").read_text()
assert auth_soul == "# Auth Updated", f"REGRESSION: update_agent ignored runtime.context['user_id']={auth_uid!r} and routed the write to users/default/ instead. auth_soul={auth_soul!r}, default_soul={default_soul!r}"
assert default_soul == "# Default Original", "REGRESSION: update_agent corrupted the shared default-user agent. It should have written under the authenticated user's path."
# ---------------------------------------------------------------------------
# Positive — when contextvar IS the auth user (the normal HTTP case), things
# already work. Pin it as a regression guard so future refactors don't
# accidentally break the contextvar path in pursuit of the runtime-context fix.
# ---------------------------------------------------------------------------
def test_update_agent_uses_contextvar_when_present(tmp_path: Path, monkeypatch):
"""The normal HTTP case: contextvar is set by auth_middleware. This must
keep working regardless of how runtime.context is populated."""
from types import SimpleNamespace as _SN
from deerflow.runtime.user_context import reset_current_user, set_current_user
auth_uid = "11112222-3333-4444-5555-666677778888"
user = _SN(id=auth_uid, email="ctxvar@local")
_seed_existing_agent(tmp_path, auth_uid, "ctxvar-agent", soul="# Original")
from langgraph.runtime import Runtime
config = _assemble_config(
body_context={"agent_name": "ctxvar-agent"},
request_user_id=auth_uid,
thread_id="thread-update-3",
)
runtime_ctx = _build_runtime_context("thread-update-3", "run-3", config.get("context"), None)
_install_runtime_context(config, runtime_ctx)
runtime = Runtime(context=runtime_ctx, store=None)
config.setdefault("configurable", {})["__pregel_runtime"] = runtime
graph = _build_update_graph(soul_payload="# CtxVar Updated")
with ExitStack() as stack:
for p in _patch_update_agent_dependencies(tmp_path):
stack.enter_context(p)
token = set_current_user(user)
try:
final = graph.invoke(
{"messages": [HumanMessage(content="update ctxvar-agent")]},
config=config,
)
finally:
reset_current_user(token)
# surface the tool's reply for debug if it errored
tool_replies = [m.content for m in final["messages"] if getattr(m, "type", "") == "tool"]
soul = (tmp_path / "users" / auth_uid / "agents" / "ctxvar-agent" / "SOUL.md").read_text()
assert soul == "# CtxVar Updated", f"tool replies: {tool_replies}"