fix(gateway): enforce thread ownership on stateless run endpoints (#3473)

POST /api/runs/stream and /api/runs/wait accept thread_id in the request
body but performed no owner authorization, letting any authenticated user
start runs on -- and read /wait checkpoint channel_values from -- another
user's thread (cross-user IDOR, #3472).

The @require_permission(owner_check=True) decorator resolves ownership from
the thread_id *path* param, so it cannot cover these body-param endpoints.
Enforce ownership inside start_run() before create_or_reject via
ThreadMetaStore.check_access: missing rows (auto-created temp threads) and
NULL-owner rows stay accessible, while a thread owned by another user
returns 404 (matching thread_runs.py). The internal system role (IM
channels acting for platform users) is exempt.

Closes #3472
This commit is contained in:
Xinmin Zeng
2026-06-10 23:03:39 +08:00
committed by GitHub
parent 05ae4467ae
commit ba9cc5e972
2 changed files with 188 additions and 0 deletions
+15
View File
@@ -315,6 +315,21 @@ async def start_run(
detail=f"Model {model_name!r} is not in the configured model allowlist",
)
# Stateless run endpoints carry thread_id in the request *body*, so the
# @require_permission(owner_check=True) decorator -- which resolves ownership
# from the path param -- cannot protect them. Enforce thread ownership here,
# before any run is created, so one user cannot start runs on (or read /wait
# checkpoint state from) another user's thread. Missing rows (auto-created
# temp threads) and NULL-owner rows (shared / pre-auth data) stay accessible
# via check_access; only a thread already owned by another user is rejected
# with 404, matching thread_runs.py's anti-enumeration behaviour. Internal
# channel runs act on behalf of IM users they do not own (see
# inject_authenticated_user_context), so the internal system role is exempt.
user = getattr(request.state, "user", None)
if user is not None and getattr(user, "system_role", None) != INTERNAL_SYSTEM_ROLE:
if not await run_ctx.thread_store.check_access(thread_id, str(user.id)):
raise HTTPException(status_code=404, detail=f"Thread {thread_id} not found")
try:
record = await run_mgr.create_or_reject(
thread_id,
@@ -0,0 +1,173 @@
"""Cross-user isolation for the stateless ``POST /api/runs/stream`` and ``/wait`` endpoints.
These endpoints receive ``thread_id`` in the request body, so the
``@require_permission(owner_check=True)`` decorator — which reads the
``thread_id`` *path* parameter — cannot protect them. The owner check
lives inside ``services.start_run()`` instead; this suite pins it at the
HTTP layer so the gap cannot silently reopen.
Strategy
--------
``app.state.run_manager.create_or_reject`` raises ``ConflictError``, so a
request that *passes* the owner check deterministically short-circuits
with 409 before any agent code runs. The two outcomes:
- 404 + ``create_or_reject`` never awaited -> blocked by the owner check
- 409 + ``create_or_reject`` awaited -> passed the owner check
The thread store is a real ``MemoryThreadMetaStore`` (not a mock) so the
``check_access`` semantics under test — missing row allows, ``user_id``
NULL allows, foreign owner denies — are exercised through real code.
"""
from __future__ import annotations
import asyncio
from contextlib import contextmanager
from types import SimpleNamespace
from unittest.mock import AsyncMock, MagicMock
from uuid import uuid4
import pytest
from _router_auth_helpers import make_authed_test_app
from fastapi.testclient import TestClient
from langgraph.store.memory import InMemoryStore
from app.gateway.auth.models import User
from app.gateway.routers import runs
from deerflow.config.app_config import AppConfig, reset_app_config, set_app_config
from deerflow.persistence.thread_meta.memory import MemoryThreadMetaStore
from deerflow.runtime import ConflictError
USER_A = User(email="owner-a@example.com", password_hash="x", system_role="user", id=uuid4())
USER_B = User(email="intruder-b@example.com", password_hash="x", system_role="user", id=uuid4())
INTERNAL_USER = SimpleNamespace(id="default", system_role="internal")
THREAD_A = "thread-owned-by-a"
THREAD_SHARED = "thread-shared-null-owner"
@pytest.fixture(autouse=True)
def _stub_app_config():
"""Inject a minimal AppConfig so the allowed path (which builds a
RunContext via ``get_config()``) never reads config.yaml from disk."""
set_app_config(AppConfig.model_validate({"sandbox": {"use": "deerflow.sandbox.local:LocalSandboxProvider"}}))
yield
reset_app_config()
def _make_thread_store() -> MemoryThreadMetaStore:
store = MemoryThreadMetaStore(InMemoryStore())
async def _seed():
await store.create(THREAD_A, user_id=str(USER_A.id))
await store.create(THREAD_SHARED, user_id=None)
asyncio.run(_seed())
return store
@contextmanager
def _client(user):
"""Yield a ``TestClient`` authenticated as ``user`` plus the stubbed
``create_or_reject`` mock, closing the client (and its anyio portal /
background threads) on exit.
``create_or_reject`` raises ``ConflictError`` so a request that passes the
owner check short-circuits to 409 before any agent code runs.
"""
app = make_authed_test_app(user_factory=lambda: user)
app.include_router(runs.router)
app.state.thread_store = _make_thread_store()
app.state.stream_bridge = MagicMock()
app.state.checkpointer = MagicMock()
app.state.store = MagicMock()
app.state.run_events_config = None
app.state.run_event_store = MagicMock()
run_manager = MagicMock()
run_manager.create_or_reject = AsyncMock(side_effect=ConflictError("sentinel: owner check passed"))
app.state.run_manager = run_manager
with TestClient(app) as client:
yield client, run_manager.create_or_reject
def _body(thread_id: str | None = None) -> dict:
if thread_id is None:
return {}
return {"config": {"configurable": {"thread_id": thread_id}}}
# ---------------------------------------------------------------------------
# Denied: another user's thread
# ---------------------------------------------------------------------------
def test_stream_cross_user_returns_404():
"""User B cannot start a run on user A's thread via /api/runs/stream."""
with _client(USER_B) as (client, create_or_reject):
response = client.post("/api/runs/stream", json=_body(THREAD_A))
assert response.status_code == 404
assert response.json()["detail"] == f"Thread {THREAD_A} not found"
create_or_reject.assert_not_awaited()
def test_wait_cross_user_returns_404_without_channel_values():
"""User B cannot read user A's checkpoint state via /api/runs/wait."""
with _client(USER_B) as (client, create_or_reject):
response = client.post("/api/runs/wait", json=_body(THREAD_A))
assert response.status_code == 404
assert response.json() == {"detail": f"Thread {THREAD_A} not found"}
create_or_reject.assert_not_awaited()
# ---------------------------------------------------------------------------
# Allowed: owner, fresh/untracked/shared threads, internal role
# ---------------------------------------------------------------------------
def test_stream_owner_passes_owner_check():
"""User A reaches run creation on their own thread (409 sentinel)."""
with _client(USER_A) as (client, create_or_reject):
response = client.post("/api/runs/stream", json=_body(THREAD_A))
assert response.status_code == 409
create_or_reject.assert_awaited()
def test_wait_owner_passes_owner_check():
with _client(USER_A) as (client, create_or_reject):
response = client.post("/api/runs/wait", json=_body(THREAD_A))
assert response.status_code == 409
create_or_reject.assert_awaited()
def test_stream_without_thread_id_passes_owner_check():
"""Stateless run with no thread_id auto-creates a thread — never blocked."""
with _client(USER_B) as (client, create_or_reject):
response = client.post("/api/runs/stream", json=_body())
assert response.status_code == 409
create_or_reject.assert_awaited()
def test_stream_untracked_thread_passes_owner_check():
"""A thread_id with no thread_meta row (untracked legacy) stays accessible."""
with _client(USER_B) as (client, create_or_reject):
response = client.post("/api/runs/stream", json=_body("never-created-thread"))
assert response.status_code == 409
create_or_reject.assert_awaited()
def test_stream_shared_thread_passes_owner_check():
"""A thread_meta row with user_id NULL (shared / pre-auth data) stays accessible."""
with _client(USER_B) as (client, create_or_reject):
response = client.post("/api/runs/stream", json=_body(THREAD_SHARED))
assert response.status_code == 409
create_or_reject.assert_awaited()
def test_stream_internal_role_bypasses_owner_check():
"""IM channels run with the internal system role on behalf of platform
users whose threads they do not own — the owner check must not break them."""
with _client(INTERNAL_USER) as (client, create_or_reject):
response = client.post("/api/runs/stream", json=_body(THREAD_A))
assert response.status_code == 409
create_or_reject.assert_awaited()