From 525af0da144833327f433b13895ab256155cb3b0 Mon Sep 17 00:00:00 2001 From: Nan Gao Date: Thu, 18 Jun 2026 05:45:35 +0200 Subject: [PATCH] fix(channels): scope IM files and helper commands to owner (#3579) * fix(channels): scope IM files and helper commands to owner * fix(memory): honor bound IM owner for /memory gateway endpoints The channel manager already attaches X-DeerFlow-Owner-User-Id for /memory and /models, but the memory router resolved user_id solely from get_effective_user_id(), which returns the synthetic internal user (DEFAULT_USER_ID) for channel workers. A bound IM /memory therefore read the default/internal memory instead of the connection owner's. Resolve the owner via _resolve_memory_user_id(request) across all /api/memory* endpoints: trusted internal callers act for the owner header, browser/API callers fall back to get_effective_user_id(). Mirrors the threads router's get_trusted_internal_owner_user_id pattern, completing acceptance criterion #3 of #3539. Add end-to-end tests asserting the resolved user_id (not just that the header is sent) and that a spoofed owner header from a browser user is ignored. Co-Authored-By: Claude Opus 4.8 * fix(channels): align memory bucket and reuse cached storage owner Address PR #3579 review feedback: - Memory router now sanitizes the trusted owner header via make_safe_user_id before routing, matching the channel file pipeline (_safe_user_id_for_run/prepare_user_dir_for_raw_id). A bound owner id needing sanitization now resolves to the same bucket as its files/uploads instead of 500ing in _validate_user_id. - _handle_chat reuses the storage_user_id cached at the top of the method for artifact delivery instead of re-deriving _channel_storage_user_id(msg), so uploads and outputs cannot drift to different buckets if a channel rewrites the InboundMessage in receive_file. Co-Authored-By: Claude Opus 4.8 * fix(channels): stage unbound IM files under the run's user bucket Address PR #3579 review feedback (#5): _channel_storage_user_id now mirrors _resolve_run_params' identity policy, falling back to safe(msg.user_id) instead of returning None for unbound auth-enabled channels. Previously an unbound msg ran under safe(platform_user_id) but staged uploads under get_effective_user_id() in the dispatcher task (unset contextvar -> "default"), so files landed in users/default/... while the agent read from users/{safe_platform_user_id}/.... Bound and unbound channels now write where the agent reads. Returns None only when no identity is available. Co-Authored-By: Claude Opus 4.8 * fix(channels): reuse cached storage owner in streaming artifact delivery Address PR #3579 review feedback (#6): thread the storage_user_id resolved in _handle_chat into _handle_streaming_chat instead of re-deriving _channel_storage_user_id(msg) in the finally block. Avoids re-running _safe_user_id_for_run (and its possible filesystem touch) on the streaming-error path and guarantees artifact delivery targets the same bucket as the uploads. Co-Authored-By: Claude Opus 4.8 * docs(channels): document owner-scoped IM file storage Address PR #3579 review feedback (#4): the IM Channels and File Upload sections still described pre-PR default-bucket behaviour. Document that receive_file, _ingest_inbound_files/ensure_uploads_dir/get_uploads_dir, and _resolve_attachments/_prepare_artifact_delivery are owner-scoped via the user_id kwarg, and that the bucket matches the memory bucket from _resolve_memory_user_id. Co-Authored-By: Claude Opus 4.8 * refactor(channels): unify run identity and storage bucket resolution Address PR #3579 review feedback (#3): _resolve_run_params no longer duplicates the owner-resolution rule inline. After the #5 fix the inline block and _channel_storage_user_id computed the identical sanitized-with-platform-fallback value, so the run identity now calls the same helper, making it the single source of truth for run_context["user_id"] and the file/artifact storage bucket. _owner_headers stays deliberately separate: it sends the raw owner id over HTTP for the gateway to re-resolve (no sanitize, no platform fallback), documented on both helpers. test_run_identity_matches_storage_bucket pins the two together so they cannot drift again. Co-Authored-By: Claude Opus 4.8 --------- Co-authored-by: Claude Opus 4.8 --- backend/CLAUDE.md | 9 +- backend/app/channels/base.py | 4 +- backend/app/channels/feishu.py | 22 +- backend/app/channels/manager.py | 81 +++++-- backend/app/gateway/routers/memory.py | 62 ++++-- .../harness/deerflow/uploads/manager.py | 8 +- backend/tests/test_channels.py | 203 +++++++++++++++++- backend/tests/test_memory_router.py | 127 +++++++++++ 8 files changed, 461 insertions(+), 55 deletions(-) diff --git a/backend/CLAUDE.md b/backend/CLAUDE.md index f18bc7760..60159bc56 100644 --- a/backend/CLAUDE.md +++ b/backend/CLAUDE.md @@ -405,6 +405,12 @@ Bridges external messaging platforms (Feishu, Slack, Telegram, Discord, DingTalk 10. For commands (`/new`, `/status`, `/models`, `/memory`, `/help`): handle locally or query Gateway API 11. Outbound → channel callbacks → platform reply +**Owner-scoped file storage**: inbound files, uploads, and output artifacts are staged under the DeerFlow owner's bucket so they land where the agent run reads/writes (`users/{user_id}/threads/{thread_id}/user-data/{uploads,outputs}`). `ChannelManager._handle_chat` resolves the storage owner once via `_channel_storage_user_id(msg)` (sanitized owner id, falling back to `safe(msg.user_id)` for unbound auth-enabled channels — mirroring `_resolve_run_params`'s run identity; `None` only when no identity is available) and threads it as the `user_id=` kwarg through the file pipeline: +- `Channel.receive_file(msg, thread_id, user_id=...)` — owner-bound channels persist downloaded files under the owner's bucket instead of the default bucket +- `_ingest_inbound_files(...)` and the underlying `ensure_uploads_dir` / `get_uploads_dir` — owner-scoped via the same kwarg +- `_resolve_attachments` / `_prepare_artifact_delivery` — resolve output artifacts from the bound owner's bucket +The cached value is reused for both the blocking (`runs.wait`) and streaming (`_handle_streaming_chat`) paths, so uploads and artifact delivery always target the same bucket even if a channel returns a rewritten `InboundMessage` from `receive_file`. The bucket id matches the memory bucket resolved by `_resolve_memory_user_id` (both normalize through `make_safe_user_id`). + **Configuration** (`config.yaml` -> `channels`): - `langgraph_url` - LangGraph-compatible Gateway API base URL (default: `http://localhost:8001/api`) - `gateway_url` - Gateway API URL for auxiliary commands (default: `http://localhost:8001`) @@ -438,6 +444,7 @@ Bridges external messaging platforms (Feishu, Slack, Telegram, Discord, DingTalk - Per-agent per-user memory at `{base_dir}/users/{user_id}/agents/{agent_name}/memory.json` - Custom agent definitions (`SOUL.md` + `config.yaml`) are also per-user at `{base_dir}/users/{user_id}/agents/{agent_name}/`. The legacy shared layout `{base_dir}/agents/{agent_name}/` remains read-only fallback for unmigrated installations - `user_id` is resolved via `get_effective_user_id()` from `deerflow.runtime.user_context` +- The `/api/memory*` endpoints resolve the owner through `_resolve_memory_user_id(request)`: trusted internal callers (IM channel workers carrying the `X-DeerFlow-Owner-User-Id` header, e.g. a bound `/memory` command) act for the connection owner; browser/API callers fall back to `get_effective_user_id()`. The header is only honored after `AuthMiddleware` validated the internal token, mirroring `get_trusted_internal_owner_user_id` used by the threads router - In no-auth mode, `user_id` defaults to `"default"` (constant `DEFAULT_USER_ID`) - Absolute `storage_path` in config opts out of per-user isolation - **Migration**: Run `PYTHONPATH=. python scripts/migrate_user_isolation.py` to move legacy `memory.json`, `threads/`, and `agents/` into per-user layout. Supports `--dry-run` (preview changes) and `--user-id USER_ID` (assign unowned legacy data to a user, defaults to `default`). @@ -624,7 +631,7 @@ Multi-file upload with automatic document conversion: - Supports: PDF, PPT, Excel, Word documents (converted via `markitdown`) - Rejects directory inputs before copying so uploads stay all-or-nothing - Reuses one conversion worker per request when called from an active event loop -- Files stored in thread-isolated directories +- Files stored in thread-isolated directories under the resolving user's bucket (`users/{user_id}/threads/{thread_id}/user-data/uploads`). For IM channels the owner is threaded explicitly via the `user_id=` kwarg (see IM Channels → Owner-scoped file storage); HTTP/embedded callers resolve it from `get_effective_user_id()` - Duplicate filenames in a single upload request are auto-renamed with `_N` suffixes so later files do not truncate earlier files - Agent receives uploaded file list via `UploadsMiddleware` diff --git a/backend/app/channels/base.py b/backend/app/channels/base.py index 2e9868c21..ecdeda1b2 100644 --- a/backend/app/channels/base.py +++ b/backend/app/channels/base.py @@ -178,7 +178,7 @@ class Channel(ABC): except Exception: logger.exception("[%s] failed to upload file %s", self.name, attachment.filename) - async def receive_file(self, msg: InboundMessage, thread_id: str) -> InboundMessage: + async def receive_file(self, msg: InboundMessage, thread_id: str, *, user_id: str | None = None) -> InboundMessage: """ Optionally process and materialize inbound file attachments for this channel. @@ -190,8 +190,10 @@ class Channel(ABC): Args: msg: The inbound message, possibly containing file metadata in msg.files. thread_id: The resolved DeerFlow thread ID for sandbox path context. + user_id: Optional DeerFlow storage user ID for user-scoped channel workers. Returns: The (possibly modified) InboundMessage, with text and/or files updated as needed. """ + del user_id return msg diff --git a/backend/app/channels/feishu.py b/backend/app/channels/feishu.py index f714207d3..6e235df75 100644 --- a/backend/app/channels/feishu.py +++ b/backend/app/channels/feishu.py @@ -311,7 +311,7 @@ class FeishuChannel(Channel): raise RuntimeError(f"Feishu file upload failed: code={response.code}, msg={response.msg}") return response.data.file_key - async def receive_file(self, msg: InboundMessage, thread_id: str) -> InboundMessage: + async def receive_file(self, msg: InboundMessage, thread_id: str, *, user_id: str | None = None) -> InboundMessage: """Download a Feishu file into the thread uploads directory. Returns the sandbox virtual path when the image is persisted successfully. @@ -326,15 +326,23 @@ class FeishuChannel(Channel): text = msg.text for file in files: if file.get("image_key"): - virtual_path = await self._receive_single_file(msg.thread_ts, file["image_key"], "image", thread_id) + virtual_path = await self._receive_single_file(msg.thread_ts, file["image_key"], "image", thread_id, user_id=user_id) text = text.replace("[image]", virtual_path, 1) elif file.get("file_key"): - virtual_path = await self._receive_single_file(msg.thread_ts, file["file_key"], "file", thread_id) + virtual_path = await self._receive_single_file(msg.thread_ts, file["file_key"], "file", thread_id, user_id=user_id) text = text.replace("[file]", virtual_path, 1) msg.text = text return msg - async def _receive_single_file(self, message_id: str, file_key: str, type: Literal["image", "file"], thread_id: str) -> str: + async def _receive_single_file( + self, + message_id: str, + file_key: str, + type: Literal["image", "file"], + thread_id: str, + *, + user_id: str | None = None, + ) -> str: request = self._GetMessageResourceRequest.builder().message_id(message_id).file_key(file_key).type(type).build() def inner(): @@ -373,9 +381,9 @@ class FeishuChannel(Channel): return f"Failed to obtain the [{type}]" paths = get_paths() - user_id = get_effective_user_id() - paths.ensure_thread_dirs(thread_id, user_id=user_id) - uploads_dir = paths.sandbox_uploads_dir(thread_id, user_id=user_id).resolve() + effective_user_id = user_id or get_effective_user_id() + paths.ensure_thread_dirs(thread_id, user_id=effective_user_id) + uploads_dir = paths.sandbox_uploads_dir(thread_id, user_id=effective_user_id).resolve() ext = "png" if type == "image" else "bin" raw_filename = getattr(response, "file_name", "") or f"feishu_{file_key[-12:]}.{ext}" diff --git a/backend/app/channels/manager.py b/backend/app/channels/manager.py index df8ca2b28..d030c7db7 100644 --- a/backend/app/channels/manager.py +++ b/backend/app/channels/manager.py @@ -525,6 +525,34 @@ def _safe_user_id_for_run(raw_user_id: str) -> str: return make_safe_user_id(raw_user_id) +def _channel_storage_user_id(msg: InboundMessage) -> str | None: + """Resolve the canonical DeerFlow user id for a channel-triggered message. + + Single source of truth for both the agent **run identity** + (``_resolve_run_params`` → ``run_context["user_id"]``) and the **file/artifact + storage bucket** (``receive_file`` / ``_ingest_inbound_files`` / + ``_prepare_artifact_delivery``), so the bucket the agent reads/writes always + matches where channel files are staged. Prefer the bound DeerFlow owner, + otherwise fall back to the sanitized raw platform user id. Without that + fallback, an unbound auth-enabled channel would run under ``safe(msg.user_id)`` + but stage files under ``get_effective_user_id()`` (the dispatcher task's unset + contextvar → ``"default"``), so uploads would land in ``users/default/...`` + while the agent reads ``users/{safe_platform_user_id}/...``. Returns ``None`` + only when neither identity is available, leaving the caller to fall back to the + contextvar/default user. + + Distinct from :func:`_owner_headers`, which deliberately sends the *raw* owner + id (no sanitize, no platform fallback) over HTTP for gateway to re-resolve; + this helper is the in-process, sanitized, filesystem-facing identity. + """ + owner_user_id = _effective_owner_user_id(msg) + if owner_user_id: + return _safe_user_id_for_run(owner_user_id) + if msg.user_id: + return _safe_user_id_for_run(msg.user_id) + return None + + def _resolve_slash_skill_command( text: str, available_skills: set[str] | None = None, @@ -551,7 +579,7 @@ def _resolve_slash_skill_command( raise SlashSkillCommandResolutionError("Failed to resolve slash skill command. Please check the skill configuration.") from exc -def _resolve_attachments(thread_id: str, artifacts: list[str]) -> list[ResolvedAttachment]: +def _resolve_attachments(thread_id: str, artifacts: list[str], *, user_id: str | None = None) -> list[ResolvedAttachment]: """Resolve virtual artifact paths to host filesystem paths with metadata. Only paths under ``/mnt/user-data/outputs/`` are accepted; any other @@ -565,15 +593,15 @@ def _resolve_attachments(thread_id: str, artifacts: list[str]) -> list[ResolvedA attachments: list[ResolvedAttachment] = [] paths = get_paths() - user_id = get_effective_user_id() - outputs_dir = paths.sandbox_outputs_dir(thread_id, user_id=user_id).resolve() + effective_user_id = user_id or get_effective_user_id() + outputs_dir = paths.sandbox_outputs_dir(thread_id, user_id=effective_user_id).resolve() for virtual_path in artifacts: # Security: only allow files from the agent outputs directory if not virtual_path.startswith(_OUTPUTS_VIRTUAL_PREFIX): logger.warning("[Manager] rejected non-outputs artifact path: %s", virtual_path) continue try: - actual = paths.resolve_virtual_path(thread_id, virtual_path, user_id=user_id) + actual = paths.resolve_virtual_path(thread_id, virtual_path, user_id=effective_user_id) # Verify the resolved path is actually under the outputs directory # (guards against path-traversal even after prefix check) try: @@ -605,13 +633,15 @@ def _prepare_artifact_delivery( thread_id: str, response_text: str, artifacts: list[str], + *, + user_id: str | None = None, ) -> tuple[str, list[ResolvedAttachment]]: """Resolve attachments and append filename fallbacks to the text response.""" attachments: list[ResolvedAttachment] = [] if not artifacts: return response_text, attachments - attachments = _resolve_attachments(thread_id, artifacts) + attachments = _resolve_attachments(thread_id, artifacts, user_id=user_id) resolved_virtuals = {attachment.virtual_path for attachment in attachments} unresolved = [path for path in artifacts if path not in resolved_virtuals] @@ -628,7 +658,7 @@ def _prepare_artifact_delivery( return response_text, attachments -async def _ingest_inbound_files(thread_id: str, msg: InboundMessage) -> list[dict[str, Any]]: +async def _ingest_inbound_files(thread_id: str, msg: InboundMessage, *, user_id: str | None = None) -> list[dict[str, Any]]: if not msg.files: return [] @@ -643,7 +673,7 @@ async def _ingest_inbound_files(thread_id: str, msg: InboundMessage) -> list[dic def _prepare_uploads_dir() -> tuple[Path, set[str]]: # Worker thread: ensure_uploads_dir's mkdir and the iterdir enumeration are # blocking filesystem IO that must stay off the event loop. - target = ensure_uploads_dir(thread_id) + target = ensure_uploads_dir(thread_id, user_id=user_id) existing = {entry.name for entry in target.iterdir() if entry.is_file()} return target, existing @@ -833,11 +863,12 @@ class ChannelManager: # owns the connection. Preserve the raw platform user under # ``channel_user_id`` for platform-facing lookups and audits. run_context_identity: dict[str, Any] = {"thread_id": thread_id} - owner_user_id = _effective_owner_user_id(msg) - if owner_user_id: - run_context_identity["user_id"] = _safe_user_id_for_run(owner_user_id) - elif msg.user_id: - run_context_identity["user_id"] = _safe_user_id_for_run(msg.user_id) + # Single source of truth for the run identity: the same helper that scopes + # inbound files and outbound artifacts, so the bucket the agent reads/writes + # always matches where channel files are staged. + run_user_id = _channel_storage_user_id(msg) + if run_user_id: + run_context_identity["user_id"] = run_user_id if msg.user_id: run_context_identity["channel_user_id"] = msg.user_id @@ -1215,6 +1246,7 @@ class ChannelManager: return client = self._get_client() + storage_user_id = _channel_storage_user_id(msg) # Look up existing DeerFlow thread. # topic_id may be None (e.g. Telegram private chats) — the store @@ -1240,12 +1272,12 @@ class ChannelManager: service = get_channel_service() channel = service.get_channel(msg.channel_name) if service else None logger.info("[Manager] preparing receive file context for %d attachments", len(msg.files)) - msg = await channel.receive_file(msg, thread_id) if channel else msg + msg = await channel.receive_file(msg, thread_id, user_id=storage_user_id) if channel else msg if extra_context: run_context.update(extra_context) original_text = msg.text - uploaded = await _ingest_inbound_files(thread_id, msg) + uploaded = await _ingest_inbound_files(thread_id, msg, user_id=storage_user_id) if uploaded: msg.text = f"{_format_uploaded_files_block(uploaded)}\n\n{msg.text}".strip() human_message = _human_input_message(msg.text, original_content=original_text) @@ -1259,6 +1291,7 @@ class ChannelManager: run_config, run_context, human_message, + storage_user_id=storage_user_id, ) return @@ -1296,7 +1329,10 @@ class ChannelManager: len(artifacts), ) - response_text, attachments = _prepare_artifact_delivery(thread_id, response_text, artifacts) + # Reuse the storage owner cached at the top of _handle_chat so uploads and + # artifact delivery always resolve to the same bucket, even if a future + # channel.receive_file returns a rewritten InboundMessage. + response_text, attachments = _prepare_artifact_delivery(thread_id, response_text, artifacts, user_id=storage_user_id) if not response_text: if attachments: @@ -1328,6 +1364,7 @@ class ChannelManager: run_config: dict[str, Any], run_context: dict[str, Any], human_message: dict[str, Any], + storage_user_id: str | None = None, ) -> None: logger.info("[Manager] invoking runs.stream(thread_id=%s, text_len=%d)", thread_id, len(msg.text or "")) @@ -1400,7 +1437,10 @@ class ChannelManager: response_text = _extract_response_text(result) pending_clarification = _has_current_turn_clarification(result) artifacts = _extract_artifacts(result) - response_text, attachments = _prepare_artifact_delivery(thread_id, response_text, artifacts) + # Reuse the storage owner resolved by _handle_chat so artifact delivery + # matches the upload bucket and we avoid re-running _safe_user_id_for_run + # (and its possible filesystem touch) on the streaming-error path. + response_text, attachments = _prepare_artifact_delivery(thread_id, response_text, artifacts, user_id=storage_user_id) if not response_text: if attachments: @@ -1481,9 +1521,9 @@ class ChannelManager: thread_id = await self._lookup_thread_id(msg) reply = f"Active thread: {thread_id}" if thread_id else "No active conversation." elif reply is None and command == "models": - reply = await self._fetch_gateway("/api/models", "models") + reply = await self._fetch_gateway("/api/models", "models", msg=msg) elif reply is None and command == "memory": - reply = await self._fetch_gateway("/api/memory", "memory") + reply = await self._fetch_gateway("/api/memory", "memory", msg=msg) elif reply is None and command == "help": reply = ( "Available commands:\n" @@ -1526,16 +1566,17 @@ class ChannelManager: ) await self.bus.publish_outbound(outbound) - async def _fetch_gateway(self, path: str, kind: str) -> str: + async def _fetch_gateway(self, path: str, kind: str, *, msg: InboundMessage | None = None) -> str: """Fetch data from the Gateway API for command responses.""" import httpx try: + headers = _owner_headers(msg) if msg is not None else None async with httpx.AsyncClient() as http: resp = await http.get( f"{self._gateway_url}{path}", timeout=10, - headers=create_internal_auth_headers(), + headers=headers or create_internal_auth_headers(), ) resp.raise_for_status() data = resp.json() diff --git a/backend/app/gateway/routers/memory.py b/backend/app/gateway/routers/memory.py index fd413a715..6275303e5 100644 --- a/backend/app/gateway/routers/memory.py +++ b/backend/app/gateway/routers/memory.py @@ -1,8 +1,9 @@ """Memory API router for retrieving and managing global memory data.""" -from fastapi import APIRouter, HTTPException +from fastapi import APIRouter, HTTPException, Request from pydantic import BaseModel, Field +from app.gateway.internal_auth import get_trusted_internal_owner_user_id from deerflow.agents.memory.updater import ( clear_memory_data, create_memory_fact, @@ -13,11 +14,34 @@ from deerflow.agents.memory.updater import ( update_memory_fact, ) from deerflow.config.memory_config import get_memory_config +from deerflow.config.paths import make_safe_user_id from deerflow.runtime.user_context import get_effective_user_id router = APIRouter(prefix="/api", tags=["memory"]) +def _resolve_memory_user_id(request: Request) -> str: + """Resolve the memory owner for this request. + + Honors the trusted internal owner header that channel workers attach when + acting for a connection owner, so an IM ``/memory`` command reads the bound + owner's memory instead of the synthetic internal user. The header is only + honored after ``AuthMiddleware`` validated the internal token (see + ``get_trusted_internal_owner_user_id``). Browser/API callers are never + internal, so this falls back to the normal contextvar-based effective user. + + The trusted owner header carries the *raw* owner id, so sanitize it through + ``make_safe_user_id`` (the same normalization the channel file pipeline applies + via ``_safe_user_id_for_run``/``prepare_user_dir_for_raw_id``). This keeps the + memory bucket aligned with the owner's file/upload bucket and avoids a 500 when + the raw id contains characters ``_validate_user_id`` would reject. + """ + raw_owner = get_trusted_internal_owner_user_id(request) + if raw_owner: + return make_safe_user_id(raw_owner) + return get_effective_user_id() + + class ContextSection(BaseModel): """Model for context sections (user and history).""" @@ -115,7 +139,7 @@ class MemoryStatusResponse(BaseModel): summary="Get Memory Data", description="Retrieve the current global memory data including user context, history, and facts.", ) -async def get_memory() -> MemoryResponse: +async def get_memory(http_request: Request) -> MemoryResponse: """Get the current global memory data. Returns: @@ -149,7 +173,7 @@ async def get_memory() -> MemoryResponse: } ``` """ - memory_data = get_memory_data(user_id=get_effective_user_id()) + memory_data = get_memory_data(user_id=_resolve_memory_user_id(http_request)) return MemoryResponse(**memory_data) @@ -160,7 +184,7 @@ async def get_memory() -> MemoryResponse: summary="Reload Memory Data", description="Reload memory data from the storage file, refreshing the in-memory cache.", ) -async def reload_memory() -> MemoryResponse: +async def reload_memory(http_request: Request) -> MemoryResponse: """Reload memory data from file. This forces a reload of the memory data from the storage file, @@ -169,7 +193,7 @@ async def reload_memory() -> MemoryResponse: Returns: The reloaded memory data. """ - memory_data = reload_memory_data(user_id=get_effective_user_id()) + memory_data = reload_memory_data(user_id=_resolve_memory_user_id(http_request)) return MemoryResponse(**memory_data) @@ -180,10 +204,10 @@ async def reload_memory() -> MemoryResponse: summary="Clear All Memory Data", description="Delete all saved memory data and reset the memory structure to an empty state.", ) -async def clear_memory() -> MemoryResponse: +async def clear_memory(http_request: Request) -> MemoryResponse: """Clear all persisted memory data.""" try: - memory_data = clear_memory_data(user_id=get_effective_user_id()) + memory_data = clear_memory_data(user_id=_resolve_memory_user_id(http_request)) except OSError as exc: raise HTTPException(status_code=500, detail="Failed to clear memory data.") from exc @@ -197,14 +221,14 @@ async def clear_memory() -> MemoryResponse: summary="Create Memory Fact", description="Create a single saved memory fact manually.", ) -async def create_memory_fact_endpoint(request: FactCreateRequest) -> MemoryResponse: +async def create_memory_fact_endpoint(request: FactCreateRequest, http_request: Request) -> MemoryResponse: """Create a single fact manually.""" try: memory_data = create_memory_fact( content=request.content, category=request.category, confidence=request.confidence, - user_id=get_effective_user_id(), + user_id=_resolve_memory_user_id(http_request), ) except ValueError as exc: raise _map_memory_fact_value_error(exc) from exc @@ -221,10 +245,10 @@ async def create_memory_fact_endpoint(request: FactCreateRequest) -> MemoryRespo summary="Delete Memory Fact", description="Delete a single saved memory fact by its fact id.", ) -async def delete_memory_fact_endpoint(fact_id: str) -> MemoryResponse: +async def delete_memory_fact_endpoint(fact_id: str, http_request: Request) -> MemoryResponse: """Delete a single fact from memory by fact id.""" try: - memory_data = delete_memory_fact(fact_id, user_id=get_effective_user_id()) + memory_data = delete_memory_fact(fact_id, user_id=_resolve_memory_user_id(http_request)) except KeyError as exc: raise HTTPException(status_code=404, detail=f"Memory fact '{fact_id}' not found.") from exc except OSError as exc: @@ -240,7 +264,7 @@ async def delete_memory_fact_endpoint(fact_id: str) -> MemoryResponse: summary="Patch Memory Fact", description="Partially update a single saved memory fact by its fact id while preserving omitted fields.", ) -async def update_memory_fact_endpoint(fact_id: str, request: FactPatchRequest) -> MemoryResponse: +async def update_memory_fact_endpoint(fact_id: str, request: FactPatchRequest, http_request: Request) -> MemoryResponse: """Partially update a single fact manually.""" try: memory_data = update_memory_fact( @@ -248,7 +272,7 @@ async def update_memory_fact_endpoint(fact_id: str, request: FactPatchRequest) - content=request.content, category=request.category, confidence=request.confidence, - user_id=get_effective_user_id(), + user_id=_resolve_memory_user_id(http_request), ) except ValueError as exc: raise _map_memory_fact_value_error(exc) from exc @@ -267,9 +291,9 @@ async def update_memory_fact_endpoint(fact_id: str, request: FactPatchRequest) - summary="Export Memory Data", description="Export the current global memory data as JSON for backup or transfer.", ) -async def export_memory() -> MemoryResponse: +async def export_memory(http_request: Request) -> MemoryResponse: """Export the current memory data.""" - memory_data = get_memory_data(user_id=get_effective_user_id()) + memory_data = get_memory_data(user_id=_resolve_memory_user_id(http_request)) return MemoryResponse(**memory_data) @@ -280,10 +304,10 @@ async def export_memory() -> MemoryResponse: summary="Import Memory Data", description="Import and overwrite the current global memory data from a JSON payload.", ) -async def import_memory(request: MemoryResponse) -> MemoryResponse: +async def import_memory(request: MemoryResponse, http_request: Request) -> MemoryResponse: """Import and persist memory data.""" try: - memory_data = import_memory_data(request.model_dump(), user_id=get_effective_user_id()) + memory_data = import_memory_data(request.model_dump(), user_id=_resolve_memory_user_id(http_request)) except OSError as exc: raise HTTPException(status_code=500, detail="Failed to import memory data.") from exc @@ -336,14 +360,14 @@ async def get_memory_config_endpoint() -> MemoryConfigResponse: summary="Get Memory Status", description="Retrieve both memory configuration and current data in a single request.", ) -async def get_memory_status() -> MemoryStatusResponse: +async def get_memory_status(http_request: Request) -> MemoryStatusResponse: """Get the memory system status including configuration and data. Returns: Combined memory configuration and current data. """ config = get_memory_config() - memory_data = get_memory_data(user_id=get_effective_user_id()) + memory_data = get_memory_data(user_id=_resolve_memory_user_id(http_request)) return MemoryStatusResponse( config=MemoryConfigResponse( diff --git a/backend/packages/harness/deerflow/uploads/manager.py b/backend/packages/harness/deerflow/uploads/manager.py index 3014f65ab..3605cd5b2 100644 --- a/backend/packages/harness/deerflow/uploads/manager.py +++ b/backend/packages/harness/deerflow/uploads/manager.py @@ -37,15 +37,15 @@ def validate_thread_id(thread_id: str) -> None: raise ValueError(f"Invalid thread_id: {thread_id!r}") -def get_uploads_dir(thread_id: str) -> Path: +def get_uploads_dir(thread_id: str, *, user_id: str | None = None) -> Path: """Return the uploads directory path for a thread (no side effects).""" validate_thread_id(thread_id) - return get_paths().sandbox_uploads_dir(thread_id, user_id=get_effective_user_id()) + return get_paths().sandbox_uploads_dir(thread_id, user_id=user_id or get_effective_user_id()) -def ensure_uploads_dir(thread_id: str) -> Path: +def ensure_uploads_dir(thread_id: str, *, user_id: str | None = None) -> Path: """Return the uploads directory for a thread, creating it if needed.""" - base = get_uploads_dir(thread_id) + base = get_uploads_dir(thread_id, user_id=user_id) base.mkdir(parents=True, exist_ok=True) return base diff --git a/backend/tests/test_channels.py b/backend/tests/test_channels.py index fb7053370..ed17b1411 100644 --- a/backend/tests/test_channels.py +++ b/backend/tests/test_channels.py @@ -679,6 +679,55 @@ class TestChannelManager: _run(go()) + def test_fetch_gateway_uses_bound_owner_headers(self, monkeypatch): + from app.channels.manager import ChannelManager + from app.gateway.internal_auth import INTERNAL_OWNER_USER_ID_HEADER_NAME + + class MockResponse: + def raise_for_status(self): + return None + + def json(self): + return {"facts": [{"text": "owner fact"}]} + + class MockAsyncClient: + def __init__(self, *args, **kwargs): + return None + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc, tb): + return None + + async def get(self, url, **kwargs): + calls.append({"url": url, **kwargs}) + return MockResponse() + + calls = [] + monkeypatch.setattr("app.channels.manager.httpx.AsyncClient", MockAsyncClient) + + async def go(): + bus = MessageBus() + store = ChannelStore(path=Path(tempfile.mkdtemp()) / "store.json") + manager = ChannelManager(bus=bus, store=store, gateway_url="http://gateway:8001") + msg = InboundMessage( + channel_name="slack", + chat_id="C123", + user_id="U-platform", + owner_user_id="deerflow-user-1", + connection_id="connection-1", + text="/memory", + msg_type=InboundMessageType.COMMAND, + ) + + reply = await manager._fetch_gateway("/api/memory", "memory", msg=msg) + + assert reply == "Memory contains 1 fact(s)." + assert calls[0]["headers"][INTERNAL_OWNER_USER_ID_HEADER_NAME] == "deerflow-user-1" + + _run(go()) + def test_handle_chat_calls_channel_receive_file_for_inbound_files(self, monkeypatch): from app.channels.manager import ChannelManager @@ -716,7 +765,9 @@ class TestChannelManager: inbound = InboundMessage( channel_name="test", chat_id="chat1", - user_id="user1", + user_id="platform-user", + owner_user_id="owner-1", + connection_id="connection-1", text="hi [image]", files=[{"image_key": "img_1"}], ) @@ -729,6 +780,7 @@ class TestChannelManager: assert called_msg.text == "hi [image]" assert isinstance(called_thread_id, str) assert called_thread_id + assert mock_channel.receive_file.await_args.kwargs["user_id"] == "owner-1" mock_client.runs.wait.assert_called_once() run_call_args = mock_client.runs.wait.call_args @@ -736,6 +788,70 @@ class TestChannelManager: _run(go()) + def test_ingest_inbound_files_uses_explicit_owner_bucket(self, tmp_path, monkeypatch): + from app.channels.manager import INBOUND_FILE_READERS, _ingest_inbound_files + from deerflow.config.paths import Paths + + paths = Paths(tmp_path) + monkeypatch.setattr("deerflow.uploads.manager.get_paths", lambda: paths) + + async def read_file(file_info, client): + del file_info, client + return b"owner data" + + INBOUND_FILE_READERS["owner-test"] = read_file + + async def go(): + try: + created = await _ingest_inbound_files( + "thread-owner", + InboundMessage( + channel_name="owner-test", + chat_id="C123", + user_id="U-platform", + text="file", + files=[{"filename": "report.txt", "type": "file"}], + ), + user_id="owner-1", + ) + finally: + INBOUND_FILE_READERS.pop("owner-test", None) + + assert created == [ + { + "filename": "report.txt", + "size": len(b"owner data"), + "path": "/mnt/user-data/uploads/report.txt", + "is_image": False, + } + ] + assert (paths.sandbox_uploads_dir("thread-owner", user_id="owner-1") / "report.txt").read_bytes() == b"owner data" + assert not paths.sandbox_uploads_dir("thread-owner").exists() + + _run(go()) + + def test_channel_storage_user_id_falls_back_to_platform_user(self, monkeypatch): + """Unbound auth-enabled channels stage files under the same bucket the run uses. + + ``_resolve_run_params`` runs an unbound msg under ``safe(msg.user_id)``, so + ``_channel_storage_user_id`` must resolve to the same value instead of + ``None`` (which would fall back to ``"default"`` in the dispatcher task and + cross buckets — the agent would read uploads the channel never wrote there). + """ + from app.channels.manager import _channel_storage_user_id, _safe_user_id_for_run + + # Auth enabled (no auth-disabled owner), unbound (no owner_user_id). + monkeypatch.setattr("app.channels.manager._auth_disabled_owner_user_id", lambda: None) + + unbound = InboundMessage(channel_name="slack", chat_id="C1", user_id="U-platform", text="hi") + assert _channel_storage_user_id(unbound) == _safe_user_id_for_run("U-platform") + + bound = InboundMessage(channel_name="slack", chat_id="C1", user_id="U-platform", text="hi", owner_user_id="owner-1") + assert _channel_storage_user_id(bound) == _safe_user_id_for_run("owner-1") + + anonymous = InboundMessage(channel_name="slack", chat_id="C1", user_id="", text="hi") + assert _channel_storage_user_id(anonymous) is None + def test_handle_chat_creates_thread(self): from app.channels.manager import ChannelManager @@ -1862,7 +1978,8 @@ class TestChannelManager: def test_handle_command_slash_skill_with_attachment_preserves_original_content(self, monkeypatch, tmp_path): from app.channels.manager import ChannelManager - async def fake_ingest(thread_id, msg): + async def fake_ingest(thread_id, msg, *, user_id=None): + del user_id return [ { "filename": "report.pdf", @@ -1916,7 +2033,8 @@ class TestChannelManager: def test_streaming_slash_skill_with_attachment_preserves_original_content(self, monkeypatch, tmp_path): from app.channels.manager import ChannelManager - async def fake_ingest(thread_id, msg): + async def fake_ingest(thread_id, msg, *, user_id=None): + del user_id return [ { "filename": "report.pdf", @@ -2658,6 +2776,31 @@ class TestResolveRunParamsUserId: assert run_context["user_id"] == "123456" assert run_context["channel_user_id"] == "123456" + @pytest.mark.parametrize( + "kwargs", + [ + {"user_id": "U-platform", "owner_user_id": "deerflow-user-1"}, # bound + {"user_id": "U-platform"}, # unbound auth-enabled + {"user_id": "feishu|ou_AbC/123"}, # unbound needing sanitization + ], + ) + def test_run_identity_matches_storage_bucket(self, kwargs, monkeypatch): + """The run user_id and the file/artifact storage bucket share one resolver. + + Pins #2 and #3 to a single source of truth so they cannot drift: whatever + _resolve_run_params puts in run_context["user_id"] is exactly what + _channel_storage_user_id scopes uploads/artifacts to. + """ + from app.channels.manager import _channel_storage_user_id + + manager = self._manager() + monkeypatch.delenv("DEER_FLOW_AUTH_DISABLED", raising=False) + msg = InboundMessage(channel_name="slack", chat_id="C123", text="hi", **kwargs) + + _, _, run_context = manager._resolve_run_params(msg, "thread-1") + + assert run_context["user_id"] == _channel_storage_user_id(msg) + def test_connection_owner_user_id_takes_precedence_over_platform_user_id(self, monkeypatch): manager = self._manager() monkeypatch.delenv("DEER_FLOW_AUTH_DISABLED", raising=False) @@ -3429,6 +3572,60 @@ class TestFormatArtifactText: class TestHandleChatWithArtifacts: + def test_bound_owner_artifacts_resolve_from_owner_outputs_bucket(self, tmp_path, monkeypatch): + from app.channels.manager import ChannelManager + from deerflow.config.paths import Paths + + paths = Paths(tmp_path) + monkeypatch.setattr("deerflow.config.paths.get_paths", lambda: paths) + outputs_dir = paths.sandbox_outputs_dir("test-thread-123", user_id="owner-1") + outputs_dir.mkdir(parents=True) + (outputs_dir / "report.md").write_text("owner report", encoding="utf-8") + + async def go(): + bus = MessageBus() + store = ChannelStore(path=tmp_path / "store.json") + manager = ChannelManager(bus=bus, store=store) + + run_result = { + "messages": [ + {"type": "human", "content": "generate report"}, + { + "type": "ai", + "content": "Here is your report.", + "tool_calls": [ + {"name": "present_files", "args": {"filepaths": ["/mnt/user-data/outputs/report.md"]}}, + ], + }, + {"type": "tool", "name": "present_files", "content": "ok"}, + ], + } + mock_client = _make_mock_langgraph_client(run_result=run_result) + manager._client = mock_client + + outbound_received = [] + bus.subscribe_outbound(lambda msg: outbound_received.append(msg)) + await manager.start() + + await bus.publish_inbound( + InboundMessage( + channel_name="test", + chat_id="c1", + user_id="U-platform", + owner_user_id="owner-1", + connection_id="connection-1", + text="generate report", + ) + ) + await _wait_for(lambda: len(outbound_received) >= 1) + await manager.stop() + + assert len(outbound_received) == 1 + assert len(outbound_received[0].attachments) == 1 + assert outbound_received[0].attachments[0].actual_path == outputs_dir / "report.md" + + _run(go()) + def test_artifacts_appended_to_text(self): from app.channels.manager import ChannelManager diff --git a/backend/tests/test_memory_router.py b/backend/tests/test_memory_router.py index 91fd1d662..2989f6ab5 100644 --- a/backend/tests/test_memory_router.py +++ b/backend/tests/test_memory_router.py @@ -1,3 +1,5 @@ +import asyncio +from types import SimpleNamespace from unittest.mock import patch from fastapi import FastAPI @@ -303,3 +305,128 @@ def test_update_memory_fact_route_returns_specific_error_for_invalid_confidence( assert response.status_code == 400 assert response.json()["detail"] == "Invalid confidence value; must be between 0 and 1." + + +def _internal_owner_request(owner_user_id: str) -> SimpleNamespace: + """Build a trusted-internal request carrying the connection owner header. + + Mirrors what ``AuthMiddleware`` stamps for a channel worker that holds the + internal token (``request.state.user`` is the synthetic internal user) and + what ``ChannelManager._fetch_gateway`` attaches via ``_owner_headers``. + """ + from app.gateway.internal_auth import INTERNAL_OWNER_USER_ID_HEADER_NAME, INTERNAL_SYSTEM_ROLE + from deerflow.runtime.user_context import DEFAULT_USER_ID + + return SimpleNamespace( + headers={INTERNAL_OWNER_USER_ID_HEADER_NAME: owner_user_id}, + state=SimpleNamespace(user=SimpleNamespace(id=DEFAULT_USER_ID, system_role=INTERNAL_SYSTEM_ROLE)), + ) + + +def test_get_memory_honors_bound_owner_header() -> None: + """A bound IM ``/memory`` reads the owner's bucket, not the internal user's.""" + seen: dict[str, str] = {} + + def fake_get_memory_data(*, user_id: str) -> dict: + seen["user_id"] = user_id + return _sample_memory(facts=[{"id": "f", "content": "owner fact", "category": "context", "confidence": 0.9, "createdAt": "", "source": "owner"}]) + + with patch("app.gateway.routers.memory.get_memory_data", side_effect=fake_get_memory_data): + response = asyncio.run(memory.get_memory(_internal_owner_request("owner-1"))) + + assert seen["user_id"] == "owner-1" + assert response.facts[0].content == "owner fact" + + +def test_get_memory_sanitizes_unsafe_owner_header() -> None: + """A bound owner id needing sanitization routes to the safe bucket, not a 500. + + The trusted owner header carries the raw owner id. The memory router must + normalize it through the same ``make_safe_user_id`` the channel file pipeline + applies, so the memory bucket matches the owner's file/upload bucket and the + raw id never reaches ``_validate_user_id`` unsanitized. + """ + from deerflow.config.paths import make_safe_user_id + + raw_owner = "feishu|ou_AbC/123" + seen: dict[str, str] = {} + + def fake_get_memory_data(*, user_id: str) -> dict: + seen["user_id"] = user_id + return _sample_memory() + + with patch("app.gateway.routers.memory.get_memory_data", side_effect=fake_get_memory_data): + asyncio.run(memory.get_memory(_internal_owner_request(raw_owner))) + + expected = make_safe_user_id(raw_owner) + assert seen["user_id"] == expected + assert seen["user_id"] != raw_owner + + +def test_get_memory_falls_back_to_effective_user_for_browser_requests() -> None: + """Non-internal callers ignore the owner header and use the effective user.""" + from app.gateway.internal_auth import INTERNAL_OWNER_USER_ID_HEADER_NAME + + seen: dict[str, str] = {} + + def fake_get_memory_data(*, user_id: str) -> dict: + seen["user_id"] = user_id + return _sample_memory() + + # A real browser user (system_role "user") must never be overridden even if + # a spoofed owner header is present — the header is only honored for the + # synthetic internal caller after the internal token is validated. + browser_request = SimpleNamespace( + headers={INTERNAL_OWNER_USER_ID_HEADER_NAME: "owner-1"}, + state=SimpleNamespace(user=SimpleNamespace(id="real-user", system_role="user")), + ) + + with patch("app.gateway.routers.memory.get_effective_user_id", return_value="real-user"): + with patch("app.gateway.routers.memory.get_memory_data", side_effect=fake_get_memory_data): + asyncio.run(memory.get_memory(browser_request)) + + assert seen["user_id"] == "real-user" + + +def _browser_request_with_spoofed_owner_header() -> SimpleNamespace: + from app.gateway.internal_auth import INTERNAL_OWNER_USER_ID_HEADER_NAME + + return SimpleNamespace( + headers={INTERNAL_OWNER_USER_ID_HEADER_NAME: "owner-1"}, + state=SimpleNamespace(user=SimpleNamespace(id="real-user", system_role="user")), + ) + + +def test_clear_memory_scopes_destructive_write_to_bound_owner() -> None: + """A bound IM caller clears the owner's bucket; a browser user keeps their own.""" + seen: dict[str, str] = {} + + def fake_clear(*, user_id: str) -> dict: + seen["user_id"] = user_id + return _sample_memory() + + with patch("app.gateway.routers.memory.clear_memory_data", side_effect=fake_clear): + asyncio.run(memory.clear_memory(_internal_owner_request("owner-1"))) + assert seen["user_id"] == "owner-1" + + with patch("app.gateway.routers.memory.get_effective_user_id", return_value="real-user"): + asyncio.run(memory.clear_memory(_browser_request_with_spoofed_owner_header())) + assert seen["user_id"] == "real-user" + + +def test_import_memory_scopes_overwrite_to_bound_owner() -> None: + """A bound IM caller overwrites the owner's bucket; a spoofed header is ignored.""" + seen: dict[str, str] = {} + payload = memory.MemoryResponse(**_sample_memory()) + + def fake_import(_data: dict, *, user_id: str) -> dict: + seen["user_id"] = user_id + return _sample_memory() + + with patch("app.gateway.routers.memory.import_memory_data", side_effect=fake_import): + asyncio.run(memory.import_memory(payload, _internal_owner_request("owner-1"))) + assert seen["user_id"] == "owner-1" + + with patch("app.gateway.routers.memory.get_effective_user_id", return_value="real-user"): + asyncio.run(memory.import_memory(payload, _browser_request_with_spoofed_owner_header())) + assert seen["user_id"] == "real-user"