fix(channels): scope IM files and helper commands to owner (#3579)

* fix(channels): scope IM files and helper commands to owner

* fix(memory): honor bound IM owner for /memory gateway endpoints

The channel manager already attaches X-DeerFlow-Owner-User-Id for /memory
and /models, but the memory router resolved user_id solely from
get_effective_user_id(), which returns the synthetic internal user
(DEFAULT_USER_ID) for channel workers. A bound IM /memory therefore read
the default/internal memory instead of the connection owner's.

Resolve the owner via _resolve_memory_user_id(request) across all
/api/memory* endpoints: trusted internal callers act for the owner header,
browser/API callers fall back to get_effective_user_id(). Mirrors the
threads router's get_trusted_internal_owner_user_id pattern, completing
acceptance criterion #3 of #3539.

Add end-to-end tests asserting the resolved user_id (not just that the
header is sent) and that a spoofed owner header from a browser user is
ignored.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* fix(channels): align memory bucket and reuse cached storage owner

Address PR #3579 review feedback:

- Memory router now sanitizes the trusted owner header via make_safe_user_id
  before routing, matching the channel file pipeline
  (_safe_user_id_for_run/prepare_user_dir_for_raw_id). A bound owner id needing
  sanitization now resolves to the same bucket as its files/uploads instead of
  500ing in _validate_user_id.
- _handle_chat reuses the storage_user_id cached at the top of the method for
  artifact delivery instead of re-deriving _channel_storage_user_id(msg), so
  uploads and outputs cannot drift to different buckets if a channel rewrites
  the InboundMessage in receive_file.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* fix(channels): stage unbound IM files under the run's user bucket

Address PR #3579 review feedback (#5): _channel_storage_user_id now mirrors
_resolve_run_params' identity policy, falling back to safe(msg.user_id) instead
of returning None for unbound auth-enabled channels.

Previously an unbound msg ran under safe(platform_user_id) but staged uploads
under get_effective_user_id() in the dispatcher task (unset contextvar ->
"default"), so files landed in users/default/... while the agent read from
users/{safe_platform_user_id}/.... Bound and unbound channels now write where
the agent reads. Returns None only when no identity is available.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* fix(channels): reuse cached storage owner in streaming artifact delivery

Address PR #3579 review feedback (#6): thread the storage_user_id resolved in
_handle_chat into _handle_streaming_chat instead of re-deriving
_channel_storage_user_id(msg) in the finally block. Avoids re-running
_safe_user_id_for_run (and its possible filesystem touch) on the streaming-error
path and guarantees artifact delivery targets the same bucket as the uploads.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* docs(channels): document owner-scoped IM file storage

Address PR #3579 review feedback (#4): the IM Channels and File Upload sections
still described pre-PR default-bucket behaviour. Document that receive_file,
_ingest_inbound_files/ensure_uploads_dir/get_uploads_dir, and
_resolve_attachments/_prepare_artifact_delivery are owner-scoped via the user_id
kwarg, and that the bucket matches the memory bucket from _resolve_memory_user_id.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* refactor(channels): unify run identity and storage bucket resolution

Address PR #3579 review feedback (#3): _resolve_run_params no longer duplicates
the owner-resolution rule inline. After the #5 fix the inline block and
_channel_storage_user_id computed the identical sanitized-with-platform-fallback
value, so the run identity now calls the same helper, making it the single
source of truth for run_context["user_id"] and the file/artifact storage bucket.

_owner_headers stays deliberately separate: it sends the raw owner id over HTTP
for the gateway to re-resolve (no sanitize, no platform fallback), documented on
both helpers. test_run_identity_matches_storage_bucket pins the two together so
they cannot drift again.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Nan Gao
2026-06-18 05:45:35 +02:00
committed by GitHub
parent 2b301e8211
commit 525af0da14
8 changed files with 461 additions and 55 deletions
+3 -1
View File
@@ -178,7 +178,7 @@ class Channel(ABC):
except Exception:
logger.exception("[%s] failed to upload file %s", self.name, attachment.filename)
async def receive_file(self, msg: InboundMessage, thread_id: str) -> InboundMessage:
async def receive_file(self, msg: InboundMessage, thread_id: str, *, user_id: str | None = None) -> InboundMessage:
"""
Optionally process and materialize inbound file attachments for this channel.
@@ -190,8 +190,10 @@ class Channel(ABC):
Args:
msg: The inbound message, possibly containing file metadata in msg.files.
thread_id: The resolved DeerFlow thread ID for sandbox path context.
user_id: Optional DeerFlow storage user ID for user-scoped channel workers.
Returns:
The (possibly modified) InboundMessage, with text and/or files updated as needed.
"""
del user_id
return msg
+15 -7
View File
@@ -311,7 +311,7 @@ class FeishuChannel(Channel):
raise RuntimeError(f"Feishu file upload failed: code={response.code}, msg={response.msg}")
return response.data.file_key
async def receive_file(self, msg: InboundMessage, thread_id: str) -> InboundMessage:
async def receive_file(self, msg: InboundMessage, thread_id: str, *, user_id: str | None = None) -> InboundMessage:
"""Download a Feishu file into the thread uploads directory.
Returns the sandbox virtual path when the image is persisted successfully.
@@ -326,15 +326,23 @@ class FeishuChannel(Channel):
text = msg.text
for file in files:
if file.get("image_key"):
virtual_path = await self._receive_single_file(msg.thread_ts, file["image_key"], "image", thread_id)
virtual_path = await self._receive_single_file(msg.thread_ts, file["image_key"], "image", thread_id, user_id=user_id)
text = text.replace("[image]", virtual_path, 1)
elif file.get("file_key"):
virtual_path = await self._receive_single_file(msg.thread_ts, file["file_key"], "file", thread_id)
virtual_path = await self._receive_single_file(msg.thread_ts, file["file_key"], "file", thread_id, user_id=user_id)
text = text.replace("[file]", virtual_path, 1)
msg.text = text
return msg
async def _receive_single_file(self, message_id: str, file_key: str, type: Literal["image", "file"], thread_id: str) -> str:
async def _receive_single_file(
self,
message_id: str,
file_key: str,
type: Literal["image", "file"],
thread_id: str,
*,
user_id: str | None = None,
) -> str:
request = self._GetMessageResourceRequest.builder().message_id(message_id).file_key(file_key).type(type).build()
def inner():
@@ -373,9 +381,9 @@ class FeishuChannel(Channel):
return f"Failed to obtain the [{type}]"
paths = get_paths()
user_id = get_effective_user_id()
paths.ensure_thread_dirs(thread_id, user_id=user_id)
uploads_dir = paths.sandbox_uploads_dir(thread_id, user_id=user_id).resolve()
effective_user_id = user_id or get_effective_user_id()
paths.ensure_thread_dirs(thread_id, user_id=effective_user_id)
uploads_dir = paths.sandbox_uploads_dir(thread_id, user_id=effective_user_id).resolve()
ext = "png" if type == "image" else "bin"
raw_filename = getattr(response, "file_name", "") or f"feishu_{file_key[-12:]}.{ext}"
+61 -20
View File
@@ -525,6 +525,34 @@ def _safe_user_id_for_run(raw_user_id: str) -> str:
return make_safe_user_id(raw_user_id)
def _channel_storage_user_id(msg: InboundMessage) -> str | None:
"""Resolve the canonical DeerFlow user id for a channel-triggered message.
Single source of truth for both the agent **run identity**
(``_resolve_run_params`` → ``run_context["user_id"]``) and the **file/artifact
storage bucket** (``receive_file`` / ``_ingest_inbound_files`` /
``_prepare_artifact_delivery``), so the bucket the agent reads/writes always
matches where channel files are staged. Prefer the bound DeerFlow owner,
otherwise fall back to the sanitized raw platform user id. Without that
fallback, an unbound auth-enabled channel would run under ``safe(msg.user_id)``
but stage files under ``get_effective_user_id()`` (the dispatcher task's unset
contextvar → ``"default"``), so uploads would land in ``users/default/...``
while the agent reads ``users/{safe_platform_user_id}/...``. Returns ``None``
only when neither identity is available, leaving the caller to fall back to the
contextvar/default user.
Distinct from :func:`_owner_headers`, which deliberately sends the *raw* owner
id (no sanitize, no platform fallback) over HTTP for gateway to re-resolve;
this helper is the in-process, sanitized, filesystem-facing identity.
"""
owner_user_id = _effective_owner_user_id(msg)
if owner_user_id:
return _safe_user_id_for_run(owner_user_id)
if msg.user_id:
return _safe_user_id_for_run(msg.user_id)
return None
def _resolve_slash_skill_command(
text: str,
available_skills: set[str] | None = None,
@@ -551,7 +579,7 @@ def _resolve_slash_skill_command(
raise SlashSkillCommandResolutionError("Failed to resolve slash skill command. Please check the skill configuration.") from exc
def _resolve_attachments(thread_id: str, artifacts: list[str]) -> list[ResolvedAttachment]:
def _resolve_attachments(thread_id: str, artifacts: list[str], *, user_id: str | None = None) -> list[ResolvedAttachment]:
"""Resolve virtual artifact paths to host filesystem paths with metadata.
Only paths under ``/mnt/user-data/outputs/`` are accepted; any other
@@ -565,15 +593,15 @@ def _resolve_attachments(thread_id: str, artifacts: list[str]) -> list[ResolvedA
attachments: list[ResolvedAttachment] = []
paths = get_paths()
user_id = get_effective_user_id()
outputs_dir = paths.sandbox_outputs_dir(thread_id, user_id=user_id).resolve()
effective_user_id = user_id or get_effective_user_id()
outputs_dir = paths.sandbox_outputs_dir(thread_id, user_id=effective_user_id).resolve()
for virtual_path in artifacts:
# Security: only allow files from the agent outputs directory
if not virtual_path.startswith(_OUTPUTS_VIRTUAL_PREFIX):
logger.warning("[Manager] rejected non-outputs artifact path: %s", virtual_path)
continue
try:
actual = paths.resolve_virtual_path(thread_id, virtual_path, user_id=user_id)
actual = paths.resolve_virtual_path(thread_id, virtual_path, user_id=effective_user_id)
# Verify the resolved path is actually under the outputs directory
# (guards against path-traversal even after prefix check)
try:
@@ -605,13 +633,15 @@ def _prepare_artifact_delivery(
thread_id: str,
response_text: str,
artifacts: list[str],
*,
user_id: str | None = None,
) -> tuple[str, list[ResolvedAttachment]]:
"""Resolve attachments and append filename fallbacks to the text response."""
attachments: list[ResolvedAttachment] = []
if not artifacts:
return response_text, attachments
attachments = _resolve_attachments(thread_id, artifacts)
attachments = _resolve_attachments(thread_id, artifacts, user_id=user_id)
resolved_virtuals = {attachment.virtual_path for attachment in attachments}
unresolved = [path for path in artifacts if path not in resolved_virtuals]
@@ -628,7 +658,7 @@ def _prepare_artifact_delivery(
return response_text, attachments
async def _ingest_inbound_files(thread_id: str, msg: InboundMessage) -> list[dict[str, Any]]:
async def _ingest_inbound_files(thread_id: str, msg: InboundMessage, *, user_id: str | None = None) -> list[dict[str, Any]]:
if not msg.files:
return []
@@ -643,7 +673,7 @@ async def _ingest_inbound_files(thread_id: str, msg: InboundMessage) -> list[dic
def _prepare_uploads_dir() -> tuple[Path, set[str]]:
# Worker thread: ensure_uploads_dir's mkdir and the iterdir enumeration are
# blocking filesystem IO that must stay off the event loop.
target = ensure_uploads_dir(thread_id)
target = ensure_uploads_dir(thread_id, user_id=user_id)
existing = {entry.name for entry in target.iterdir() if entry.is_file()}
return target, existing
@@ -833,11 +863,12 @@ class ChannelManager:
# owns the connection. Preserve the raw platform user under
# ``channel_user_id`` for platform-facing lookups and audits.
run_context_identity: dict[str, Any] = {"thread_id": thread_id}
owner_user_id = _effective_owner_user_id(msg)
if owner_user_id:
run_context_identity["user_id"] = _safe_user_id_for_run(owner_user_id)
elif msg.user_id:
run_context_identity["user_id"] = _safe_user_id_for_run(msg.user_id)
# Single source of truth for the run identity: the same helper that scopes
# inbound files and outbound artifacts, so the bucket the agent reads/writes
# always matches where channel files are staged.
run_user_id = _channel_storage_user_id(msg)
if run_user_id:
run_context_identity["user_id"] = run_user_id
if msg.user_id:
run_context_identity["channel_user_id"] = msg.user_id
@@ -1215,6 +1246,7 @@ class ChannelManager:
return
client = self._get_client()
storage_user_id = _channel_storage_user_id(msg)
# Look up existing DeerFlow thread.
# topic_id may be None (e.g. Telegram private chats) — the store
@@ -1240,12 +1272,12 @@ class ChannelManager:
service = get_channel_service()
channel = service.get_channel(msg.channel_name) if service else None
logger.info("[Manager] preparing receive file context for %d attachments", len(msg.files))
msg = await channel.receive_file(msg, thread_id) if channel else msg
msg = await channel.receive_file(msg, thread_id, user_id=storage_user_id) if channel else msg
if extra_context:
run_context.update(extra_context)
original_text = msg.text
uploaded = await _ingest_inbound_files(thread_id, msg)
uploaded = await _ingest_inbound_files(thread_id, msg, user_id=storage_user_id)
if uploaded:
msg.text = f"{_format_uploaded_files_block(uploaded)}\n\n{msg.text}".strip()
human_message = _human_input_message(msg.text, original_content=original_text)
@@ -1259,6 +1291,7 @@ class ChannelManager:
run_config,
run_context,
human_message,
storage_user_id=storage_user_id,
)
return
@@ -1296,7 +1329,10 @@ class ChannelManager:
len(artifacts),
)
response_text, attachments = _prepare_artifact_delivery(thread_id, response_text, artifacts)
# Reuse the storage owner cached at the top of _handle_chat so uploads and
# artifact delivery always resolve to the same bucket, even if a future
# channel.receive_file returns a rewritten InboundMessage.
response_text, attachments = _prepare_artifact_delivery(thread_id, response_text, artifacts, user_id=storage_user_id)
if not response_text:
if attachments:
@@ -1328,6 +1364,7 @@ class ChannelManager:
run_config: dict[str, Any],
run_context: dict[str, Any],
human_message: dict[str, Any],
storage_user_id: str | None = None,
) -> None:
logger.info("[Manager] invoking runs.stream(thread_id=%s, text_len=%d)", thread_id, len(msg.text or ""))
@@ -1400,7 +1437,10 @@ class ChannelManager:
response_text = _extract_response_text(result)
pending_clarification = _has_current_turn_clarification(result)
artifacts = _extract_artifacts(result)
response_text, attachments = _prepare_artifact_delivery(thread_id, response_text, artifacts)
# Reuse the storage owner resolved by _handle_chat so artifact delivery
# matches the upload bucket and we avoid re-running _safe_user_id_for_run
# (and its possible filesystem touch) on the streaming-error path.
response_text, attachments = _prepare_artifact_delivery(thread_id, response_text, artifacts, user_id=storage_user_id)
if not response_text:
if attachments:
@@ -1481,9 +1521,9 @@ class ChannelManager:
thread_id = await self._lookup_thread_id(msg)
reply = f"Active thread: {thread_id}" if thread_id else "No active conversation."
elif reply is None and command == "models":
reply = await self._fetch_gateway("/api/models", "models")
reply = await self._fetch_gateway("/api/models", "models", msg=msg)
elif reply is None and command == "memory":
reply = await self._fetch_gateway("/api/memory", "memory")
reply = await self._fetch_gateway("/api/memory", "memory", msg=msg)
elif reply is None and command == "help":
reply = (
"Available commands:\n"
@@ -1526,16 +1566,17 @@ class ChannelManager:
)
await self.bus.publish_outbound(outbound)
async def _fetch_gateway(self, path: str, kind: str) -> str:
async def _fetch_gateway(self, path: str, kind: str, *, msg: InboundMessage | None = None) -> str:
"""Fetch data from the Gateway API for command responses."""
import httpx
try:
headers = _owner_headers(msg) if msg is not None else None
async with httpx.AsyncClient() as http:
resp = await http.get(
f"{self._gateway_url}{path}",
timeout=10,
headers=create_internal_auth_headers(),
headers=headers or create_internal_auth_headers(),
)
resp.raise_for_status()
data = resp.json()
+43 -19
View File
@@ -1,8 +1,9 @@
"""Memory API router for retrieving and managing global memory data."""
from fastapi import APIRouter, HTTPException
from fastapi import APIRouter, HTTPException, Request
from pydantic import BaseModel, Field
from app.gateway.internal_auth import get_trusted_internal_owner_user_id
from deerflow.agents.memory.updater import (
clear_memory_data,
create_memory_fact,
@@ -13,11 +14,34 @@ from deerflow.agents.memory.updater import (
update_memory_fact,
)
from deerflow.config.memory_config import get_memory_config
from deerflow.config.paths import make_safe_user_id
from deerflow.runtime.user_context import get_effective_user_id
router = APIRouter(prefix="/api", tags=["memory"])
def _resolve_memory_user_id(request: Request) -> str:
"""Resolve the memory owner for this request.
Honors the trusted internal owner header that channel workers attach when
acting for a connection owner, so an IM ``/memory`` command reads the bound
owner's memory instead of the synthetic internal user. The header is only
honored after ``AuthMiddleware`` validated the internal token (see
``get_trusted_internal_owner_user_id``). Browser/API callers are never
internal, so this falls back to the normal contextvar-based effective user.
The trusted owner header carries the *raw* owner id, so sanitize it through
``make_safe_user_id`` (the same normalization the channel file pipeline applies
via ``_safe_user_id_for_run``/``prepare_user_dir_for_raw_id``). This keeps the
memory bucket aligned with the owner's file/upload bucket and avoids a 500 when
the raw id contains characters ``_validate_user_id`` would reject.
"""
raw_owner = get_trusted_internal_owner_user_id(request)
if raw_owner:
return make_safe_user_id(raw_owner)
return get_effective_user_id()
class ContextSection(BaseModel):
"""Model for context sections (user and history)."""
@@ -115,7 +139,7 @@ class MemoryStatusResponse(BaseModel):
summary="Get Memory Data",
description="Retrieve the current global memory data including user context, history, and facts.",
)
async def get_memory() -> MemoryResponse:
async def get_memory(http_request: Request) -> MemoryResponse:
"""Get the current global memory data.
Returns:
@@ -149,7 +173,7 @@ async def get_memory() -> MemoryResponse:
}
```
"""
memory_data = get_memory_data(user_id=get_effective_user_id())
memory_data = get_memory_data(user_id=_resolve_memory_user_id(http_request))
return MemoryResponse(**memory_data)
@@ -160,7 +184,7 @@ async def get_memory() -> MemoryResponse:
summary="Reload Memory Data",
description="Reload memory data from the storage file, refreshing the in-memory cache.",
)
async def reload_memory() -> MemoryResponse:
async def reload_memory(http_request: Request) -> MemoryResponse:
"""Reload memory data from file.
This forces a reload of the memory data from the storage file,
@@ -169,7 +193,7 @@ async def reload_memory() -> MemoryResponse:
Returns:
The reloaded memory data.
"""
memory_data = reload_memory_data(user_id=get_effective_user_id())
memory_data = reload_memory_data(user_id=_resolve_memory_user_id(http_request))
return MemoryResponse(**memory_data)
@@ -180,10 +204,10 @@ async def reload_memory() -> MemoryResponse:
summary="Clear All Memory Data",
description="Delete all saved memory data and reset the memory structure to an empty state.",
)
async def clear_memory() -> MemoryResponse:
async def clear_memory(http_request: Request) -> MemoryResponse:
"""Clear all persisted memory data."""
try:
memory_data = clear_memory_data(user_id=get_effective_user_id())
memory_data = clear_memory_data(user_id=_resolve_memory_user_id(http_request))
except OSError as exc:
raise HTTPException(status_code=500, detail="Failed to clear memory data.") from exc
@@ -197,14 +221,14 @@ async def clear_memory() -> MemoryResponse:
summary="Create Memory Fact",
description="Create a single saved memory fact manually.",
)
async def create_memory_fact_endpoint(request: FactCreateRequest) -> MemoryResponse:
async def create_memory_fact_endpoint(request: FactCreateRequest, http_request: Request) -> MemoryResponse:
"""Create a single fact manually."""
try:
memory_data = create_memory_fact(
content=request.content,
category=request.category,
confidence=request.confidence,
user_id=get_effective_user_id(),
user_id=_resolve_memory_user_id(http_request),
)
except ValueError as exc:
raise _map_memory_fact_value_error(exc) from exc
@@ -221,10 +245,10 @@ async def create_memory_fact_endpoint(request: FactCreateRequest) -> MemoryRespo
summary="Delete Memory Fact",
description="Delete a single saved memory fact by its fact id.",
)
async def delete_memory_fact_endpoint(fact_id: str) -> MemoryResponse:
async def delete_memory_fact_endpoint(fact_id: str, http_request: Request) -> MemoryResponse:
"""Delete a single fact from memory by fact id."""
try:
memory_data = delete_memory_fact(fact_id, user_id=get_effective_user_id())
memory_data = delete_memory_fact(fact_id, user_id=_resolve_memory_user_id(http_request))
except KeyError as exc:
raise HTTPException(status_code=404, detail=f"Memory fact '{fact_id}' not found.") from exc
except OSError as exc:
@@ -240,7 +264,7 @@ async def delete_memory_fact_endpoint(fact_id: str) -> MemoryResponse:
summary="Patch Memory Fact",
description="Partially update a single saved memory fact by its fact id while preserving omitted fields.",
)
async def update_memory_fact_endpoint(fact_id: str, request: FactPatchRequest) -> MemoryResponse:
async def update_memory_fact_endpoint(fact_id: str, request: FactPatchRequest, http_request: Request) -> MemoryResponse:
"""Partially update a single fact manually."""
try:
memory_data = update_memory_fact(
@@ -248,7 +272,7 @@ async def update_memory_fact_endpoint(fact_id: str, request: FactPatchRequest) -
content=request.content,
category=request.category,
confidence=request.confidence,
user_id=get_effective_user_id(),
user_id=_resolve_memory_user_id(http_request),
)
except ValueError as exc:
raise _map_memory_fact_value_error(exc) from exc
@@ -267,9 +291,9 @@ async def update_memory_fact_endpoint(fact_id: str, request: FactPatchRequest) -
summary="Export Memory Data",
description="Export the current global memory data as JSON for backup or transfer.",
)
async def export_memory() -> MemoryResponse:
async def export_memory(http_request: Request) -> MemoryResponse:
"""Export the current memory data."""
memory_data = get_memory_data(user_id=get_effective_user_id())
memory_data = get_memory_data(user_id=_resolve_memory_user_id(http_request))
return MemoryResponse(**memory_data)
@@ -280,10 +304,10 @@ async def export_memory() -> MemoryResponse:
summary="Import Memory Data",
description="Import and overwrite the current global memory data from a JSON payload.",
)
async def import_memory(request: MemoryResponse) -> MemoryResponse:
async def import_memory(request: MemoryResponse, http_request: Request) -> MemoryResponse:
"""Import and persist memory data."""
try:
memory_data = import_memory_data(request.model_dump(), user_id=get_effective_user_id())
memory_data = import_memory_data(request.model_dump(), user_id=_resolve_memory_user_id(http_request))
except OSError as exc:
raise HTTPException(status_code=500, detail="Failed to import memory data.") from exc
@@ -336,14 +360,14 @@ async def get_memory_config_endpoint() -> MemoryConfigResponse:
summary="Get Memory Status",
description="Retrieve both memory configuration and current data in a single request.",
)
async def get_memory_status() -> MemoryStatusResponse:
async def get_memory_status(http_request: Request) -> MemoryStatusResponse:
"""Get the memory system status including configuration and data.
Returns:
Combined memory configuration and current data.
"""
config = get_memory_config()
memory_data = get_memory_data(user_id=get_effective_user_id())
memory_data = get_memory_data(user_id=_resolve_memory_user_id(http_request))
return MemoryStatusResponse(
config=MemoryConfigResponse(