fix(sandbox): scope provisioner PVC data by user (#2973)

* fix(sandbox): scope provisioner PVC data by user

* Address provisioner PVC review feedback
This commit is contained in:
魔力鸟
2026-05-17 15:23:42 +08:00
committed by GitHub
parent c0233cae26
commit e74e126ed3
7 changed files with 92 additions and 29 deletions
@@ -21,6 +21,8 @@ import logging
import requests import requests
from deerflow.runtime.user_context import get_effective_user_id
from .backend import SandboxBackend from .backend import SandboxBackend
from .sandbox_info import SandboxInfo from .sandbox_info import SandboxInfo
@@ -138,6 +140,7 @@ class RemoteSandboxBackend(SandboxBackend):
json={ json={
"sandbox_id": sandbox_id, "sandbox_id": sandbox_id,
"thread_id": thread_id, "thread_id": thread_id,
"user_id": get_effective_user_id(),
}, },
timeout=30, timeout=30,
) )
@@ -1,11 +1,13 @@
"""Tests for AioSandboxProvider mount helpers.""" """Tests for AioSandboxProvider mount helpers."""
import importlib import importlib
from types import SimpleNamespace
from unittest.mock import MagicMock, patch from unittest.mock import MagicMock, patch
import pytest import pytest
from deerflow.config.paths import Paths, join_host_path from deerflow.config.paths import Paths, join_host_path
from deerflow.runtime.user_context import reset_current_user, set_current_user
# ── ensure_thread_dirs ─────────────────────────────────────────────────────── # ── ensure_thread_dirs ───────────────────────────────────────────────────────
@@ -136,3 +138,36 @@ def test_discover_or_create_only_unlocks_when_lock_succeeds(tmp_path, monkeypatc
provider._discover_or_create_with_lock("thread-5", "sandbox-5") provider._discover_or_create_with_lock("thread-5", "sandbox-5")
assert unlock_calls == [] assert unlock_calls == []
def test_remote_backend_create_forwards_effective_user_id(monkeypatch):
"""Provisioner mode must receive user_id so PVC subPath matches user isolation."""
remote_mod = importlib.import_module("deerflow.community.aio_sandbox.remote_backend")
backend = remote_mod.RemoteSandboxBackend("http://provisioner:8002")
token = set_current_user(SimpleNamespace(id="user-7"))
posted: dict = {}
class _Response:
def raise_for_status(self):
return None
def json(self):
return {"sandbox_url": "http://sandbox.local"}
def _post(url, json, timeout): # noqa: A002 - mirrors requests.post kwarg
posted.update({"url": url, "json": json, "timeout": timeout})
return _Response()
monkeypatch.setattr(remote_mod.requests, "post", _post)
try:
backend.create("thread-42", "sandbox-42")
finally:
reset_current_user(token)
assert posted["url"] == "http://provisioner:8002/api/sandboxes"
assert posted["json"] == {
"sandbox_id": "sandbox-42",
"thread_id": "thread-42",
"user_id": "user-7",
}
+14 -8
View File
@@ -92,12 +92,19 @@ class TestBuildVolumeMounts:
userdata_mount = mounts[1] userdata_mount = mounts[1]
assert userdata_mount.sub_path is None assert userdata_mount.sub_path is None
def test_pvc_sets_subpath(self, provisioner_module): def test_pvc_sets_user_scoped_subpath(self, provisioner_module):
"""PVC mode should set sub_path to threads/{thread_id}/user-data.""" """PVC mode should include user_id in the user-data subPath."""
provisioner_module.USERDATA_PVC_NAME = "my-pvc"
mounts = provisioner_module._build_volume_mounts("thread-42", user_id="user-7")
userdata_mount = mounts[1]
assert userdata_mount.sub_path == "deer-flow/users/user-7/threads/thread-42/user-data"
def test_pvc_defaults_to_default_user_subpath(self, provisioner_module):
"""Older callers should still land under a stable default user namespace."""
provisioner_module.USERDATA_PVC_NAME = "my-pvc" provisioner_module.USERDATA_PVC_NAME = "my-pvc"
mounts = provisioner_module._build_volume_mounts("thread-42") mounts = provisioner_module._build_volume_mounts("thread-42")
userdata_mount = mounts[1] userdata_mount = mounts[1]
assert userdata_mount.sub_path == "threads/thread-42/user-data" assert userdata_mount.sub_path == "deer-flow/users/default/threads/thread-42/user-data"
def test_skills_mount_read_only(self, provisioner_module): def test_skills_mount_read_only(self, provisioner_module):
"""Skills mount should always be read-only.""" """Skills mount should always be read-only."""
@@ -146,13 +153,12 @@ class TestBuildPodVolumes:
pod = provisioner_module._build_pod("sandbox-1", "thread-1") pod = provisioner_module._build_pod("sandbox-1", "thread-1")
assert len(pod.spec.containers[0].volume_mounts) == 2 assert len(pod.spec.containers[0].volume_mounts) == 2
def test_pod_pvc_mode(self, provisioner_module): def test_pod_pvc_mode_uses_user_scoped_subpath(self, provisioner_module):
"""Pod should use PVC volumes when PVC names are configured.""" """Pod should use a user-scoped subPath for PVC user-data."""
provisioner_module.SKILLS_PVC_NAME = "skills-pvc" provisioner_module.SKILLS_PVC_NAME = "skills-pvc"
provisioner_module.USERDATA_PVC_NAME = "userdata-pvc" provisioner_module.USERDATA_PVC_NAME = "userdata-pvc"
pod = provisioner_module._build_pod("sandbox-1", "thread-1") pod = provisioner_module._build_pod("sandbox-1", "thread-1", user_id="user-7")
assert pod.spec.volumes[0].persistent_volume_claim is not None assert pod.spec.volumes[0].persistent_volume_claim is not None
assert pod.spec.volumes[1].persistent_volume_claim is not None assert pod.spec.volumes[1].persistent_volume_claim is not None
# subPath should be set on user-data mount
userdata_mount = pod.spec.containers[0].volume_mounts[1] userdata_mount = pod.spec.containers[0].volume_mounts[1]
assert userdata_mount.sub_path == "threads/thread-1/user-data" assert userdata_mount.sub_path == "deer-flow/users/user-7/threads/thread-1/user-data"
+5 -1
View File
@@ -144,7 +144,11 @@ def test_provisioner_create_returns_sandbox_info(monkeypatch):
def mock_post(url: str, json: dict, timeout: int): def mock_post(url: str, json: dict, timeout: int):
assert url == "http://provisioner:8002/api/sandboxes" assert url == "http://provisioner:8002/api/sandboxes"
assert json == {"sandbox_id": "abc123", "thread_id": "thread-1"} assert json == {
"sandbox_id": "abc123",
"thread_id": "thread-1",
"user_id": "test-user-autouse",
}
assert timeout == 30 assert timeout == 30
return _StubResponse(payload={"sandbox_id": "abc123", "sandbox_url": "http://k3s:31001"}) return _StubResponse(payload={"sandbox_id": "abc123", "sandbox_url": "http://k3s:31001"})
+1 -1
View File
@@ -37,7 +37,7 @@ services:
- THREADS_HOST_PATH=${DEER_FLOW_ROOT}/backend/.deer-flow/threads - THREADS_HOST_PATH=${DEER_FLOW_ROOT}/backend/.deer-flow/threads
# Production: use PVC instead of hostPath to avoid data loss on node failure. # Production: use PVC instead of hostPath to avoid data loss on node failure.
# When set, hostPath vars above are ignored for the corresponding volume. # When set, hostPath vars above are ignored for the corresponding volume.
# USERDATA_PVC_NAME uses subPath (threads/{thread_id}/user-data) automatically. # USERDATA_PVC_NAME uses subPath (deer-flow/users/{user_id}/threads/{thread_id}/user-data) automatically.
# - SKILLS_PVC_NAME=deer-flow-skills-pvc # - SKILLS_PVC_NAME=deer-flow-skills-pvc
# - USERDATA_PVC_NAME=deer-flow-userdata-pvc # - USERDATA_PVC_NAME=deer-flow-userdata-pvc
- KUBECONFIG_PATH=/root/.kube/config - KUBECONFIG_PATH=/root/.kube/config
+21 -4
View File
@@ -20,7 +20,7 @@ The **Sandbox Provisioner** is a FastAPI service that dynamically manages sandbo
### How It Works ### How It Works
1. **Backend Request**: When the backend needs to execute code, it sends a `POST /api/sandboxes` request with a `sandbox_id` and `thread_id`. 1. **Backend Request**: When the backend needs to execute code, it sends a `POST /api/sandboxes` request with a `sandbox_id`, `thread_id`, and optional `user_id`.
2. **Pod Creation**: The provisioner creates a dedicated Pod in the `deer-flow` namespace with: 2. **Pod Creation**: The provisioner creates a dedicated Pod in the `deer-flow` namespace with:
- The sandbox container image (all-in-one-sandbox) - The sandbox container image (all-in-one-sandbox)
@@ -70,10 +70,13 @@ Create a new sandbox Pod + Service.
```json ```json
{ {
"sandbox_id": "abc-123", "sandbox_id": "abc-123",
"thread_id": "thread-456" "thread_id": "thread-456",
"user_id": "user-789"
} }
``` ```
`user_id` is optional for backwards compatibility and defaults to `default`. When `USERDATA_PVC_NAME` is set, the provisioner uses it to isolate PVC-backed user-data directories.
**Response**: **Response**:
```json ```json
{ {
@@ -138,11 +141,25 @@ The provisioner is configured via environment variables (set in [docker-compose-
| `SKILLS_HOST_PATH` | - | **Host machine** path to skills directory (must be absolute) | | `SKILLS_HOST_PATH` | - | **Host machine** path to skills directory (must be absolute) |
| `THREADS_HOST_PATH` | - | **Host machine** path to threads data directory (must be absolute) | | `THREADS_HOST_PATH` | - | **Host machine** path to threads data directory (must be absolute) |
| `SKILLS_PVC_NAME` | empty (use hostPath) | PVC name for skills volume; when set, sandbox Pods use PVC instead of hostPath | | `SKILLS_PVC_NAME` | empty (use hostPath) | PVC name for skills volume; when set, sandbox Pods use PVC instead of hostPath |
| `USERDATA_PVC_NAME` | empty (use hostPath) | PVC name for user-data volume; when set, uses PVC with `subPath: threads/{thread_id}/user-data` | | `USERDATA_PVC_NAME` | empty (use hostPath) | PVC name for user-data volume; when set, uses PVC with `subPath: deer-flow/users/{user_id}/threads/{thread_id}/user-data` |
| `KUBECONFIG_PATH` | `/root/.kube/config` | Path to kubeconfig **inside** the provisioner container | | `KUBECONFIG_PATH` | `/root/.kube/config` | Path to kubeconfig **inside** the provisioner container |
| `NODE_HOST` | `host.docker.internal` | Hostname that backend containers use to reach host NodePorts | | `NODE_HOST` | `host.docker.internal` | Hostname that backend containers use to reach host NodePorts |
| `K8S_API_SERVER` | (from kubeconfig) | Override K8s API server URL (e.g., `https://host.docker.internal:26443`) | | `K8S_API_SERVER` | (from kubeconfig) | Override K8s API server URL (e.g., `https://host.docker.internal:26443`) |
### PVC User-Data Upgrade Note
Older provisioner versions mounted PVC user-data from `threads/{thread_id}/user-data`. The user-scoped layout mounts from `deer-flow/users/{user_id}/threads/{thread_id}/user-data`.
If an existing deployment already has PVC-backed user-data under the legacy layout, migrate the DeerFlow data directory before relying on the new PVC subPath. Mount the same PVC path that the gateway uses as its DeerFlow base directory, then run the existing user-isolation migration script:
```bash
cd backend
PYTHONPATH=. python scripts/migrate_user_isolation.py --dry-run
PYTHONPATH=. python scripts/migrate_user_isolation.py --user-id <target-user-id>
```
This moves legacy `threads/{thread_id}/user-data` data under `users/<target-user-id>/threads/{thread_id}/user-data`, which matches the new provisioner PVC subPath when the gateway base directory is mounted at `deer-flow/` on the PVC. Use `default` as the target user only when the legacy data should remain in the default no-auth user namespace. Run the migration while no gateway or sandbox Pods are writing to those paths.
### Important: K8S_API_SERVER Override ### Important: K8S_API_SERVER Override
If your kubeconfig uses `localhost`, `127.0.0.1`, or `0.0.0.0` as the API server address (common with OrbStack, minikube, kind), the provisioner **cannot** reach it from inside the Docker container. If your kubeconfig uses `localhost`, `127.0.0.1`, or `0.0.0.0` as the API server address (common with OrbStack, minikube, kind), the provisioner **cannot** reach it from inside the Docker container.
@@ -213,7 +230,7 @@ curl http://localhost:8002/health
# Create a sandbox (via provisioner container for internal DNS) # Create a sandbox (via provisioner container for internal DNS)
docker exec deer-flow-provisioner curl -X POST http://localhost:8002/api/sandboxes \ docker exec deer-flow-provisioner curl -X POST http://localhost:8002/api/sandboxes \
-H "Content-Type: application/json" \ -H "Content-Type: application/json" \
-d '{"sandbox_id":"test-001","thread_id":"thread-001"}' -d '{"sandbox_id":"test-001","thread_id":"thread-001","user_id":"user-001"}'
# Check sandbox status # Check sandbox status
docker exec deer-flow-provisioner curl http://localhost:8002/api/sandboxes/test-001 docker exec deer-flow-provisioner curl http://localhost:8002/api/sandboxes/test-001
+13 -15
View File
@@ -63,6 +63,8 @@ THREADS_HOST_PATH = os.environ.get("THREADS_HOST_PATH", "/.deer-flow/threads")
SKILLS_PVC_NAME = os.environ.get("SKILLS_PVC_NAME", "") SKILLS_PVC_NAME = os.environ.get("SKILLS_PVC_NAME", "")
USERDATA_PVC_NAME = os.environ.get("USERDATA_PVC_NAME", "") USERDATA_PVC_NAME = os.environ.get("USERDATA_PVC_NAME", "")
SAFE_THREAD_ID_PATTERN = r"^[A-Za-z0-9_\-]+$" SAFE_THREAD_ID_PATTERN = r"^[A-Za-z0-9_\-]+$"
SAFE_USER_ID_PATTERN = r"^[A-Za-z0-9_\-]+$"
DEFAULT_USER_ID = "default"
# Path to the kubeconfig *inside* the provisioner container. # Path to the kubeconfig *inside* the provisioner container.
# Typically the host's ~/.kube/config is mounted here. # Typically the host's ~/.kube/config is mounted here.
@@ -95,14 +97,6 @@ def join_host_path(base: str, *parts: str) -> str:
return str(result) return str(result)
def _validate_thread_id(thread_id: str) -> str:
if not re.match(SAFE_THREAD_ID_PATTERN, thread_id):
raise ValueError(
"Invalid thread_id: only alphanumeric characters, hyphens, and underscores are allowed."
)
return thread_id
# ── K8s client setup ──────────────────────────────────────────────────── # ── K8s client setup ────────────────────────────────────────────────────
core_v1: k8s_client.CoreV1Api | None = None core_v1: k8s_client.CoreV1Api | None = None
@@ -221,6 +215,7 @@ app = FastAPI(title="DeerFlow Sandbox Provisioner", lifespan=lifespan)
class CreateSandboxRequest(BaseModel): class CreateSandboxRequest(BaseModel):
sandbox_id: str sandbox_id: str
thread_id: str = Field(pattern=SAFE_THREAD_ID_PATTERN) thread_id: str = Field(pattern=SAFE_THREAD_ID_PATTERN)
user_id: str = Field(default=DEFAULT_USER_ID, pattern=SAFE_USER_ID_PATTERN)
class SandboxResponse(BaseModel): class SandboxResponse(BaseModel):
@@ -283,7 +278,7 @@ def _build_volumes(thread_id: str) -> list[k8s_client.V1Volume]:
return [skills_vol, userdata_vol] return [skills_vol, userdata_vol]
def _build_volume_mounts(thread_id: str) -> list[k8s_client.V1VolumeMount]: def _build_volume_mounts(thread_id: str, user_id: str = DEFAULT_USER_ID) -> list[k8s_client.V1VolumeMount]:
"""Build volume mount list, using subPath for PVC user-data.""" """Build volume mount list, using subPath for PVC user-data."""
userdata_mount = k8s_client.V1VolumeMount( userdata_mount = k8s_client.V1VolumeMount(
name="user-data", name="user-data",
@@ -291,7 +286,7 @@ def _build_volume_mounts(thread_id: str) -> list[k8s_client.V1VolumeMount]:
read_only=False, read_only=False,
) )
if USERDATA_PVC_NAME: if USERDATA_PVC_NAME:
userdata_mount.sub_path = f"threads/{thread_id}/user-data" userdata_mount.sub_path = f"deer-flow/users/{user_id}/threads/{thread_id}/user-data"
return [ return [
k8s_client.V1VolumeMount( k8s_client.V1VolumeMount(
@@ -303,9 +298,8 @@ def _build_volume_mounts(thread_id: str) -> list[k8s_client.V1VolumeMount]:
] ]
def _build_pod(sandbox_id: str, thread_id: str) -> k8s_client.V1Pod: def _build_pod(sandbox_id: str, thread_id: str, user_id: str = DEFAULT_USER_ID) -> k8s_client.V1Pod:
"""Construct a Pod manifest for a single sandbox.""" """Construct a Pod manifest for a single sandbox."""
thread_id = _validate_thread_id(thread_id)
return k8s_client.V1Pod( return k8s_client.V1Pod(
metadata=k8s_client.V1ObjectMeta( metadata=k8s_client.V1ObjectMeta(
name=_pod_name(sandbox_id), name=_pod_name(sandbox_id),
@@ -362,7 +356,7 @@ def _build_pod(sandbox_id: str, thread_id: str) -> k8s_client.V1Pod:
"ephemeral-storage": "500Mi", "ephemeral-storage": "500Mi",
}, },
), ),
volume_mounts=_build_volume_mounts(thread_id), volume_mounts=_build_volume_mounts(thread_id, user_id=user_id),
security_context=k8s_client.V1SecurityContext( security_context=k8s_client.V1SecurityContext(
privileged=False, privileged=False,
allow_privilege_escalation=True, allow_privilege_escalation=True,
@@ -445,9 +439,13 @@ async def create_sandbox(req: CreateSandboxRequest):
""" """
sandbox_id = req.sandbox_id sandbox_id = req.sandbox_id
thread_id = req.thread_id thread_id = req.thread_id
user_id = req.user_id
logger.info( logger.info(
f"Received request to create sandbox '{sandbox_id}' for thread '{thread_id}'" "Received request to create sandbox '%s' for thread '%s' user '%s'",
sandbox_id,
thread_id,
user_id,
) )
# ── Fast path: sandbox already exists ──────────────────────────── # ── Fast path: sandbox already exists ────────────────────────────
@@ -461,7 +459,7 @@ async def create_sandbox(req: CreateSandboxRequest):
# ── Create Pod ─────────────────────────────────────────────────── # ── Create Pod ───────────────────────────────────────────────────
try: try:
core_v1.create_namespaced_pod(K8S_NAMESPACE, _build_pod(sandbox_id, thread_id)) core_v1.create_namespaced_pod(K8S_NAMESPACE, _build_pod(sandbox_id, thread_id, user_id=user_id))
logger.info(f"Created Pod {_pod_name(sandbox_id)}") logger.info(f"Created Pod {_pod_name(sandbox_id)}")
except ApiException as exc: except ApiException as exc:
if exc.status != 409: # 409 = AlreadyExists if exc.status != 409: # 409 = AlreadyExists